diff --git a/templates/components.jsonnet b/templates/components.jsonnet index ec7f862b..26368deb 100644 --- a/templates/components.jsonnet +++ b/templates/components.jsonnet @@ -25,6 +25,7 @@ "trustgraph-base": import "components/trustgraph.jsonnet", "vector-store-milvus": import "components/milvus.jsonnet", "vector-store-qdrant": import "components/qdrant.jsonnet", + "vector-store-pinecone": import "components/pinecone.jsonnet", "vertexai": import "components/vertexai.jsonnet", "null": {}, @@ -34,6 +35,7 @@ "cassandra": import "components/cassandra.jsonnet", "neo4j": import "components/neo4j.jsonnet", "qdrant": import "components/qdrant.jsonnet", + "pinecone": import "components/pinecone.jsonnet", "milvus": import "components/milvus.jsonnet", "trustgraph": import "components/trustgraph.jsonnet", diff --git a/templates/components/pinecone.jsonnet b/templates/components/pinecone.jsonnet new file mode 100644 index 00000000..3422952a --- /dev/null +++ b/templates/components/pinecone.jsonnet @@ -0,0 +1,153 @@ +local base = import "base/base.jsonnet"; +local images = import "values/images.jsonnet"; +local url = import "values/url.jsonnet"; +local cassandra_hosts = "cassandra"; + +{ + + "pinecone-cloud":: "aws", + "pinecone-region":: "us-east-1", + + "store-graph-embeddings" +: { + + create:: function(engine) + + local envSecrets = engine.envSecrets("pinecone-api-key") + .with_env_var("PINECONE_API_KEY", "pinecone-api-key"); + + local container = + engine.container("store-graph-embeddings") + .with_image(images.trustgraph) + .with_command([ + "ge-write-pinecone", + "-p", + url.pulsar, + ]) + .with_env_var_secrets(envSecrets) + .with_limits("0.5", "128M") + .with_reservations("0.1", "128M"); + + local containerSet = engine.containers( + "store-graph-embeddings", [ container ] + ); + + local service = + engine.internalService(containerSet) + .with_port(8080, 8080, "metrics"); + + engine.resources([ + envSecrets, + containerSet, + service, + ]) + + }, + + "query-graph-embeddings" +: { + + create:: function(engine) + + local envSecrets = engine.envSecrets("pinecone-api-key") + .with_env_var("PINECONE_API_KEY", "pinecone-api-key"); + + local container = + engine.container("query-graph-embeddings") + .with_image(images.trustgraph) + .with_command([ + "ge-query-pinecone", + "-p", + url.pulsar, + ]) + .with_env_var_secrets(envSecrets) + .with_limits("0.5", "128M") + .with_reservations("0.1", "128M"); + + local containerSet = engine.containers( + "query-graph-embeddings", [ container ] + ); + + local service = + engine.internalService(containerSet) + .with_port(8080, 8080, "metrics"); + + engine.resources([ + envSecrets, + containerSet, + service, + ]) + + }, + + "store-doc-embeddings" +: { + + create:: function(engine) + + local envSecrets = engine.envSecrets("pinecone-api-key") + .with_env_var("PINECONE_API_KEY", "pinecone-api-key"); + + local container = + engine.container("store-doc-embeddings") + .with_image(images.trustgraph) + .with_command([ + "de-write-pinecone", + "-p", + url.pulsar, + ]) + .with_env_var_secrets(envSecrets) + .with_limits("0.5", "128M") + .with_reservations("0.1", "128M"); + + local containerSet = engine.containers( + "store-doc-embeddings", [ container ] + ); + + local service = + engine.internalService(containerSet) + .with_port(8080, 8080, "metrics"); + + engine.resources([ + envSecrets, + containerSet, + service, + ]) + + }, + + "query-doc-embeddings" +: { + + create:: function(engine) + + local envSecrets = engine.envSecrets("pinecone-api-key") + .with_env_var("PINECONE_API_KEY", "pinecone-api-key"); + + local container = + engine.container("query-doc-embeddings") + .with_image(images.trustgraph) + .with_command([ + "de-query-pinecone", + "-p", + url.pulsar, + ]) + .with_env_var_secrets(envSecrets) + .with_limits("0.5", "128M") + .with_reservations("0.1", "128M"); + + local containerSet = engine.containers( + "query-doc-embeddings", [ container ] + ); + + local service = + engine.internalService(containerSet) + .with_port(8080, 8080, "metrics"); + + engine.resources([ + envSecrets, + containerSet, + service, + ]) + + + } + +} + diff --git a/trustgraph-flow/scripts/ge-query-pinecone b/trustgraph-flow/scripts/ge-query-pinecone new file mode 100755 index 00000000..b75aec78 --- /dev/null +++ b/trustgraph-flow/scripts/ge-query-pinecone @@ -0,0 +1,6 @@ +#!/usr/bin/env python3 + +from trustgraph.query.graph_embeddings.pinecone import run + +run() + diff --git a/trustgraph-flow/scripts/ge-write-pinecone b/trustgraph-flow/scripts/ge-write-pinecone new file mode 100755 index 00000000..802a8377 --- /dev/null +++ b/trustgraph-flow/scripts/ge-write-pinecone @@ -0,0 +1,6 @@ +#!/usr/bin/env python3 + +from trustgraph.storage.graph_embeddings.pinecone import run + +run() + diff --git a/trustgraph-flow/setup.py b/trustgraph-flow/setup.py index 44901119..1650122f 100644 --- a/trustgraph-flow/setup.py +++ b/trustgraph-flow/setup.py @@ -60,6 +60,7 @@ setuptools.setup( "jsonschema", "aiohttp", "aiopulsar-py", + "pinecone[grpc]", ], scripts=[ "scripts/api-gateway", @@ -74,8 +75,10 @@ setuptools.setup( "scripts/embeddings-ollama", "scripts/embeddings-vectorize", "scripts/ge-query-milvus", + "scripts/ge-query-pinecone", "scripts/ge-query-qdrant", "scripts/ge-write-milvus", + "scripts/ge-write-pinecone", "scripts/ge-write-qdrant", "scripts/graph-rag", "scripts/kg-extract-definitions", diff --git a/trustgraph-flow/trustgraph/query/doc_embeddings/pinecone/__init__.py b/trustgraph-flow/trustgraph/query/doc_embeddings/pinecone/__init__.py new file mode 100644 index 00000000..ba844705 --- /dev/null +++ b/trustgraph-flow/trustgraph/query/doc_embeddings/pinecone/__init__.py @@ -0,0 +1,3 @@ + +from . service import * + diff --git a/trustgraph-flow/trustgraph/query/doc_embeddings/pinecone/__main__.py b/trustgraph-flow/trustgraph/query/doc_embeddings/pinecone/__main__.py new file mode 100755 index 00000000..89684e3e --- /dev/null +++ b/trustgraph-flow/trustgraph/query/doc_embeddings/pinecone/__main__.py @@ -0,0 +1,7 @@ +#!/usr/bin/env python3 + +from . hf import run + +if __name__ == '__main__': + run() + diff --git a/trustgraph-flow/trustgraph/query/doc_embeddings/pinecone/service.py b/trustgraph-flow/trustgraph/query/doc_embeddings/pinecone/service.py new file mode 100755 index 00000000..3fcbfb21 --- /dev/null +++ b/trustgraph-flow/trustgraph/query/doc_embeddings/pinecone/service.py @@ -0,0 +1,142 @@ + +""" +Document embeddings query service. Input is vector, output is an array +of chunks. Pinecone implementation. +""" + +from pinecone import Pinecone, ServerlessSpec +from pinecone.grpc import PineconeGRPC, GRPCClientConfig + +import uuid +import os + +from .... schema import DocumentEmbeddingsRequest, DocumentEmbeddingsResponse +from .... schema import Error, Value +from .... schema import document_embeddings_request_queue +from .... schema import document_embeddings_response_queue +from .... base import ConsumerProducer + +module = ".".join(__name__.split(".")[1:-1]) + +default_input_queue = document_embeddings_request_queue +default_output_queue = document_embeddings_response_queue +default_subscriber = module +default_api_key = os.getenv("PINECONE_API_KEY", "not-specified") + +class Processor(ConsumerProducer): + + def __init__(self, **params): + + input_queue = params.get("input_queue", default_input_queue) + output_queue = params.get("output_queue", default_output_queue) + subscriber = params.get("subscriber", default_subscriber) + + self.url = params.get("url", None) + self.api_key = params.get("api_key", default_api_key) + + if self.url: + + self.pinecone = PineconeGRPC( + api_key = self.api_key, + host = self.url + ) + + else: + + self.pinecone = Pinecone(api_key = self.api_key) + + super(Processor, self).__init__( + **params | { + "input_queue": input_queue, + "output_queue": output_queue, + "subscriber": subscriber, + "input_schema": DocumentEmbeddingsRequest, + "output_schema": DocumentEmbeddingsResponse, + "url": self.url, + } + ) + + def handle(self, msg): + + try: + + v = msg.value() + + # Sender-produced ID + id = msg.properties()["id"] + + print(f"Handling input {id}...", flush=True) + + chunks = [] + + for vec in v.vectors: + + dim = len(vec) + + index_name = ( + "d-" + v.user + "-" + str(dim) + ) + + index = self.pinecone.Index(index_name) + + results = index.query( + namespace=v.collection, + vector=vec, + top_k=v.limit, + include_values=False, + include_metadata=True + ) + + search_result = self.client.query_points( + collection_name=collection, + query=vec, + limit=v.limit, + with_payload=True, + ).points + + for r in results.matches: + doc = r.metadata["doc"] + chunks.add(doc) + + print("Send response...", flush=True) + r = DocumentEmbeddingsResponse(documents=chunks, error=None) + self.producer.send(r, properties={"id": id}) + + print("Done.", flush=True) + + except Exception as e: + + print(f"Exception: {e}") + + print("Send error response...", flush=True) + + r = DocumentEmbeddingsResponse( + error=Error( + type = "llm-error", + message = str(e), + ), + documents=None, + ) + + self.producer.send(r, properties={"id": id}) + + self.consumer.acknowledge(msg) + + @staticmethod + def add_args(parser): + + ConsumerProducer.add_args( + parser, default_input_queue, default_subscriber, + default_output_queue, + ) + + parser.add_argument( + '-t', '--store-uri', + default=default_store_uri, + help=f'Milvus store URI (default: {default_store_uri})' + ) + +def run(): + + Processor.start(module, __doc__) + diff --git a/trustgraph-flow/trustgraph/query/graph_embeddings/pinecone/__init__.py b/trustgraph-flow/trustgraph/query/graph_embeddings/pinecone/__init__.py new file mode 100644 index 00000000..ba844705 --- /dev/null +++ b/trustgraph-flow/trustgraph/query/graph_embeddings/pinecone/__init__.py @@ -0,0 +1,3 @@ + +from . service import * + diff --git a/trustgraph-flow/trustgraph/query/graph_embeddings/pinecone/__main__.py b/trustgraph-flow/trustgraph/query/graph_embeddings/pinecone/__main__.py new file mode 100755 index 00000000..89684e3e --- /dev/null +++ b/trustgraph-flow/trustgraph/query/graph_embeddings/pinecone/__main__.py @@ -0,0 +1,7 @@ +#!/usr/bin/env python3 + +from . hf import run + +if __name__ == '__main__': + run() + diff --git a/trustgraph-flow/trustgraph/query/graph_embeddings/pinecone/service.py b/trustgraph-flow/trustgraph/query/graph_embeddings/pinecone/service.py new file mode 100755 index 00000000..64ae4d32 --- /dev/null +++ b/trustgraph-flow/trustgraph/query/graph_embeddings/pinecone/service.py @@ -0,0 +1,156 @@ + +""" +Graph embeddings query service. Input is vector, output is list of +entities. Pinecone implementation. +""" + +from pinecone import Pinecone, ServerlessSpec +from pinecone.grpc import PineconeGRPC, GRPCClientConfig + +import uuid +import os + +from .... schema import GraphEmbeddingsRequest, GraphEmbeddingsResponse +from .... schema import Error, Value +from .... schema import graph_embeddings_request_queue +from .... schema import graph_embeddings_response_queue +from .... base import ConsumerProducer + +module = ".".join(__name__.split(".")[1:-1]) + +default_input_queue = graph_embeddings_request_queue +default_output_queue = graph_embeddings_response_queue +default_subscriber = module +default_api_key = os.getenv("PINECONE_API_KEY", "not-specified") + +class Processor(ConsumerProducer): + + def __init__(self, **params): + + input_queue = params.get("input_queue", default_input_queue) + output_queue = params.get("output_queue", default_output_queue) + subscriber = params.get("subscriber", default_subscriber) + + self.url = params.get("url", None) + self.api_key = params.get("api_key", default_api_key) + + if self.url: + + self.pinecone = PineconeGRPC( + api_key = self.api_key, + host = self.url + ) + + else: + + self.pinecone = Pinecone(api_key = self.api_key) + + super(Processor, self).__init__( + **params | { + "input_queue": input_queue, + "output_queue": output_queue, + "subscriber": subscriber, + "input_schema": GraphEmbeddingsRequest, + "output_schema": GraphEmbeddingsResponse, + "url": self.url, + } + ) + + def create_value(self, ent): + if ent.startswith("http://") or ent.startswith("https://"): + return Value(value=ent, is_uri=True) + else: + return Value(value=ent, is_uri=False) + + def handle(self, msg): + + try: + + v = msg.value() + + # Sender-produced ID + id = msg.properties()["id"] + + print(f"Handling input {id}...", flush=True) + + entities = set() + + for vec in v.vectors: + + dim = len(vec) + + index_name = ( + "t-" + v.user + "-" + str(dim) + ) + + index = self.pinecone.Index(index_name) + + results = index.query( + namespace=v.collection, + vector=vec, + top_k=v.limit, + include_values=False, + include_metadata=True + ) + + for r in results.matches: + ent = r.metadata["entity"] + entities.add(ent) + + # Convert set to list + entities = list(entities) + + ents2 = [] + + for ent in entities: + ents2.append(self.create_value(ent)) + + entities = ents2 + + print("Send response...", flush=True) + r = GraphEmbeddingsResponse(entities=entities, error=None) + self.producer.send(r, properties={"id": id}) + + print("Done.", flush=True) + + except Exception as e: + + print(f"Exception: {e}") + + print("Send error response...", flush=True) + + r = GraphEmbeddingsResponse( + error=Error( + type = "llm-error", + message = str(e), + ), + entities=None, + ) + + self.producer.send(r, properties={"id": id}) + + self.consumer.acknowledge(msg) + + @staticmethod + def add_args(parser): + + ConsumerProducer.add_args( + parser, default_input_queue, default_subscriber, + default_output_queue, + ) + + parser.add_argument( + '-a', '--api-key', + default=default_api_key, + help='Pinecone API key. (default from PINECONE_API_KEY)' + ) + + parser.add_argument( + '-u', '--url', + help='Pinecone URL. If unspecified, serverless is used' + ) + +def run(): + + Processor.start(module, __doc__) + diff --git a/trustgraph-flow/trustgraph/storage/doc_embeddings/pinecone/__init__.py b/trustgraph-flow/trustgraph/storage/doc_embeddings/pinecone/__init__.py new file mode 100644 index 00000000..d891d55f --- /dev/null +++ b/trustgraph-flow/trustgraph/storage/doc_embeddings/pinecone/__init__.py @@ -0,0 +1,3 @@ + +from . write import * + diff --git a/trustgraph-flow/trustgraph/storage/doc_embeddings/pinecone/__main__.py b/trustgraph-flow/trustgraph/storage/doc_embeddings/pinecone/__main__.py new file mode 100644 index 00000000..c05d8c6d --- /dev/null +++ b/trustgraph-flow/trustgraph/storage/doc_embeddings/pinecone/__main__.py @@ -0,0 +1,7 @@ +#!/usr/bin/env python3 + +from . write import run + +if __name__ == '__main__': + run() + diff --git a/trustgraph-flow/trustgraph/storage/doc_embeddings/pinecone/write.py b/trustgraph-flow/trustgraph/storage/doc_embeddings/pinecone/write.py new file mode 100644 index 00000000..24cfcb78 --- /dev/null +++ b/trustgraph-flow/trustgraph/storage/doc_embeddings/pinecone/write.py @@ -0,0 +1,167 @@ + +""" +Accepts entity/vector pairs and writes them to a Qdrant store. +""" + +from qdrant_client import QdrantClient +from qdrant_client.models import PointStruct +from qdrant_client.models import Distance, VectorParams + +import time +import uuid +import os + +from .... schema import ChunkEmbeddings +from .... schema import chunk_embeddings_ingest_queue +from .... log_level import LogLevel +from .... base import Consumer + +module = ".".join(__name__.split(".")[1:-1]) + +default_input_queue = chunk_embeddings_ingest_queue +default_subscriber = module +default_api_key = os.getenv("PINECONE_API_KEY", "not-specified") +default_cloud = "aws" +default_region = "us-east-1" + +class Processor(Consumer): + + def __init__(self, **params): + + input_queue = params.get("input_queue", default_input_queue) + subscriber = params.get("subscriber", default_subscriber) + + self.url = params.get("url", None) + self.cloud = params.get("cloud", default_cloud) + self.region = params.get("region", default_region) + self.api_key = params.get("api_key", default_api_key) + + if self.api_key is None: + raise RuntimeError("Pinecone API key must be specified") + + if self.url: + + self.pinecone = PineconeGRPC( + api_key = self.api_key, + host = self.url + ) + + else: + + self.pinecone = Pinecone(api_key = self.api_key) + + super(Processor, self).__init__( + **params | { + "input_queue": input_queue, + "subscriber": subscriber, + "input_schema": ChunkEmbeddings, + "url": self.url, + } + ) + + self.last_index_name = None + + def handle(self, msg): + + v = msg.value() + + chunk = v.chunk.decode("utf-8") + + if chunk == "": return + + for vec in v.vectors: + + dim = len(vec) + collection = ( + "d-" + v.metadata.user + "-" + str(dim) + ) + + if index_name != self.last_index_name: + + if not self.pinecone.has_index(index_name): + + try: + + self.pinecone.create_index( + name = index_name, + dimension = dim, + metric = "cosine", + spec = ServerlessSpec( + cloud = self.cloud, + region = self.region, + ) + ) + + for i in range(0, 1000): + + if self.pinecone.describe_index( + index_name + ).status["ready"]: + break + + time.sleep(1) + + if not self.pinecone.describe_index( + index_name + ).status["ready"]: + raise RuntimeError( + "Gave up waiting for index creation" + ) + + except Exception as e: + print("Pinecone index creation failed") + raise e + + print(f"Index {index_name} created", flush=True) + + self.last_index_name = index_name + + index = self.pinecone.Index(index_name) + + records = [ + { + "id": id, + "values": vec, + "metadata": { "doc": chunk }, + } + ] + + index.upsert( + vectors = records, + namespace = v.metadata.collection, + ) + + @staticmethod + def add_args(parser): + + Consumer.add_args( + parser, default_input_queue, default_subscriber, + ) + + parser.add_argument( + '-a', '--api-key', + default=default_api_key, + help='Pinecone API key. (default from PINECONE_API_KEY)' + ) + + parser.add_argument( + '-u', '--url', + help='Pinecone URL. If unspecified, serverless is used' + ) + + parser.add_argument( + '--cloud', + default=default_cloud, + help=f'Pinecone cloud, (default: {default_cloud}' + ) + + parser.add_argument( + '--region', + default=default_region, + help=f'Pinecone region, (default: {default_region}' + ) + +def run(): + + Processor.start(module, __doc__) + diff --git a/trustgraph-flow/trustgraph/storage/graph_embeddings/pinecone/__init__.py b/trustgraph-flow/trustgraph/storage/graph_embeddings/pinecone/__init__.py new file mode 100644 index 00000000..d891d55f --- /dev/null +++ b/trustgraph-flow/trustgraph/storage/graph_embeddings/pinecone/__init__.py @@ -0,0 +1,3 @@ + +from . write import * + diff --git a/trustgraph-flow/trustgraph/storage/graph_embeddings/pinecone/__main__.py b/trustgraph-flow/trustgraph/storage/graph_embeddings/pinecone/__main__.py new file mode 100755 index 00000000..c05d8c6d --- /dev/null +++ b/trustgraph-flow/trustgraph/storage/graph_embeddings/pinecone/__main__.py @@ -0,0 +1,7 @@ +#!/usr/bin/env python3 + +from . write import run + +if __name__ == '__main__': + run() + diff --git a/trustgraph-flow/trustgraph/storage/graph_embeddings/pinecone/write.py b/trustgraph-flow/trustgraph/storage/graph_embeddings/pinecone/write.py new file mode 100755 index 00000000..b918c10b --- /dev/null +++ b/trustgraph-flow/trustgraph/storage/graph_embeddings/pinecone/write.py @@ -0,0 +1,167 @@ + +""" +Accepts entity/vector pairs and writes them to a Pinecone store. +""" + +from pinecone import Pinecone, ServerlessSpec +from pinecone.grpc import PineconeGRPC, GRPCClientConfig + +import time +import uuid +import os + +from .... schema import GraphEmbeddings +from .... schema import graph_embeddings_store_queue +from .... log_level import LogLevel +from .... base import Consumer + +module = ".".join(__name__.split(".")[1:-1]) + +default_input_queue = graph_embeddings_store_queue +default_subscriber = module +default_api_key = os.getenv("PINECONE_API_KEY", "not-specified") +default_cloud = "aws" +default_region = "us-east-1" + +class Processor(Consumer): + + def __init__(self, **params): + + input_queue = params.get("input_queue", default_input_queue) + subscriber = params.get("subscriber", default_subscriber) + + self.url = params.get("url", None) + self.cloud = params.get("cloud", default_cloud) + self.region = params.get("region", default_region) + self.api_key = params.get("api_key", default_api_key) + + if self.api_key is None: + raise RuntimeError("Pinecone API key must be specified") + + if self.url: + + self.pinecone = PineconeGRPC( + api_key = self.api_key, + host = self.url + ) + + else: + + self.pinecone = Pinecone(api_key = self.api_key) + + super(Processor, self).__init__( + **params | { + "input_queue": input_queue, + "subscriber": subscriber, + "input_schema": GraphEmbeddings, + "url": self.url, + } + ) + + self.last_index_name = None + + def handle(self, msg): + + v = msg.value() + + id = str(uuid.uuid4()) + + if v.entity.value == "" or v.entity.value is None: return + + for vec in v.vectors: + + dim = len(vec) + + index_name = ( + "t-" + v.metadata.user + "-" + str(dim) + ) + + if index_name != self.last_index_name: + + if not self.pinecone.has_index(index_name): + + try: + + self.pinecone.create_index( + name = index_name, + dimension = dim, + metric = "cosine", + spec = ServerlessSpec( + cloud = self.cloud, + region = self.region, + ) + ) + + for i in range(0, 1000): + + if self.pinecone.describe_index( + index_name + ).status["ready"]: + break + + time.sleep(1) + + if not self.pinecone.describe_index( + index_name + ).status["ready"]: + raise RuntimeError( + "Gave up waiting for index creation" + ) + + except Exception as e: + print("Pinecone index creation failed") + raise e + + print(f"Index {index_name} created", flush=True) + + self.last_index_name = index_name + + index = self.pinecone.Index(index_name) + + records = [ + { + "id": id, + "values": vec, + "metadata": { "entity": v.entity.value }, + } + ] + + index.upsert( + vectors = records, + namespace = v.metadata.collection, + ) + + @staticmethod + def add_args(parser): + + Consumer.add_args( + parser, default_input_queue, default_subscriber, + ) + + parser.add_argument( + '-a', '--api-key', + default=default_api_key, + help='Pinecone API key. (default from PINECONE_API_KEY)' + ) + + parser.add_argument( + '-u', '--url', + help='Pinecone URL. If unspecified, serverless is used' + ) + + parser.add_argument( + '--cloud', + default=default_cloud, + help=f'Pinecone cloud, (default: {default_cloud}' + ) + + parser.add_argument( + '--region', + default=default_region, + help=f'Pinecone region, (default: {default_region}' + ) + +def run(): + + Processor.start(module, __doc__) +