From 807c19fd226d60e984087fce959c39ed479b79a5 Mon Sep 17 00:00:00 2001 From: cybermaggedon Date: Tue, 6 May 2025 23:44:10 +0100 Subject: [PATCH] knowledge service (#367) * Write knowledge core elements to Cassandra * Store service works, building management service * kg-manager --- test-api/test-knowledge-fetch | 39 ++ test-api/test-knowledge-fetch2 | 50 ++ test-api/test-knowledge-list | 38 ++ trustgraph-base/trustgraph/schema/__init__.py | 1 + .../trustgraph/schema/knowledge.py | 49 ++ trustgraph-base/trustgraph/schema/library.py | 11 - trustgraph-flow/scripts/kg-manager | 6 + trustgraph-flow/scripts/kg-store | 6 + trustgraph-flow/setup.py | 2 + trustgraph-flow/trustgraph/cores/__init__.py | 3 + trustgraph-flow/trustgraph/cores/__main__.py | 5 + trustgraph-flow/trustgraph/cores/knowledge.py | 104 ++++ trustgraph-flow/trustgraph/cores/service.py | 228 ++++++++ .../trustgraph/gateway/dispatch/knowledge.py | 68 +++ .../trustgraph/gateway/dispatch/manager.py | 2 + .../trustgraph/gateway/dispatch/requestor.py | 2 +- .../trustgraph/librarian/librarian.py | 4 +- .../trustgraph/librarian/service.py | 2 +- .../trustgraph/storage/knowledge/__init__.py | 3 + .../trustgraph/storage/knowledge/__main__.py | 5 + .../trustgraph/storage/knowledge/store.py | 78 +++ trustgraph-flow/trustgraph/tables/__init__.py | 0 .../trustgraph/tables/knowledge.py | 504 ++++++++++++++++++ .../table_store.py => tables/library.py} | 229 +------- 24 files changed, 1196 insertions(+), 243 deletions(-) create mode 100755 test-api/test-knowledge-fetch create mode 100755 test-api/test-knowledge-fetch2 create mode 100755 test-api/test-knowledge-list create mode 100644 trustgraph-base/trustgraph/schema/knowledge.py create mode 100644 trustgraph-flow/scripts/kg-manager create mode 100644 trustgraph-flow/scripts/kg-store create mode 100644 trustgraph-flow/trustgraph/cores/__init__.py create mode 100644 trustgraph-flow/trustgraph/cores/__main__.py create mode 100644 trustgraph-flow/trustgraph/cores/knowledge.py create mode 100755 trustgraph-flow/trustgraph/cores/service.py create mode 100644 trustgraph-flow/trustgraph/gateway/dispatch/knowledge.py create mode 100644 trustgraph-flow/trustgraph/storage/knowledge/__init__.py create mode 100644 trustgraph-flow/trustgraph/storage/knowledge/__main__.py create mode 100644 trustgraph-flow/trustgraph/storage/knowledge/store.py create mode 100644 trustgraph-flow/trustgraph/tables/__init__.py create mode 100644 trustgraph-flow/trustgraph/tables/knowledge.py rename trustgraph-flow/trustgraph/{librarian/table_store.py => tables/library.py} (68%) diff --git a/test-api/test-knowledge-fetch b/test-api/test-knowledge-fetch new file mode 100755 index 00000000..c327a9d8 --- /dev/null +++ b/test-api/test-knowledge-fetch @@ -0,0 +1,39 @@ +#!/usr/bin/env python3 + +import requests +import json +import sys +import base64 +import time + +url = "http://localhost:8088/api/v1/" + +############################################################################ + +input = { + "operation": "fetch-kg-core", + "id": "https://trustgraph.ai/doc/intelligence-and-state", + "user": "trustgraph", +} + +resp = requests.post( + f"{url}knowledge", + json=input, +) + +print(resp.text) +resp = resp.json() + +print(resp) + +if "error" in resp: + print(f"Error: {resp['error']}") + sys.exit(1) + +# print(resp["response"]) +print(resp) + +sys.exit(0) + +############################################################################ + diff --git a/test-api/test-knowledge-fetch2 b/test-api/test-knowledge-fetch2 new file mode 100755 index 00000000..2a611547 --- /dev/null +++ b/test-api/test-knowledge-fetch2 @@ -0,0 +1,50 @@ +#!/usr/bin/env python3 + +import requests +import asyncio +import json +import sys +import base64 +import time +from websockets.asyncio.client import connect + +url = "ws://localhost:8088/api/v1/socket" + +############################################################################ + +async def run(): + + async with connect(url) as ws: + + req = { + "id": "aa11", + "service": "knowledge", + "request": { + "operation": "fetch-kg-core", + "user": "trustgraph", + "id": "https://trustgraph.ai/doc/intelligence-and-state" + } + } + + await ws.send(json.dumps(req)) + + while True: + + msg = await ws.recv() + obj = json.loads(msg) + + print(obj) + + if "error" in obj: + print(f"Error: {obj['error']}") + break + + if "response" not in obj: continue + + if "eos" in obj["response"]: + if obj["response"]["eos"]: + break + +############################################################################ + +asyncio.run(run()) diff --git a/test-api/test-knowledge-list b/test-api/test-knowledge-list new file mode 100755 index 00000000..b616c719 --- /dev/null +++ b/test-api/test-knowledge-list @@ -0,0 +1,38 @@ +#!/usr/bin/env python3 + +import requests +import json +import sys +import base64 +import time + +url = "http://localhost:8088/api/v1/" + +############################################################################ + +input = { + "operation": "list-kg-cores", + "user": "trustgraph", +} + +resp = requests.post( + f"{url}knowledge", + json=input, +) + +print(resp.text) +resp = resp.json() + +print(resp) + +if "error" in resp: + print(f"Error: {resp['error']}") + sys.exit(1) + +# print(resp["response"]) +print(resp) + +sys.exit(0) + +############################################################################ + diff --git a/trustgraph-base/trustgraph/schema/__init__.py b/trustgraph-base/trustgraph/schema/__init__.py index a9bb30a6..957ebcbd 100644 --- a/trustgraph-base/trustgraph/schema/__init__.py +++ b/trustgraph-base/trustgraph/schema/__init__.py @@ -13,4 +13,5 @@ from . lookup import * from . library import * from . config import * from . flows import * +from . knowledge import * diff --git a/trustgraph-base/trustgraph/schema/knowledge.py b/trustgraph-base/trustgraph/schema/knowledge.py new file mode 100644 index 00000000..88892f8c --- /dev/null +++ b/trustgraph-base/trustgraph/schema/knowledge.py @@ -0,0 +1,49 @@ + +from pulsar.schema import Record, Bytes, String, Array, Long, Boolean +from . types import Triple +from . topic import topic +from . types import Error +from . metadata import Metadata +from . documents import Document, TextDocument +from . graph import Triples, GraphEmbeddings + +# fetch-kg-core +# -> (???) +# <- () +# <- (error) + +# delete-kg-core +# -> (???) +# <- () +# <- (error) + +# list-kg-cores +# -> (user) +# <- () +# <- (error) + +class KnowledgeRequest(Record): + + # fetch-kg-core, delete-kg-core, list-kg-cores + operation = String() + + # list-kg-cores, delete-kg-core + user = String() + + # fetch-kg-core, list-kg-cores, delete-kg-core + id = String() + +class KnowledgeResponse(Record): + error = Error() + ids = Array(String()) + eos = Boolean() # Indicates end of knowledge core stream + triples = Triples() + graph_embeddings = GraphEmbeddings() + +knowledge_request_queue = topic( + 'knowledge', kind='non-persistent', namespace='request' +) +knowledge_response_queue = topic( + 'knowledge', kind='non-persistent', namespace='response', +) + diff --git a/trustgraph-base/trustgraph/schema/library.py b/trustgraph-base/trustgraph/schema/library.py index e6854987..6504fa78 100644 --- a/trustgraph-base/trustgraph/schema/library.py +++ b/trustgraph-base/trustgraph/schema/library.py @@ -51,17 +51,6 @@ from . documents import Document, TextDocument # <- (processing_metadata[]) # <- (error) -# OLD: -# add(Metadata, Bytes) : error? -# copy(id, user, collection) -# move(id, user, collection) -# delete(id) -# get(id) : Bytes -# reindex(id) -# list(user, collection) : id[] -# info(id[]) : DocumentInfo[] -# search([]) : id[] - class DocumentMetadata(Record): id = String() time = Long() diff --git a/trustgraph-flow/scripts/kg-manager b/trustgraph-flow/scripts/kg-manager new file mode 100644 index 00000000..ee8ec923 --- /dev/null +++ b/trustgraph-flow/scripts/kg-manager @@ -0,0 +1,6 @@ +#!/usr/bin/env python3 + +from trustgraph.cores import run + +run() + diff --git a/trustgraph-flow/scripts/kg-store b/trustgraph-flow/scripts/kg-store new file mode 100644 index 00000000..1a5ba9ef --- /dev/null +++ b/trustgraph-flow/scripts/kg-store @@ -0,0 +1,6 @@ +#!/usr/bin/env python3 + +from trustgraph.storage.knowledge import run + +run() + diff --git a/trustgraph-flow/setup.py b/trustgraph-flow/setup.py index de4bf95c..b4598b64 100644 --- a/trustgraph-flow/setup.py +++ b/trustgraph-flow/setup.py @@ -95,6 +95,8 @@ setuptools.setup( "scripts/kg-extract-definitions", "scripts/kg-extract-relationships", "scripts/kg-extract-topics", + "scripts/kg-store", + "scripts/kg-manager", "scripts/librarian", "scripts/metering", "scripts/object-extract-row", diff --git a/trustgraph-flow/trustgraph/cores/__init__.py b/trustgraph-flow/trustgraph/cores/__init__.py new file mode 100644 index 00000000..9843ccfb --- /dev/null +++ b/trustgraph-flow/trustgraph/cores/__init__.py @@ -0,0 +1,3 @@ + +from . service import run + diff --git a/trustgraph-flow/trustgraph/cores/__main__.py b/trustgraph-flow/trustgraph/cores/__main__.py new file mode 100644 index 00000000..1729017e --- /dev/null +++ b/trustgraph-flow/trustgraph/cores/__main__.py @@ -0,0 +1,5 @@ + +from . service import run + +if __name__ == '__main__': + run() diff --git a/trustgraph-flow/trustgraph/cores/knowledge.py b/trustgraph-flow/trustgraph/cores/knowledge.py new file mode 100644 index 00000000..adf9b429 --- /dev/null +++ b/trustgraph-flow/trustgraph/cores/knowledge.py @@ -0,0 +1,104 @@ + +from .. schema import KnowledgeResponse, Error +from .. knowledge import hash +from .. exceptions import RequestError +from .. tables.knowledge import KnowledgeTableStore +import base64 + +import uuid + +class KnowledgeManager: + + def __init__( + self, cassandra_host, cassandra_user, cassandra_password, + keyspace, + ): + + self.table_store = KnowledgeTableStore( + cassandra_host, cassandra_user, cassandra_password, keyspace + ) + + async def delete_kg_core(self, request, respond): + + print("Deleting core...", flush=True) + + await self.table_store.delete_kg_core( + request.user, request.id + ) + + await respond( + KnowledgeResponse( + error = None, + ids = None, + eos = False, + triples = None, + graph_embeddings = None, + ) + ) + + async def fetch_kg_core(self, request, respond): + + print("Fetch core...", flush=True) + + async def publish_triples(t): + await respond( + KnowledgeResponse( + error = None, + ids = None, + eos = False, + triples = t, + graph_embeddings = None, + ) + ) + + # Remove doc table row + await self.table_store.get_triples( + request.user, + request.id, + publish_triples, + ) + + async def publish_ge(g): + await respond( + KnowledgeResponse( + error = None, + ids = None, + eos = False, + triples = None, + graph_embeddings = g, + ) + ) + + # Remove doc table row + await self.table_store.get_graph_embeddings( + request.user, + request.id, + publish_ge, + ) + + print("Fetch complete", flush=True) + + await respond( + KnowledgeResponse( + error = None, + ids = None, + eos = True, + triples = None, + graph_embeddings = None, + ) + ) + + async def list_kg_cores(self, request, respond): + + ids = await self.table_store.list_kg_cores(request.user) + + await respond( + KnowledgeResponse( + error = None, + ids = ids, + eos = False, + triples = None, + graph_embeddings = None + ) + ) + diff --git a/trustgraph-flow/trustgraph/cores/service.py b/trustgraph-flow/trustgraph/cores/service.py new file mode 100755 index 00000000..93e84d1e --- /dev/null +++ b/trustgraph-flow/trustgraph/cores/service.py @@ -0,0 +1,228 @@ + +""" +Knowledge core service, manages cores and exports them +""" + +from functools import partial +import asyncio +import base64 +import json + +from .. base import AsyncProcessor, Consumer, Producer, Publisher, Subscriber +from .. base import ConsumerMetrics, ProducerMetrics + +from .. schema import KnowledgeRequest, KnowledgeResponse, Error +from .. schema import knowledge_request_queue, knowledge_response_queue + +from .. schema import Document, Metadata +from .. schema import TextDocument, Metadata + +from .. exceptions import RequestError + +from . knowledge import KnowledgeManager + +default_ident = "knowledge" + +default_knowledge_request_queue = knowledge_request_queue +default_knowledge_response_queue = knowledge_response_queue + +default_cassandra_host = "cassandra" + +# FIXME: How to ensure this doesn't conflict with other usage? +keyspace = "knowledge" + +class Processor(AsyncProcessor): + + def __init__(self, **params): + + id = params.get("id") + + knowledge_request_queue = params.get( + "knowledge_request_queue", default_knowledge_request_queue + ) + + knowledge_response_queue = params.get( + "knowledge_response_queue", default_knowledge_response_queue + ) + + cassandra_host = params.get("cassandra_host", default_cassandra_host) + cassandra_user = params.get("cassandra_user") + cassandra_password = params.get("cassandra_password") + + super(Processor, self).__init__( + **params | { + "knowledge_request_queue": knowledge_request_queue, + "knowledge_response_queue": knowledge_response_queue, + "cassandra_host": cassandra_host, + "cassandra_user": cassandra_user, + } + ) + + knowledge_request_metrics = ConsumerMetrics( + processor = self.id, flow = None, name = "knowledge-request" + ) + + knowledge_response_metrics = ProducerMetrics( + processor = self.id, flow = None, name = "knowledge-response" + ) + + self.knowledge_request_consumer = Consumer( + taskgroup = self.taskgroup, + client = self.pulsar_client, + flow = None, + topic = knowledge_request_queue, + subscriber = id, + schema = KnowledgeRequest, + handler = self.on_knowledge_request, + metrics = knowledge_request_metrics, + ) + + self.knowledge_response_producer = Producer( + client = self.pulsar_client, + topic = knowledge_response_queue, + schema = KnowledgeResponse, + metrics = knowledge_response_metrics, + ) + + self.knowledge = KnowledgeManager( + cassandra_host = cassandra_host.split(","), + cassandra_user = cassandra_user, + cassandra_password = cassandra_password, + keyspace = keyspace, + ) + + self.register_config_handler(self.on_knowledge_config) + + self.flows = {} + + print("Initialised.", flush=True) + + async def start(self): + + await super(Processor, self).start() + await self.knowledge_request_consumer.start() + await self.knowledge_response_producer.start() + + async def on_knowledge_config(self, config, version): + + print("config version", version) + + if "flows" in config: + + self.flows = { + k: json.loads(v) + for k, v in config["flows"].items() + } + + print(self.flows) + + async def process_request(self, v, id): + + if v.operation is None: + raise RequestError("Null operation") + + print("request", v.operation) + + impls = { + "list-kg-cores": self.knowledge.list_kg_cores, + "fetch-kg-core": self.knowledge.fetch_kg_core, + "delete-kg-core": self.knowledge.delete_kg_core, + } + + if v.operation not in impls: + raise RequestError(f"Invalid operation: {v.operation}") + + async def respond(x): + await self.knowledge_response_producer.send( + x, { "id": id } + ) + return await impls[v.operation](v, respond) + + async def on_knowledge_request(self, msg, consumer, flow): + + v = msg.value() + + # Sender-produced ID + + id = msg.properties()["id"] + + print(f"Handling input {id}...", flush=True) + + try: + + await self.process_request(v, id) + +# await self.knowledge_response_producer.send( +# resp, properties={"id": id} +# ) + + return + + except RequestError as e: + resp = KnowledgeResponse( + error = Error( + type = "request-error", + message = str(e), + ) + ) + + await self.knowledge_response_producer.send( + resp, properties={"id": id} + ) + + return + except Exception as e: + resp = KnowledgeResponse( + error = Error( + type = "unexpected-error", + message = str(e), + ) + ) + + await self.knowledge_response_producer.send( + resp, properties={"id": id} + ) + + return + + print("Done.", flush=True) + + @staticmethod + def add_args(parser): + + AsyncProcessor.add_args(parser) + + parser.add_argument( + '--knowledge-request-queue', + default=default_knowledge_request_queue, + help=f'Config request queue (default: {default_knowledge_request_queue})' + ) + + parser.add_argument( + '--knowledge-response-queue', + default=default_knowledge_response_queue, + help=f'Config response queue {default_knowledge_response_queue}', + ) + + parser.add_argument( + '--cassandra-host', + default="cassandra", + help=f'Graph host (default: cassandra)' + ) + + parser.add_argument( + '--cassandra-user', + default=None, + help=f'Cassandra user' + ) + + parser.add_argument( + '--cassandra-password', + default=None, + help=f'Cassandra password' + ) + +def run(): + + Processor.launch(default_ident, __doc__) + diff --git a/trustgraph-flow/trustgraph/gateway/dispatch/knowledge.py b/trustgraph-flow/trustgraph/gateway/dispatch/knowledge.py new file mode 100644 index 00000000..2e1ae43a --- /dev/null +++ b/trustgraph-flow/trustgraph/gateway/dispatch/knowledge.py @@ -0,0 +1,68 @@ + +import base64 + +from ... schema import KnowledgeRequest, KnowledgeResponse +from ... schema import knowledge_request_queue +from ... schema import knowledge_response_queue + +from . requestor import ServiceRequestor +from . serialize import serialize_graph_embeddings +from . serialize import serialize_triples +from . serialize import to_document_metadata, to_processing_metadata + +class KnowledgeRequestor(ServiceRequestor): + def __init__(self, pulsar_client, consumer, subscriber, timeout=120): + + super(KnowledgeRequestor, self).__init__( + pulsar_client=pulsar_client, + consumer_name = consumer, + subscription = subscriber, + request_queue=knowledge_request_queue, + response_queue=knowledge_response_queue, + request_schema=KnowledgeRequest, + response_schema=KnowledgeResponse, + timeout=timeout, + ) + + def to_request(self, body): + + return KnowledgeRequest( + operation = body.get("operation", None), + user = body.get("user", None), + id = body.get("id", None), + ) + + def from_response(self, message): + + print("Processing message") + + # Response to list, + if message.ids is not None: + print("-> IDS") + return { + "ids": message.ids + }, True + + if message.triples: + print("-> triples") + return { + "triples": serialize_triples(message.triples) + }, False + + if message.graph_embeddings: + print("-> ge") + return { + "graph-embeddings": serialize_graph_embeddings( + message.graph_embeddings + ) + }, False + + if message.eos is True: + print("-> eos") + return { + "eos": True + }, True + + # Empty case, return from successful delete. + return {}, True + diff --git a/trustgraph-flow/trustgraph/gateway/dispatch/manager.py b/trustgraph-flow/trustgraph/gateway/dispatch/manager.py index ddd396a1..7896d588 100644 --- a/trustgraph-flow/trustgraph/gateway/dispatch/manager.py +++ b/trustgraph-flow/trustgraph/gateway/dispatch/manager.py @@ -5,6 +5,7 @@ import uuid from . config import ConfigRequestor from . flow import FlowRequestor from . librarian import LibrarianRequestor +from . knowledge import KnowledgeRequestor from . embeddings import EmbeddingsRequestor from . agent import AgentRequestor @@ -44,6 +45,7 @@ global_dispatchers = { "config": ConfigRequestor, "flow": FlowRequestor, "librarian": LibrarianRequestor, + "knowledge": KnowledgeRequestor, } sender_dispatchers = { diff --git a/trustgraph-flow/trustgraph/gateway/dispatch/requestor.py b/trustgraph-flow/trustgraph/gateway/dispatch/requestor.py index 1ce5ac68..b8a84644 100644 --- a/trustgraph-flow/trustgraph/gateway/dispatch/requestor.py +++ b/trustgraph-flow/trustgraph/gateway/dispatch/requestor.py @@ -82,7 +82,7 @@ class ServiceRequestor: resp, fin = self.from_response(resp) - print(resp, fin) + print(resp, fin, flush=True) if responder: await responder(resp, fin) diff --git a/trustgraph-flow/trustgraph/librarian/librarian.py b/trustgraph-flow/trustgraph/librarian/librarian.py index 7c09515a..89750c42 100644 --- a/trustgraph-flow/trustgraph/librarian/librarian.py +++ b/trustgraph-flow/trustgraph/librarian/librarian.py @@ -2,7 +2,7 @@ from .. schema import LibrarianRequest, LibrarianResponse, Error, Triple from .. knowledge import hash from .. exceptions import RequestError -from . table_store import TableStore +from .. tables.library import LibraryTableStore from . blob_store import BlobStore import base64 @@ -21,7 +21,7 @@ class Librarian: minio_host, minio_access_key, minio_secret_key, bucket_name ) - self.table_store = TableStore( + self.table_store = LibraryTableStore( cassandra_host, cassandra_user, cassandra_password, keyspace ) diff --git a/trustgraph-flow/trustgraph/librarian/service.py b/trustgraph-flow/trustgraph/librarian/service.py index 4c18b9a5..d1ce4805 100755 --- a/trustgraph-flow/trustgraph/librarian/service.py +++ b/trustgraph-flow/trustgraph/librarian/service.py @@ -210,7 +210,7 @@ class Processor(AsyncProcessor): if v.operation is None: raise RequestError("Null operation") - print("requets", v.operation) + print("request", v.operation) impls = { "add-document": self.librarian.add_document, diff --git a/trustgraph-flow/trustgraph/storage/knowledge/__init__.py b/trustgraph-flow/trustgraph/storage/knowledge/__init__.py new file mode 100644 index 00000000..ff60c5fa --- /dev/null +++ b/trustgraph-flow/trustgraph/storage/knowledge/__init__.py @@ -0,0 +1,3 @@ + +from . store import run + diff --git a/trustgraph-flow/trustgraph/storage/knowledge/__main__.py b/trustgraph-flow/trustgraph/storage/knowledge/__main__.py new file mode 100644 index 00000000..92825a02 --- /dev/null +++ b/trustgraph-flow/trustgraph/storage/knowledge/__main__.py @@ -0,0 +1,5 @@ + +from . store import run + +if __name__ == '__main__': + run() diff --git a/trustgraph-flow/trustgraph/storage/knowledge/store.py b/trustgraph-flow/trustgraph/storage/knowledge/store.py new file mode 100644 index 00000000..62e915be --- /dev/null +++ b/trustgraph-flow/trustgraph/storage/knowledge/store.py @@ -0,0 +1,78 @@ + +""" +Stores knowledge-cores in Cassandra +""" + +import json +import urllib.parse + +from ... schema import Triples, GraphEmbeddings +from ... base import FlowProcessor, ConsumerSpec + +from ... tables.knowledge import KnowledgeTableStore + +default_ident = "kg-store" + +default_cassandra_host = "cassandra" +keyspace = "knowledge" + +class Processor(FlowProcessor): + + def __init__(self, **params): + + id = params.get("id") + + cassandra_host = params.get("cassandra_host", default_cassandra_host) + cassandra_user = params.get("cassandra_user") + cassandra_password = params.get("cassandra_password") + + super(Processor, self).__init__( + **params | { + "id": id, + "cassandra_host": cassandra_host, + "cassandra_user": cassandra_user, + } + ) + + self.register_specification( + ConsumerSpec( + name = "triples-input", + schema = Triples, + handler = self.on_triples + ) + ) + + self.register_specification( + ConsumerSpec( + name = "graph-embeddings-input", + schema = GraphEmbeddings, + handler = self.on_graph_embeddings + ) + ) + + self.table_store = KnowledgeTableStore( + cassandra_host = cassandra_host.split(","), + cassandra_user = cassandra_user, + cassandra_password = cassandra_password, + keyspace = keyspace, + ) + + async def on_triples(self, msg, consumer, flow): + + v = msg.value() + await self.table_store.add_triples(v) + + async def on_graph_embeddings(self, msg, consumer, flow): + + v = msg.value() + await self.table_store.add_graph_embeddings(v) + + @staticmethod + def add_args(parser): + + FlowProcessor.add_args(parser) + +def run(): + + Processor.launch(default_ident, __doc__) + diff --git a/trustgraph-flow/trustgraph/tables/__init__.py b/trustgraph-flow/trustgraph/tables/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/trustgraph-flow/trustgraph/tables/knowledge.py b/trustgraph-flow/trustgraph/tables/knowledge.py new file mode 100644 index 00000000..3996b5a7 --- /dev/null +++ b/trustgraph-flow/trustgraph/tables/knowledge.py @@ -0,0 +1,504 @@ + +from .. schema import KnowledgeResponse, Triple, Triples, EntityEmbeddings +from .. schema import Metadata, Value, GraphEmbeddings + +from cassandra.cluster import Cluster +from cassandra.auth import PlainTextAuthProvider +from ssl import SSLContext, PROTOCOL_TLSv1_2 + +import uuid +import time +import asyncio + +class KnowledgeTableStore: + + def __init__( + self, + cassandra_host, cassandra_user, cassandra_password, keyspace, + ): + + self.keyspace = keyspace + + print("Connecting to Cassandra...", flush=True) + + if cassandra_user and cassandra_password: + ssl_context = SSLContext(PROTOCOL_TLSv1_2) + auth_provider = PlainTextAuthProvider( + username=cassandra_user, password=cassandra_password + ) + self.cluster = Cluster( + cassandra_host, + auth_provider=auth_provider, + ssl_context=ssl_context + ) + else: + self.cluster = Cluster(cassandra_host) + + self.cassandra = self.cluster.connect() + + print("Connected.", flush=True) + + self.ensure_cassandra_schema() + + self.prepare_statements() + + def ensure_cassandra_schema(self): + + print("Ensure Cassandra schema...", flush=True) + + print("Keyspace...", flush=True) + + # FIXME: Replication factor should be configurable + self.cassandra.execute(f""" + create keyspace if not exists {self.keyspace} + with replication = {{ + 'class' : 'SimpleStrategy', + 'replication_factor' : 1 + }}; + """); + + self.cassandra.set_keyspace(self.keyspace) + + print("triples table...", flush=True) + + self.cassandra.execute(""" + CREATE TABLE IF NOT EXISTS triples ( + user text, + document_id text, + id uuid, + time timestamp, + metadata list>, + triples list>, + PRIMARY KEY ((user, document_id), id) + ); + """); + + print("graph_embeddings table...", flush=True) + + self.cassandra.execute(""" + create table if not exists graph_embeddings ( + user text, + document_id text, + id uuid, + time timestamp, + metadata list>, + entity_embeddings list< + tuple< + tuple, + list> + > + >, + PRIMARY KEY ((user, document_id), id) + ); + """); + + self.cassandra.execute(""" + CREATE INDEX IF NOT EXISTS graph_embeddings_user ON + graph_embeddings ( user ); + """); + + print("document_embeddings table...", flush=True) + + self.cassandra.execute(""" + create table if not exists document_embeddings ( + user text, + document_id text, + id uuid, + time timestamp, + metadata list>, + chunks list< + tuple< + blob, + list> + > + >, + PRIMARY KEY ((user, document_id), id) + ); + """); + + self.cassandra.execute(""" + CREATE INDEX IF NOT EXISTS document_embeddings_user ON + document_embeddings ( user ); + """); + + print("Cassandra schema OK.", flush=True) + + def prepare_statements(self): + + self.insert_triples_stmt = self.cassandra.prepare(""" + INSERT INTO triples + ( + id, user, document_id, + time, metadata, triples + ) + VALUES (?, ?, ?, ?, ?, ?) + """) + + self.insert_graph_embeddings_stmt = self.cassandra.prepare(""" + INSERT INTO graph_embeddings + ( + id, user, document_id, time, metadata, entity_embeddings + ) + VALUES (?, ?, ?, ?, ?, ?) + """) + + self.insert_document_embeddings_stmt = self.cassandra.prepare(""" + INSERT INTO document_embeddings + ( + id, user, document_id, time, metadata, chunks + ) + VALUES (?, ?, ?, ?, ?, ?) + """) + + self.list_cores_stmt = self.cassandra.prepare(""" + SELECT DISTINCT user, document_id FROM graph_embeddings + WHERE user = ? + """) + + self.get_triples_stmt = self.cassandra.prepare(""" + SELECT id, time, metadata, triples + FROM triples + WHERE user = ? AND document_id = ? + """) + + self.get_graph_embeddings_stmt = self.cassandra.prepare(""" + SELECT id, time, metadata, entity_embeddings + FROM graph_embeddings + WHERE user = ? AND document_id = ? + """) + + self.get_document_embeddings_stmt = self.cassandra.prepare(""" + SELECT id, time, metadata, chunks + FROM document_embeddings + WHERE user = ? AND document_id = ? + """) + + self.delete_triples_stmt = self.cassandra.prepare(""" + DELETE FROM triples + WHERE user = ? AND document_id = ? + """) + + self.delete_graph_embeddings_stmt = self.cassandra.prepare(""" + DELETE FROM graph_embeddings + WHERE user = ? AND document_id = ? + """) + + async def add_triples(self, m): + + when = int(time.time() * 1000) + + if m.metadata.metadata: + metadata = [ + ( + v.s.value, v.s.is_uri, v.p.value, v.p.is_uri, + v.o.value, v.o.is_uri + ) + for v in m.metadata.metadata + ] + else: + metadata = [] + + triples = [ + ( + v.s.value, v.s.is_uri, v.p.value, v.p.is_uri, + v.o.value, v.o.is_uri + ) + for v in m.triples + ] + + while True: + + try: + + resp = self.cassandra.execute( + self.insert_triples_stmt, + ( + uuid.uuid4(), m.metadata.user, + m.metadata.id, when, + metadata, triples, + ) + ) + + break + + except Exception as e: + + print("Exception:", type(e)) + print(f"{e}, retry...", flush=True) + await asyncio.sleep(1) + + async def add_graph_embeddings(self, m): + + when = int(time.time() * 1000) + + if m.metadata.metadata: + metadata = [ + ( + v.s.value, v.s.is_uri, v.p.value, v.p.is_uri, + v.o.value, v.o.is_uri + ) + for v in m.metadata.metadata + ] + else: + metadata = [] + + entities = [ + ( + (v.entity.value, v.entity.is_uri), + v.vectors + ) + for v in m.entities + ] + + while True: + + try: + + resp = self.cassandra.execute( + self.insert_graph_embeddings_stmt, + ( + uuid.uuid4(), m.metadata.user, + m.metadata.id, when, + metadata, entities, + ) + ) + + break + + except Exception as e: + + print("Exception:", type(e)) + print(f"{e}, retry...", flush=True) + await asyncio.sleep(1) + + async def add_document_embeddings(self, m): + + when = int(time.time() * 1000) + + if m.metadata.metadata: + metadata = [ + ( + v.s.value, v.s.is_uri, v.p.value, v.p.is_uri, + v.o.value, v.o.is_uri + ) + for v in m.metadata.metadata + ] + else: + metadata = [] + + chunks = [ + ( + v.chunk, + v.vectors, + ) + for v in m.chunks + ] + + while True: + + try: + + resp = self.cassandra.execute( + self.insert_document_embeddings_stmt, + ( + uuid.uuid4(), m.metadata.user, + m.metadata.id, when, + metadata, chunks, + ) + ) + + break + + except Exception as e: + + print("Exception:", type(e)) + print(f"{e}, retry...", flush=True) + await asyncio.sleep(1) + + async def list_kg_cores(self, user): + + print("List kg cores...") + + while True: + + try: + + resp = self.cassandra.execute( + self.list_cores_stmt, + (user,) + ) + + break + + except Exception as e: + print("Exception:", type(e)) + print(f"{e}, retry...", flush=True) + await asyncio.sleep(1) + + + lst = [ + row[1] + for row in resp + ] + + print("Done") + + return lst + + async def delete_kg_core(self, user, document_id): + + print("Delete kg cores...") + + while True: + + try: + + resp = self.cassandra.execute( + self.delete_triples_stmt, + (user, document_id) + ) + + break + + except Exception as e: + print("Exception:", type(e)) + print(f"{e}, retry...", flush=True) + await asyncio.sleep(1) + + while True: + + try: + + resp = self.cassandra.execute( + self.delete_graph_embeddings_stmt, + (user, document_id) + ) + + break + + except Exception as e: + print("Exception:", type(e)) + print(f"{e}, retry...", flush=True) + await asyncio.sleep(1) + + async def get_triples(self, user, document_id, receiver): + + print("Get triples...") + + while True: + + try: + + resp = self.cassandra.execute( + self.get_triples_stmt, + (user, document_id) + ) + + break + + except Exception as e: + print("Exception:", type(e)) + print(f"{e}, retry...", flush=True) + await asyncio.sleep(1) + + for row in resp: + + if row[2]: + metadata = [ + Triple( + s = Value(value = elt[0], is_uri = elt[1]), + p = Value(value = elt[2], is_uri = elt[3]), + o = Value(value = elt[4], is_uri = elt[5]), + ) + for elt in row[2] + ] + else: + metadata = [] + + triples = [ + Triple( + s = Value(value = elt[0], is_uri = elt[1]), + p = Value(value = elt[2], is_uri = elt[3]), + o = Value(value = elt[4], is_uri = elt[5]), + ) + for elt in row[3] + ] + + await receiver( + Triples( + metadata = Metadata( + id = document_id, + user = user, + collection = "default", # FIXME: What to put here? + metadata = metadata, + ), + triples = triples + ) + ) + + print("Done") + + async def get_graph_embeddings(self, user, document_id, receiver): + + print("Get GE...") + + while True: + + try: + + resp = self.cassandra.execute( + self.get_graph_embeddings_stmt, + (user, document_id) + ) + + break + + except Exception as e: + print("Exception:", type(e)) + print(f"{e}, retry...", flush=True) + await asyncio.sleep(1) + + for row in resp: + + if row[2]: + metadata = [ + Triple( + s = Value(value = elt[0], is_uri = elt[1]), + p = Value(value = elt[2], is_uri = elt[3]), + o = Value(value = elt[4], is_uri = elt[5]), + ) + for elt in row[2] + ] + else: + metadata = [] + + entities = [ + EntityEmbeddings( + entity = Value(value = ent[0][0], is_uri = ent[0][1]), + vectors = ent[1] + ) + for ent in row[3] + ] + + await receiver( + GraphEmbeddings( + metadata = Metadata( + id = document_id, + user = user, + collection = "default", # FIXME: What to put here? + metadata = metadata, + ), + entities = entities + ) + ) + + print("Done") + diff --git a/trustgraph-flow/trustgraph/librarian/table_store.py b/trustgraph-flow/trustgraph/tables/library.py similarity index 68% rename from trustgraph-flow/trustgraph/librarian/table_store.py rename to trustgraph-flow/trustgraph/tables/library.py index 0646f50b..4168fd2b 100644 --- a/trustgraph-flow/trustgraph/librarian/table_store.py +++ b/trustgraph-flow/trustgraph/tables/library.py @@ -14,7 +14,7 @@ import uuid import time import asyncio -class TableStore: +class LibraryTableStore: def __init__( self, @@ -104,71 +104,6 @@ class TableStore: ); """); - return - - print("triples table...", flush=True) - - self.cassandra.execute(""" - CREATE TABLE IF NOT EXISTS triples ( - user text, - collection text, - document_id text, - id uuid, - time timestamp, - metadata list>, - triples list>, - PRIMARY KEY (user, collection, document_id, id) - ); - """); - - print("graph_embeddings table...", flush=True) - - self.cassandra.execute(""" - create table if not exists graph_embeddings ( - user text, - collection text, - document_id text, - id uuid, - time timestamp, - metadata list>, - entity_embeddings list< - tuple< - tuple, - list> - > - >, - PRIMARY KEY (user, collection, document_id, id) - ); - """); - - print("document_embeddings table...", flush=True) - - self.cassandra.execute(""" - create table if not exists document_embeddings ( - user text, - collection text, - document_id text, - id uuid, - time timestamp, - metadata list>, - chunks list< - tuple< - blob, - list> - > - >, - PRIMARY KEY (user, collection, document_id, id) - ); - """); - print("Cassandra schema OK.", flush=True) def prepare_statements(self): @@ -252,35 +187,6 @@ class TableStore: WHERE user = ? """) - return - - self.insert_triples_stmt = self.cassandra.prepare(""" - INSERT INTO triples - ( - id, user, collection, document_id, time, - metadata, triples - ) - VALUES (?, ?, ?, ?, ?, ?, ?) - """) - - self.insert_graph_embeddings_stmt = self.cassandra.prepare(""" - INSERT INTO graph_embeddings - ( - id, user, collection, document_id, time, - metadata, entity_embeddings - ) - VALUES (?, ?, ?, ?, ?, ?, ?) - """) - - self.insert_document_embeddings_stmt = self.cassandra.prepare(""" - INSERT INTO document_embeddings - ( - id, user, collection, document_id, time, - metadata, chunks - ) - VALUES (?, ?, ?, ?, ?, ?, ?) - """) - async def document_exists(self, user, id): resp = self.cassandra.execute( @@ -391,50 +297,6 @@ class TableStore: print("Delete complete", flush=True) - async def add_triples(self, m): - - when = int(time.time() * 1000) - - if m.metadata.metadata: - metadata = [ - ( - v.s.value, v.s.is_uri, v.p.value, v.p.is_uri, - v.o.value, v.o.is_uri - ) - for v in m.metadata.metadata - ] - else: - metadata = [] - - triples = [ - ( - v.s.value, v.s.is_uri, v.p.value, v.p.is_uri, - v.o.value, v.o.is_uri - ) - for v in m.triples - ] - - while True: - - try: - - resp = self.cassandra.execute( - self.insert_triples_stmt, - ( - uuid.uuid4(), m.metadata.user, - m.metadata.collection, m.metadata.id, when, - metadata, triples, - ) - ) - - break - - except Exception as e: - - print("Exception:", type(e)) - print(f"{e}, retry...", flush=True) - await asyncio.sleep(1) - async def list_documents(self, user): print("List documents...") @@ -661,92 +523,3 @@ class TableStore: return lst - async def add_graph_embeddings(self, m): - - when = int(time.time() * 1000) - - if m.metadata.metadata: - metadata = [ - ( - v.s.value, v.s.is_uri, v.p.value, v.p.is_uri, - v.o.value, v.o.is_uri - ) - for v in m.metadata.metadata - ] - else: - metadata = [] - - entities = [ - ( - (v.entity.value, v.entity.is_uri), - v.vectors - ) - for v in m.entities - ] - - while True: - - try: - - resp = self.cassandra.execute( - self.insert_graph_embeddings_stmt, - ( - uuid.uuid4(), m.metadata.user, - m.metadata.collection, m.metadata.id, when, - metadata, entities, - ) - ) - - break - - except Exception as e: - - print("Exception:", type(e)) - print(f"{e}, retry...", flush=True) - await asyncio.sleep(1) - - async def add_document_embeddings(self, m): - - when = int(time.time() * 1000) - - if m.metadata.metadata: - metadata = [ - ( - v.s.value, v.s.is_uri, v.p.value, v.p.is_uri, - v.o.value, v.o.is_uri - ) - for v in m.metadata.metadata - ] - else: - metadata = [] - - chunks = [ - ( - v.chunk, - v.vectors, - ) - for v in m.chunks - ] - - while True: - - try: - - resp = self.cassandra.execute( - self.insert_document_embeddings_stmt, - ( - uuid.uuid4(), m.metadata.user, - m.metadata.collection, m.metadata.id, when, - metadata, chunks, - ) - ) - - break - - except Exception as e: - - print("Exception:", type(e)) - print(f"{e}, retry...", flush=True) - await asyncio.sleep(1) - -