trustgraph/trustgraph-flow/trustgraph/api/gateway/triples_load.py
cybermaggedon 1b9c6be4fc
Feature/gateway auth (#186)
* Added auth module, just a simple token at this stage
* Pass auth token GATEWAY_SECRET through
* Auth token not mandatory, can be provided in env var
2024-12-02 19:57:21 +00:00

59 lines
1.5 KiB
Python

import asyncio
from pulsar.schema import JsonSchema
import uuid
from aiohttp import WSMsgType
from ... schema import Metadata
from ... schema import Triples
from ... schema import triples_store_queue
from . publisher import Publisher
from . socket import SocketEndpoint
from . serialize import to_subgraph
class TriplesLoadEndpoint(SocketEndpoint):
def __init__(self, pulsar_host, auth, path="/api/v1/load/triples"):
super(TriplesLoadEndpoint, self).__init__(
endpoint_path=path, auth=auth,
)
self.pulsar_host=pulsar_host
self.publisher = Publisher(
self.pulsar_host, triples_store_queue,
schema=JsonSchema(Triples)
)
async def start(self):
self.task = asyncio.create_task(
self.publisher.run()
)
async def listener(self, ws, running):
async for msg in ws:
# On error, finish
if msg.type == WSMsgType.ERROR:
break
else:
data = msg.json()
elt = Triples(
metadata=Metadata(
id=data["metadata"]["id"],
metadata=to_subgraph(data["metadata"]["metadata"]),
user=data["metadata"]["user"],
collection=data["metadata"]["collection"],
),
triples=to_subgraph(data["triples"]),
)
await self.publisher.send(None, elt)
running.stop()