diff --git a/templates/components.jsonnet b/templates/components.jsonnet index 653c48bb..19a52206 100644 --- a/templates/components.jsonnet +++ b/templates/components.jsonnet @@ -34,6 +34,9 @@ "graph-rag": import "components/graph-rag.jsonnet", "document-rag": import "components/document-rag.jsonnet", + // Librarian - document management + "librarian": import "components/librarian.jsonnet", + // Vector stores "vector-store-milvus": import "components/milvus.jsonnet", "vector-store-qdrant": import "components/qdrant.jsonnet", diff --git a/templates/components/librarian.jsonnet b/templates/components/librarian.jsonnet new file mode 100644 index 00000000..4df1b692 --- /dev/null +++ b/templates/components/librarian.jsonnet @@ -0,0 +1,43 @@ +local base = import "base/base.jsonnet"; +local images = import "values/images.jsonnet"; +local url = import "values/url.jsonnet"; +local minio = import "stores/minio.jsonnet"; +local cassandra = import "stores/cassandra.jsonnet"; + +{ + + "librarian" +: { + + create:: function(engine) + + local container = + engine.container("librarian") + .with_image(images.trustgraph_flow) + .with_command([ + "librarian", + "-p", + url.pulsar, + ]) + .with_limits("0.5", "256M") + .with_reservations("0.1", "256M"); + + local containerSet = engine.containers( + "librarian", [ container ] + ); + + local service = + engine.internalService(containerSet) + .with_port(8000, 8000, "metrics"); + + engine.resources([ + containerSet, + service, + ]) + + }, + +} + + // Minio and Cassandra are used by the Librarian + + minio + cassandra + diff --git a/templates/components/trustgraph.jsonnet b/templates/components/trustgraph.jsonnet index 0c07ea4e..833d932b 100644 --- a/templates/components/trustgraph.jsonnet +++ b/templates/components/trustgraph.jsonnet @@ -1,8 +1,6 @@ local base = import "base/base.jsonnet"; local images = import "values/images.jsonnet"; local url = import "values/url.jsonnet"; -local minio = import "stores/minio.jsonnet"; -local cassandra = import "stores/cassandra.jsonnet"; { @@ -184,6 +182,3 @@ local cassandra = import "stores/cassandra.jsonnet"; } - // Minio and Cassandra are used by the Librarian - + minio + cassandra - diff --git a/templates/values/images.jsonnet b/templates/values/images.jsonnet index 3e3d3e75..dde235ce 100644 --- a/templates/values/images.jsonnet +++ b/templates/values/images.jsonnet @@ -14,7 +14,7 @@ local version = import "version.jsonnet"; trustgraph_bedrock: "docker.io/trustgraph/trustgraph-bedrock:" + version, trustgraph_vertexai: "docker.io/trustgraph/trustgraph-vertexai:" + version, trustgraph_hf: "docker.io/trustgraph/trustgraph-hf:" + version, - qdrant: "docker.io/qdrant/qdrant:v1.11.1", + qdrant: "docker.io/qdrant/qdrant:v1.13.3", memgraph_mage: "docker.io/memgraph/memgraph-mage:1.22-memgraph-2.22", memgraph_lab: "docker.io/memgraph/lab:2.19.1", falkordb: "docker.io/falkordb/falkordb:latest", diff --git a/trustgraph-base/trustgraph/base/publisher.py b/trustgraph-base/trustgraph/base/publisher.py index b8fbedce..37fae0ec 100644 --- a/trustgraph-base/trustgraph/base/publisher.py +++ b/trustgraph-base/trustgraph/base/publisher.py @@ -6,14 +6,13 @@ import threading class Publisher: - def __init__(self, pulsar_host, topic, schema=None, max_size=10, - chunking_enabled=True, listener=None): - self.pulsar_host = pulsar_host + def __init__(self, pulsar_client, topic, schema=None, max_size=10, + chunking_enabled=True): + self.client = pulsar_client self.topic = topic self.schema = schema self.q = queue.Queue(maxsize=max_size) self.chunking_enabled = chunking_enabled - self.listener_name = listener self.running = True def start(self): @@ -33,11 +32,8 @@ class Publisher: try: - client = pulsar.Client( - self.pulsar_host, listener_name=self.listener_name - ) - - producer = client.create_producer( + print(self.chunking_enabled) + producer = self.client.create_producer( topic=self.topic, schema=self.schema, chunking_enabled=self.chunking_enabled, diff --git a/trustgraph-base/trustgraph/base/subscriber.py b/trustgraph-base/trustgraph/base/subscriber.py index 606c765a..30ade3ee 100644 --- a/trustgraph-base/trustgraph/base/subscriber.py +++ b/trustgraph-base/trustgraph/base/subscriber.py @@ -6,9 +6,9 @@ import time class Subscriber: - def __init__(self, pulsar_host, topic, subscription, consumer_name, - schema=None, max_size=100, listener=None): - self.pulsar_host = pulsar_host + def __init__(self, pulsar_client, topic, subscription, consumer_name, + schema=None, max_size=100): + self.client = pulsar_client self.topic = topic self.subscription = subscription self.consumer_name = consumer_name @@ -17,7 +17,6 @@ class Subscriber: self.full = {} self.max_size = max_size self.lock = threading.Lock() - self.listener_name = listener self.running = True def start(self): @@ -36,12 +35,7 @@ class Subscriber: try: - client = pulsar.Client( - self.pulsar_host, - listener_name=self.listener_name, - ) - - consumer = client.subscribe( + consumer = self.client.subscribe( topic=self.topic, subscription_name=self.subscription, consumer_name=self.consumer_name, diff --git a/trustgraph-base/trustgraph/schema/library.py b/trustgraph-base/trustgraph/schema/library.py index 8ab88842..11006ad8 100644 --- a/trustgraph-base/trustgraph/schema/library.py +++ b/trustgraph-base/trustgraph/schema/library.py @@ -22,6 +22,8 @@ class DocumentPackage(Record): kind = String() user = String() collection = String() + title = String() + comments = String() class DocumentInfo(Record): metadata = Array(Triple()) diff --git a/trustgraph-flow/trustgraph/gateway/agent.py b/trustgraph-flow/trustgraph/gateway/agent.py index c7af947b..150b970e 100644 --- a/trustgraph-flow/trustgraph/gateway/agent.py +++ b/trustgraph-flow/trustgraph/gateway/agent.py @@ -7,10 +7,10 @@ from . endpoint import ServiceEndpoint from . requestor import ServiceRequestor class AgentRequestor(ServiceRequestor): - def __init__(self, pulsar_host, timeout, auth): + def __init__(self, pulsar_client, timeout, auth): super(AgentRequestor, self).__init__( - pulsar_host=pulsar_host, + pulsar_client=pulsar_client, request_queue=agent_request_queue, response_queue=agent_response_queue, request_schema=AgentRequest, diff --git a/trustgraph-flow/trustgraph/gateway/dbpedia.py b/trustgraph-flow/trustgraph/gateway/dbpedia.py index 8ae4f695..4c8f9346 100644 --- a/trustgraph-flow/trustgraph/gateway/dbpedia.py +++ b/trustgraph-flow/trustgraph/gateway/dbpedia.py @@ -7,10 +7,10 @@ from . endpoint import ServiceEndpoint from . requestor import ServiceRequestor class DbpediaRequestor(ServiceRequestor): - def __init__(self, pulsar_host, timeout, auth): + def __init__(self, pulsar_client, timeout, auth): super(DbpediaRequestor, self).__init__( - pulsar_host=pulsar_host, + pulsar_client=pulsar_client, request_queue=dbpedia_lookup_request_queue, response_queue=dbpedia_lookup_response_queue, request_schema=LookupRequest, diff --git a/trustgraph-flow/trustgraph/gateway/document_embeddings_load.py b/trustgraph-flow/trustgraph/gateway/document_embeddings_load.py index 17568eda..82960966 100644 --- a/trustgraph-flow/trustgraph/gateway/document_embeddings_load.py +++ b/trustgraph-flow/trustgraph/gateway/document_embeddings_load.py @@ -15,17 +15,17 @@ from . serialize import to_subgraph class DocumentEmbeddingsLoadEndpoint(SocketEndpoint): def __init__( - self, pulsar_host, auth, path="/api/v1/load/document-embeddings", + self, pulsar_client, auth, path="/api/v1/load/document-embeddings", ): super(DocumentEmbeddingsLoadEndpoint, self).__init__( endpoint_path=path, auth=auth, ) - self.pulsar_host=pulsar_host + self.pulsar_client=pulsar_client self.publisher = Publisher( - self.pulsar_host, document_embeddings_store_queue, + self.pulsar_client, document_embeddings_store_queue, schema=JsonSchema(DocumentEmbeddings) ) diff --git a/trustgraph-flow/trustgraph/gateway/document_embeddings_stream.py b/trustgraph-flow/trustgraph/gateway/document_embeddings_stream.py index fa19fe06..a32c3e68 100644 --- a/trustgraph-flow/trustgraph/gateway/document_embeddings_stream.py +++ b/trustgraph-flow/trustgraph/gateway/document_embeddings_stream.py @@ -14,17 +14,18 @@ from . serialize import serialize_document_embeddings class DocumentEmbeddingsStreamEndpoint(SocketEndpoint): def __init__( - self, pulsar_host, auth, path="/api/v1/stream/document-embeddings" + self, pulsar_client, auth, + path="/api/v1/stream/document-embeddings" ): super(DocumentEmbeddingsStreamEndpoint, self).__init__( endpoint_path=path, auth=auth, ) - self.pulsar_host=pulsar_host + self.pulsar_client=pulsar_client self.subscriber = Subscriber( - self.pulsar_host, document_embeddings_store_queue, + self.pulsar_client, document_embeddings_store_queue, "api-gateway", "api-gateway", schema=JsonSchema(DocumentEmbeddings) ) diff --git a/trustgraph-flow/trustgraph/gateway/document_load.py b/trustgraph-flow/trustgraph/gateway/document_load.py index 4a37ecb6..78cd7930 100644 --- a/trustgraph-flow/trustgraph/gateway/document_load.py +++ b/trustgraph-flow/trustgraph/gateway/document_load.py @@ -8,10 +8,10 @@ from . sender import ServiceSender from . serialize import to_subgraph class DocumentLoadSender(ServiceSender): - def __init__(self, pulsar_host): + def __init__(self, pulsar_client): super(DocumentLoadSender, self).__init__( - pulsar_host=pulsar_host, + pulsar_client=pulsar_client, request_queue=document_ingest_queue, request_schema=Document, ) diff --git a/trustgraph-flow/trustgraph/gateway/document_rag.py b/trustgraph-flow/trustgraph/gateway/document_rag.py index 6665a7dc..e5749197 100644 --- a/trustgraph-flow/trustgraph/gateway/document_rag.py +++ b/trustgraph-flow/trustgraph/gateway/document_rag.py @@ -7,10 +7,10 @@ from . endpoint import ServiceEndpoint from . requestor import ServiceRequestor class DocumentRagRequestor(ServiceRequestor): - def __init__(self, pulsar_host, timeout, auth): + def __init__(self, pulsar_client, timeout, auth): super(DocumentRagRequestor, self).__init__( - pulsar_host=pulsar_host, + pulsar_client=pulsar_client, request_queue=document_rag_request_queue, response_queue=document_rag_response_queue, request_schema=DocumentRagQuery, diff --git a/trustgraph-flow/trustgraph/gateway/embeddings.py b/trustgraph-flow/trustgraph/gateway/embeddings.py index 1efafa76..42ed91a1 100644 --- a/trustgraph-flow/trustgraph/gateway/embeddings.py +++ b/trustgraph-flow/trustgraph/gateway/embeddings.py @@ -7,10 +7,10 @@ from . endpoint import ServiceEndpoint from . requestor import ServiceRequestor class EmbeddingsRequestor(ServiceRequestor): - def __init__(self, pulsar_host, timeout, auth): + def __init__(self, pulsar_client, timeout, auth): super(EmbeddingsRequestor, self).__init__( - pulsar_host=pulsar_host, + pulsar_client=pulsar_client, request_queue=embeddings_request_queue, response_queue=embeddings_response_queue, request_schema=EmbeddingsRequest, diff --git a/trustgraph-flow/trustgraph/gateway/encyclopedia.py b/trustgraph-flow/trustgraph/gateway/encyclopedia.py index 3f4dad79..49c1dfcd 100644 --- a/trustgraph-flow/trustgraph/gateway/encyclopedia.py +++ b/trustgraph-flow/trustgraph/gateway/encyclopedia.py @@ -7,10 +7,10 @@ from . endpoint import ServiceEndpoint from . requestor import ServiceRequestor class EncyclopediaRequestor(ServiceRequestor): - def __init__(self, pulsar_host, timeout, auth): + def __init__(self, pulsar_client, timeout, auth): super(EncyclopediaRequestor, self).__init__( - pulsar_host=pulsar_host, + pulsar_client=pulsar_client, request_queue=encyclopedia_lookup_request_queue, response_queue=encyclopedia_lookup_response_queue, request_schema=LookupRequest, diff --git a/trustgraph-flow/trustgraph/gateway/graph_embeddings_load.py b/trustgraph-flow/trustgraph/gateway/graph_embeddings_load.py index 2b1f8291..8b5328ce 100644 --- a/trustgraph-flow/trustgraph/gateway/graph_embeddings_load.py +++ b/trustgraph-flow/trustgraph/gateway/graph_embeddings_load.py @@ -15,17 +15,17 @@ from . serialize import to_subgraph, to_value class GraphEmbeddingsLoadEndpoint(SocketEndpoint): def __init__( - self, pulsar_host, auth, path="/api/v1/load/graph-embeddings", + self, pulsar_client, auth, path="/api/v1/load/graph-embeddings", ): super(GraphEmbeddingsLoadEndpoint, self).__init__( endpoint_path=path, auth=auth, ) - self.pulsar_host=pulsar_host + self.pulsar_client=pulsar_client self.publisher = Publisher( - self.pulsar_host, graph_embeddings_store_queue, + self.pulsar_client, graph_embeddings_store_queue, schema=JsonSchema(GraphEmbeddings) ) diff --git a/trustgraph-flow/trustgraph/gateway/graph_embeddings_query.py b/trustgraph-flow/trustgraph/gateway/graph_embeddings_query.py index 5e3c0ce9..8df38e97 100644 --- a/trustgraph-flow/trustgraph/gateway/graph_embeddings_query.py +++ b/trustgraph-flow/trustgraph/gateway/graph_embeddings_query.py @@ -8,10 +8,10 @@ from . requestor import ServiceRequestor from . serialize import serialize_value class GraphEmbeddingsQueryRequestor(ServiceRequestor): - def __init__(self, pulsar_host, timeout, auth): + def __init__(self, pulsar_client, timeout, auth): super(GraphEmbeddingsQueryRequestor, self).__init__( - pulsar_host=pulsar_host, + pulsar_client=pulsar_client, request_queue=graph_embeddings_request_queue, response_queue=graph_embeddings_response_queue, request_schema=GraphEmbeddingsRequest, diff --git a/trustgraph-flow/trustgraph/gateway/graph_embeddings_stream.py b/trustgraph-flow/trustgraph/gateway/graph_embeddings_stream.py index fa6ace3a..385eb9f4 100644 --- a/trustgraph-flow/trustgraph/gateway/graph_embeddings_stream.py +++ b/trustgraph-flow/trustgraph/gateway/graph_embeddings_stream.py @@ -14,17 +14,17 @@ from . serialize import serialize_graph_embeddings class GraphEmbeddingsStreamEndpoint(SocketEndpoint): def __init__( - self, pulsar_host, auth, path="/api/v1/stream/graph-embeddings" + self, pulsar_client, auth, path="/api/v1/stream/graph-embeddings" ): super(GraphEmbeddingsStreamEndpoint, self).__init__( endpoint_path=path, auth=auth, ) - self.pulsar_host=pulsar_host + self.pulsar_client=pulsar_client self.subscriber = Subscriber( - self.pulsar_host, graph_embeddings_store_queue, + self.pulsar_client, graph_embeddings_store_queue, "api-gateway", "api-gateway", schema=JsonSchema(GraphEmbeddings) ) diff --git a/trustgraph-flow/trustgraph/gateway/graph_rag.py b/trustgraph-flow/trustgraph/gateway/graph_rag.py index 55fd5d2f..59a4fb90 100644 --- a/trustgraph-flow/trustgraph/gateway/graph_rag.py +++ b/trustgraph-flow/trustgraph/gateway/graph_rag.py @@ -7,10 +7,10 @@ from . endpoint import ServiceEndpoint from . requestor import ServiceRequestor class GraphRagRequestor(ServiceRequestor): - def __init__(self, pulsar_host, timeout, auth): + def __init__(self, pulsar_client, timeout, auth): super(GraphRagRequestor, self).__init__( - pulsar_host=pulsar_host, + pulsar_client=pulsar_client, request_queue=graph_rag_request_queue, response_queue=graph_rag_response_queue, request_schema=GraphRagQuery, diff --git a/trustgraph-flow/trustgraph/gateway/internet_search.py b/trustgraph-flow/trustgraph/gateway/internet_search.py index 127cd5d1..598a75cf 100644 --- a/trustgraph-flow/trustgraph/gateway/internet_search.py +++ b/trustgraph-flow/trustgraph/gateway/internet_search.py @@ -7,10 +7,10 @@ from . endpoint import ServiceEndpoint from . requestor import ServiceRequestor class InternetSearchRequestor(ServiceRequestor): - def __init__(self, pulsar_host, timeout, auth): + def __init__(self, pulsar_client, timeout, auth): super(InternetSearchRequestor, self).__init__( - pulsar_host=pulsar_host, + pulsar_client=pulsar_client, request_queue=internet_search_request_queue, response_queue=internet_search_response_queue, request_schema=LookupRequest, diff --git a/trustgraph-flow/trustgraph/gateway/librarian.py b/trustgraph-flow/trustgraph/gateway/librarian.py index 78de7970..d899eae5 100644 --- a/trustgraph-flow/trustgraph/gateway/librarian.py +++ b/trustgraph-flow/trustgraph/gateway/librarian.py @@ -9,10 +9,10 @@ from . serialize import serialize_document_package, serialize_document_info from . serialize import to_document_package, to_document_info, to_criteria class LibrarianRequestor(ServiceRequestor): - def __init__(self, pulsar_host, timeout, auth): + def __init__(self, pulsar_client, timeout, auth): super(LibrarianRequestor, self).__init__( - pulsar_host=pulsar_host, + pulsar_client=pulsar_client, request_queue=librarian_request_queue, response_queue=librarian_response_queue, request_schema=LibrarianRequest, @@ -22,17 +22,19 @@ class LibrarianRequestor(ServiceRequestor): def to_request(self, body): + print("TRR") if "document" in body: dp = to_document_package(body["document"]) else: dp = None + print("GOT") if "criteria" in body: criteria = to_criteria(body["criteria"]) else: criteria = None - limit = int(body.get("limit", 10000)) + print("ASLDKJ") return LibrarianRequest( operation = body.get("operation", None), diff --git a/trustgraph-flow/trustgraph/gateway/mux.py b/trustgraph-flow/trustgraph/gateway/mux.py index ae699ae6..23b693ab 100644 --- a/trustgraph-flow/trustgraph/gateway/mux.py +++ b/trustgraph-flow/trustgraph/gateway/mux.py @@ -18,7 +18,7 @@ MAX_QUEUE_SIZE = 10 class MuxEndpoint(SocketEndpoint): def __init__( - self, pulsar_host, auth, + self, pulsar_client, auth, services, path="/api/v1/socket", ): diff --git a/trustgraph-flow/trustgraph/gateway/prompt.py b/trustgraph-flow/trustgraph/gateway/prompt.py index 080d5618..eb50ac73 100644 --- a/trustgraph-flow/trustgraph/gateway/prompt.py +++ b/trustgraph-flow/trustgraph/gateway/prompt.py @@ -9,10 +9,10 @@ from . endpoint import ServiceEndpoint from . requestor import ServiceRequestor class PromptRequestor(ServiceRequestor): - def __init__(self, pulsar_host, timeout, auth): + def __init__(self, pulsar_client, timeout, auth): super(PromptRequestor, self).__init__( - pulsar_host=pulsar_host, + pulsar_client=pulsar_client, request_queue=prompt_request_queue, response_queue=prompt_response_queue, request_schema=PromptRequest, diff --git a/trustgraph-flow/trustgraph/gateway/requestor.py b/trustgraph-flow/trustgraph/gateway/requestor.py index 41b69a3f..68ab1b58 100644 --- a/trustgraph-flow/trustgraph/gateway/requestor.py +++ b/trustgraph-flow/trustgraph/gateway/requestor.py @@ -14,7 +14,7 @@ class ServiceRequestor: def __init__( self, - pulsar_host, + pulsar_client, request_queue, request_schema, response_queue, response_schema, subscription="api-gateway", consumer_name="api-gateway", @@ -22,12 +22,12 @@ class ServiceRequestor: ): self.pub = Publisher( - pulsar_host, request_queue, - schema=JsonSchema(request_schema) + pulsar_client, request_queue, + schema=JsonSchema(request_schema), ) self.sub = Subscriber( - pulsar_host, response_queue, + pulsar_client, response_queue, subscription, consumer_name, JsonSchema(response_schema) ) @@ -53,9 +53,11 @@ class ServiceRequestor: q = self.sub.subscribe(id) + print("BOUT TO SEDN") await asyncio.to_thread( self.pub.send, id, self.to_request(request) ) + print("SENT") while True: diff --git a/trustgraph-flow/trustgraph/gateway/sender.py b/trustgraph-flow/trustgraph/gateway/sender.py index 036ffaf8..32c586b1 100644 --- a/trustgraph-flow/trustgraph/gateway/sender.py +++ b/trustgraph-flow/trustgraph/gateway/sender.py @@ -15,13 +15,13 @@ class ServiceSender: def __init__( self, - pulsar_host, + pulsar_client, request_queue, request_schema, ): self.pub = Publisher( - pulsar_host, request_queue, - schema=JsonSchema(request_schema) + pulsar_client, request_queue, + schema=JsonSchema(request_schema), ) async def start(self): @@ -53,4 +53,3 @@ class ServiceSender: return err - diff --git a/trustgraph-flow/trustgraph/gateway/serialize.py b/trustgraph-flow/trustgraph/gateway/serialize.py index 552105c3..4cad220e 100644 --- a/trustgraph-flow/trustgraph/gateway/serialize.py +++ b/trustgraph-flow/trustgraph/gateway/serialize.py @@ -126,7 +126,7 @@ def to_document_package(x): return DocumentPackage( metadata = to_subgraph(x["metadata"]), - document = base64.b64decode(x["document"].encode("utf-8")), + document = x.get("document", None), kind = x.get("kind", None), user = x.get("user", None), collection = x.get("collection", None), diff --git a/trustgraph-flow/trustgraph/gateway/service.py b/trustgraph-flow/trustgraph/gateway/service.py index c473f7f5..dff55b0e 100755 --- a/trustgraph-flow/trustgraph/gateway/service.py +++ b/trustgraph-flow/trustgraph/gateway/service.py @@ -73,6 +73,11 @@ class Api: self.port = int(config.get("port", default_port)) self.timeout = int(config.get("timeout", default_timeout)) self.pulsar_host = config.get("pulsar_host", default_pulsar_host) + self.pulsar_listener = config.get("pulsar_listener", None) + + self.pulsar_client = pulsar.Client( + self.pulsar_host, listener_name=self.pulsar_listener + ) self.prometheus_url = config.get( "prometheus_url", default_prometheus_url, @@ -91,58 +96,58 @@ class Api: self.services = { "text-completion": TextCompletionRequestor( - pulsar_host=self.pulsar_host, timeout=self.timeout, + pulsar_client=self.pulsar_client, timeout=self.timeout, auth = self.auth, ), "prompt": PromptRequestor( - pulsar_host=self.pulsar_host, timeout=self.timeout, + pulsar_client=self.pulsar_client, timeout=self.timeout, auth = self.auth, ), "graph-rag": GraphRagRequestor( - pulsar_host=self.pulsar_host, timeout=self.timeout, + pulsar_client=self.pulsar_client, timeout=self.timeout, auth = self.auth, ), "document-rag": DocumentRagRequestor( - pulsar_host=self.pulsar_host, timeout=self.timeout, + pulsar_client=self.pulsar_client, timeout=self.timeout, auth = self.auth, ), "triples-query": TriplesQueryRequestor( - pulsar_host=self.pulsar_host, timeout=self.timeout, + pulsar_client=self.pulsar_client, timeout=self.timeout, auth = self.auth, ), "graph-embeddings-query": GraphEmbeddingsQueryRequestor( - pulsar_host=self.pulsar_host, timeout=self.timeout, + pulsar_client=self.pulsar_client, timeout=self.timeout, auth = self.auth, ), "embeddings": EmbeddingsRequestor( - pulsar_host=self.pulsar_host, timeout=self.timeout, + pulsar_client=self.pulsar_client, timeout=self.timeout, auth = self.auth, ), "agent": AgentRequestor( - pulsar_host=self.pulsar_host, timeout=self.timeout, + pulsar_client=self.pulsar_client, timeout=self.timeout, auth = self.auth, ), "librarian": LibrarianRequestor( - pulsar_host=self.pulsar_host, timeout=self.timeout, + pulsar_client=self.pulsar_client, timeout=self.timeout, auth = self.auth, ), "encyclopedia": EncyclopediaRequestor( - pulsar_host=self.pulsar_host, timeout=self.timeout, + pulsar_client=self.pulsar_client, timeout=self.timeout, auth = self.auth, ), "dbpedia": DbpediaRequestor( - pulsar_host=self.pulsar_host, timeout=self.timeout, + pulsar_client=self.pulsar_client, timeout=self.timeout, auth = self.auth, ), "internet-search": InternetSearchRequestor( - pulsar_host=self.pulsar_host, timeout=self.timeout, + pulsar_client=self.pulsar_client, timeout=self.timeout, auth = self.auth, ), "document-load": DocumentLoadSender( - pulsar_host=self.pulsar_host, + pulsar_client=self.pulsar_client, ), "text-load": TextLoadSender( - pulsar_host=self.pulsar_host, + pulsar_client=self.pulsar_client, ), } @@ -205,31 +210,31 @@ class Api: requestor = self.services["text-load"], ), TriplesStreamEndpoint( - pulsar_host=self.pulsar_host, + pulsar_client=self.pulsar_client, auth = self.auth, ), GraphEmbeddingsStreamEndpoint( - pulsar_host=self.pulsar_host, + pulsar_client=self.pulsar_client, auth = self.auth, ), DocumentEmbeddingsStreamEndpoint( - pulsar_host=self.pulsar_host, + pulsar_client=self.pulsar_client, auth = self.auth, ), TriplesLoadEndpoint( - pulsar_host=self.pulsar_host, + pulsar_client=self.pulsar_client, auth = self.auth, ), GraphEmbeddingsLoadEndpoint( - pulsar_host=self.pulsar_host, + pulsar_client=self.pulsar_client, auth = self.auth, ), DocumentEmbeddingsLoadEndpoint( - pulsar_host=self.pulsar_host, + pulsar_client=self.pulsar_client, auth = self.auth, ), MuxEndpoint( - pulsar_host=self.pulsar_host, + pulsar_client=self.pulsar_client, auth = self.auth, services = self.services, ), @@ -266,6 +271,11 @@ def run(): help=f'Pulsar host (default: {default_pulsar_host})', ) + parser.add_argument( + '--pulsar-listener', + help=f'Pulsar listener (default: none)', + ) + parser.add_argument( '-m', '--prometheus-url', default=default_prometheus_url, diff --git a/trustgraph-flow/trustgraph/gateway/text_completion.py b/trustgraph-flow/trustgraph/gateway/text_completion.py index 7291fc88..ec84e5d6 100644 --- a/trustgraph-flow/trustgraph/gateway/text_completion.py +++ b/trustgraph-flow/trustgraph/gateway/text_completion.py @@ -7,10 +7,10 @@ from . endpoint import ServiceEndpoint from . requestor import ServiceRequestor class TextCompletionRequestor(ServiceRequestor): - def __init__(self, pulsar_host, timeout, auth): + def __init__(self, pulsar_client, timeout, auth): super(TextCompletionRequestor, self).__init__( - pulsar_host=pulsar_host, + pulsar_client=pulsar_client, request_queue=text_completion_request_queue, response_queue=text_completion_response_queue, request_schema=TextCompletionRequest, diff --git a/trustgraph-flow/trustgraph/gateway/text_load.py b/trustgraph-flow/trustgraph/gateway/text_load.py index 2499dd01..cc432698 100644 --- a/trustgraph-flow/trustgraph/gateway/text_load.py +++ b/trustgraph-flow/trustgraph/gateway/text_load.py @@ -8,10 +8,10 @@ from . sender import ServiceSender from . serialize import to_subgraph class TextLoadSender(ServiceSender): - def __init__(self, pulsar_host): + def __init__(self, pulsar_client): super(TextLoadSender, self).__init__( - pulsar_host=pulsar_host, + pulsar_client=pulsar_client, request_queue=text_ingest_queue, request_schema=TextDocument, ) diff --git a/trustgraph-flow/trustgraph/gateway/triples_load.py b/trustgraph-flow/trustgraph/gateway/triples_load.py index 88fecd88..0c672697 100644 --- a/trustgraph-flow/trustgraph/gateway/triples_load.py +++ b/trustgraph-flow/trustgraph/gateway/triples_load.py @@ -14,16 +14,16 @@ from . serialize import to_subgraph class TriplesLoadEndpoint(SocketEndpoint): - def __init__(self, pulsar_host, auth, path="/api/v1/load/triples"): + def __init__(self, pulsar_client, auth, path="/api/v1/load/triples"): super(TriplesLoadEndpoint, self).__init__( endpoint_path=path, auth=auth, ) - self.pulsar_host=pulsar_host + self.pulsar_client=pulsar_client self.publisher = Publisher( - self.pulsar_host, triples_store_queue, + self.pulsar_client, triples_store_queue, schema=JsonSchema(Triples) ) diff --git a/trustgraph-flow/trustgraph/gateway/triples_query.py b/trustgraph-flow/trustgraph/gateway/triples_query.py index 0ea7cd8d..061bd4d8 100644 --- a/trustgraph-flow/trustgraph/gateway/triples_query.py +++ b/trustgraph-flow/trustgraph/gateway/triples_query.py @@ -8,10 +8,10 @@ from . requestor import ServiceRequestor from . serialize import to_value, serialize_subgraph class TriplesQueryRequestor(ServiceRequestor): - def __init__(self, pulsar_host, timeout, auth): + def __init__(self, pulsar_client, timeout, auth): super(TriplesQueryRequestor, self).__init__( - pulsar_host=pulsar_host, + pulsar_client=pulsar_client, request_queue=triples_request_queue, response_queue=triples_response_queue, request_schema=TriplesQueryRequest, diff --git a/trustgraph-flow/trustgraph/gateway/triples_stream.py b/trustgraph-flow/trustgraph/gateway/triples_stream.py index d1ea7bf5..a5d5ad0a 100644 --- a/trustgraph-flow/trustgraph/gateway/triples_stream.py +++ b/trustgraph-flow/trustgraph/gateway/triples_stream.py @@ -13,16 +13,16 @@ from . serialize import serialize_triples class TriplesStreamEndpoint(SocketEndpoint): - def __init__(self, pulsar_host, auth, path="/api/v1/stream/triples"): + def __init__(self, pulsar_client, auth, path="/api/v1/stream/triples"): super(TriplesStreamEndpoint, self).__init__( endpoint_path=path, auth=auth, ) - self.pulsar_host=pulsar_host + self.pulsar_client=pulsar_client self.subscriber = Subscriber( - self.pulsar_host, triples_store_queue, + self.pulsar_client, triples_store_queue, "api-gateway", "api-gateway", schema=JsonSchema(Triples) ) diff --git a/trustgraph-flow/trustgraph/librarian/librarian.py b/trustgraph-flow/trustgraph/librarian/librarian.py index 08010f3f..84ab3793 100644 --- a/trustgraph-flow/trustgraph/librarian/librarian.py +++ b/trustgraph-flow/trustgraph/librarian/librarian.py @@ -53,4 +53,22 @@ class Librarian: info = None, ) + def handle_triples(self, m): + self.table_store.add_triples(m) + + def handle_graph_embeddings(self, m): + self.table_store.add_graph_embeddings(m) + + def handle_document_embeddings(self, m): + self.table_store.add_document_embeddings(m) + + + def handle_triples(self, m): + self.table_store.add_triples(m) + + def handle_graph_embeddings(self, m): + self.table_store.add_graph_embeddings(m) + + def handle_document_embeddings(self, m): + self.table_store.add_document_embeddings(m) diff --git a/trustgraph-flow/trustgraph/librarian/service.py b/trustgraph-flow/trustgraph/librarian/service.py index ade4ca38..5e7153bb 100755 --- a/trustgraph-flow/trustgraph/librarian/service.py +++ b/trustgraph-flow/trustgraph/librarian/service.py @@ -7,6 +7,7 @@ from functools import partial import asyncio import threading import queue +import base64 from pulsar.schema import JsonSchema @@ -94,23 +95,38 @@ class Processor(ConsumerProducer): ) self.document_load = Publisher( - self.pulsar_host, document_load_queue, JsonSchema(Document), - listener=self.pulsar_listener, + self.client, document_load_queue, JsonSchema(Document), ) self.text_load = Publisher( - self.pulsar_host, text_load_queue, JsonSchema(TextDocument), - listener=self.pulsar_listener, + self.client, text_load_queue, JsonSchema(TextDocument), ) - self.triples_load = Subscriber( - self.pulsar_host, triples_store_queue, + self.triples_brk = Subscriber( + self.client, triples_store_queue, "librarian", "librarian", schema=JsonSchema(Triples), - listener=self.pulsar_listener, + ) + self.graph_embeddings_brk = Subscriber( + self.client, graph_embeddings_store_queue, + "librarian", "librarian", + schema=JsonSchema(GraphEmbeddings), + ) + self.document_embeddings_brk = Subscriber( + self.client, document_embeddings_store_queue, + "librarian", "librarian", + schema=JsonSchema(DocumentEmbeddings), ) - self.triples_reader = threading.Thread(target=self.receive_triples) + self.triples_reader = threading.Thread( + target=self.receive_triples + ) + self.graph_embeddings_reader = threading.Thread( + target=self.receive_graph_embeddings + ) + self.document_embeddings_reader = threading.Thread( + target=self.receive_document_embeddings + ) self.librarian = Librarian( cassandra_host = cassandra_host.split(","), @@ -131,34 +147,23 @@ class Processor(ConsumerProducer): self.document_load.start() self.text_load.start() - self.triples_load.start() - self.triples_sub = self.triples_load.subscribe_all("x") + self.triples_brk.start() + self.graph_embeddings_brk.start() + self.document_embeddings_brk.start() + + self.triples_sub = self.triples_brk.subscribe_all("x") + self.graph_embeddings_sub = self.graph_embeddings_brk.subscribe_all("x") + self.document_embeddings_sub = self.document_embeddings_brk.subscribe_all("x") self.triples_reader.start() - - def receive_triples(self): - - print("Receive triples!") - - while self.running: - try: - msg = self.triples_sub.get(timeout=1) - except queue.Empty: - print("Tick") - continue - - print(msg) - - print("BYE") + self.graph_embeddings_reader.start() + self.document_embeddings_reader.start() def __del__(self): self.running = False - if hasattr(self, "triples_sub"): - self.triples_sub.unsubscribe_all("x") - if hasattr(self, "document_load"): self.document_load.stop() self.document_load.join() @@ -167,9 +172,56 @@ class Processor(ConsumerProducer): self.text_load.stop() self.text_load.join() - if hasattr(self, "triples_load"): - self.triples_load.stop() - self.triples_load.join() + if hasattr(self, "triples_sub"): + self.triples_sub.unsubscribe_all("x") + + if hasattr(self, "graph_embeddings_sub"): + self.graph_embeddings_sub.unsubscribe_all("x") + + if hasattr(self, "document_embeddings_sub"): + self.document_embeddings_sub.unsubscribe_all("x") + + if hasattr(self, "triples_brk"): + self.triples_brk.stop() + self.triples_brk.join() + + if hasattr(self, "graph_embeddings_brk"): + self.graph_embeddings_brk.stop() + self.graph_embeddings_brk.join() + + if hasattr(self, "document_embeddings_brk"): + self.document_embeddings_brk.stop() + self.document_embeddings_brk.join() + + def receive_triples(self): + + while self.running: + try: + msg = self.triples_sub.get(timeout=1) + except queue.Empty: + continue + + self.librarian.handle_triples(msg) + + def receive_graph_embeddings(self): + + while self.running: + try: + msg = self.graph_embeddings_sub.get(timeout=1) + except queue.Empty: + continue + + self.librarian.handle_graph_embeddings(msg) + + def receive_document_embeddings(self): + + while self.running: + try: + msg = self.document_embeddings_sub.get(timeout=1) + except queue.Empty: + continue + + self.librarian.handle_document_embeddings(msg) async def load_document(self, id, document): @@ -187,6 +239,9 @@ class Processor(ConsumerProducer): async def load_text(self, id, document): + text = base64.b64decode(document.document) + text = text.decode("utf-8") + doc = TextDocument( metadata = Metadata( id = id, @@ -194,7 +249,7 @@ class Processor(ConsumerProducer): user = document.user, collection = document.collection ), - text = document.document + text = text, ) self.text_load.send(None, doc) diff --git a/trustgraph-flow/trustgraph/librarian/table_store.py b/trustgraph-flow/trustgraph/librarian/table_store.py index 99e0d845..18d18781 100644 --- a/trustgraph-flow/trustgraph/librarian/table_store.py +++ b/trustgraph-flow/trustgraph/librarian/table_store.py @@ -36,11 +36,7 @@ class TableStore: self.ensure_cassandra_schema() - self.insert_document_stmt = self.cassandra.prepare(""" - insert into document - (id, user, collection, kind, object_id, metadata) - values (?, ?, ?, ?, ?, ?) - """) + self.prepare_statements() def ensure_cassandra_schema(self): @@ -62,10 +58,13 @@ class TableStore: print("document table...", flush=True) self.cassandra.execute(""" - create table if not exists document ( + CREATE TABLE IF NOT EXISTS document ( user text, collection text, id uuid, + time timestamp, + title text, + comments text, kind text, object_id uuid, metadata list>, + triples list>, + PRIMARY KEY (user, collection, document_id, id) + ); + """); + + print("graph_embeddings table...", flush=True) + + self.cassandra.execute(""" + create table if not exists graph_embeddings ( + user text, + collection text, + document_id text, + id uuid, + time timestamp, + metadata list>, + entity_embeddings list< + tuple< + tuple, + list> + > + >, + PRIMARY KEY (user, collection, document_id, id) + ); + """); + + print("document_embeddings table...", flush=True) + + self.cassandra.execute(""" + create table if not exists document_embeddings ( + user text, + collection text, + document_id text, + id uuid, + time timestamp, + metadata list>, + chunks list< + tuple< + blob, + list> + > + >, + PRIMARY KEY (user, collection, document_id, id) + ); """); print("Cassandra schema OK.", flush=True) + def prepare_statements(self): + + self.insert_document_stmt = self.cassandra.prepare(""" + INSERT INTO document + ( + id, user, collection, kind, object_id, time, title, comments, + metadata + ) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) + """) + + self.insert_triples_stmt = self.cassandra.prepare(""" + INSERT INTO triples + ( + id, user, collection, document_id, time, + metadata, triples + ) + VALUES (?, ?, ?, ?, ?, ?, ?) + """) + + self.insert_graph_embeddings_stmt = self.cassandra.prepare(""" + INSERT INTO graph_embeddings + ( + id, user, collection, document_id, time, + metadata, entity_embeddings + ) + VALUES (?, ?, ?, ?, ?, ?, ?) + """) + + self.insert_document_embeddings_stmt = self.cassandra.prepare(""" + INSERT INTO document_embeddings + ( + id, user, collection, document_id, time, + metadata, chunks + ) + VALUES (?, ?, ?, ?, ?, ?, ?) + """) + def add(self, object_id, document): if document.kind not in ( @@ -93,6 +193,7 @@ class TableStore: # Create random doc ID doc_id = uuid.uuid4() + when = int(time.time() * 1000) print("Adding", object_id, doc_id) @@ -104,6 +205,8 @@ class TableStore: for v in document.metadata ] + # FIXME: doc_id should be the user-supplied ID??? + while True: try: @@ -111,8 +214,10 @@ class TableStore: resp = self.cassandra.execute( self.insert_document_stmt, ( - doc_id, document.user, document.collection, - document.kind, object_id, metadata + doc_id, document.user, document.collection, + document.kind, object_id, when, + document.title, document.comments, + metadata ) ) @@ -126,6 +231,136 @@ class TableStore: print("Add complete", flush=True) + def add_triples(self, m): + when = int(time.time() * 1000) + if m.metadata.metadata: + metadata = [ + ( + v.s.value, v.s.is_uri, v.p.value, v.p.is_uri, + v.o.value, v.o.is_uri + ) + for v in m.metadata.metadata + ] + else: + metadata = [] + triples = [ + ( + v.s.value, v.s.is_uri, v.p.value, v.p.is_uri, + v.o.value, v.o.is_uri + ) + for v in m.triples + ] + + while True: + + try: + + resp = self.cassandra.execute( + self.insert_triples_stmt, + ( + uuid.uuid4(), m.metadata.user, + m.metadata.collection, m.metadata.id, when, + metadata, triples, + ) + ) + + break + + except Exception as e: + + print("Exception:", type(e)) + print(f"{e}, retry...", flush=True) + time.sleep(1) + + def add_graph_embeddings(self, m): + + when = int(time.time() * 1000) + + if m.metadata.metadata: + metadata = [ + ( + v.s.value, v.s.is_uri, v.p.value, v.p.is_uri, + v.o.value, v.o.is_uri + ) + for v in m.metadata.metadata + ] + else: + metadata = [] + + entities = [ + ( + (v.entity.value, v.entity.is_uri), + v.vectors + ) + for v in m.entities + ] + + while True: + + try: + + resp = self.cassandra.execute( + self.insert_graph_embeddings_stmt, + ( + uuid.uuid4(), m.metadata.user, + m.metadata.collection, m.metadata.id, when, + metadata, entities, + ) + ) + + break + + except Exception as e: + + print("Exception:", type(e)) + print(f"{e}, retry...", flush=True) + time.sleep(1) + + def add_document_embeddings(self, m): + + when = int(time.time() * 1000) + + if m.metadata.metadata: + metadata = [ + ( + v.s.value, v.s.is_uri, v.p.value, v.p.is_uri, + v.o.value, v.o.is_uri + ) + for v in m.metadata.metadata + ] + else: + metadata = [] + + chunks = [ + ( + v.chunk, + v.vectors, + ) + for v in m.chunks + ] + + while True: + + try: + + resp = self.cassandra.execute( + self.insert_document_embeddings_stmt, + ( + uuid.uuid4(), m.metadata.user, + m.metadata.collection, m.metadata.id, when, + metadata, chunks, + ) + ) + + break + + except Exception as e: + + print("Exception:", type(e)) + print(f"{e}, retry...", flush=True) + time.sleep(1) + +