mirror of
https://github.com/trustgraph-ai/trustgraph.git
synced 2026-06-06 21:35:13 +02:00
Entity contexts import/export
This commit is contained in:
parent
7d90696ec1
commit
0507f10373
4 changed files with 171 additions and 5 deletions
|
|
@ -0,0 +1,67 @@
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import queue
|
||||||
|
import uuid
|
||||||
|
|
||||||
|
from ... schema import EntityContexts
|
||||||
|
from ... base import Subscriber
|
||||||
|
|
||||||
|
from . serialize import serialize_entity_contexts
|
||||||
|
|
||||||
|
class EntityContextsExport:
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self, ws, running, pulsar_client, queue, consumer, subscriber
|
||||||
|
):
|
||||||
|
|
||||||
|
self.ws = ws
|
||||||
|
self.running = running
|
||||||
|
self.pulsar_client = pulsar_client
|
||||||
|
self.queue = queue
|
||||||
|
self.consumer = consumer
|
||||||
|
self.subscriber = subscriber
|
||||||
|
|
||||||
|
async def destroy(self):
|
||||||
|
self.running.stop()
|
||||||
|
await self.ws.close()
|
||||||
|
|
||||||
|
async def receive(self, msg):
|
||||||
|
# Ignore incoming info from websocket
|
||||||
|
pass
|
||||||
|
|
||||||
|
async def run(self):
|
||||||
|
|
||||||
|
subs = Subscriber(
|
||||||
|
client = self.pulsar_client, topic = self.queue,
|
||||||
|
consumer_name = self.consumer, subscription = self.subscriber,
|
||||||
|
schema = EntityContexts
|
||||||
|
)
|
||||||
|
|
||||||
|
await subs.start()
|
||||||
|
|
||||||
|
id = str(uuid.uuid4())
|
||||||
|
q = await subs.subscribe_all(id)
|
||||||
|
|
||||||
|
while self.running.get():
|
||||||
|
try:
|
||||||
|
|
||||||
|
resp = await asyncio.wait_for(q.get(), timeout=0.5)
|
||||||
|
await self.ws.send_json(serialize_entity_contexts(resp))
|
||||||
|
|
||||||
|
except TimeoutError:
|
||||||
|
continue
|
||||||
|
|
||||||
|
except queue.Empty:
|
||||||
|
continue
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Exception: {str(e)}", flush=True)
|
||||||
|
break
|
||||||
|
|
||||||
|
await subs.unsubscribe_all(id)
|
||||||
|
|
||||||
|
await subs.stop()
|
||||||
|
|
||||||
|
await self.ws.close()
|
||||||
|
self.running.stop()
|
||||||
|
|
||||||
|
|
@ -0,0 +1,67 @@
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import uuid
|
||||||
|
from aiohttp import WSMsgType
|
||||||
|
|
||||||
|
from ... schema import Metadata
|
||||||
|
from ... schema import EntityContexts, EntityContext
|
||||||
|
from ... base import Publisher
|
||||||
|
|
||||||
|
from . serialize import to_subgraph, to_value
|
||||||
|
|
||||||
|
class EntityContextsImport:
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self, ws, running, pulsar_client, queue
|
||||||
|
):
|
||||||
|
|
||||||
|
self.ws = ws
|
||||||
|
self.running = running
|
||||||
|
|
||||||
|
self.publisher = Publisher(
|
||||||
|
pulsar_client, topic = queue, schema = EntityContexts
|
||||||
|
)
|
||||||
|
|
||||||
|
async def start(self):
|
||||||
|
await self.publisher.start()
|
||||||
|
|
||||||
|
async def destroy(self):
|
||||||
|
self.running.stop()
|
||||||
|
|
||||||
|
if self.ws:
|
||||||
|
await self.ws.close()
|
||||||
|
|
||||||
|
await self.publisher.stop()
|
||||||
|
|
||||||
|
async def receive(self, msg):
|
||||||
|
|
||||||
|
data = msg.json()
|
||||||
|
|
||||||
|
elt = EntityContexts(
|
||||||
|
metadata=Metadata(
|
||||||
|
id=data["metadata"]["id"],
|
||||||
|
metadata=to_subgraph(data["metadata"]["metadata"]),
|
||||||
|
user=data["metadata"]["user"],
|
||||||
|
collection=data["metadata"]["collection"],
|
||||||
|
),
|
||||||
|
entities=[
|
||||||
|
EntityContext(
|
||||||
|
entity=to_value(ent["entity"]),
|
||||||
|
context=ent["context"],
|
||||||
|
)
|
||||||
|
for ent in data["entities"]
|
||||||
|
]
|
||||||
|
)
|
||||||
|
|
||||||
|
await self.publisher.send(None, elt)
|
||||||
|
|
||||||
|
async def run(self):
|
||||||
|
|
||||||
|
while self.running.get():
|
||||||
|
await asyncio.sleep(0.5)
|
||||||
|
|
||||||
|
if self.ws:
|
||||||
|
await self.ws.close()
|
||||||
|
|
||||||
|
self.ws = None
|
||||||
|
|
||||||
|
|
@ -23,10 +23,12 @@ from . document_load import DocumentLoad
|
||||||
from . triples_export import TriplesExport
|
from . triples_export import TriplesExport
|
||||||
from . graph_embeddings_export import GraphEmbeddingsExport
|
from . graph_embeddings_export import GraphEmbeddingsExport
|
||||||
from . document_embeddings_export import DocumentEmbeddingsExport
|
from . document_embeddings_export import DocumentEmbeddingsExport
|
||||||
|
from . entity_contexts_export import EntityContextsExport
|
||||||
|
|
||||||
from . triples_import import TriplesImport
|
from . triples_import import TriplesImport
|
||||||
from . graph_embeddings_import import GraphEmbeddingsImport
|
from . graph_embeddings_import import GraphEmbeddingsImport
|
||||||
from . document_embeddings_import import DocumentEmbeddingsImport
|
from . document_embeddings_import import DocumentEmbeddingsImport
|
||||||
|
from . entity_contexts_import import EntityContextsImport
|
||||||
|
|
||||||
from . mux import Mux
|
from . mux import Mux
|
||||||
|
|
||||||
|
|
@ -57,12 +59,14 @@ export_dispatchers = {
|
||||||
"triples": TriplesExport,
|
"triples": TriplesExport,
|
||||||
"graph-embeddings": GraphEmbeddingsExport,
|
"graph-embeddings": GraphEmbeddingsExport,
|
||||||
"document-embeddings": DocumentEmbeddingsExport,
|
"document-embeddings": DocumentEmbeddingsExport,
|
||||||
|
"entity-contexts": EntityContextsExport,
|
||||||
}
|
}
|
||||||
|
|
||||||
import_dispatchers = {
|
import_dispatchers = {
|
||||||
"triples": TriplesImport,
|
"triples": TriplesImport,
|
||||||
"graph-embeddings": GraphEmbeddingsImport,
|
"graph-embeddings": GraphEmbeddingsImport,
|
||||||
"document-embeddings": DocumentEmbeddingsImport,
|
"document-embeddings": DocumentEmbeddingsImport,
|
||||||
|
"entity-contexts": EntityContextsImport,
|
||||||
}
|
}
|
||||||
|
|
||||||
class DispatcherWrapper:
|
class DispatcherWrapper:
|
||||||
|
|
@ -146,11 +150,17 @@ class DispatcherManager:
|
||||||
|
|
||||||
intf_defs = self.flows[flow]["interfaces"]
|
intf_defs = self.flows[flow]["interfaces"]
|
||||||
|
|
||||||
if kind not in intf_defs:
|
# FIXME: The -store bit, does it make sense?
|
||||||
|
if kind == "entity-contexts":
|
||||||
|
int_kind = kind + "-load"
|
||||||
|
else:
|
||||||
|
int_kind = kind + "-store"
|
||||||
|
|
||||||
|
if int_kind not in intf_defs:
|
||||||
raise RuntimeError("This kind not supported by flow")
|
raise RuntimeError("This kind not supported by flow")
|
||||||
|
|
||||||
# FIXME: The -store bit, does it make sense?
|
# FIXME: The -store bit, does it make sense?
|
||||||
qconfig = intf_defs[kind + "-store"]
|
qconfig = intf_defs[int_kind]
|
||||||
|
|
||||||
id = str(uuid.uuid4())
|
id = str(uuid.uuid4())
|
||||||
dispatcher = import_dispatchers[kind](
|
dispatcher = import_dispatchers[kind](
|
||||||
|
|
@ -179,11 +189,16 @@ class DispatcherManager:
|
||||||
|
|
||||||
intf_defs = self.flows[flow]["interfaces"]
|
intf_defs = self.flows[flow]["interfaces"]
|
||||||
|
|
||||||
if kind not in intf_defs:
|
# FIXME: The -store bit, does it make sense?
|
||||||
|
if kind == "entity-contexts":
|
||||||
|
int_kind = kind + "-load"
|
||||||
|
else:
|
||||||
|
int_kind = kind + "-store"
|
||||||
|
|
||||||
|
if int_kind not in intf_defs:
|
||||||
raise RuntimeError("This kind not supported by flow")
|
raise RuntimeError("This kind not supported by flow")
|
||||||
|
|
||||||
# FIXME: The -store bit, does it make sense?
|
qconfig = intf_defs[int_kind]
|
||||||
qconfig = intf_defs[kind + "-store"]
|
|
||||||
|
|
||||||
id = str(uuid.uuid4())
|
id = str(uuid.uuid4())
|
||||||
dispatcher = export_dispatchers[kind](
|
dispatcher = export_dispatchers[kind](
|
||||||
|
|
|
||||||
|
|
@ -63,6 +63,23 @@ def serialize_graph_embeddings(message):
|
||||||
],
|
],
|
||||||
}
|
}
|
||||||
|
|
||||||
|
def serialize_entity_contexts(message):
|
||||||
|
return {
|
||||||
|
"metadata": {
|
||||||
|
"id": message.metadata.id,
|
||||||
|
"metadata": serialize_subgraph(message.metadata.metadata),
|
||||||
|
"user": message.metadata.user,
|
||||||
|
"collection": message.metadata.collection,
|
||||||
|
},
|
||||||
|
"entities": [
|
||||||
|
{
|
||||||
|
"context": entity.context,
|
||||||
|
"entity": serialize_value(entity.entity),
|
||||||
|
}
|
||||||
|
for entity in message.entities
|
||||||
|
],
|
||||||
|
}
|
||||||
|
|
||||||
def serialize_document_embeddings(message):
|
def serialize_document_embeddings(message):
|
||||||
return {
|
return {
|
||||||
"metadata": {
|
"metadata": {
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue