Merge pull request #384 from trustgraph-ai/feature/entity-contexts-import-export

Entity contexts import/export
This commit is contained in:
cybermaggedon 2025-05-17 13:26:00 +01:00 committed by GitHub
commit 1eef9db1c9
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 171 additions and 5 deletions

View file

@ -0,0 +1,67 @@
import asyncio
import queue
import uuid
from ... schema import EntityContexts
from ... base import Subscriber
from . serialize import serialize_entity_contexts
class EntityContextsExport:
def __init__(
self, ws, running, pulsar_client, queue, consumer, subscriber
):
self.ws = ws
self.running = running
self.pulsar_client = pulsar_client
self.queue = queue
self.consumer = consumer
self.subscriber = subscriber
async def destroy(self):
self.running.stop()
await self.ws.close()
async def receive(self, msg):
# Ignore incoming info from websocket
pass
async def run(self):
subs = Subscriber(
client = self.pulsar_client, topic = self.queue,
consumer_name = self.consumer, subscription = self.subscriber,
schema = EntityContexts
)
await subs.start()
id = str(uuid.uuid4())
q = await subs.subscribe_all(id)
while self.running.get():
try:
resp = await asyncio.wait_for(q.get(), timeout=0.5)
await self.ws.send_json(serialize_entity_contexts(resp))
except TimeoutError:
continue
except queue.Empty:
continue
except Exception as e:
print(f"Exception: {str(e)}", flush=True)
break
await subs.unsubscribe_all(id)
await subs.stop()
await self.ws.close()
self.running.stop()

View file

@ -0,0 +1,67 @@
import asyncio
import uuid
from aiohttp import WSMsgType
from ... schema import Metadata
from ... schema import EntityContexts, EntityContext
from ... base import Publisher
from . serialize import to_subgraph, to_value
class EntityContextsImport:
def __init__(
self, ws, running, pulsar_client, queue
):
self.ws = ws
self.running = running
self.publisher = Publisher(
pulsar_client, topic = queue, schema = EntityContexts
)
async def start(self):
await self.publisher.start()
async def destroy(self):
self.running.stop()
if self.ws:
await self.ws.close()
await self.publisher.stop()
async def receive(self, msg):
data = msg.json()
elt = EntityContexts(
metadata=Metadata(
id=data["metadata"]["id"],
metadata=to_subgraph(data["metadata"]["metadata"]),
user=data["metadata"]["user"],
collection=data["metadata"]["collection"],
),
entities=[
EntityContext(
entity=to_value(ent["entity"]),
context=ent["context"],
)
for ent in data["entities"]
]
)
await self.publisher.send(None, elt)
async def run(self):
while self.running.get():
await asyncio.sleep(0.5)
if self.ws:
await self.ws.close()
self.ws = None

View file

@ -23,10 +23,12 @@ from . document_load import DocumentLoad
from . triples_export import TriplesExport
from . graph_embeddings_export import GraphEmbeddingsExport
from . document_embeddings_export import DocumentEmbeddingsExport
from . entity_contexts_export import EntityContextsExport
from . triples_import import TriplesImport
from . graph_embeddings_import import GraphEmbeddingsImport
from . document_embeddings_import import DocumentEmbeddingsImport
from . entity_contexts_import import EntityContextsImport
from . mux import Mux
@ -57,12 +59,14 @@ export_dispatchers = {
"triples": TriplesExport,
"graph-embeddings": GraphEmbeddingsExport,
"document-embeddings": DocumentEmbeddingsExport,
"entity-contexts": EntityContextsExport,
}
import_dispatchers = {
"triples": TriplesImport,
"graph-embeddings": GraphEmbeddingsImport,
"document-embeddings": DocumentEmbeddingsImport,
"entity-contexts": EntityContextsImport,
}
class DispatcherWrapper:
@ -146,11 +150,17 @@ class DispatcherManager:
intf_defs = self.flows[flow]["interfaces"]
if kind not in intf_defs:
# FIXME: The -store bit, does it make sense?
if kind == "entity-contexts":
int_kind = kind + "-load"
else:
int_kind = kind + "-store"
if int_kind not in intf_defs:
raise RuntimeError("This kind not supported by flow")
# FIXME: The -store bit, does it make sense?
qconfig = intf_defs[kind + "-store"]
qconfig = intf_defs[int_kind]
id = str(uuid.uuid4())
dispatcher = import_dispatchers[kind](
@ -179,11 +189,16 @@ class DispatcherManager:
intf_defs = self.flows[flow]["interfaces"]
if kind not in intf_defs:
# FIXME: The -store bit, does it make sense?
if kind == "entity-contexts":
int_kind = kind + "-load"
else:
int_kind = kind + "-store"
if int_kind not in intf_defs:
raise RuntimeError("This kind not supported by flow")
# FIXME: The -store bit, does it make sense?
qconfig = intf_defs[kind + "-store"]
qconfig = intf_defs[int_kind]
id = str(uuid.uuid4())
dispatcher = export_dispatchers[kind](

View file

@ -63,6 +63,23 @@ def serialize_graph_embeddings(message):
],
}
def serialize_entity_contexts(message):
return {
"metadata": {
"id": message.metadata.id,
"metadata": serialize_subgraph(message.metadata.metadata),
"user": message.metadata.user,
"collection": message.metadata.collection,
},
"entities": [
{
"context": entity.context,
"entity": serialize_value(entity.entity),
}
for entity in message.entities
],
}
def serialize_document_embeddings(message):
return {
"metadata": {