diff --git a/trustgraph-flow/trustgraph/gateway/dispatch/entity_contexts_export.py b/trustgraph-flow/trustgraph/gateway/dispatch/entity_contexts_export.py new file mode 100644 index 00000000..e388003b --- /dev/null +++ b/trustgraph-flow/trustgraph/gateway/dispatch/entity_contexts_export.py @@ -0,0 +1,67 @@ + +import asyncio +import queue +import uuid + +from ... schema import EntityContexts +from ... base import Subscriber + +from . serialize import serialize_entity_contexts + +class EntityContextsExport: + + def __init__( + self, ws, running, pulsar_client, queue, consumer, subscriber + ): + + self.ws = ws + self.running = running + self.pulsar_client = pulsar_client + self.queue = queue + self.consumer = consumer + self.subscriber = subscriber + + async def destroy(self): + self.running.stop() + await self.ws.close() + + async def receive(self, msg): + # Ignore incoming info from websocket + pass + + async def run(self): + + subs = Subscriber( + client = self.pulsar_client, topic = self.queue, + consumer_name = self.consumer, subscription = self.subscriber, + schema = EntityContexts + ) + + await subs.start() + + id = str(uuid.uuid4()) + q = await subs.subscribe_all(id) + + while self.running.get(): + try: + + resp = await asyncio.wait_for(q.get(), timeout=0.5) + await self.ws.send_json(serialize_entity_contexts(resp)) + + except TimeoutError: + continue + + except queue.Empty: + continue + + except Exception as e: + print(f"Exception: {str(e)}", flush=True) + break + + await subs.unsubscribe_all(id) + + await subs.stop() + + await self.ws.close() + self.running.stop() + diff --git a/trustgraph-flow/trustgraph/gateway/dispatch/entity_contexts_import.py b/trustgraph-flow/trustgraph/gateway/dispatch/entity_contexts_import.py new file mode 100644 index 00000000..22d18904 --- /dev/null +++ b/trustgraph-flow/trustgraph/gateway/dispatch/entity_contexts_import.py @@ -0,0 +1,67 @@ + +import asyncio +import uuid +from aiohttp import WSMsgType + +from ... schema import Metadata +from ... schema import EntityContexts, EntityContext +from ... base import Publisher + +from . serialize import to_subgraph, to_value + +class EntityContextsImport: + + def __init__( + self, ws, running, pulsar_client, queue + ): + + self.ws = ws + self.running = running + + self.publisher = Publisher( + pulsar_client, topic = queue, schema = EntityContexts + ) + + async def start(self): + await self.publisher.start() + + async def destroy(self): + self.running.stop() + + if self.ws: + await self.ws.close() + + await self.publisher.stop() + + async def receive(self, msg): + + data = msg.json() + + elt = EntityContexts( + metadata=Metadata( + id=data["metadata"]["id"], + metadata=to_subgraph(data["metadata"]["metadata"]), + user=data["metadata"]["user"], + collection=data["metadata"]["collection"], + ), + entities=[ + EntityContext( + entity=to_value(ent["entity"]), + context=ent["context"], + ) + for ent in data["entities"] + ] + ) + + await self.publisher.send(None, elt) + + async def run(self): + + while self.running.get(): + await asyncio.sleep(0.5) + + if self.ws: + await self.ws.close() + + self.ws = None + diff --git a/trustgraph-flow/trustgraph/gateway/dispatch/manager.py b/trustgraph-flow/trustgraph/gateway/dispatch/manager.py index f0cbc234..8223461a 100644 --- a/trustgraph-flow/trustgraph/gateway/dispatch/manager.py +++ b/trustgraph-flow/trustgraph/gateway/dispatch/manager.py @@ -23,10 +23,12 @@ from . document_load import DocumentLoad from . triples_export import TriplesExport from . graph_embeddings_export import GraphEmbeddingsExport from . document_embeddings_export import DocumentEmbeddingsExport +from . entity_contexts_export import EntityContextsExport from . triples_import import TriplesImport from . graph_embeddings_import import GraphEmbeddingsImport from . document_embeddings_import import DocumentEmbeddingsImport +from . entity_contexts_import import EntityContextsImport from . mux import Mux @@ -57,12 +59,14 @@ export_dispatchers = { "triples": TriplesExport, "graph-embeddings": GraphEmbeddingsExport, "document-embeddings": DocumentEmbeddingsExport, + "entity-contexts": EntityContextsExport, } import_dispatchers = { "triples": TriplesImport, "graph-embeddings": GraphEmbeddingsImport, "document-embeddings": DocumentEmbeddingsImport, + "entity-contexts": EntityContextsImport, } class DispatcherWrapper: @@ -146,11 +150,17 @@ class DispatcherManager: intf_defs = self.flows[flow]["interfaces"] - if kind not in intf_defs: + # FIXME: The -store bit, does it make sense? + if kind == "entity-contexts": + int_kind = kind + "-load" + else: + int_kind = kind + "-store" + + if int_kind not in intf_defs: raise RuntimeError("This kind not supported by flow") # FIXME: The -store bit, does it make sense? - qconfig = intf_defs[kind + "-store"] + qconfig = intf_defs[int_kind] id = str(uuid.uuid4()) dispatcher = import_dispatchers[kind]( @@ -179,11 +189,16 @@ class DispatcherManager: intf_defs = self.flows[flow]["interfaces"] - if kind not in intf_defs: + # FIXME: The -store bit, does it make sense? + if kind == "entity-contexts": + int_kind = kind + "-load" + else: + int_kind = kind + "-store" + + if int_kind not in intf_defs: raise RuntimeError("This kind not supported by flow") - # FIXME: The -store bit, does it make sense? - qconfig = intf_defs[kind + "-store"] + qconfig = intf_defs[int_kind] id = str(uuid.uuid4()) dispatcher = export_dispatchers[kind]( diff --git a/trustgraph-flow/trustgraph/gateway/dispatch/serialize.py b/trustgraph-flow/trustgraph/gateway/dispatch/serialize.py index 45ae55d7..bde3553a 100644 --- a/trustgraph-flow/trustgraph/gateway/dispatch/serialize.py +++ b/trustgraph-flow/trustgraph/gateway/dispatch/serialize.py @@ -63,6 +63,23 @@ def serialize_graph_embeddings(message): ], } +def serialize_entity_contexts(message): + return { + "metadata": { + "id": message.metadata.id, + "metadata": serialize_subgraph(message.metadata.metadata), + "user": message.metadata.user, + "collection": message.metadata.collection, + }, + "entities": [ + { + "context": entity.context, + "entity": serialize_value(entity.entity), + } + for entity in message.entities + ], + } + def serialize_document_embeddings(message): return { "metadata": {