diff --git a/test-api/test-library-add-processing2 b/test-api/test-library-add-processing2 new file mode 100755 index 00000000..613da630 --- /dev/null +++ b/test-api/test-library-add-processing2 @@ -0,0 +1,50 @@ +#!/usr/bin/env python3 + +import requests +import json +import sys +import base64 +import time + +url = "http://localhost:8088/api/v1/" + +############################################################################ + +doc_id = "http://trustgraph.ai/doc/6d034da9-2759-45c2-af24-14db7f4c44c2" + +proc_id = "72be9c56-a63a-4dde-8f3c-9b35f2598b83" + +input = { + "operation": "add-processing", + "processing-metadata": { + "id": proc_id, + "document-id": doc_id, + "time": int(time.time()), + "flow": "0000", + "user": "trustgraph", + "collection": "default", + "tags": ["test"], + } +} + +resp = requests.post( + f"{url}librarian", + json=input, +) + +print(resp.text) +resp = resp.json() + +print(resp) + +if "error" in resp: + print(f"Error: {resp['error']}") + sys.exit(1) + +# print(resp["response"]) +print(resp) + +sys.exit(0) + +############################################################################ + diff --git a/trustgraph-flow/trustgraph/librarian/librarian.py b/trustgraph-flow/trustgraph/librarian/librarian.py index befad00a..7c09515a 100644 --- a/trustgraph-flow/trustgraph/librarian/librarian.py +++ b/trustgraph-flow/trustgraph/librarian/librarian.py @@ -14,7 +14,7 @@ class Librarian: self, cassandra_host, cassandra_user, cassandra_password, minio_host, minio_access_key, minio_secret_key, - bucket_name, keyspace, load_document, load_text, + bucket_name, keyspace, load_document, ): self.blob_store = BlobStore( @@ -26,7 +26,6 @@ class Librarian: ) self.load_document = load_document - self.load_text = load_text async def add_document(self, request): @@ -199,6 +198,14 @@ class Librarian: await self.table_store.add_processing(request.processing_metadata) + print("Invoke document processing...") + + await self.load_document( + document = doc, + processing = request.processing_metadata, + content = content, + ) + print("Add complete", flush=True) return LibrarianResponse( @@ -208,12 +215,6 @@ class Librarian: document_metadatas = None, processing_metadatas = None, ) - - - # if document.kind == "application/pdf": - # await self.load_document(document) - # elif document.kind == "text/plain": - # await self.load_text(document) async def remove_processing(self, request): diff --git a/trustgraph-flow/trustgraph/librarian/service.py b/trustgraph-flow/trustgraph/librarian/service.py index d24d2d70..4c18b9a5 100755 --- a/trustgraph-flow/trustgraph/librarian/service.py +++ b/trustgraph-flow/trustgraph/librarian/service.py @@ -113,7 +113,6 @@ class Processor(AsyncProcessor): bucket_name = bucket_name, keyspace = keyspace, load_document = self.load_document, - load_text = self.load_text, ) self.register_config_handler(self.on_librarian_config) @@ -145,38 +144,66 @@ class Processor(AsyncProcessor): pass - async def load_document(self, document): + async def load_document(self, document, processing, content): - doc = Document( - metadata = Metadata( - id = document.id, - metadata = document.metadata, - user = document.user, - collection = document.collection - ), - data = document.document + print("Ready for processing...") + + print(document, processing, len(content)) + + if processing.flow not in self.flows: + raise RuntimeError("Invalid flow ID") + + flow = self.flows[processing.flow] + + if document.kind == "text/plain": + kind = "text-load" + elif document.kind == "application/pdf": + kind = "document-load" + else: + raise RuntimeError("Document with a MIME type I don't know") + + q = flow["interfaces"][kind] + + if kind == "text-load": + doc = TextDocument( + metadata = Metadata( + id = document.id, + metadata = document.metadata, + user = processing.user, + collection = processing.collection + ), + text = content, + ) + schema = TextDocument + else: + doc = Document( + metadata = Metadata( + id = document.id, + metadata = document.metadata, + user = processing.user, + collection = processing.collection + ), + data = base64.b64encode(content).decode("utf-8") + + ) + schema = Document + + print(f"Submit on queue {q}...") + + pub = Publisher( + self.pulsar_client, q, schema=schema ) - + await pub.start() - self.document_load.send(None, doc) + # FIXME: Time wait kludge? + await asyncio.sleep(1) - async def load_text(self, document): + await pub.send(None, doc) - text = base64.b64decode(document.document) - text = text.decode("utf-8") + await pub.stop() - doc = TextDocument( - metadata = Metadata( - id = document.id, - metadata = document.metadata, - user = document.user, - collection = document.collection - ), - text = text, - ) - - self.text_load.send(None, doc) + print("Document submitted") async def process_request(self, v):