diff --git a/templates/components/trustgraph.jsonnet b/templates/components/trustgraph.jsonnet index 833d932b..0c07ea4e 100644 --- a/templates/components/trustgraph.jsonnet +++ b/templates/components/trustgraph.jsonnet @@ -1,6 +1,8 @@ local base = import "base/base.jsonnet"; local images = import "values/images.jsonnet"; local url = import "values/url.jsonnet"; +local minio = import "stores/minio.jsonnet"; +local cassandra = import "stores/cassandra.jsonnet"; { @@ -182,3 +184,6 @@ local url = import "values/url.jsonnet"; } + // Minio and Cassandra are used by the Librarian + + minio + cassandra + diff --git a/templates/generate b/templates/generate index 6f915cda..2569a4c0 100755 --- a/templates/generate +++ b/templates/generate @@ -50,10 +50,13 @@ class Generator: self.templates.joinpath(filename), self.resources.joinpath(dir, filename), self.resources.joinpath(filename), + pathlib.Path(dir).joinpath(filename), ] else: candidates = [ self.templates.joinpath(filename), + pathlib.Path(dir).joinpath(filename), + pathlib.Path(filename), ] try: @@ -86,7 +89,7 @@ class Packager: def __init__(self): self.templates = pathlib.Path("./templates") - self.resources = pathlib.Path("./resources") + self.resources = pathlib.Path("./") def process( self, config, version="0.0.0", platform="docker-compose", diff --git a/templates/stores/milvus.jsonnet b/templates/stores/milvus.jsonnet index cbeb4268..1c3e3734 100644 --- a/templates/stores/milvus.jsonnet +++ b/templates/stores/milvus.jsonnet @@ -1,7 +1,8 @@ local base = import "base/base.jsonnet"; local images = import "values/images.jsonnet"; +local minio = import "stores/minio.jsonnet"; -{ +minio { etcd +: { @@ -47,47 +48,6 @@ local images = import "values/images.jsonnet"; }, - mino +: { - - create:: function(engine) - - local vol = engine.volume("minio-data").with_size("20G"); - - local container = - engine.container("minio") - .with_image(images.minio) - .with_command([ - "minio", - "server", - "/minio_data", - "--console-address", - ":9001", - ]) - .with_environment({ - MINIO_ROOT_USER: "minioadmin", - MINIO_ROOT_PASSWORD: "minioadmin", - }) - .with_limits("0.5", "128M") - .with_reservations("0.25", "128M") - .with_port(9001, 9001, "api") - .with_volume_mount(vol, "/minio_data"); - - local containerSet = engine.containers( - "etcd", [ container ] - ); - - local service = - engine.service(containerSet) - .with_port(9001, 9001, "api"); - - engine.resources([ - vol, - containerSet, - service, - ]) - - }, - milvus +: { create:: function(engine) diff --git a/templates/stores/minio.jsonnet b/templates/stores/minio.jsonnet new file mode 100644 index 00000000..6ef1d96f --- /dev/null +++ b/templates/stores/minio.jsonnet @@ -0,0 +1,49 @@ +local base = import "base/base.jsonnet"; +local images = import "values/images.jsonnet"; + +{ + + minio +: { + + create:: function(engine) + + local vol = engine.volume("minio-data").with_size("20G"); + + local container = + engine.container("minio") + .with_image(images.minio) + .with_command([ + "minio", + "server", + "/minio_data", + "--console-address", + ":9001", + ]) + .with_environment({ + MINIO_ROOT_USER: "minioadmin", + MINIO_ROOT_PASSWORD: "minioadmin", + }) + .with_limits("0.5", "128M") + .with_reservations("0.25", "128M") + .with_port(9000, 9000, "api") + .with_port(9001, 9001, "console") + .with_volume_mount(vol, "/minio_data"); + + local containerSet = engine.containers( + "etcd", [ container ] + ); + + local service = + engine.service(containerSet) + .with_port(9000, 9000, "api") + .with_port(9001, 9001, "console"); + + engine.resources([ + vol, + containerSet, + service, + ]) + + }, + +} diff --git a/templates/values/images.jsonnet b/templates/values/images.jsonnet index d515a450..3e3d3e75 100644 --- a/templates/values/images.jsonnet +++ b/templates/values/images.jsonnet @@ -5,7 +5,7 @@ local version = import "version.jsonnet"; pulsar: "docker.io/apachepulsar/pulsar:3.3.1", pulsar_manager: "docker.io/apachepulsar/pulsar-manager:v0.4.0", etcd: "quay.io/coreos/etcd:v3.5.15", - minio: "docker.io/minio/minio:RELEASE.2024-08-17T01-24-54Z", + minio: "docker.io/minio/minio:RELEASE.2025-02-03T21-03-04Z", milvus: "docker.io/milvusdb/milvus:v2.4.9", prometheus: "docker.io/prom/prometheus:v2.53.2", grafana: "docker.io/grafana/grafana:11.1.4", diff --git a/trustgraph-base/trustgraph/base/__init__.py b/trustgraph-base/trustgraph/base/__init__.py index b9dba4fa..3a58d51e 100644 --- a/trustgraph-base/trustgraph/base/__init__.py +++ b/trustgraph-base/trustgraph/base/__init__.py @@ -3,4 +3,6 @@ from . base_processor import BaseProcessor from . consumer import Consumer from . producer import Producer from . consumer_producer import ConsumerProducer +from . publisher import Publisher +from . subscriber import Subscriber diff --git a/trustgraph-base/trustgraph/base/base_processor.py b/trustgraph-base/trustgraph/base/base_processor.py index f258ff1a..b0fdd1bb 100644 --- a/trustgraph-base/trustgraph/base/base_processor.py +++ b/trustgraph-base/trustgraph/base/base_processor.py @@ -28,15 +28,19 @@ class BaseProcessor: }) pulsar_host = params.get("pulsar_host", self.default_pulsar_host) + pulsar_listener = params.get("pulsar_listener", None) log_level = params.get("log_level", LogLevel.INFO) self.pulsar_host = pulsar_host self.client = pulsar.Client( pulsar_host, + listener_name=pulsar_listener, logger=pulsar.ConsoleLogger(log_level.to_pulsar()) ) + self.pulsar_listener = pulsar_listener + def __del__(self): if hasattr(self, "client"): @@ -52,6 +56,11 @@ class BaseProcessor: help=f'Pulsar host (default: {__class__.default_pulsar_host})', ) + parser.add_argument( + '--pulsar-listener', + help=f'Pulsar listener (default: none)', + ) + parser.add_argument( '-l', '--log-level', type=LogLevel, diff --git a/trustgraph-flow/trustgraph/gateway/publisher.py b/trustgraph-base/trustgraph/base/publisher.py similarity index 77% rename from trustgraph-flow/trustgraph/gateway/publisher.py rename to trustgraph-base/trustgraph/base/publisher.py index f79cfa1e..b8fbedce 100644 --- a/trustgraph-flow/trustgraph/gateway/publisher.py +++ b/trustgraph-base/trustgraph/base/publisher.py @@ -14,14 +14,22 @@ class Publisher: self.q = queue.Queue(maxsize=max_size) self.chunking_enabled = chunking_enabled self.listener_name = listener + self.running = True def start(self): self.task = threading.Thread(target=self.run) self.task.start() + def stop(self): + self.running = False + + def join(self): + self.stop() + self.task.join() + def run(self): - while True: + while self.running: try: @@ -35,9 +43,12 @@ class Publisher: chunking_enabled=self.chunking_enabled, ) - while True: + while self.running: - id, item = self.q.get() + try: + id, item = self.q.get(timeout=0.5) + except queue.Empty: + continue if id: producer.send(item, { "id": id }) @@ -52,3 +63,5 @@ class Publisher: def send(self, id, msg): self.q.put((id, msg)) + + diff --git a/trustgraph-flow/trustgraph/gateway/subscriber.py b/trustgraph-base/trustgraph/base/subscriber.py similarity index 89% rename from trustgraph-flow/trustgraph/gateway/subscriber.py rename to trustgraph-base/trustgraph/base/subscriber.py index 2eca5a36..606c765a 100644 --- a/trustgraph-flow/trustgraph/gateway/subscriber.py +++ b/trustgraph-base/trustgraph/base/subscriber.py @@ -18,14 +18,21 @@ class Subscriber: self.max_size = max_size self.lock = threading.Lock() self.listener_name = listener + self.running = True def start(self): self.task = threading.Thread(target=self.run) self.task.start() + def stop(self): + self.running = False + + def join(self): + self.task.join() + def run(self): - while True: + while self.running: try: @@ -41,7 +48,7 @@ class Subscriber: schema=self.schema, ) - while True: + while self.running: msg = consumer.receive() @@ -59,12 +66,14 @@ class Subscriber: if id in self.q: try: + # FIXME: Timeout means data goes missing self.q[id].put(value, timeout=0.5) except: pass for q in self.full.values(): try: + # FIXME: Timeout means data goes missing q.put(value, timeout=0.5) except: pass diff --git a/trustgraph-base/trustgraph/exceptions.py b/trustgraph-base/trustgraph/exceptions.py index afe72ccc..09f098df 100644 --- a/trustgraph-base/trustgraph/exceptions.py +++ b/trustgraph-base/trustgraph/exceptions.py @@ -8,3 +8,6 @@ class LlmError(Exception): class ParseError(Exception): pass +class RequestError(Exception): + pass + diff --git a/trustgraph-base/trustgraph/schema/__init__.py b/trustgraph-base/trustgraph/schema/__init__.py index be41b670..9c44a743 100644 --- a/trustgraph-base/trustgraph/schema/__init__.py +++ b/trustgraph-base/trustgraph/schema/__init__.py @@ -10,5 +10,6 @@ from . retrieval import * from . metadata import * from . agent import * from . lookup import * +from . library import * diff --git a/trustgraph-base/trustgraph/schema/library.py b/trustgraph-base/trustgraph/schema/library.py new file mode 100644 index 00000000..8ab88842 --- /dev/null +++ b/trustgraph-base/trustgraph/schema/library.py @@ -0,0 +1,56 @@ + +from pulsar.schema import Record, Bytes, String, Array +from . types import Triple +from . topic import topic +from . types import Error +from . metadata import Metadata +from . documents import Document, TextDocument + +# add(Metadata, Bytes) : error? +# copy(id, user, collection) +# move(id, user, collection) +# delete(id) +# get(id) : Bytes +# reindex(id) +# list(user, collection) : id[] +# info(id[]) : DocumentInfo[] +# search([]) : id[] + +class DocumentPackage(Record): + metadata = Array(Triple()) + document = Bytes() + kind = String() + user = String() + collection = String() + +class DocumentInfo(Record): + metadata = Array(Triple()) + kind = String() + user = String() + collection = String() + +class Criteria(Record): + key = String() + value = String() + operator = String() + +class LibrarianRequest(Record): + operation = String() + id = String() + document = DocumentPackage() + user = String() + collection = String() + criteria = Array(Criteria()) + +class LibrarianResponse(Record): + error = Error() + document = DocumentPackage() + info = Array(DocumentInfo()) + +librarian_request_queue = topic( + 'librarian', kind='non-persistent', namespace='request' +) +librarian_response_queue = topic( + 'librarian', kind='non-persistent', namespace='response', +) + diff --git a/trustgraph-flow/scripts/librarian b/trustgraph-flow/scripts/librarian new file mode 100755 index 00000000..9f6458ab --- /dev/null +++ b/trustgraph-flow/scripts/librarian @@ -0,0 +1,6 @@ +#!/usr/bin/env python3 + +from trustgraph.librarian import run + +run() + diff --git a/trustgraph-flow/setup.py b/trustgraph-flow/setup.py index e56aa694..fe167e90 100644 --- a/trustgraph-flow/setup.py +++ b/trustgraph-flow/setup.py @@ -49,6 +49,7 @@ setuptools.setup( "langchain-community", "langchain-core", "langchain-text-splitters", + "minio", "neo4j", "ollama", "openai", @@ -78,8 +79,8 @@ setuptools.setup( "scripts/de-write-qdrant", "scripts/document-embeddings", "scripts/document-rag", - "scripts/embeddings-ollama", "scripts/embeddings-fastembed", + "scripts/embeddings-ollama", "scripts/ge-query-milvus", "scripts/ge-query-pinecone", "scripts/ge-query-qdrant", @@ -91,6 +92,7 @@ setuptools.setup( "scripts/kg-extract-definitions", "scripts/kg-extract-relationships", "scripts/kg-extract-topics", + "scripts/librarian", "scripts/metering", "scripts/object-extract-row", "scripts/oe-write-milvus", diff --git a/trustgraph-flow/trustgraph/gateway/document_embeddings_load.py b/trustgraph-flow/trustgraph/gateway/document_embeddings_load.py index 1a7f635d..39228221 100644 --- a/trustgraph-flow/trustgraph/gateway/document_embeddings_load.py +++ b/trustgraph-flow/trustgraph/gateway/document_embeddings_load.py @@ -7,8 +7,8 @@ from aiohttp import WSMsgType from .. schema import Metadata from .. schema import DocumentEmbeddings, ChunkEmbeddings from .. schema import document_embeddings_store_queue +from .. base import Publisher -from . publisher import Publisher from . socket import SocketEndpoint from . serialize import to_subgraph diff --git a/trustgraph-flow/trustgraph/gateway/document_embeddings_stream.py b/trustgraph-flow/trustgraph/gateway/document_embeddings_stream.py index 99cfb0a9..fa19fe06 100644 --- a/trustgraph-flow/trustgraph/gateway/document_embeddings_stream.py +++ b/trustgraph-flow/trustgraph/gateway/document_embeddings_stream.py @@ -6,8 +6,8 @@ import uuid from .. schema import DocumentEmbeddings from .. schema import document_embeddings_store_queue +from .. base import Subscriber -from . subscriber import Subscriber from . socket import SocketEndpoint from . serialize import serialize_document_embeddings diff --git a/trustgraph-flow/trustgraph/gateway/endpoint.py b/trustgraph-flow/trustgraph/gateway/endpoint.py index 1f38c489..5005463c 100644 --- a/trustgraph-flow/trustgraph/gateway/endpoint.py +++ b/trustgraph-flow/trustgraph/gateway/endpoint.py @@ -5,8 +5,8 @@ from aiohttp import web import uuid import logging -from . publisher import Publisher -from . subscriber import Subscriber +from .. base import Publisher +from .. base import Subscriber logger = logging.getLogger("endpoint") logger.setLevel(logging.INFO) diff --git a/trustgraph-flow/trustgraph/gateway/graph_embeddings_load.py b/trustgraph-flow/trustgraph/gateway/graph_embeddings_load.py index 86f2016b..e3fa1302 100644 --- a/trustgraph-flow/trustgraph/gateway/graph_embeddings_load.py +++ b/trustgraph-flow/trustgraph/gateway/graph_embeddings_load.py @@ -7,8 +7,8 @@ from aiohttp import WSMsgType from .. schema import Metadata from .. schema import GraphEmbeddings, EntityEmbeddings from .. schema import graph_embeddings_store_queue +from .. base import Publisher -from . publisher import Publisher from . socket import SocketEndpoint from . serialize import to_subgraph, to_value diff --git a/trustgraph-flow/trustgraph/gateway/graph_embeddings_stream.py b/trustgraph-flow/trustgraph/gateway/graph_embeddings_stream.py index b6b5403f..fa6ace3a 100644 --- a/trustgraph-flow/trustgraph/gateway/graph_embeddings_stream.py +++ b/trustgraph-flow/trustgraph/gateway/graph_embeddings_stream.py @@ -6,8 +6,8 @@ import uuid from .. schema import GraphEmbeddings from .. schema import graph_embeddings_store_queue +from .. base import Subscriber -from . subscriber import Subscriber from . socket import SocketEndpoint from . serialize import serialize_graph_embeddings diff --git a/trustgraph-flow/trustgraph/gateway/librarian.py b/trustgraph-flow/trustgraph/gateway/librarian.py new file mode 100644 index 00000000..78de7970 --- /dev/null +++ b/trustgraph-flow/trustgraph/gateway/librarian.py @@ -0,0 +1,57 @@ + +from .. schema import LibrarianRequest, LibrarianResponse, Triples +from .. schema import librarian_request_queue +from .. schema import librarian_response_queue + +from . endpoint import ServiceEndpoint +from . requestor import ServiceRequestor +from . serialize import serialize_document_package, serialize_document_info +from . serialize import to_document_package, to_document_info, to_criteria + +class LibrarianRequestor(ServiceRequestor): + def __init__(self, pulsar_host, timeout, auth): + + super(LibrarianRequestor, self).__init__( + pulsar_host=pulsar_host, + request_queue=librarian_request_queue, + response_queue=librarian_response_queue, + request_schema=LibrarianRequest, + response_schema=LibrarianResponse, + timeout=timeout, + ) + + def to_request(self, body): + + if "document" in body: + dp = to_document_package(body["document"]) + else: + dp = None + + if "criteria" in body: + criteria = to_criteria(body["criteria"]) + else: + criteria = None + + limit = int(body.get("limit", 10000)) + + return LibrarianRequest( + operation = body.get("operation", None), + id = body.get("id", None), + document = dp, + user = body.get("user", None), + collection = body.get("collection", None), + criteria = criteria, + ) + + def from_response(self, message): + + response = {} + + if message.document: + response["document"] = serialize_document_package(message.document) + + if message.info: + response["info"] = serialize_document_info(message.info) + + return response, True + diff --git a/trustgraph-flow/trustgraph/gateway/requestor.py b/trustgraph-flow/trustgraph/gateway/requestor.py index 567c9de7..41b69a3f 100644 --- a/trustgraph-flow/trustgraph/gateway/requestor.py +++ b/trustgraph-flow/trustgraph/gateway/requestor.py @@ -4,8 +4,8 @@ from pulsar.schema import JsonSchema import uuid import logging -from . publisher import Publisher -from . subscriber import Subscriber +from .. base import Publisher +from .. base import Subscriber logger = logging.getLogger("requestor") logger.setLevel(logging.INFO) @@ -68,7 +68,10 @@ class ServiceRequestor: raise RuntimeError("Timeout") if resp.error: - err = { "error": resp.error.message } + err = { "error": { + "type": resp.error.type, + "message": resp.error.message, + } } if responder: await responder(err, True) return err @@ -87,7 +90,10 @@ class ServiceRequestor: logging.error(f"Exception: {e}") - err = { "error": str(e) } + err = { "error": { + "type": "gateway-error", + "message": str(e), + } } if responder: await responder(err, True) return err diff --git a/trustgraph-flow/trustgraph/gateway/sender.py b/trustgraph-flow/trustgraph/gateway/sender.py index c5bb2e17..036ffaf8 100644 --- a/trustgraph-flow/trustgraph/gateway/sender.py +++ b/trustgraph-flow/trustgraph/gateway/sender.py @@ -6,7 +6,7 @@ from pulsar.schema import JsonSchema import uuid import logging -from . publisher import Publisher +from .. base import Publisher logger = logging.getLogger("sender") logger.setLevel(logging.INFO) diff --git a/trustgraph-flow/trustgraph/gateway/serialize.py b/trustgraph-flow/trustgraph/gateway/serialize.py index 40b6efc5..552105c3 100644 --- a/trustgraph-flow/trustgraph/gateway/serialize.py +++ b/trustgraph-flow/trustgraph/gateway/serialize.py @@ -1,4 +1,7 @@ -from .. schema import Value, Triple + +import base64 + +from .. schema import Value, Triple, DocumentPackage, DocumentInfo def to_value(x): return Value(value=x["v"], is_uri=x["e"]) @@ -77,3 +80,69 @@ def serialize_document_embeddings(message): ], } +def serialize_document_package(message): + + ret = {} + + if message.metadata: + ret["metadata"] = serialize_subgraph(message.metdata) + + if message.document: + blob = base64.b64encode( + message.document.encode("utf-8") + ).decode("utf-8") + ret["document"] = blob + + if message.kind: + ret["kind"] = message.kind + + if message.user: + ret["user"] = message.user + + if message.collection: + ret["collection"] = message.collection + + return ret + +def serialize_document_info(message): + + ret = {} + + if message.metadata: + ret["metadata"] = serialize_subgraph(message.metdata) + + if message.kind: + ret["kind"] = message.kind + + if message.user: + ret["user"] = message.user + + if message.collection: + ret["collection"] = message.collection + + return ret + +def to_document_package(x): + + return DocumentPackage( + metadata = to_subgraph(x["metadata"]), + document = base64.b64decode(x["document"].encode("utf-8")), + kind = x.get("kind", None), + user = x.get("user", None), + collection = x.get("collection", None), + ) + +def to_document_info(x): + + return DocumentInfo( + metadata = to_subgraph(x["metadata"]), + kind = x.get("kind", None), + user = x.get("user", None), + collection = x.get("collection", None), + ) + +def to_criteria(x): + return [ + Critera(v["key"], v["value"], v["operator"]) + for v in x + ] diff --git a/trustgraph-flow/trustgraph/gateway/service.py b/trustgraph-flow/trustgraph/gateway/service.py index 644731e2..c473f7f5 100755 --- a/trustgraph-flow/trustgraph/gateway/service.py +++ b/trustgraph-flow/trustgraph/gateway/service.py @@ -26,8 +26,6 @@ from .. log_level import LogLevel from . serialize import to_subgraph from . running import Running -from . publisher import Publisher -from . subscriber import Subscriber from . text_completion import TextCompletionRequestor from . prompt import PromptRequestor from . graph_rag import GraphRagRequestor @@ -39,6 +37,7 @@ from . encyclopedia import EncyclopediaRequestor from . agent import AgentRequestor from . dbpedia import DbpediaRequestor from . internet_search import InternetSearchRequestor +from . librarian import LibrarianRequestor from . triples_stream import TriplesStreamEndpoint from . graph_embeddings_stream import GraphEmbeddingsStreamEndpoint from . document_embeddings_stream import DocumentEmbeddingsStreamEndpoint @@ -123,6 +122,10 @@ class Api: pulsar_host=self.pulsar_host, timeout=self.timeout, auth = self.auth, ), + "librarian": LibrarianRequestor( + pulsar_host=self.pulsar_host, timeout=self.timeout, + auth = self.auth, + ), "encyclopedia": EncyclopediaRequestor( pulsar_host=self.pulsar_host, timeout=self.timeout, auth = self.auth, @@ -177,6 +180,10 @@ class Api: endpoint_path = "/api/v1/agent", auth=self.auth, requestor = self.services["agent"], ), + ServiceEndpoint( + endpoint_path = "/api/v1/librarian", auth=self.auth, + requestor = self.services["librarian"], + ), ServiceEndpoint( endpoint_path = "/api/v1/encyclopedia", auth=self.auth, requestor = self.services["encyclopedia"], diff --git a/trustgraph-flow/trustgraph/gateway/triples_load.py b/trustgraph-flow/trustgraph/gateway/triples_load.py index 2689f3ad..f2ebd040 100644 --- a/trustgraph-flow/trustgraph/gateway/triples_load.py +++ b/trustgraph-flow/trustgraph/gateway/triples_load.py @@ -7,8 +7,8 @@ from aiohttp import WSMsgType from .. schema import Metadata from .. schema import Triples from .. schema import triples_store_queue +from .. base import Publisher -from . publisher import Publisher from . socket import SocketEndpoint from . serialize import to_subgraph diff --git a/trustgraph-flow/trustgraph/gateway/triples_stream.py b/trustgraph-flow/trustgraph/gateway/triples_stream.py index 8048d3cc..d1ea7bf5 100644 --- a/trustgraph-flow/trustgraph/gateway/triples_stream.py +++ b/trustgraph-flow/trustgraph/gateway/triples_stream.py @@ -6,8 +6,8 @@ import uuid from .. schema import Triples from .. schema import triples_store_queue +from .. base import Subscriber -from . subscriber import Subscriber from . socket import SocketEndpoint from . serialize import serialize_triples diff --git a/trustgraph-flow/trustgraph/librarian/__init__.py b/trustgraph-flow/trustgraph/librarian/__init__.py new file mode 100644 index 00000000..ba844705 --- /dev/null +++ b/trustgraph-flow/trustgraph/librarian/__init__.py @@ -0,0 +1,3 @@ + +from . service import * + diff --git a/trustgraph-flow/trustgraph/librarian/__main__.py b/trustgraph-flow/trustgraph/librarian/__main__.py new file mode 100755 index 00000000..e9136855 --- /dev/null +++ b/trustgraph-flow/trustgraph/librarian/__main__.py @@ -0,0 +1,7 @@ +#!/usr/bin/env python3 + +from . service import run + +if __name__ == '__main__': + run() + diff --git a/trustgraph-flow/trustgraph/librarian/blob_store.py b/trustgraph-flow/trustgraph/librarian/blob_store.py new file mode 100644 index 00000000..5cffef18 --- /dev/null +++ b/trustgraph-flow/trustgraph/librarian/blob_store.py @@ -0,0 +1,51 @@ +from .. schema import LibrarianRequest, LibrarianResponse, Error +from .. knowledge import hash +from .. exceptions import RequestError + +from minio import Minio +import time +import io + +class BlobStore: + + def __init__( + self, + minio_host, minio_access_key, minio_secret_key, bucket_name, + ): + + + self.minio = Minio( + minio_host, + access_key = minio_access_key, + secret_key = minio_secret_key, + secure = False, + ) + + self.bucket_name = bucket_name + + print("Connected to minio", flush=True) + + self.ensure_bucket() + + def ensure_bucket(self): + + # Make the bucket if it doesn't exist. + found = self.minio.bucket_exists(self.bucket_name) + if not found: + self.minio.make_bucket(self.bucket_name) + print("Created bucket", self.bucket_name, flush=True) + else: + print("Bucket", self.bucket_name, "already exists", flush=True) + + def add(self, object_id, blob, kind): + + # FIXME: Loop retry + self.minio.put_object( + bucket_name = self.bucket_name, + object_name = "doc/" + str(object_id), + length = len(blob), + data = io.BytesIO(blob), + content_type = kind, + ) + + print("Add blob complete", flush=True) diff --git a/trustgraph-flow/trustgraph/librarian/librarian.py b/trustgraph-flow/trustgraph/librarian/librarian.py new file mode 100644 index 00000000..df06c707 --- /dev/null +++ b/trustgraph-flow/trustgraph/librarian/librarian.py @@ -0,0 +1,55 @@ +from .. schema import LibrarianRequest, LibrarianResponse, Error +from .. knowledge import hash +from .. exceptions import RequestError +from . table_store import TableStore +from . blob_store import BlobStore + +import uuid + +class Librarian: + + def __init__( + self, + cassandra_host, cassandra_user, cassandra_password, + minio_host, minio_access_key, minio_secret_key, + bucket_name, keyspace, load_document, load_text, + ): + + self.blob_store = BlobStore( + minio_host, minio_access_key, minio_secret_key, bucket_name + ) + + self.table_store = TableStore( + cassandra_host, cassandra_user, cassandra_password, keyspace + ) + + self.load_document = load_document + self.load_text = load_text + + def add(self, id, document): + + if document.kind not in ( + "text/plain", "application/pdf" + ): + raise RequestError("Invalid document kind: " + document.kind) + + # Create object ID as a hash of the document + object_id = uuid.UUID(hash(document.document)) + + self.blob_store.add(object_id, document.document, document.kind) + + self.table_store.add(object_id, document) + + if document.kind == "application/pdf": + self.load_document(id, document) + elif document.kind == "text/plain": + self.load_text(id, document) + + print("Add complete", flush=True) + + return LibrarianResponse( + error = None, + document = None, + info = None, + ) + diff --git a/trustgraph-flow/trustgraph/librarian/service.py b/trustgraph-flow/trustgraph/librarian/service.py new file mode 100755 index 00000000..42c3b585 --- /dev/null +++ b/trustgraph-flow/trustgraph/librarian/service.py @@ -0,0 +1,352 @@ + +""" +Librarian service, manages documents in collections +""" + +from functools import partial +import asyncio +import threading +import queue + +from pulsar.schema import JsonSchema + +from .. schema import LibrarianRequest, LibrarianResponse, Error +from .. schema import librarian_request_queue, librarian_response_queue + +from .. schema import GraphEmbeddings +from .. schema import graph_embeddings_store_queue +from .. schema import Triples +from .. schema import triples_store_queue +from .. schema import DocumentEmbeddings +from .. schema import document_embeddings_store_queue + +from .. schema import Document, Metadata +from .. schema import document_ingest_queue +from .. schema import TextDocument, Metadata +from .. schema import text_ingest_queue + +from .. base import Publisher +from .. base import Subscriber + +from .. log_level import LogLevel +from .. base import ConsumerProducer +from .. exceptions import RequestError + +from . librarian import Librarian + +module = ".".join(__name__.split(".")[1:-1]) + +default_input_queue = librarian_request_queue +default_output_queue = librarian_response_queue +default_subscriber = module +default_minio_host = "minio:9000" +default_minio_access_key = "minioadmin" +default_minio_secret_key = "minioadmin" +default_cassandra_host = "cassandra" + +bucket_name = "library" + +# FIXME: How to ensure this doesn't conflict with other usage? +keyspace = "librarian" + +class Processor(ConsumerProducer): + + def __init__(self, **params): + + self.running = True + + input_queue = params.get("input_queue", default_input_queue) + output_queue = params.get("output_queue", default_output_queue) + subscriber = params.get("subscriber", default_subscriber) + + minio_host = params.get("minio_host", default_minio_host) + minio_access_key = params.get( + "minio_access_key", + default_minio_access_key + ) + minio_secret_key = params.get( + "minio_secret_key", + default_minio_secret_key + ) + + cassandra_host = params.get("cassandra_host", default_cassandra_host) + cassandra_user = params.get("cassandra_user") + cassandra_password = params.get("cassandra_password") + + triples_queue = params.get("triples_queue") + graph_embeddings_queue = params.get("graph_embeddings_queue") + document_embeddings_queue = params.get("document_embeddings_queue") + document_load_queue = params.get("document_load_queue") + text_load_queue = params.get("text_load_queue") + + super(Processor, self).__init__( + **params | { + "input_queue": input_queue, + "output_queue": output_queue, + "subscriber": subscriber, + "input_schema": LibrarianRequest, + "output_schema": LibrarianResponse, + "minio_host": minio_host, + "minio_access_key": minio_access_key, + "cassandra_host": cassandra_host, + "cassandra_user": cassandra_user, + } + ) + + self.document_load = Publisher( + self.pulsar_host, document_load_queue, JsonSchema(Document), + listener=self.pulsar_listener, + ) + + self.text_load = Publisher( + self.pulsar_host, text_load_queue, JsonSchema(TextDocument), + listener=self.pulsar_listener, + ) + + self.triples_load = Subscriber( + self.pulsar_host, triples_store_queue, + "librarian", "librarian", + schema=JsonSchema(Triples), + listener=self.pulsar_listener, + ) + + self.document_load.start() + self.text_load.start() + self.triples_load.start() + + self.triples_sub = self.triples_load.subscribe_all("x") + + self.triples_reader = threading.Thread(target=self.receive_triples) + self.triples_reader.start() + + self.librarian = Librarian( + cassandra_host = cassandra_host.split(","), + cassandra_user = cassandra_user, + cassandra_password = cassandra_password, + minio_host = minio_host, + minio_access_key = minio_access_key, + minio_secret_key = minio_secret_key, + bucket_name = bucket_name, + keyspace = keyspace, + load_document = self.load_document, + load_text = self.load_text, + ) + + print("Initialised.", flush=True) + + def receive_triples(self): + + print("Receive triples!") + + while self.running: + try: + msg = self.triples_sub.get(timeout=1) + except queue.Empty: + print("Tick") + continue + + print(msg) + + print("BYE") + + def __del__(self): + + self.running = False + + if hasattr(self, "triples_sub"): + self.triples_sub.unsubscribe_all("x") + + if hasattr(self, "document_load"): + self.document_load.stop() + self.document_load.join() + + if hasattr(self, "text_load"): + self.text_load.stop() + self.text_load.join() + + if hasattr(self, "triples_load"): + self.triples_load.stop() + self.triples_load.join() + + def load_document(self, id, document): + + doc = Document( + metadata = Metadata( + id = id, + metadata = document.metadata, + user = document.user, + collection = document.collection + ), + data = document.document + ) + + self.document_load.send(None, doc) + + def load_text(self, id, document): + + doc = TextDocument( + metadata = Metadata( + id = id, + metadata = document.metadata, + user = document.user, + collection = document.collection + ), + text = document.document + ) + + self.text_load.send(None, doc) + + def parse_request(self, v): + + if v.operation is None: + raise RequestError("Null operation") + + if v.operation == "add": + print(v) + if ( + v.id and v.document and v.document.metadata and + v.document.document and v.document.kind + ): + return partial( + self.librarian.add, + id = v.id, + document = v.document, + ) + else: + raise RequestError("Invalid call") + + raise RequestError("Invalid operation: " + v.operation) + + def handle(self, msg): + + v = msg.value() + + # Sender-produced ID + + id = msg.properties()["id"] + + print(f"Handling input {id}...", flush=True) + + try: + func = self.parse_request(v) + except RequestError as e: + resp = LibrarianResponse( + error = Error( + type = "request-error", + message = str(e), + ) + ) + self.producer.send(resp, properties={"id": id}) + return + + try: + resp = func() + except RequestError as e: + resp = LibrarianResponse( + error = Error( + type = "request-error", + message = str(e), + ) + ) + self.producer.send(resp, properties={"id": id}) + return + except Exception as e: + print("Exception:", e, flush=True) + resp = LibrarianResponse( + error = Error( + type = "processing-error", + message = "Unhandled error: " + str(e), + ) + ) + self.producer.send(resp, properties={"id": id}) + return + + print("Send response...", flush=True) + + self.producer.send(resp, properties={"id": id}) + + print("Done.", flush=True) + + @staticmethod + def add_args(parser): + + ConsumerProducer.add_args( + parser, default_input_queue, default_subscriber, + default_output_queue, + ) + + parser.add_argument( + '--minio-host', + default=default_minio_host, + help=f'Minio hostname (default: {default_minio_host})', + ) + + parser.add_argument( + '--minio-access-key', + default='minioadmin', + help='Minio access key / username ' + f'(default: {default_minio_access_key})', + ) + + parser.add_argument( + '--minio-secret-key', + default='minioadmin', + help='Minio secret key / password ' + f'(default: {default_minio_access_key})', + ) + + parser.add_argument( + '--cassandra-host', + default="cassandra", + help=f'Graph host (default: cassandra)' + ) + + parser.add_argument( + '--cassandra-user', + default=None, + help=f'Cassandra user' + ) + + parser.add_argument( + '--cassandra-password', + default=None, + help=f'Cassandra password' + ) + + parser.add_argument( + '--triples-queue', + default=triples_store_queue, + help=f'Triples queue (default: {triples_store_queue})' + ) + + parser.add_argument( + '--graph-embeddings-queue', + default=graph_embeddings_store_queue, + help=f'Graph embeddings queue (default: {triples_store_queue})' + ) + + parser.add_argument( + '--document-embeddings-queue', + default=document_embeddings_store_queue, + help='Document embeddings queue ' + f'(default: {document_embeddings_store_queue})' + ) + + parser.add_argument( + '--document-load-queue', + default=document_ingest_queue, + help='Document load queue ' + f'(default: {document_ingest_queue})' + ) + + parser.add_argument( + '--text-load-queue', + default=text_ingest_queue, + help='Text ingest queue ' + f'(default: {text_ingest_queue})' + ) + +def run(): + + Processor.start(module, __doc__) + diff --git a/trustgraph-flow/trustgraph/librarian/table_store.py b/trustgraph-flow/trustgraph/librarian/table_store.py new file mode 100644 index 00000000..99e0d845 --- /dev/null +++ b/trustgraph-flow/trustgraph/librarian/table_store.py @@ -0,0 +1,131 @@ +from .. schema import LibrarianRequest, LibrarianResponse, Error +from .. knowledge import hash +from .. exceptions import RequestError + +from cassandra.cluster import Cluster +from cassandra.auth import PlainTextAuthProvider +from cassandra.query import BatchStatement +import uuid +import time + +class TableStore: + + def __init__( + self, + cassandra_host, cassandra_user, cassandra_password, keyspace, + ): + + self.keyspace = keyspace + + print("Connecting to Cassandra...", flush=True) + + if cassandra_user and cassandra_password: + auth_provider = PlainTextAuthProvider( + username=cassandra_user, password=cassandra_password + ) + self.cluster = Cluster( + cassandra_host, + auth_provider=auth_provider + ) + else: + self.cluster = Cluster(cassandra_host) + + self.cassandra = self.cluster.connect() + + print("Connected.", flush=True) + + self.ensure_cassandra_schema() + + self.insert_document_stmt = self.cassandra.prepare(""" + insert into document + (id, user, collection, kind, object_id, metadata) + values (?, ?, ?, ?, ?, ?) + """) + + def ensure_cassandra_schema(self): + + print("Ensure Cassandra schema...", flush=True) + + print("Keyspace...", flush=True) + + # FIXME: Replication factor should be configurable + self.cassandra.execute(f""" + create keyspace if not exists {self.keyspace} + with replication = {{ + 'class' : 'SimpleStrategy', + 'replication_factor' : 1 + }}; + """); + + self.cassandra.set_keyspace(self.keyspace) + + print("document table...", flush=True) + + self.cassandra.execute(""" + create table if not exists document ( + user text, + collection text, + id uuid, + kind text, + object_id uuid, + metadata list>, + PRIMARY KEY (user, collection, id) + ); + """); + + print("object index...", flush=True) + + self.cassandra.execute(""" + create index if not exists document_object + on document ( object_id) + """); + + print("Cassandra schema OK.", flush=True) + + def add(self, object_id, document): + + if document.kind not in ( + "text/plain", "application/pdf" + ): + raise RequestError("Invalid document kind: " + document.kind) + + # Create random doc ID + doc_id = uuid.uuid4() + + print("Adding", object_id, doc_id) + + metadata = [ + ( + v.s.value, v.s.is_uri, v.p.value, v.p.is_uri, + v.o.value, v.o.is_uri + ) + for v in document.metadata + ] + + while True: + + try: + + resp = self.cassandra.execute( + self.insert_document_stmt, + ( + doc_id, document.user, document.collection, + document.kind, object_id, metadata + ) + ) + + break + + except Exception as e: + + print("Exception:", type(e)) + print(f"{e}, retry...", flush=True) + time.sleep(1) + + print("Add complete", flush=True) + + + +