diff --git a/trustgraph-base/trustgraph/schema/library.py b/trustgraph-base/trustgraph/schema/library.py index 11006ad8..ed52b2ad 100644 --- a/trustgraph-base/trustgraph/schema/library.py +++ b/trustgraph-base/trustgraph/schema/library.py @@ -1,11 +1,21 @@ -from pulsar.schema import Record, Bytes, String, Array +from pulsar.schema import Record, Bytes, String, Array, Long from . types import Triple from . topic import topic from . types import Error from . metadata import Metadata from . documents import Document, TextDocument +# add +# -> (id, document) +# <- () +# <- (error) + +# list +# -> (user, collection?) +# <- (info) +# <- (error) + # add(Metadata, Bytes) : error? # copy(id, user, collection) # move(id, user, collection) @@ -17,19 +27,25 @@ from . documents import Document, TextDocument # search([]) : id[] class DocumentPackage(Record): - metadata = Array(Triple()) + id = String() document = Bytes() kind = String() user = String() collection = String() title = String() comments = String() + time = Long() + metadata = Array(Triple()) class DocumentInfo(Record): - metadata = Array(Triple()) + id = String() kind = String() user = String() collection = String() + title = String() + comments = String() + time = Long() + metadata = Array(Triple()) class Criteria(Record): key = String() diff --git a/trustgraph-flow/trustgraph/gateway/librarian.py b/trustgraph-flow/trustgraph/gateway/librarian.py index d899eae5..e6ff7ce3 100644 --- a/trustgraph-flow/trustgraph/gateway/librarian.py +++ b/trustgraph-flow/trustgraph/gateway/librarian.py @@ -53,7 +53,10 @@ class LibrarianRequestor(ServiceRequestor): response["document"] = serialize_document_package(message.document) if message.info: - response["info"] = serialize_document_info(message.info) + response["info"] = [ + serialize_document_info(v) + for v in message.info + ] return response, True diff --git a/trustgraph-flow/trustgraph/gateway/serialize.py b/trustgraph-flow/trustgraph/gateway/serialize.py index 4cad220e..5cc90a78 100644 --- a/trustgraph-flow/trustgraph/gateway/serialize.py +++ b/trustgraph-flow/trustgraph/gateway/serialize.py @@ -84,6 +84,9 @@ def serialize_document_package(message): ret = {} + if message.id: + ret["id"] = message.id + if message.metadata: ret["metadata"] = serialize_subgraph(message.metdata) @@ -108,8 +111,8 @@ def serialize_document_info(message): ret = {} - if message.metadata: - ret["metadata"] = serialize_subgraph(message.metdata) + if message.id: + ret["id"] = message.id if message.kind: ret["kind"] = message.kind @@ -120,25 +123,45 @@ def serialize_document_info(message): if message.collection: ret["collection"] = message.collection + if message.title: + ret["title"] = message.title + + if message.comments: + ret["comments"] = message.comments + + if message.time: + ret["time"] = message.time + + if message.metadata: + ret["metadata"] = serialize_subgraph(message.metadata) + return ret def to_document_package(x): return DocumentPackage( - metadata = to_subgraph(x["metadata"]), - document = x.get("document", None), + id = x.get("id", None), kind = x.get("kind", None), user = x.get("user", None), collection = x.get("collection", None), + title = x.get("title", None), + comments = x.get("comments", None), + time = x.get("time", None), + document = x.get("document", None), + metadata = to_subgraph(x["metadata"]), ) def to_document_info(x): return DocumentInfo( - metadata = to_subgraph(x["metadata"]), + id = x.get("id", None), kind = x.get("kind", None), user = x.get("user", None), collection = x.get("collection", None), + title = x.get("title", None), + comments = x.get("comments", None), + time = x.get("time", None), + metadata = to_subgraph(x["metadata"]), ) def to_criteria(x): diff --git a/trustgraph-flow/trustgraph/librarian/librarian.py b/trustgraph-flow/trustgraph/librarian/librarian.py index 84ab3793..9bccc37a 100644 --- a/trustgraph-flow/trustgraph/librarian/librarian.py +++ b/trustgraph-flow/trustgraph/librarian/librarian.py @@ -1,4 +1,4 @@ -from .. schema import LibrarianRequest, LibrarianResponse, Error +from .. schema import LibrarianRequest, LibrarianResponse, Error, Triple from .. knowledge import hash from .. exceptions import RequestError from . table_store import TableStore @@ -26,7 +26,7 @@ class Librarian: self.load_document = load_document self.load_text = load_text - async def add(self, id, document): + async def add(self, document): if document.kind not in ( "text/plain", "application/pdf" @@ -41,9 +41,9 @@ class Librarian: self.table_store.add(object_id, document) if document.kind == "application/pdf": - await self.load_document(id, document) + await self.load_document(document) elif document.kind == "text/plain": - await self.load_text(id, document) + await self.load_text(document) print("Add complete", flush=True) @@ -53,6 +53,20 @@ class Librarian: info = None, ) + async def list(self, user, collection): + + print("list") + + info = self.table_store.list(user, collection) + + print(">>", info) + + return LibrarianResponse( + error = None, + document = None, + info = info, + ) + def handle_triples(self, m): self.table_store.add_triples(m) diff --git a/trustgraph-flow/trustgraph/librarian/service.py b/trustgraph-flow/trustgraph/librarian/service.py index 5e7153bb..b42123a5 100755 --- a/trustgraph-flow/trustgraph/librarian/service.py +++ b/trustgraph-flow/trustgraph/librarian/service.py @@ -223,11 +223,11 @@ class Processor(ConsumerProducer): self.librarian.handle_document_embeddings(msg) - async def load_document(self, id, document): + async def load_document(self, document): doc = Document( metadata = Metadata( - id = id, + id = document.id, metadata = document.metadata, user = document.user, collection = document.collection @@ -237,14 +237,14 @@ class Processor(ConsumerProducer): self.document_load.send(None, doc) - async def load_text(self, id, document): + async def load_text(self, document): text = base64.b64decode(document.document) text = text.decode("utf-8") doc = TextDocument( metadata = Metadata( - id = id, + id = document.id, metadata = document.metadata, user = document.user, collection = document.collection @@ -259,19 +259,33 @@ class Processor(ConsumerProducer): if v.operation is None: raise RequestError("Null operation") + print("op", v.operation) + if v.operation == "add": if ( - v.id and v.document and v.document.metadata and + v.document and v.document.id and v.document.metadata and v.document.document and v.document.kind ): return partial( self.librarian.add, - id = v.id, document = v.document, ) else: raise RequestError("Invalid call") + if v.operation == "list": + print("list", v) + print(v.user) + if v.user: + return partial( + self.librarian.list, + user = v.user, + collection = v.collection, + ) + else: + print("BROK") + raise RequestError("Invalid call") + raise RequestError("Invalid operation: " + v.operation) async def handle(self, msg): @@ -298,6 +312,7 @@ class Processor(ConsumerProducer): try: resp = await func() + print("->", resp) except RequestError as e: resp = LibrarianResponse( error = Error( @@ -318,7 +333,7 @@ class Processor(ConsumerProducer): await self.send(resp, properties={"id": id}) return - print("Send response...", flush=True) + print("Send response..!.", flush=True) await self.send(resp, properties={"id": id}) diff --git a/trustgraph-flow/trustgraph/librarian/table_store.py b/trustgraph-flow/trustgraph/librarian/table_store.py index 18d18781..ca10a05e 100644 --- a/trustgraph-flow/trustgraph/librarian/table_store.py +++ b/trustgraph-flow/trustgraph/librarian/table_store.py @@ -1,4 +1,5 @@ -from .. schema import LibrarianRequest, LibrarianResponse, Error +from .. schema import LibrarianRequest, LibrarianResponse +from .. schema import DocumentInfo, Error, Triple, Value from .. knowledge import hash from .. exceptions import RequestError @@ -61,7 +62,7 @@ class TableStore: CREATE TABLE IF NOT EXISTS document ( user text, collection text, - id uuid, + id text, time timestamp, title text, comments text, @@ -157,6 +158,20 @@ class TableStore: VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) """) + self.list_document_stmt = self.cassandra.prepare(""" + SELECT + id, kind, user, collection, title, comments, time, metadata + FROM document + WHERE user = ? + """) + + self.list_document_by_collection_stmt = self.cassandra.prepare(""" + SELECT + id, kind, user, collection, title, comments, time, metadata + FROM document + WHERE user = ? AND collection = ? + """) + self.insert_triples_stmt = self.cassandra.prepare(""" INSERT INTO triples ( @@ -192,10 +207,9 @@ class TableStore: raise RequestError("Invalid document kind: " + document.kind) # Create random doc ID - doc_id = uuid.uuid4() when = int(time.time() * 1000) - print("Adding", object_id, doc_id) + print("Adding", document.id, object_id) metadata = [ ( @@ -205,8 +219,6 @@ class TableStore: for v in document.metadata ] - # FIXME: doc_id should be the user-supplied ID??? - while True: try: @@ -214,7 +226,7 @@ class TableStore: resp = self.cassandra.execute( self.insert_document_stmt, ( - doc_id, document.user, document.collection, + document.id, document.user, document.collection, document.kind, object_id, when, document.title, document.comments, metadata @@ -275,6 +287,73 @@ class TableStore: print(f"{e}, retry...", flush=True) time.sleep(1) + def list(self, user, collection=None): + + print("LIST") + while True: + + print("TRY") + + print(self.list_document_stmt) + try: + + if collection: + resp = self.cassandra.execute( + self.list_document_by_collection_stmt, + (user, collection) + ) + else: + resp = self.cassandra.execute( + self.list_document_stmt, + (user,) + ) + break + + print("OK") + + except Exception as e: + print("Exception:", type(e)) + print(f"{e}, retry...", flush=True) + time.sleep(1) + + print("OK2") + + info = [ + DocumentInfo( + id = row[0], + kind = row[1], + user = row[2], + collection = row[3], + title = row[4], + comments = row[5], + time = int(1000 * row[6].timestamp()), + metadata = [ + Triple( + s=Value(value=m[0], is_uri=m[1]), + p=Value(value=m[2], is_uri=m[3]), + o=Value(value=m[4], is_uri=m[5]) + ) + for m in row[7] + ], + ) + for row in resp + ] + + print("OK3") + + print(info[0]) + + print(info[0].user) + print(info[0].time) + print(info[0].kind) + print(info[0].collection) + print(info[0].title) + print(info[0].comments) + print(info[0].metadata) + print(info[0].metadata) + + return info + def add_graph_embeddings(self, m): when = int(time.time() * 1000)