From ff28d26f4d4cf0b113277b575ff1712a15b13c7d Mon Sep 17 00:00:00 2001 From: cybermaggedon Date: Sun, 4 May 2025 22:26:19 +0100 Subject: [PATCH] Feature/flow librarian (#361) * Update librarian to new API * Implementing new schema with document + processing objects --- test-api/test-library-add-doc | 24 +- test-api/test-library-add-doc2 | 20 +- test-api/test-library-add-processing | 50 ++ test-api/test-library-get-document-content | 41 ++ test-api/test-library-get-document-metadata | 42 ++ test-api/test-library-list | 2 +- test-api/test-library-list-documents | 38 ++ test-api/test-library-list-processing | 38 ++ test-api/test-library-remove-document | 41 ++ test-api/test-library-remove-document2 | 41 ++ test-api/test-library-remove-processing | 41 ++ test-api/test-library-update-doc | 75 +++ trustgraph-base/trustgraph/schema/library.py | 100 +++- .../trustgraph/config/service/service.py | 3 - .../trustgraph/gateway/dispatch/librarian.py | 60 ++- .../trustgraph/gateway/dispatch/serialize.py | 99 ++-- trustgraph-flow/trustgraph/gateway/service.py | 1 - .../trustgraph/librarian/blob_store.py | 24 +- .../trustgraph/librarian/librarian.py | 241 ++++++++-- .../trustgraph/librarian/service.py | 326 +++++-------- .../trustgraph/librarian/table_store.py | 444 +++++++++++++++--- 21 files changed, 1323 insertions(+), 428 deletions(-) create mode 100755 test-api/test-library-add-processing create mode 100755 test-api/test-library-get-document-content create mode 100755 test-api/test-library-get-document-metadata create mode 100755 test-api/test-library-list-documents create mode 100755 test-api/test-library-list-processing create mode 100755 test-api/test-library-remove-document create mode 100755 test-api/test-library-remove-document2 create mode 100755 test-api/test-library-remove-processing create mode 100755 test-api/test-library-update-doc diff --git a/test-api/test-library-add-doc b/test-api/test-library-add-doc index bd927367..d0fcb0d2 100755 --- a/test-api/test-library-add-doc +++ b/test-api/test-library-add-doc @@ -4,20 +4,25 @@ import requests import json import sys import base64 +import time url = "http://localhost:8088/api/v1/" ############################################################################ -id = "http://trustgraph.ai/doc/12345678" +id = "http://trustgraph.ai/doc/9fdee98b-b259-40ac-bcb9-8e82ccedeb04" -with open("docs/README.cats") as f: - doc = base64.b64encode(f.read().encode("utf-8")).decode("utf-8") +with open("docs/README.cats", "rb") as f: + doc = base64.b64encode(f.read()).decode("utf-8") input = { - "operation": "add", - "document": { + "operation": "add-document", + "document-metadata": { "id": id, + "time": int(time.time()), + "kind": "text/plain", + "title": "Mark's cats", + "comments": "Test doc taken from the TrustGraph repo", "metadata": [ { "s": { @@ -46,13 +51,10 @@ input = { }, }, ], - "document": doc, - "kind": "text/plain", "user": "trustgraph", - "collection": "default", - "title": "Mark's cats", - "comments": "Test doc taken from the TrustGraph repo", - } + "tags": ["mark", "cats"], + }, + "content": doc, } resp = requests.post( diff --git a/test-api/test-library-add-doc2 b/test-api/test-library-add-doc2 index 0c0856f9..f886c739 100755 --- a/test-api/test-library-add-doc2 +++ b/test-api/test-library-add-doc2 @@ -4,12 +4,13 @@ import requests import json import sys import base64 +import time url = "http://localhost:8088/api/v1/" ############################################################################ -id = "http://trustgraph.ai/doc/12345678" +id = "http://trustgraph.ai/doc/6d034da9-2759-45c2-af24-14db7f4c44c2" source = "../sources/20160001634.pdf" @@ -17,9 +18,13 @@ with open(source, "rb") as f: doc = base64.b64encode(f.read()).decode("utf-8") input = { - "operation": "add", - "id": id, - "document": { + "operation": "add-document", + "document-metadata": { + "id": id, + "time": int(time.time()), + "kind": "application/pdf", + "title": "Application of SAE ARP4754A to Flight Critical Systems", + "comments": "Application of federal safety standards to NASA spacecraft", "metadata": [ { "s": { @@ -61,11 +66,10 @@ input = { }, }, ], - "document": doc, - "kind": "application/pdf", "user": "trustgraph", - "collection": "default", - } + "tags": ["nasa", "safety-engineering"], + }, + "content": doc, } resp = requests.post( diff --git a/test-api/test-library-add-processing b/test-api/test-library-add-processing new file mode 100755 index 00000000..f1692b0a --- /dev/null +++ b/test-api/test-library-add-processing @@ -0,0 +1,50 @@ +#!/usr/bin/env python3 + +import requests +import json +import sys +import base64 +import time + +url = "http://localhost:8088/api/v1/" + +############################################################################ + +doc_id = "http://trustgraph.ai/doc/9fdee98b-b259-40ac-bcb9-8e82ccedeb04" + +proc_id = "2714fc72-44ab-45f2-94dd-6773fc336535" + +input = { + "operation": "add-processing", + "processing-metadata": { + "id": proc_id, + "document-id": doc_id, + "time": int(time.time()), + "flow": "0000", + "user": "trustgraph", + "collection": "default", + "tags": ["test"], + } +} + +resp = requests.post( + f"{url}librarian", + json=input, +) + +print(resp.text) +resp = resp.json() + +print(resp) + +if "error" in resp: + print(f"Error: {resp['error']}") + sys.exit(1) + +# print(resp["response"]) +print(resp) + +sys.exit(0) + +############################################################################ + diff --git a/test-api/test-library-get-document-content b/test-api/test-library-get-document-content new file mode 100755 index 00000000..5a8b2880 --- /dev/null +++ b/test-api/test-library-get-document-content @@ -0,0 +1,41 @@ +#!/usr/bin/env python3 + +import requests +import json +import sys +import base64 + +url = "http://localhost:8088/api/v1/" + +############################################################################ + +id = "http://trustgraph.ai/doc/9fdee98b-b259-40ac-bcb9-8e82ccedeb04" + +user = "trustgraph" + +input = { + "operation": "get-document-content", + "user": user, + "document-id": id, +} + +resp = requests.post( + f"{url}librarian", + json=input, +) + +resp = resp.json() + +if "error" in resp: + print(f"Error: {resp['error']}") + sys.exit(1) + + +content = base64.b64decode(resp["content"]).decode("utf-8") + +print(content) + +sys.exit(0) + +############################################################################ + diff --git a/test-api/test-library-get-document-metadata b/test-api/test-library-get-document-metadata new file mode 100755 index 00000000..0bcdd321 --- /dev/null +++ b/test-api/test-library-get-document-metadata @@ -0,0 +1,42 @@ +#!/usr/bin/env python3 + +import requests +import json +import sys +import base64 + +url = "http://localhost:8088/api/v1/" + +############################################################################ + +id = "http://trustgraph.ai/doc/9fdee98b-b259-40ac-bcb9-8e82ccedeb04" + +user = "trustgraph" + +input = { + "operation": "get-document-metadata", + "user": user, + "document-id": id, +} + +resp = requests.post( + f"{url}librarian", + json=input, +) + +print(resp.text) +resp = resp.json() + +print(resp) + +if "error" in resp: + print(f"Error: {resp['error']}") + sys.exit(1) + +# print(resp["response"]) +print(resp) + +sys.exit(0) + +############################################################################ + diff --git a/test-api/test-library-list b/test-api/test-library-list index 72ea4478..cecb835f 100755 --- a/test-api/test-library-list +++ b/test-api/test-library-list @@ -12,7 +12,7 @@ url = "http://localhost:8088/api/v1/" user = "trustgraph" input = { - "operation": "list", + "operation": "list-documents", "user": user, } diff --git a/test-api/test-library-list-documents b/test-api/test-library-list-documents new file mode 100755 index 00000000..9677ce4d --- /dev/null +++ b/test-api/test-library-list-documents @@ -0,0 +1,38 @@ +#!/usr/bin/env python3 + +import requests +import json +import sys +import base64 +import time + +url = "http://localhost:8088/api/v1/" + +############################################################################ + +input = { + "operation": "list-documents", + "user": "trustgraph", +} + +resp = requests.post( + f"{url}librarian", + json=input, +) + +print(resp.text) +resp = resp.json() + +print(resp) + +if "error" in resp: + print(f"Error: {resp['error']}") + sys.exit(1) + +# print(resp["response"]) +print(resp) + +sys.exit(0) + +############################################################################ + diff --git a/test-api/test-library-list-processing b/test-api/test-library-list-processing new file mode 100755 index 00000000..1d31a572 --- /dev/null +++ b/test-api/test-library-list-processing @@ -0,0 +1,38 @@ +#!/usr/bin/env python3 + +import requests +import json +import sys +import base64 +import time + +url = "http://localhost:8088/api/v1/" + +############################################################################ + +input = { + "operation": "list-processing", + "user": "trustgraph", +} + +resp = requests.post( + f"{url}librarian", + json=input, +) + +print(resp.text) +resp = resp.json() + +print(resp) + +if "error" in resp: + print(f"Error: {resp['error']}") + sys.exit(1) + +# print(resp["response"]) +print(resp) + +sys.exit(0) + +############################################################################ + diff --git a/test-api/test-library-remove-document b/test-api/test-library-remove-document new file mode 100755 index 00000000..6354c292 --- /dev/null +++ b/test-api/test-library-remove-document @@ -0,0 +1,41 @@ +#!/usr/bin/env python3 + +import requests +import json +import sys +import base64 +import time + +url = "http://localhost:8088/api/v1/" + +############################################################################ + +id = "http://trustgraph.ai/doc/9fdee98b-b259-40ac-bcb9-8e82ccedeb04" + +input = { + "operation": "remove-document", + "user": "trustgraph", + "document-id": id +} + +resp = requests.post( + f"{url}librarian", + json=input, +) + +print(resp.text) +resp = resp.json() + +print(resp) + +if "error" in resp: + print(f"Error: {resp['error']}") + sys.exit(1) + +# print(resp["response"]) +print(resp) + +sys.exit(0) + +############################################################################ + diff --git a/test-api/test-library-remove-document2 b/test-api/test-library-remove-document2 new file mode 100755 index 00000000..fd57d025 --- /dev/null +++ b/test-api/test-library-remove-document2 @@ -0,0 +1,41 @@ +#!/usr/bin/env python3 + +import requests +import json +import sys +import base64 +import time + +url = "http://localhost:8088/api/v1/" + +############################################################################ + +id = "http://trustgraph.ai/doc/6d034da9-2759-45c2-af24-14db7f4c44c2" + +input = { + "operation": "remove-document", + "user": "trustgraph", + "document-id": id +} + +resp = requests.post( + f"{url}librarian", + json=input, +) + +print(resp.text) +resp = resp.json() + +print(resp) + +if "error" in resp: + print(f"Error: {resp['error']}") + sys.exit(1) + +# print(resp["response"]) +print(resp) + +sys.exit(0) + +############################################################################ + diff --git a/test-api/test-library-remove-processing b/test-api/test-library-remove-processing new file mode 100755 index 00000000..51bbcc42 --- /dev/null +++ b/test-api/test-library-remove-processing @@ -0,0 +1,41 @@ +#!/usr/bin/env python3 + +import requests +import json +import sys +import base64 +import time + +url = "http://localhost:8088/api/v1/" + +############################################################################ + +proc_id = "2714fc72-44ab-45f2-94dd-6773fc336535" + +input = { + "operation": "remove-processing", + "user": "trustgraph", + "processing-id": proc_id, +} + +resp = requests.post( + f"{url}librarian", + json=input, +) + +print(resp.text) +resp = resp.json() + +print(resp) + +if "error" in resp: + print(f"Error: {resp['error']}") + sys.exit(1) + +# print(resp["response"]) +print(resp) + +sys.exit(0) + +############################################################################ + diff --git a/test-api/test-library-update-doc b/test-api/test-library-update-doc new file mode 100755 index 00000000..eee4170d --- /dev/null +++ b/test-api/test-library-update-doc @@ -0,0 +1,75 @@ +#!/usr/bin/env python3 + +import requests +import json +import sys +import base64 +import time + +url = "http://localhost:8088/api/v1/" + +############################################################################ + +id = "http://trustgraph.ai/doc/9fdee98b-b259-40ac-bcb9-8e82ccedeb04" + +input = { + "operation": "update-document", + "document-metadata": { + "id": id, + "time": int(time.time()), + "title": "Mark's cats - a story", + "comments": "Information about Mark's cats", + "metadata": [ + { + "s": { + "v": id, + "e": True, + }, + "p": { + "v": "http://www.w3.org/2000/01/rdf-schema#label", + "e": True, + }, + "o": { + "v": "Mark's pets", "e": False, + }, + }, + { + "s": { + "v": id, + "e": True, + }, + "p": { + "v": 'https://schema.org/keywords', + "e": True, + }, + "o": { + "v": "cats", "e": False, + }, + }, + ], + "user": "trustgraph", + "tags": ["mark", "cats", "pets"], + }, +} + +resp = requests.post( + f"{url}librarian", + json=input, +) + +print(resp.text) +resp = resp.json() + +print(resp) + +if "error" in resp: + print(f"Error: {resp['error']}") + sys.exit(1) + +# print(resp["response"]) +print(resp) + +sys.exit(0) + +############################################################################ + diff --git a/trustgraph-base/trustgraph/schema/library.py b/trustgraph-base/trustgraph/schema/library.py index ed52b2ad..dfec533f 100644 --- a/trustgraph-base/trustgraph/schema/library.py +++ b/trustgraph-base/trustgraph/schema/library.py @@ -6,16 +6,52 @@ from . types import Error from . metadata import Metadata from . documents import Document, TextDocument -# add -# -> (id, document) +# add-document +# -> (document_id, document_metadata, content) # <- () # <- (error) -# list -# -> (user, collection?) -# <- (info) +# remove-document +# -> (document_id) +# <- () # <- (error) +# update-document +# -> (document_id, document_metadata) +# <- () +# <- (error) + +# get-document-metadata +# -> (document_id) +# <- (document_metadata) +# <- (error) + +# get-document-content +# -> (document_id) +# <- (content) +# <- (error) + +# add-processing +# -> (processing_id, processing_metadata) +# <- () +# <- (error) + +# remove-processing +# -> (processing_id) +# <- () +# <- (error) + +# list-documents +# -> (user, collection?) +# <- (document_metadata[]) +# <- (error) + +# list-processing +# -> (user, collection?) +# <- (processing_metadata[]) +# <- (error) + +# OLD: # add(Metadata, Bytes) : error? # copy(id, user, collection) # move(id, user, collection) @@ -26,26 +62,24 @@ from . documents import Document, TextDocument # info(id[]) : DocumentInfo[] # search([]) : id[] -class DocumentPackage(Record): +class DocumentMetadata(Record): id = String() - document = Bytes() + time = Long() kind = String() - user = String() - collection = String() title = String() comments = String() - time = Long() metadata = Array(Triple()) + user = String() + tags = Array(String()) -class DocumentInfo(Record): +class ProcessingMetadata(Record): id = String() - kind = String() + document_id = String() + time = Long() + flow = String() user = String() collection = String() - title = String() - comments = String() - time = Long() - metadata = Array(Triple()) + tags = Array(String()) class Criteria(Record): key = String() @@ -53,17 +87,43 @@ class Criteria(Record): operator = String() class LibrarianRequest(Record): + + # add-document, remove-document, update-document, get-document-metadata, + # get-document-content, add-processing, remove-processing, list-documents, + # list-processing operation = String() - id = String() - document = DocumentPackage() + + # add-document, remove-document, update-document, get-document-metadata, + # get-document-content + document_id = String() + + # add-processing, remove-processing + processing_id = String() + + # add-document, update-document + document_metadata = DocumentMetadata() + + # add-processing + processing_metadata = ProcessingMetadata() + + # add-document + content = Bytes() + + # list-documents, list-processing user = String() + + # list-documents?, list-processing? collection = String() + + # criteria = Array(Criteria()) class LibrarianResponse(Record): error = Error() - document = DocumentPackage() - info = Array(DocumentInfo()) + document_metadata = DocumentMetadata() + content = Bytes() + document_metadatas = Array(DocumentMetadata()) + processing_metadatas = Array(ProcessingMetadata()) librarian_request_queue = topic( 'librarian', kind='non-persistent', namespace='request' diff --git a/trustgraph-flow/trustgraph/config/service/service.py b/trustgraph-flow/trustgraph/config/service/service.py index c0268389..11cd156c 100644 --- a/trustgraph-flow/trustgraph/config/service/service.py +++ b/trustgraph-flow/trustgraph/config/service/service.py @@ -3,8 +3,6 @@ Config service. Manages system global configuration state """ -from pulsar.schema import JsonSchema - from trustgraph.schema import Error from trustgraph.schema import ConfigRequest, ConfigResponse, ConfigPush @@ -14,7 +12,6 @@ from trustgraph.schema import config_push_queue from trustgraph.schema import FlowRequest, FlowResponse from trustgraph.schema import flow_request_queue, flow_response_queue -from trustgraph.log_level import LogLevel from trustgraph.base import AsyncProcessor, Consumer, Producer from . config import Configuration diff --git a/trustgraph-flow/trustgraph/gateway/dispatch/librarian.py b/trustgraph-flow/trustgraph/gateway/dispatch/librarian.py index f280b392..d33138ac 100644 --- a/trustgraph-flow/trustgraph/gateway/dispatch/librarian.py +++ b/trustgraph-flow/trustgraph/gateway/dispatch/librarian.py @@ -1,11 +1,15 @@ +import base64 + from ... schema import LibrarianRequest, LibrarianResponse from ... schema import librarian_request_queue from ... schema import librarian_response_queue from . requestor import ServiceRequestor -from . serialize import serialize_document_package, serialize_document_info -from . serialize import to_document_package, to_document_info, to_criteria +from . serialize import serialize_document_metadata +from . serialize import serialize_processing_metadata +from . serialize import to_document_metadata, to_processing_metadata +from . serialize import to_criteria class LibrarianRequestor(ServiceRequestor): def __init__(self, pulsar_client, consumer, subscriber, timeout=120): @@ -23,20 +27,37 @@ class LibrarianRequestor(ServiceRequestor): def to_request(self, body): - if "document" in body: - dp = to_document_package(body["document"]) + # Content gets base64 decoded & encoded again. It at least makes + # sure payload is valid base64. + + if "document-metadata" in body: + dm = to_document_metadata(body["document-metadata"]) else: - dp = None + dm = None + + if "processing-metadata" in body: + pm = to_processing_metadata(body["processing-metadata"]) + else: + pm = None if "criteria" in body: criteria = to_criteria(body["criteria"]) else: criteria = None + if "content" in body: + content = base64.b64decode(body["content"].encode("utf-8")) + content = base64.b64encode(content).decode("utf-8") + else: + content = None + return LibrarianRequest( operation = body.get("operation", None), - id = body.get("id", None), - document = dp, + document_id = body.get("document-id", None), + processing_id = body.get("processing-id", None), + document_metadata = dm, + processing_metadata = pm, + content = content, user = body.get("user", None), collection = body.get("collection", None), criteria = criteria, @@ -44,15 +65,28 @@ class LibrarianRequestor(ServiceRequestor): def from_response(self, message): + print(message) + response = {} - if message.document: - response["document"] = serialize_document_package(message.document) + if message.document_metadata: + response["document-metadata"] = serialize_document_metadata( + message.document_metadata + ) - if message.info: - response["info"] = [ - serialize_document_info(v) - for v in message.info + if message.content: + response["content"] = message.content.decode("utf-8") + + if message.document_metadatas != None: + response["document-metadatas"] = [ + serialize_document_metadata(v) + for v in message.document_metadatas + ] + + if message.processing_metadatas != None: + response["processing-metadatas"] = [ + serialize_processing_metadata(v) + for v in message.processing_metadatas ] return response, True diff --git a/trustgraph-flow/trustgraph/gateway/dispatch/serialize.py b/trustgraph-flow/trustgraph/gateway/dispatch/serialize.py index 1f495313..678f5109 100644 --- a/trustgraph-flow/trustgraph/gateway/dispatch/serialize.py +++ b/trustgraph-flow/trustgraph/gateway/dispatch/serialize.py @@ -1,7 +1,7 @@ import base64 -from ... schema import Value, Triple, DocumentPackage, DocumentInfo +from ... schema import Value, Triple, DocumentMetadata, ProcessingMetadata def to_value(x): return Value(value=x["v"], is_uri=x["e"]) @@ -80,88 +80,86 @@ def serialize_document_embeddings(message): ], } -def serialize_document_package(message): +def serialize_document_metadata(message): ret = {} if message.id: ret["id"] = message.id - if message.metadata: - ret["metadata"] = serialize_subgraph(message.metdata) - - if message.document: - blob = base64.b64encode( - message.document.encode("utf-8") - ).decode("utf-8") - ret["document"] = blob + if message.time: + ret["time"] = message.time if message.kind: ret["kind"] = message.kind - if message.user: - ret["user"] = message.user - - if message.collection: - ret["collection"] = message.collection - - return ret - -def serialize_document_info(message): - - ret = {} - - if message.id: - ret["id"] = message.id - - if message.kind: - ret["kind"] = message.kind - - if message.user: - ret["user"] = message.user - - if message.collection: - ret["collection"] = message.collection - if message.title: ret["title"] = message.title if message.comments: ret["comments"] = message.comments - if message.time: - ret["time"] = message.time - if message.metadata: ret["metadata"] = serialize_subgraph(message.metadata) + if message.user: + ret["user"] = message.user + + if message.tags: + ret["tags"] = message.tags + return ret -def to_document_package(x): +def serialize_processing_metadata(message): - return DocumentPackage( + ret = {} + + if message.id: + ret["id"] = message.id + + if message.id: + ret["document-id"] = message.document_id + + if message.time: + ret["time"] = message.time + + if message.flow: + ret["flow"] = message.flow + + if message.user: + ret["user"] = message.user + + if message.collection: + ret["collection"] = message.collection + + if message.tags: + ret["tags"] = message.tags + + return ret + +def to_document_metadata(x): + + return DocumentMetadata( id = x.get("id", None), + time = x.get("time", None), kind = x.get("kind", None), - user = x.get("user", None), - collection = x.get("collection", None), title = x.get("title", None), comments = x.get("comments", None), - time = x.get("time", None), - document = x.get("document", None), metadata = to_subgraph(x["metadata"]), + user = x.get("user", None), + tags = x.get("tags", None), ) -def to_document_info(x): +def to_processing_metadata(x): - return DocumentInfo( + return ProcessingMetadata( id = x.get("id", None), - kind = x.get("kind", None), + document_id = x.get("document-id", None), + time = x.get("time", None), + flow = x.get("flow", None), user = x.get("user", None), collection = x.get("collection", None), - title = x.get("title", None), - comments = x.get("comments", None), - time = x.get("time", None), - metadata = to_subgraph(x["metadata"]), + tags = x.get("tags", None), ) def to_criteria(x): @@ -169,3 +167,4 @@ def to_criteria(x): Critera(v["key"], v["value"], v["operator"]) for v in x ] + diff --git a/trustgraph-flow/trustgraph/gateway/service.py b/trustgraph-flow/trustgraph/gateway/service.py index e83b79d1..97406422 100755 --- a/trustgraph-flow/trustgraph/gateway/service.py +++ b/trustgraph-flow/trustgraph/gateway/service.py @@ -95,7 +95,6 @@ class Api: await self.config_receiver.start() - for ep in self.endpoints: ep.add_routes(self.app) diff --git a/trustgraph-flow/trustgraph/librarian/blob_store.py b/trustgraph-flow/trustgraph/librarian/blob_store.py index 5cffef18..3368f57e 100644 --- a/trustgraph-flow/trustgraph/librarian/blob_store.py +++ b/trustgraph-flow/trustgraph/librarian/blob_store.py @@ -37,7 +37,7 @@ class BlobStore: else: print("Bucket", self.bucket_name, "already exists", flush=True) - def add(self, object_id, blob, kind): + async def add(self, object_id, blob, kind): # FIXME: Loop retry self.minio.put_object( @@ -49,3 +49,25 @@ class BlobStore: ) print("Add blob complete", flush=True) + + async def remove(self, object_id): + + # FIXME: Loop retry + self.minio.remove_object( + bucket_name = self.bucket_name, + object_name = "doc/" + str(object_id), + ) + + print("Remove blob complete", flush=True) + + + async def get(self, object_id): + + # FIXME: Loop retry + resp = self.minio.get_object( + bucket_name = self.bucket_name, + object_name = "doc/" + str(object_id), + ) + + return resp.read() + diff --git a/trustgraph-flow/trustgraph/librarian/librarian.py b/trustgraph-flow/trustgraph/librarian/librarian.py index 9bccc37a..befad00a 100644 --- a/trustgraph-flow/trustgraph/librarian/librarian.py +++ b/trustgraph-flow/trustgraph/librarian/librarian.py @@ -1,8 +1,10 @@ + from .. schema import LibrarianRequest, LibrarianResponse, Error, Triple from .. knowledge import hash from .. exceptions import RequestError from . table_store import TableStore from . blob_store import BlobStore +import base64 import uuid @@ -26,63 +28,240 @@ class Librarian: self.load_document = load_document self.load_text = load_text - async def add(self, document): + async def add_document(self, request): - if document.kind not in ( + if request.document_metadata.kind not in ( "text/plain", "application/pdf" ): - raise RequestError("Invalid document kind: " + document.kind) + raise RequestError( + "Invalid document kind: " + request.document_metadata.kind + ) - # Create object ID as a hash of the document - object_id = uuid.UUID(hash(document.document)) + if await self.table_store.document_exists( + request.document_metadata.user, + request.document_metadata.id + ): + raise RuntimeError("Document already exists") - self.blob_store.add(object_id, document.document, document.kind) + # Create object ID for blob + object_id = uuid.uuid4() - self.table_store.add(object_id, document) + print("Add blob...") - if document.kind == "application/pdf": - await self.load_document(document) - elif document.kind == "text/plain": - await self.load_text(document) + await self.blob_store.add( + object_id, base64.b64decode(request.content), + request.document_metadata.kind + ) + + print("Add table...") + + await self.table_store.add_document( + request.document_metadata, object_id + ) print("Add complete", flush=True) return LibrarianResponse( error = None, - document = None, - info = None, + document_metadata = None, + content = None, + document_metadatas = None, + processing_metadatas = None, ) - async def list(self, user, collection): + async def remove_document(self, request): - print("list") + print("Removing doc...") - info = self.table_store.list(user, collection) + if not await self.table_store.document_exists( + request.user, + request.document_id, + ): + raise RuntimeError("Document does not exist") - print(">>", info) + object_id = await self.table_store.get_document_object_id( + request.user, + request.document_id + ) + + # Remove blob... + await self.blob_store.remove(object_id) + + # Remove doc table row + await self.table_store.remove_document( + request.user, + request.document_id + ) + + print("Remove complete", flush=True) return LibrarianResponse( error = None, - document = None, - info = info, + document_metadata = None, + content = None, + document_metadatas = None, + processing_metadatas = None, ) - def handle_triples(self, m): - self.table_store.add_triples(m) + async def update_document(self, request): - def handle_graph_embeddings(self, m): - self.table_store.add_graph_embeddings(m) + print("Updating doc...") - def handle_document_embeddings(self, m): - self.table_store.add_document_embeddings(m) + # You can't update the document ID, user or kind. + + if not await self.table_store.document_exists( + request.document_metadata.user, + request.document_metadata.id + ): + raise RuntimeError("Document does not exist") + + await self.table_store.update_document(request.document_metadata) + + print("Update complete", flush=True) + + return LibrarianResponse( + error = None, + document_metadata = None, + content = None, + document_metadatas = None, + processing_metadatas = None, + ) + + async def get_document_metadata(self, request): + + print("Get doc...") + + doc = await self.table_store.get_document( + request.user, + request.document_id + ) + + print("Get complete", flush=True) + + return LibrarianResponse( + error = None, + document_metadata = doc, + content = None, + document_metadatas = None, + processing_metadatas = None, + ) + + async def get_document_content(self, request): + + print("Get doc content...") + + object_id = await self.table_store.get_document_object_id( + request.user, + request.document_id + ) + + content = await self.blob_store.get( + object_id + ) + + print("Get complete", flush=True) + + return LibrarianResponse( + error = None, + document_metadata = None, + content = base64.b64encode(content), + document_metadatas = None, + processing_metadatas = None, + ) + + async def add_processing(self, request): + + print("Add processing") + + if await self.table_store.processing_exists( + request.processing_metadata.user, + request.processing_metadata.id + ): + raise RuntimeError("Processing already exists") + + doc = await self.table_store.get_document( + request.processing_metadata.user, + request.processing_metadata.document_id + ) + + object_id = await self.table_store.get_document_object_id( + request.processing_metadata.user, + request.processing_metadata.document_id + ) + + content = await self.blob_store.get( + object_id + ) + + print("Got content") + + print("Add processing...") + + await self.table_store.add_processing(request.processing_metadata) + + print("Add complete", flush=True) + + return LibrarianResponse( + error = None, + document_metadata = None, + content = None, + document_metadatas = None, + processing_metadatas = None, + ) - def handle_triples(self, m): - self.table_store.add_triples(m) + # if document.kind == "application/pdf": + # await self.load_document(document) + # elif document.kind == "text/plain": + # await self.load_text(document) - def handle_graph_embeddings(self, m): - self.table_store.add_graph_embeddings(m) + async def remove_processing(self, request): - def handle_document_embeddings(self, m): - self.table_store.add_document_embeddings(m) + print("Removing processing...") + + if not await self.table_store.processing_exists( + request.user, + request.processing_id, + ): + raise RuntimeError("Processing object does not exist") + + # Remove doc table row + await self.table_store.remove_processing( + request.user, + request.processing_id + ) + + print("Remove complete", flush=True) + + return LibrarianResponse( + error = None, + document_metadata = None, + content = None, + document_metadatas = None, + processing_metadatas = None, + ) + + async def list_documents(self, request): + + docs = await self.table_store.list_documents(request.user) + + return LibrarianResponse( + error = None, + document_metadata = None, + content = None, + document_metadatas = docs, + processing_metadatas = None, + ) + + async def list_processing(self, request): + + procs = await self.table_store.list_processing(request.user) + + return LibrarianResponse( + error = None, + document_metadata = None, + content = None, + document_metadatas = None, + processing_metadatas = procs, + ) diff --git a/trustgraph-flow/trustgraph/librarian/service.py b/trustgraph-flow/trustgraph/librarian/service.py index 587dcbf3..d24d2d70 100755 --- a/trustgraph-flow/trustgraph/librarian/service.py +++ b/trustgraph-flow/trustgraph/librarian/service.py @@ -5,41 +5,27 @@ Librarian service, manages documents in collections from functools import partial import asyncio -import threading -import queue import base64 +import json -from pulsar.schema import JsonSchema +from .. base import AsyncProcessor, Consumer, Producer, Publisher, Subscriber +from .. base import ConsumerMetrics, ProducerMetrics from .. schema import LibrarianRequest, LibrarianResponse, Error from .. schema import librarian_request_queue, librarian_response_queue -from .. schema import GraphEmbeddings -from .. schema import graph_embeddings_store_queue -from .. schema import Triples -from .. schema import triples_store_queue -from .. schema import DocumentEmbeddings -from .. schema import document_embeddings_store_queue - from .. schema import Document, Metadata -from .. schema import document_ingest_queue from .. schema import TextDocument, Metadata -from .. schema import text_ingest_queue -from .. base import Publisher -from .. base import Subscriber - -from .. log_level import LogLevel -from .. base import ConsumerProducer from .. exceptions import RequestError from . librarian import Librarian -module = "librarian" +default_ident = "librarian" + +default_librarian_request_queue = librarian_request_queue +default_librarian_response_queue = librarian_response_queue -default_input_queue = librarian_request_queue -default_output_queue = librarian_response_queue -default_subscriber = module default_minio_host = "minio:9000" default_minio_access_key = "minioadmin" default_minio_secret_key = "minioadmin" @@ -50,15 +36,21 @@ bucket_name = "library" # FIXME: How to ensure this doesn't conflict with other usage? keyspace = "librarian" -class Processor(ConsumerProducer): +class Processor(AsyncProcessor): def __init__(self, **params): - self.running = True + id = params.get("id") - input_queue = params.get("input_queue", default_input_queue) - output_queue = params.get("output_queue", default_output_queue) - subscriber = params.get("subscriber", default_subscriber) +# self.running = True + + librarian_request_queue = params.get( + "librarian_request_queue", default_librarian_request_queue + ) + + librarian_response_queue = params.get( + "librarian_response_queue", default_librarian_response_queue + ) minio_host = params.get("minio_host", default_minio_host) minio_access_key = params.get( @@ -74,19 +66,10 @@ class Processor(ConsumerProducer): cassandra_user = params.get("cassandra_user") cassandra_password = params.get("cassandra_password") - triples_queue = params.get("triples_queue") - graph_embeddings_queue = params.get("graph_embeddings_queue") - document_embeddings_queue = params.get("document_embeddings_queue") - document_load_queue = params.get("document_load_queue") - text_load_queue = params.get("text_load_queue") - super(Processor, self).__init__( **params | { - "input_queue": input_queue, - "output_queue": output_queue, - "subscriber": subscriber, - "input_schema": LibrarianRequest, - "output_schema": LibrarianResponse, + "librarian_request_queue": librarian_request_queue, + "librarian_response_queue": librarian_response_queue, "minio_host": minio_host, "minio_access_key": minio_access_key, "cassandra_host": cassandra_host, @@ -94,38 +77,30 @@ class Processor(ConsumerProducer): } ) - self.document_load = Publisher( - self.client, document_load_queue, JsonSchema(Document), + librarian_request_metrics = ConsumerMetrics( + processor = self.id, flow = None, name = "librarian-request" ) - self.text_load = Publisher( - self.client, text_load_queue, JsonSchema(TextDocument), + librarian_response_metrics = ProducerMetrics( + processor = self.id, flow = None, name = "librarian-response" ) - self.triples_brk = Subscriber( - self.client, triples_store_queue, - "librarian", "librarian", - schema=JsonSchema(Triples), - ) - self.graph_embeddings_brk = Subscriber( - self.client, graph_embeddings_store_queue, - "librarian", "librarian", - schema=JsonSchema(GraphEmbeddings), - ) - self.document_embeddings_brk = Subscriber( - self.client, document_embeddings_store_queue, - "librarian", "librarian", - schema=JsonSchema(DocumentEmbeddings), + self.librarian_request_consumer = Consumer( + taskgroup = self.taskgroup, + client = self.pulsar_client, + flow = None, + topic = librarian_request_queue, + subscriber = id, + schema = LibrarianRequest, + handler = self.on_librarian_request, + metrics = librarian_request_metrics, ) - self.triples_reader = threading.Thread( - target=self.receive_triples - ) - self.graph_embeddings_reader = threading.Thread( - target=self.receive_graph_embeddings - ) - self.document_embeddings_reader = threading.Thread( - target=self.receive_document_embeddings + self.librarian_response_producer = Producer( + client = self.pulsar_client, + topic = librarian_response_queue, + schema = LibrarianResponse, + metrics = librarian_response_metrics, ) self.librarian = Librarian( @@ -141,87 +116,34 @@ class Processor(ConsumerProducer): load_text = self.load_text, ) + self.register_config_handler(self.on_librarian_config) + + self.flows = {} + print("Initialised.", flush=True) async def start(self): - - self.document_load.start() - self.text_load.start() - self.triples_brk.start() - self.graph_embeddings_brk.start() - self.document_embeddings_brk.start() + await super(Processor, self).start() + await self.librarian_request_consumer.start() + await self.librarian_response_producer.start() - self.triples_sub = self.triples_brk.subscribe_all("x") - self.graph_embeddings_sub = self.graph_embeddings_brk.subscribe_all("x") - self.document_embeddings_sub = self.document_embeddings_brk.subscribe_all("x") + async def on_librarian_config(self, config, version): - self.triples_reader.start() - self.graph_embeddings_reader.start() - self.document_embeddings_reader.start() + print("config version", version) + + if "flows" in config: + + self.flows = { + k: json.loads(v) + for k, v in config["flows"].items() + } + + print(self.flows) def __del__(self): - self.running = False - - if hasattr(self, "document_load"): - self.document_load.stop() - self.document_load.join() - - if hasattr(self, "text_load"): - self.text_load.stop() - self.text_load.join() - - if hasattr(self, "triples_sub"): - self.triples_sub.unsubscribe_all("x") - - if hasattr(self, "graph_embeddings_sub"): - self.graph_embeddings_sub.unsubscribe_all("x") - - if hasattr(self, "document_embeddings_sub"): - self.document_embeddings_sub.unsubscribe_all("x") - - if hasattr(self, "triples_brk"): - self.triples_brk.stop() - self.triples_brk.join() - - if hasattr(self, "graph_embeddings_brk"): - self.graph_embeddings_brk.stop() - self.graph_embeddings_brk.join() - - if hasattr(self, "document_embeddings_brk"): - self.document_embeddings_brk.stop() - self.document_embeddings_brk.join() - - def receive_triples(self): - - while self.running: - try: - msg = self.triples_sub.get(timeout=1) - except queue.Empty: - continue - - self.librarian.handle_triples(msg) - - def receive_graph_embeddings(self): - - while self.running: - try: - msg = self.graph_embeddings_sub.get(timeout=1) - except queue.Empty: - continue - - self.librarian.handle_graph_embeddings(msg) - - def receive_document_embeddings(self): - - while self.running: - try: - msg = self.document_embeddings_sub.get(timeout=1) - except queue.Empty: - continue - - self.librarian.handle_document_embeddings(msg) + pass async def load_document(self, document): @@ -235,6 +157,8 @@ class Processor(ConsumerProducer): data = document.document ) + + self.document_load.send(None, doc) async def load_text(self, document): @@ -254,41 +178,31 @@ class Processor(ConsumerProducer): self.text_load.send(None, doc) - def parse_request(self, v): + async def process_request(self, v): if v.operation is None: raise RequestError("Null operation") - print("op", v.operation) + print("requets", v.operation) - if v.operation == "add": - if ( - v.document and v.document.id and v.document.metadata and - v.document.document and v.document.kind - ): - return partial( - self.librarian.add, - document = v.document, - ) - else: - raise RequestError("Invalid call") + impls = { + "add-document": self.librarian.add_document, + "remove-document": self.librarian.remove_document, + "update-document": self.librarian.update_document, + "get-document-metadata": self.librarian.get_document_metadata, + "get-document-content": self.librarian.get_document_content, + "add-processing": self.librarian.add_processing, + "remove-processing": self.librarian.remove_processing, + "list-documents": self.librarian.list_documents, + "list-processing": self.librarian.list_processing, + } - if v.operation == "list": - print("list", v) - print(v.user) - if v.user: - return partial( - self.librarian.list, - user = v.user, - collection = v.collection, - ) - else: - print("BROK") - raise RequestError("Invalid call") + if v.operation not in impls: + raise RequestError(f"Invalid operation: {v.operation}") - raise RequestError("Invalid operation: " + v.operation) + return await impls[v.operation](v) - async def handle(self, msg): + async def on_librarian_request(self, msg, consumer, flow): v = msg.value() @@ -299,20 +213,15 @@ class Processor(ConsumerProducer): print(f"Handling input {id}...", flush=True) try: - func = self.parse_request(v) - except RequestError as e: - resp = LibrarianResponse( - error = Error( - type = "request-error", - message = str(e), - ) + + resp = await self.process_request(v) + + await self.librarian_response_producer.send( + resp, properties={"id": id} ) - await self.send(resp, properties={"id": id}) + return - try: - resp = await func() - print("->", resp) except RequestError as e: resp = LibrarianResponse( error = Error( @@ -320,31 +229,43 @@ class Processor(ConsumerProducer): message = str(e), ) ) - await self.send(resp, properties={"id": id}) + + await self.librarian_response_producer.send( + resp, properties={"id": id} + ) + return except Exception as e: - print("Exception:", e, flush=True) resp = LibrarianResponse( error = Error( - type = "processing-error", - message = "Unhandled error: " + str(e), + type = "unexpected-error", + message = str(e), ) ) - await self.send(resp, properties={"id": id}) + + await self.librarian_response_producer.send( + resp, properties={"id": id} + ) + return - print("Send response..!.", flush=True) - - await self.send(resp, properties={"id": id}) - print("Done.", flush=True) @staticmethod def add_args(parser): - ConsumerProducer.add_args( - parser, default_input_queue, default_subscriber, - default_output_queue, + AsyncProcessor.add_args(parser) + + parser.add_argument( + '--librarian-request-queue', + default=default_librarian_request_queue, + help=f'Config request queue (default: {default_librarian_request_queue})' + ) + + parser.add_argument( + '--librarian-response-queue', + default=default_librarian_response_queue, + help=f'Config response queue {default_librarian_response_queue}', ) parser.add_argument( @@ -385,40 +306,7 @@ class Processor(ConsumerProducer): help=f'Cassandra password' ) - parser.add_argument( - '--triples-queue', - default=triples_store_queue, - help=f'Triples queue (default: {triples_store_queue})' - ) - - parser.add_argument( - '--graph-embeddings-queue', - default=graph_embeddings_store_queue, - help=f'Graph embeddings queue (default: {triples_store_queue})' - ) - - parser.add_argument( - '--document-embeddings-queue', - default=document_embeddings_store_queue, - help='Document embeddings queue ' - f'(default: {document_embeddings_store_queue})' - ) - - parser.add_argument( - '--document-load-queue', - default=document_ingest_queue, - help='Document load queue ' - f'(default: {document_ingest_queue})' - ) - - parser.add_argument( - '--text-load-queue', - default=text_ingest_queue, - help='Text ingest queue ' - f'(default: {text_ingest_queue})' - ) - def run(): - Processor.launch(module, __doc__) + Processor.launch(default_ident, __doc__) diff --git a/trustgraph-flow/trustgraph/librarian/table_store.py b/trustgraph-flow/trustgraph/librarian/table_store.py index 1fe47fcf..e7f2f473 100644 --- a/trustgraph-flow/trustgraph/librarian/table_store.py +++ b/trustgraph-flow/trustgraph/librarian/table_store.py @@ -1,5 +1,7 @@ + from .. schema import LibrarianRequest, LibrarianResponse -from .. schema import DocumentInfo, Error, Triple, Value +from .. schema import DocumentMetadata, ProcessingMetadata +from .. schema import Error, Triple, Value from .. knowledge import hash from .. exceptions import RequestError @@ -7,8 +9,10 @@ from cassandra.cluster import Cluster from cassandra.auth import PlainTextAuthProvider from cassandra.query import BatchStatement from ssl import SSLContext, PROTOCOL_TLSv1_2 + import uuid import time +import asyncio class TableStore: @@ -63,18 +67,18 @@ class TableStore: self.cassandra.execute(""" CREATE TABLE IF NOT EXISTS document ( - user text, - collection text, id text, + user text, time timestamp, + kind text, title text, comments text, - kind text, - object_id uuid, metadata list>, - PRIMARY KEY (user, collection, id) + tags list, + object_id uuid, + PRIMARY KEY (user, id) ); """); @@ -85,6 +89,23 @@ class TableStore: ON document (object_id) """); + print("processing table...", flush=True) + + self.cassandra.execute(""" + CREATE TABLE IF NOT EXISTS processing ( + id text, + document_id text, + time timestamp, + flow text, + user text, + collection text, + tags list, + PRIMARY KEY (user, id) + ); + """); + + return + print("triples table...", flush=True) self.cassandra.execute(""" @@ -155,26 +176,84 @@ class TableStore: self.insert_document_stmt = self.cassandra.prepare(""" INSERT INTO document ( - id, user, collection, kind, object_id, time, title, comments, - metadata + id, user, time, + kind, title, comments, + metadata, tags, object_id ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) """) + self.update_document_stmt = self.cassandra.prepare(""" + UPDATE document + SET time = ?, title = ?, comments = ?, + metadata = ?, tags = ? + WHERE user = ? AND id = ? + """) + + self.get_document_stmt = self.cassandra.prepare(""" + SELECT time, kind, title, comments, metadata, tags, object_id + FROM document + WHERE user = ? AND id = ? + """) + + self.delete_document_stmt = self.cassandra.prepare(""" + DELETE FROM document + WHERE user = ? AND id = ? + """) + + self.test_document_exists_stmt = self.cassandra.prepare(""" + SELECT id + FROM document + WHERE user = ? AND id = ? + LIMIT 1 + """) + self.list_document_stmt = self.cassandra.prepare(""" SELECT - id, kind, user, collection, title, comments, time, metadata + id, time, kind, title, comments, metadata, tags, object_id FROM document WHERE user = ? """) - self.list_document_by_collection_stmt = self.cassandra.prepare(""" + self.list_document_by_tag_stmt = self.cassandra.prepare(""" SELECT - id, kind, user, collection, title, comments, time, metadata + id, time, kind, title, comments, metadata, tags, object_id FROM document - WHERE user = ? AND collection = ? + WHERE user = ? AND tags CONTAINS ? + ALLOW FILTERING """) + self.insert_processing_stmt = self.cassandra.prepare(""" + INSERT INTO processing + ( + id, document_id, time, + flow, user, collection, + tags + ) + VALUES (?, ?, ?, ?, ?, ?, ?) + """) + + self.delete_processing_stmt = self.cassandra.prepare(""" + DELETE FROM processing + WHERE user = ? AND id = ? + """) + + self.test_processing_exists_stmt = self.cassandra.prepare(""" + SELECT id + FROM processing + WHERE user = ? AND id = ? + LIMIT 1 + """) + + self.list_processing_stmt = self.cassandra.prepare(""" + SELECT + id, document_id, time, flow, collection, tags + FROM processing + WHERE user = ? + """) + + return + self.insert_triples_stmt = self.cassandra.prepare(""" INSERT INTO triples ( @@ -202,17 +281,24 @@ class TableStore: VALUES (?, ?, ?, ?, ?, ?, ?) """) - def add(self, object_id, document): + async def document_exists(self, user, id): - if document.kind not in ( - "text/plain", "application/pdf" - ): - raise RequestError("Invalid document kind: " + document.kind) + resp = self.cassandra.execute( + self.test_document_exists_stmt, + ( user, id ) + ) - # Create random doc ID - when = int(time.time() * 1000) + # If a row exists, document exists. It's a cursor, can't just + # count the length - print("Adding", document.id, object_id) + for row in resp: + return True + + return False + + async def add_document(self, document, object_id): + + print("Adding document", document.id, object_id) metadata = [ ( @@ -229,10 +315,9 @@ class TableStore: resp = self.cassandra.execute( self.insert_document_stmt, ( - document.id, document.user, document.collection, - document.kind, object_id, when, - document.title, document.comments, - metadata + document.id, document.user, int(document.time * 1000), + document.kind, document.title, document.comments, + metadata, document.tags, object_id ) ) @@ -242,11 +327,71 @@ class TableStore: print("Exception:", type(e)) print(f"{e}, retry...", flush=True) - time.sleep(1) + await asyncio.sleep(1) print("Add complete", flush=True) - def add_triples(self, m): + async def update_document(self, document): + + print("Updating document", document.id) + + metadata = [ + ( + v.s.value, v.s.is_uri, v.p.value, v.p.is_uri, + v.o.value, v.o.is_uri + ) + for v in document.metadata + ] + + while True: + + try: + + resp = self.cassandra.execute( + self.update_document_stmt, + ( + int(document.time * 1000), document.title, + document.comments, metadata, document.tags, + document.user, document.id + ) + ) + + break + + except Exception as e: + + print("Exception:", type(e)) + print(f"{e}, retry...", flush=True) + await asyncio.sleep(1) + + print("Update complete", flush=True) + + async def remove_document(self, user, document_id): + + print("Removing document", document_id) + + while True: + + try: + + resp = self.cassandra.execute( + self.delete_document_stmt, + ( + user, document_id + ) + ) + + break + + except Exception as e: + + print("Exception:", type(e)) + print(f"{e}, retry...", flush=True) + await asyncio.sleep(1) + + print("Delete complete", flush=True) + + async def add_triples(self, m): when = int(time.time() * 1000) @@ -288,76 +433,235 @@ class TableStore: print("Exception:", type(e)) print(f"{e}, retry...", flush=True) - time.sleep(1) + await asyncio.sleep(1) - def list(self, user, collection=None): + async def list_documents(self, user): + + print("List documents...") - print("LIST") while True: - print("TRY") - - print(self.list_document_stmt) try: - if collection: - resp = self.cassandra.execute( - self.list_document_by_collection_stmt, - (user, collection) - ) - else: - resp = self.cassandra.execute( - self.list_document_stmt, - (user,) - ) - break + resp = self.cassandra.execute( + self.list_document_stmt, + (user,) + ) - print("OK") + break except Exception as e: print("Exception:", type(e)) print(f"{e}, retry...", flush=True) - time.sleep(1) + await asyncio.sleep(1) - print("OK2") - info = [ - DocumentInfo( + lst = [ + DocumentMetadata( id = row[0], - kind = row[1], - user = row[2], - collection = row[3], - title = row[4], - comments = row[5], - time = int(1000 * row[6].timestamp()), + user = user, + time = int(time.mktime(row[1].timetuple())), + kind = row[2], + title = row[3], + comments = row[4], metadata = [ Triple( s=Value(value=m[0], is_uri=m[1]), p=Value(value=m[2], is_uri=m[3]), o=Value(value=m[4], is_uri=m[5]) ) - for m in row[7] + for m in row[5] ], + tags = row[6], + object_id = row[7], ) for row in resp ] - print("OK3") + print("Done") - print(info[0]) + return lst - print(info[0].user) - print(info[0].time) - print(info[0].kind) - print(info[0].collection) - print(info[0].title) - print(info[0].comments) - print(info[0].metadata) - print(info[0].metadata) + async def get_document(self, user, id): - return info + print("Get document") - def add_graph_embeddings(self, m): + while True: + + try: + + resp = self.cassandra.execute( + self.get_document_stmt, + (user, id) + ) + + break + + except Exception as e: + print("Exception:", type(e)) + print(f"{e}, retry...", flush=True) + await asyncio.sleep(1) + + + for row in resp: + doc = DocumentMetadata( + id = id, + user = user, + time = int(time.mktime(row[0].timetuple())), + kind = row[1], + title = row[2], + comments = row[3], + metadata = [ + Triple( + s=Value(value=m[0], is_uri=m[1]), + p=Value(value=m[2], is_uri=m[3]), + o=Value(value=m[4], is_uri=m[5]) + ) + for m in row[4] + ], + tags = row[5], + object_id = row[6], + ) + + print("Done") + return doc + + raise RuntimeError("No such document row?") + + async def get_document_object_id(self, user, id): + + print("Get document obj ID") + + while True: + + try: + + resp = self.cassandra.execute( + self.get_document_stmt, + (user, id) + ) + + break + + except Exception as e: + print("Exception:", type(e)) + print(f"{e}, retry...", flush=True) + await asyncio.sleep(1) + + + for row in resp: + print("Done") + return row[6] + + raise RuntimeError("No such document row?") + + async def processing_exists(self, user, id): + + resp = self.cassandra.execute( + self.test_processing_exists_stmt, + ( user, id ) + ) + + # If a row exists, document exists. It's a cursor, can't just + # count the length + + for row in resp: + return True + + return False + + async def add_processing(self, processing): + + print("Adding processing", processing.id) + + while True: + + try: + + resp = self.cassandra.execute( + self.insert_processing_stmt, + ( + processing.id, processing.document_id, + int(processing.time * 1000), processing.flow, + processing.user, processing.collection, + processing.tags + ) + ) + + break + + except Exception as e: + + print("Exception:", type(e)) + print(f"{e}, retry...", flush=True) + await asyncio.sleep(1) + + print("Add complete", flush=True) + + async def remove_processing(self, user, processing_id): + + print("Removing processing", processing_id) + + while True: + + try: + + resp = self.cassandra.execute( + self.delete_processing_stmt, + ( + user, processing_id + ) + ) + + break + + except Exception as e: + + print("Exception:", type(e)) + print(f"{e}, retry...", flush=True) + await asyncio.sleep(1) + + print("Delete complete", flush=True) + + async def list_processing(self, user): + + print("List processing objects") + + while True: + + try: + + resp = self.cassandra.execute( + self.list_processing_stmt, + (user,) + ) + + break + + except Exception as e: + print("Exception:", type(e)) + print(f"{e}, retry...", flush=True) + await asyncio.sleep(1) + + + lst = [ + ProcessingMetadata( + id = row[0], + document_id = row[1], + time = int(time.mktime(row[2].timetuple())), + flow = row[3], + user = user, + collection = row[4], + tags = row[5], + ) + for row in resp + ] + + print("Done") + + return lst + + async def add_graph_embeddings(self, m): when = int(time.time() * 1000) @@ -399,9 +703,9 @@ class TableStore: print("Exception:", type(e)) print(f"{e}, retry...", flush=True) - time.sleep(1) + await asyncio.sleep(1) - def add_document_embeddings(self, m): + async def add_document_embeddings(self, m): when = int(time.time() * 1000) @@ -443,6 +747,6 @@ class TableStore: print("Exception:", type(e)) print(f"{e}, retry...", flush=True) - time.sleep(1) + await asyncio.sleep(1)