Skip to content

Commit

Permalink
Feature: Add an API that exposes programs that respond to Aleph messages
Browse files Browse the repository at this point in the history
Fixes #133
  • Loading branch information
hoh authored and odesenfans committed May 31, 2022
1 parent f8f6c2f commit deaa4c3
Show file tree
Hide file tree
Showing 5 changed files with 209 additions and 1 deletion.
12 changes: 11 additions & 1 deletion src/aleph/utils.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,23 @@
import asyncio
from copy import copy
from hashlib import sha256
from typing import Union
from typing import Union, Dict

from aleph_message.models import ItemType

from aleph.exceptions import UnknownHashError
from aleph.settings import settings


def trim_mongo_id(message: Dict, inplace: bool = True):
"""Remove the MongoDB id of a MongoDB record"""
if '_id' in message:
if inplace is False:
message = copy(message)
message.pop('_id')
return message


async def run_in_executor(executor, func, *args):
if settings.use_executors:
loop = asyncio.get_running_loop()
Expand Down
44 changes: 44 additions & 0 deletions src/aleph/web/controllers/programs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import json

from aiohttp import web
from aleph_message.models import MessageType
from bson import json_util
from pydantic import BaseModel, ValidationError

from aleph.model.messages import Message
from aleph.utils import trim_mongo_id


class GetProgramQueryFields(BaseModel):
sort_order: int = -1

class Config:
extra = "forbid"


async def get_programs_on_message(request: web.Request) -> web.Response:
try:
query = GetProgramQueryFields(**request.query)
except ValidationError as error:
return web.json_response(
data=error.json(), status=web.HTTPBadRequest.status_code
)

messages = [
trim_mongo_id(msg)
async for msg in Message.collection.find(
filter={
"type": MessageType.program,
"content.on.message": {"$exists": True, "$not": {"$size": 0}},
},
sort=[("time", query.sort_order)],
projection={
"item_hash": 1,
"content.on.message": 1,
},
)
]

response = web.json_response(data=messages)
response.enable_compression()
return response
3 changes: 3 additions & 0 deletions src/aleph/web/controllers/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
storage,
version,
)
from aleph.web.controllers.programs import get_programs_on_message


def register_routes(app: web.Application):
Expand Down Expand Up @@ -60,3 +61,5 @@ def register_routes(app: web.Application):

app.router.add_get("/version", version.version)
app.router.add_get("/api/v0/version", version.version)

app.router.add_get("/api/v0/programs/on/message", get_programs_on_message)
111 changes: 111 additions & 0 deletions tests/web/controllers/fixtures/messages/program.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
[
{
"chain": "ETH",
"sender": "0x9319Ad3B7A8E0eE24f2E639c40D8eD124C5520Ba",
"type": "PROGRAM",
"channel": "Fun-dApps",
"confirmed": true,
"content": {
"type": "vm-function",
"address": "0x9319Ad3B7A8E0eE24f2E639c40D8eD124C5520Ba",
"allow_amend": false,
"code": {
"encoding": "zip",
"entrypoint": "example_fastapi_2:app",
"ref": "7eb2eca2378ea8855336ed76c8b26219f1cb90234d04441de9cf8cb1c649d003",
"use_latest": false
},
"variables": {
"VM_CUSTOM_VARIABLE": "SOMETHING",
"VM_CUSTOM_VARIABLE_2": "32"
},
"on": {
"http": true,
"message": [
{
"sender": "0xB31B787AdA86c6067701d4C0A250c89C7f1f29A5",
"channel": "TEST"
},
{
"content": {
"ref": "4d4db19afca380fdf06ba7f916153d0f740db9de9eee23ad26ba96a90d8a2920"
}
}
]
},
"environment": {
"reproducible": true,
"internet": false,
"aleph_api": false,
"shared_cache": false
},
"resources": {
"vcpus": 1,
"memory": 128,
"seconds": 30
},
"runtime": {
"ref": "5f31b0706f59404fad3d0bff97ef89ddf24da4761608ea0646329362c662ba51",
"use_latest": false,
"comment": "Aleph Alpine Linux with Python 3.8"
},
"volumes": [
{
"comment": "Python libraries. Read-only since a 'ref' is specified.",
"mount": "/opt/venv",
"ref": "5f31b0706f59404fad3d0bff97ef89ddf24da4761608ea0646329362c662ba51",
"use_latest": false
},
{
"comment": "Ephemeral storage, read-write but will not persist after the VM stops",
"mount": "/var/cache",
"ephemeral": true,
"size_mib": 5
},
{
"comment": "Working data persisted on the VM supervisor, not available on other nodes",
"mount": "/var/lib/sqlite",
"name": "sqlite-data",
"persistence": "host",
"size_mib": 10
},
{
"comment": "Working data persisted on the Aleph network. New VMs will try to use the latest version of this volume, with no guarantee against conflicts",
"mount": "/var/lib/statistics",
"name": "statistics",
"persistence": "store",
"size_mib": 10
},
{
"comment": "Raw drive to use by a process, do not mount it",
"name": "raw-data",
"persistence": "host",
"size_mib": 10
}
],
"data": {
"encoding": "zip",
"mount": "/data",
"ref": "7eb2eca2378ea8855336ed76c8b26219f1cb90234d04441de9cf8cb1c649d003",
"use_latest": false
},
"export": {
"encoding": "zip",
"mount": "/data"
},
"replaces": "0x9319Ad3B7A8E0eE24f2E639c40D8eD124C5520Ba",
"time": 1619017773.8950517
},
"item_type": "inline",
"signature": "0x372da8230552b8c3e65c05b31a0ff3a24666d66c575f8e11019f62579bf48c2b7fe2f0bbe907a2a5bf8050989cdaf8a59ff8a1cbcafcdef0656c54279b4aa0c71b",
"size": 749,
"time": 1619017773.8950577,
"confirmations": [
{
"chain": "ETH",
"height": 12284734,
"hash": "0x67f2f3cde5e94e70615c92629c70d22dc959a118f46e9411b29659c2fce87cdc"
}
]
}
]
40 changes: 40 additions & 0 deletions tests/web/controllers/test_programs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import json
from hashlib import sha256
from pathlib import Path

import pytest
import pytest_asyncio

from aleph.model.messages import Message


@pytest_asyncio.fixture
async def fixture_program_message(test_db):
fixtures_file = Path(__file__).parent / "fixtures/messages/program.json"

with fixtures_file.open() as f:
messages = json.load(f)

# Add item_content and item_hash to messages, modify in place:
for message in messages:
if 'item_content' not in message:
message['item_content'] = json.dumps(message['content'])
if 'item_hash' not in message:
message['item_hash'] = sha256(message['item_content'].encode()).hexdigest()

await Message.collection.insert_many(messages)
return messages


@pytest.mark.asyncio
async def test_get_programs_on_message(fixture_program_message, ccn_api_client):
response = await ccn_api_client.get("/api/v0/programs/on/message")
assert response.status == 200, await response.text()

data = await response.json()
expected = {
'item_hash': fixture_program_message[0]['item_hash'],
'content': {'on': {'message': fixture_program_message[0]['content']['on']['message']}},
}

assert data == [expected]

0 comments on commit deaa4c3

Please sign in to comment.