From d6cdce83917a95bc5ab65faf7163f944dbed6796 Mon Sep 17 00:00:00 2001 From: Cyber MacGeddon Date: Tue, 10 Dec 2024 22:13:10 +0000 Subject: [PATCH 01/13] Open 0.18 branch --- .github/workflows/release.yaml | 2 +- trustgraph-bedrock/setup.py | 2 +- trustgraph-cli/setup.py | 2 +- trustgraph-embeddings-hf/setup.py | 4 ++-- trustgraph-flow/setup.py | 2 +- trustgraph-vertexai/setup.py | 2 +- trustgraph/setup.py | 13 ++++++------- 7 files changed, 13 insertions(+), 14 deletions(-) diff --git a/.github/workflows/release.yaml b/.github/workflows/release.yaml index 30fc70ff..88b13db4 100644 --- a/.github/workflows/release.yaml +++ b/.github/workflows/release.yaml @@ -5,7 +5,7 @@ on: workflow_dispatch: push: tags: - - v0.17.* + - v0.18.* permissions: contents: read diff --git a/trustgraph-bedrock/setup.py b/trustgraph-bedrock/setup.py index d92cc9c7..b8dd36bd 100644 --- a/trustgraph-bedrock/setup.py +++ b/trustgraph-bedrock/setup.py @@ -34,7 +34,7 @@ setuptools.setup( python_requires='>=3.8', download_url = "https://github.com/trustgraph-ai/trustgraph/archive/refs/tags/v" + version + ".tar.gz", install_requires=[ - "trustgraph-base>=0.17,<0.18", + "trustgraph-base>=0.18,<0.19", "pulsar-client", "prometheus-client", "boto3", diff --git a/trustgraph-cli/setup.py b/trustgraph-cli/setup.py index e9de429a..8217346f 100644 --- a/trustgraph-cli/setup.py +++ b/trustgraph-cli/setup.py @@ -34,7 +34,7 @@ setuptools.setup( python_requires='>=3.8', download_url = "https://github.com/trustgraph-ai/trustgraph/archive/refs/tags/v" + version + ".tar.gz", install_requires=[ - "trustgraph-base>=0.17,<0.18", + "trustgraph-base>=0.18,<0.19", "requests", "pulsar-client", "rdflib", diff --git a/trustgraph-embeddings-hf/setup.py b/trustgraph-embeddings-hf/setup.py index 25ccfeab..8febd59b 100644 --- a/trustgraph-embeddings-hf/setup.py +++ b/trustgraph-embeddings-hf/setup.py @@ -34,8 +34,8 @@ setuptools.setup( python_requires='>=3.8', download_url = "https://github.com/trustgraph-ai/trustgraph/archive/refs/tags/v" + version + ".tar.gz", install_requires=[ - "trustgraph-base>=0.17,<0.18", - "trustgraph-flow>=0.17,<0.18", + "trustgraph-base>=0.18,<0.19", + "trustgraph-flow>=0.18,<0.19", "torch", "urllib3", "transformers", diff --git a/trustgraph-flow/setup.py b/trustgraph-flow/setup.py index c53f96e7..a345df5e 100644 --- a/trustgraph-flow/setup.py +++ b/trustgraph-flow/setup.py @@ -34,7 +34,7 @@ setuptools.setup( python_requires='>=3.8', download_url = "https://github.com/trustgraph-ai/trustgraph/archive/refs/tags/v" + version + ".tar.gz", install_requires=[ - "trustgraph-base>=0.17,<0.18", + "trustgraph-base>=0.18,<0.19", "urllib3", "rdflib", "pymilvus", diff --git a/trustgraph-vertexai/setup.py b/trustgraph-vertexai/setup.py index 3ce10305..7f9c2923 100644 --- a/trustgraph-vertexai/setup.py +++ b/trustgraph-vertexai/setup.py @@ -34,7 +34,7 @@ setuptools.setup( python_requires='>=3.8', download_url = "https://github.com/trustgraph-ai/trustgraph/archive/refs/tags/v" + version + ".tar.gz", install_requires=[ - "trustgraph-base>=0.17,<0.18", + "trustgraph-base>=0.18,<0.19", "pulsar-client", "google-cloud-aiplatform", "prometheus-client", diff --git a/trustgraph/setup.py b/trustgraph/setup.py index 5f9f1f2c..a964ff06 100644 --- a/trustgraph/setup.py +++ b/trustgraph/setup.py @@ -34,13 +34,12 @@ setuptools.setup( python_requires='>=3.8', download_url = "https://github.com/trustgraph-ai/trustgraph/archive/refs/tags/v" + version + ".tar.gz", install_requires=[ - "trustgraph-base>=0.17,<0.18", - "trustgraph-bedrock>=0.17,<0.18", - "trustgraph-cli>=0.17,<0.18", - "trustgraph-embeddings-hf>=0.17,<0.18", - "trustgraph-flow>=0.17,<0.18", - "trustgraph-parquet>=0.17,<0.18", - "trustgraph-vertexai>=0.17,<0.18", + "trustgraph-base>=0.18,<0.19", + "trustgraph-bedrock>=0.18,<0.19", + "trustgraph-cli>=0.18,<0.19", + "trustgraph-embeddings-hf>=0.18,<0.19", + "trustgraph-flow>=0.18,<0.19", + "trustgraph-vertexai>=0.18,<0.19", ], scripts=[ ] From 8d326d34b3230c0c6ce4f684c52f59f6de401a0d Mon Sep 17 00:00:00 2001 From: cybermaggedon Date: Tue, 10 Dec 2024 22:15:42 +0000 Subject: [PATCH 02/13] Use Cosine similarity (#209) --- .../trustgraph/storage/graph_embeddings/qdrant/write.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/trustgraph-flow/trustgraph/storage/graph_embeddings/qdrant/write.py b/trustgraph-flow/trustgraph/storage/graph_embeddings/qdrant/write.py index e27c2516..47b53979 100755 --- a/trustgraph-flow/trustgraph/storage/graph_embeddings/qdrant/write.py +++ b/trustgraph-flow/trustgraph/storage/graph_embeddings/qdrant/write.py @@ -62,7 +62,7 @@ class Processor(Consumer): self.client.create_collection( collection_name=collection, vectors_config=VectorParams( - size=dim, distance=Distance.DOT + size=dim, distance=Distance.COSINE ), ) except Exception as e: From cd8d0c8cbc3e1bc51f859d807c50ecd5863681c4 Mon Sep 17 00:00:00 2001 From: cybermaggedon Date: Tue, 10 Dec 2024 22:15:56 +0000 Subject: [PATCH 03/13] Graph embedding query exposed through gateway (#208) --- .../gateway/graph_embeddings_query.py | 40 +++++++++++++++++++ trustgraph-flow/trustgraph/gateway/service.py | 10 +++++ 2 files changed, 50 insertions(+) create mode 100644 trustgraph-flow/trustgraph/gateway/graph_embeddings_query.py diff --git a/trustgraph-flow/trustgraph/gateway/graph_embeddings_query.py b/trustgraph-flow/trustgraph/gateway/graph_embeddings_query.py new file mode 100644 index 00000000..5e3c0ce9 --- /dev/null +++ b/trustgraph-flow/trustgraph/gateway/graph_embeddings_query.py @@ -0,0 +1,40 @@ + +from .. schema import GraphEmbeddingsRequest, GraphEmbeddingsResponse +from .. schema import graph_embeddings_request_queue +from .. schema import graph_embeddings_response_queue + +from . endpoint import ServiceEndpoint +from . requestor import ServiceRequestor +from . serialize import serialize_value + +class GraphEmbeddingsQueryRequestor(ServiceRequestor): + def __init__(self, pulsar_host, timeout, auth): + + super(GraphEmbeddingsQueryRequestor, self).__init__( + pulsar_host=pulsar_host, + request_queue=graph_embeddings_request_queue, + response_queue=graph_embeddings_response_queue, + request_schema=GraphEmbeddingsRequest, + response_schema=GraphEmbeddingsResponse, + timeout=timeout, + ) + + def to_request(self, body): + + limit = int(body.get("limit", 20)) + + return GraphEmbeddingsRequest( + vectors = body["vectors"], + limit = limit, + user = body.get("user", "trustgraph"), + collection = body.get("collection", "default"), + ) + + def from_response(self, message): + + return { + "entities": [ + serialize_value(ent) for ent in message.entities + ] + }, True + diff --git a/trustgraph-flow/trustgraph/gateway/service.py b/trustgraph-flow/trustgraph/gateway/service.py index 6a8a62eb..af15e981 100755 --- a/trustgraph-flow/trustgraph/gateway/service.py +++ b/trustgraph-flow/trustgraph/gateway/service.py @@ -35,6 +35,7 @@ from . text_completion import TextCompletionRequestor from . prompt import PromptRequestor from . graph_rag import GraphRagRequestor from . triples_query import TriplesQueryRequestor +from . graph_embeddings_query import GraphEmbeddingsQueryRequestor from . embeddings import EmbeddingsRequestor from . encyclopedia import EncyclopediaRequestor from . agent import AgentRequestor @@ -95,6 +96,10 @@ class Api: pulsar_host=self.pulsar_host, timeout=self.timeout, auth = self.auth, ), + "graph-embeddings-query": GraphEmbeddingsQueryRequestor( + pulsar_host=self.pulsar_host, timeout=self.timeout, + auth = self.auth, + ), "embeddings": EmbeddingsRequestor( pulsar_host=self.pulsar_host, timeout=self.timeout, auth = self.auth, @@ -134,6 +139,11 @@ class Api: endpoint_path = "/api/v1/triples-query", auth=self.auth, requestor = self.services["triples-query"], ), + ServiceEndpoint( + endpoint_path = "/api/v1/graph-embeddings-query", + auth=self.auth, + requestor = self.services["graph-embeddings-query"], + ), ServiceEndpoint( endpoint_path = "/api/v1/embeddings", auth=self.auth, requestor = self.services["embeddings"], From 07f9b1f24436753d47ae2093df0e9eb8eea6624b Mon Sep 17 00:00:00 2001 From: cybermaggedon Date: Tue, 10 Dec 2024 22:37:54 +0000 Subject: [PATCH 04/13] From vector DB, often get dupes, which means when end up returning (#210) less then top_k elements. So, fetch top_k=(2 * limit) and limit to just (limit) --- .../graph_embeddings/pinecone/service.py | 23 ++++++++++++++----- .../query/graph_embeddings/qdrant/service.py | 20 ++++++++++++---- 2 files changed, 32 insertions(+), 11 deletions(-) diff --git a/trustgraph-flow/trustgraph/query/graph_embeddings/pinecone/service.py b/trustgraph-flow/trustgraph/query/graph_embeddings/pinecone/service.py index 64ae4d32..2534d278 100755 --- a/trustgraph-flow/trustgraph/query/graph_embeddings/pinecone/service.py +++ b/trustgraph-flow/trustgraph/query/graph_embeddings/pinecone/service.py @@ -73,7 +73,8 @@ class Processor(ConsumerProducer): print(f"Handling input {id}...", flush=True) - entities = set() + entity_set = set() + entities = [] for vec in v.vectors: @@ -85,20 +86,30 @@ class Processor(ConsumerProducer): index = self.pinecone.Index(index_name) + # Heuristic hack, get (2*limit), so that we have more chance + # of getting (limit) entities results = index.query( namespace=v.collection, vector=vec, - top_k=v.limit, + top_k=v.limit * 2, include_values=False, include_metadata=True ) for r in results.matches: - ent = r.metadata["entity"] - entities.add(ent) - # Convert set to list - entities = list(entities) + ent = r.metadata["entity"] + + # De-dupe entities + if ent not in entity_set: + entity_set.add(ent) + entities.append(ent) + + # Keep adding entities until limit + if len(entity_set) >= v.limit: break + + # Keep adding entities until limit + if len(entity_set) >= v.limit: break ents2 = [] diff --git a/trustgraph-flow/trustgraph/query/graph_embeddings/qdrant/service.py b/trustgraph-flow/trustgraph/query/graph_embeddings/qdrant/service.py index 8991f9ea..c2dcaa4c 100755 --- a/trustgraph-flow/trustgraph/query/graph_embeddings/qdrant/service.py +++ b/trustgraph-flow/trustgraph/query/graph_embeddings/qdrant/service.py @@ -61,7 +61,8 @@ class Processor(ConsumerProducer): print(f"Handling input {id}...", flush=True) - entities = set() + entity_set = set() + entities = [] for vec in v.vectors: @@ -71,19 +72,28 @@ class Processor(ConsumerProducer): str(dim) ) + # Heuristic hack, get (2*limit), so that we have more chance + # of getting (limit) entities search_result = self.client.query_points( collection_name=collection, query=vec, - limit=v.limit, + limit=v.limit * 2, with_payload=True, ).points for r in search_result: ent = r.payload["entity"] - entities.add(ent) - # Convert set to list - entities = list(entities) + # De-dupe entities + if ent not in entity_set: + entity_set.add(ent) + entities.append(ent) + + # Keep adding entities until limit + if len(entity_set) >= v.limit: break + + # Keep adding entities until limit + if len(entity_set) >= v.limit: break ents2 = [] From 8c1b468eb0d4b0d41de4162cc9a84ce69f206ba5 Mon Sep 17 00:00:00 2001 From: cybermaggedon Date: Tue, 17 Dec 2024 12:43:16 +0000 Subject: [PATCH 05/13] Fix async error (#212) --- trustgraph-flow/trustgraph/gateway/endpoint.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/trustgraph-flow/trustgraph/gateway/endpoint.py b/trustgraph-flow/trustgraph/gateway/endpoint.py index 6d6ca8d5..1f38c489 100644 --- a/trustgraph-flow/trustgraph/gateway/endpoint.py +++ b/trustgraph-flow/trustgraph/gateway/endpoint.py @@ -53,10 +53,10 @@ class ServiceEndpoint: print(data) - def responder(x, fin): + async def responder(x, fin): print(x) - resp, fin = await self.requestor.process(data, responder) + resp = await self.requestor.process(data, responder) return web.json_response(resp) From a4afff59a04cd7a8bbd13ca1a706eeae18093a87 Mon Sep 17 00:00:00 2001 From: cybermaggedon Date: Thu, 19 Dec 2024 16:17:07 +0000 Subject: [PATCH 06/13] wip integrate falkordb (#211) (#213) Co-authored-by: Avi Avni --- Containerfile | 2 +- Makefile | 2 +- docs/README.quickstart-docker-compose.md | 2 +- templates/all-patterns.jsonnet | 1 + templates/components.jsonnet | 2 + templates/components/falkordb.jsonnet | 76 ++++ templates/generate-all | 2 +- .../patterns/triple-store-falkordb.jsonnet | 13 + templates/patterns/triple-store-neo4j.jsonnet | 2 +- templates/stores/falkordb.jsonnet | 39 ++ templates/values/images.jsonnet | 1 + .../scripts/triples-query-falkordb | 6 + .../scripts/triples-write-falkordb | 6 + trustgraph-flow/setup.py | 3 + .../query/triples/falkordb/__init__.py | 3 + .../query/triples/falkordb/__main__.py | 7 + .../query/triples/falkordb/service.py | 341 ++++++++++++++++++ .../storage/triples/falkordb/__init__.py | 3 + .../storage/triples/falkordb/__main__.py | 7 + .../storage/triples/falkordb/write.py | 150 ++++++++ .../trustgraph/storage/triples/neo4j/write.py | 2 +- 21 files changed, 664 insertions(+), 6 deletions(-) create mode 100644 templates/components/falkordb.jsonnet create mode 100644 templates/patterns/triple-store-falkordb.jsonnet create mode 100644 templates/stores/falkordb.jsonnet create mode 100755 trustgraph-flow/scripts/triples-query-falkordb create mode 100755 trustgraph-flow/scripts/triples-write-falkordb create mode 100644 trustgraph-flow/trustgraph/query/triples/falkordb/__init__.py create mode 100755 trustgraph-flow/trustgraph/query/triples/falkordb/__main__.py create mode 100755 trustgraph-flow/trustgraph/query/triples/falkordb/service.py create mode 100644 trustgraph-flow/trustgraph/storage/triples/falkordb/__init__.py create mode 100755 trustgraph-flow/trustgraph/storage/triples/falkordb/__main__.py create mode 100755 trustgraph-flow/trustgraph/storage/triples/falkordb/write.py diff --git a/Containerfile b/Containerfile index c2735feb..73c9285f 100644 --- a/Containerfile +++ b/Containerfile @@ -17,7 +17,7 @@ RUN pip3 install anthropic boto3 cohere openai google-cloud-aiplatform ollama go langchain langchain-core langchain-huggingface langchain-text-splitters \ langchain-community pymilvus sentence-transformers transformers \ huggingface-hub pulsar-client cassandra-driver pyyaml \ - neo4j tiktoken && \ + neo4j tiktoken falkordb && \ pip3 cache purge # ---------------------------------------------------------------------------- diff --git a/Makefile b/Makefile index 72d144a9..67094a90 100644 --- a/Makefile +++ b/Makefile @@ -62,7 +62,7 @@ TEMPLATES=azure bedrock claude cohere mix llamafile ollama openai vertexai \ DCS=$(foreach template,${TEMPLATES},${template:%=tg-launch-%.yaml}) MODELS=azure bedrock claude cohere llamafile ollama openai vertexai -GRAPHS=cassandra neo4j +GRAPHS=cassandra neo4j falkordb # tg-launch-%.yaml: templates/%.jsonnet templates/components/version.jsonnet # jsonnet -Jtemplates \ diff --git a/docs/README.quickstart-docker-compose.md b/docs/README.quickstart-docker-compose.md index a81da9bc..12cb8cf0 100644 --- a/docs/README.quickstart-docker-compose.md +++ b/docs/README.quickstart-docker-compose.md @@ -13,7 +13,7 @@ > [!TIP] > If using `Podman`, the only change will be to substitute `podman` instead of `docker` in all commands. -All `TrustGraph` components are deployed through a `Docker Compose` file. There are **16** `Docker Compose` files to choose from, depending on the desired model deployment and choosing between the graph stores `Cassandra` or `Neo4j`: +All `TrustGraph` components are deployed through a `Docker Compose` file. There are **16** `Docker Compose` files to choose from, depending on the desired model deployment and choosing between the graph stores `Cassandra` or `Neo4j` or `FalkorDB`: - `AzureAI` serverless endpoint for deployed models in Azure - `Bedrock` API for models deployed in AWS Bedrock diff --git a/templates/all-patterns.jsonnet b/templates/all-patterns.jsonnet index 47622939..f68f307d 100644 --- a/templates/all-patterns.jsonnet +++ b/templates/all-patterns.jsonnet @@ -5,6 +5,7 @@ import "patterns/grafana.jsonnet", import "patterns/triple-store-cassandra.jsonnet", import "patterns/triple-store-neo4j.jsonnet", + import "patterns/triple-store-falkordb.jsonnet", import "patterns/graph-rag.jsonnet", import "patterns/llm-azure.jsonnet", import "patterns/llm-azure-openai.jsonnet", diff --git a/templates/components.jsonnet b/templates/components.jsonnet index 1abf44a4..d3a4a112 100644 --- a/templates/components.jsonnet +++ b/templates/components.jsonnet @@ -12,6 +12,7 @@ "graph-rag": import "components/graph-rag.jsonnet", "triple-store-cassandra": import "components/cassandra.jsonnet", "triple-store-neo4j": import "components/neo4j.jsonnet", + "triple-store-falkordb": import "components/falkordb.jsonnet", "triple-store-memgraph": import "components/memgraph.jsonnet", "llamafile": import "components/llamafile.jsonnet", "ollama": import "components/ollama.jsonnet", @@ -39,6 +40,7 @@ "qdrant": import "components/qdrant.jsonnet", "pinecone": import "components/pinecone.jsonnet", "milvus": import "components/milvus.jsonnet", + "falkordb": import "components/falkordb.jsonnet", "trustgraph": import "components/trustgraph.jsonnet", } diff --git a/templates/components/falkordb.jsonnet b/templates/components/falkordb.jsonnet new file mode 100644 index 00000000..e238cebe --- /dev/null +++ b/templates/components/falkordb.jsonnet @@ -0,0 +1,76 @@ +local base = import "base/base.jsonnet"; +local images = import "values/images.jsonnet"; +local url = import "values/url.jsonnet"; +local falkordb = import "stores/falkordb.jsonnet"; + +falkordb + { + + "falkordb-url":: "falkor://falkordb:6379", + + "store-triples" +: { + + create:: function(engine) + + local container = + engine.container("store-triples") + .with_image(images.trustgraph) + .with_command([ + "triples-write-falkordb", + "-p", + url.pulsar, + "-g", + $["falkordb-url"], + ]) + .with_limits("0.5", "128M") + .with_reservations("0.1", "128M"); + + local containerSet = engine.containers( + "store-triples", [ container ] + ); + + local service = + engine.internalService(containerSet) + .with_port(8080, 8080, "metrics"); + + engine.resources([ + containerSet, + service, + ]) + + }, + + "query-triples" +: { + + create:: function(engine) + + local container = + engine.container("query-triples") + .with_image(images.trustgraph) + .with_command([ + "triples-query-falkordb", + "-p", + url.pulsar, + "-g", + $["falkordb-url"], + ]) + .with_limits("0.5", "128M") + .with_reservations("0.1", "128M"); + + local containerSet = engine.containers( + "query-triples", [ container ] + ); + + local service = + engine.internalService(containerSet) + .with_port(8080, 8080, "metrics"); + + engine.resources([ + containerSet, + service, + ]) + + + } + +} + diff --git a/templates/generate-all b/templates/generate-all index 0b403620..70aa6436 100755 --- a/templates/generate-all +++ b/templates/generate-all @@ -125,7 +125,7 @@ def generate_all(output, version): "azure", "azure-openai", "bedrock", "claude", "cohere", "googleaistudio", "llamafile", "ollama", "openai", "vertexai", ]: - for graph in [ "cassandra", "neo4j" ]: + for graph in [ "cassandra", "neo4j", "falkordb" ]: y = generate_config( llm=model, graph_store=graph, platform=platform, diff --git a/templates/patterns/triple-store-falkordb.jsonnet b/templates/patterns/triple-store-falkordb.jsonnet new file mode 100644 index 00000000..40ef02e2 --- /dev/null +++ b/templates/patterns/triple-store-falkordb.jsonnet @@ -0,0 +1,13 @@ +{ + pattern: { + name: "triple-store-falkordb", + icon: "🖇️🙋‍♀️", + title: "Adds a FalkorDB store configured to act as a triple store.", + description: "GraphRAG processing needs a triple store. This pattern adds a FalkorDB store, along with plumbing so that FalkorDB is integrated with GraphRag indexing and querying.", + requires: ["pulsar", "trustgraph"], + features: ["falkordb", "triple-store"], + args: [], + category: [ "knowledge-graph" ], + }, + module: "components/falkordb.jsonnet", +} diff --git a/templates/patterns/triple-store-neo4j.jsonnet b/templates/patterns/triple-store-neo4j.jsonnet index b8a93e31..b111ebe3 100644 --- a/templates/patterns/triple-store-neo4j.jsonnet +++ b/templates/patterns/triple-store-neo4j.jsonnet @@ -3,7 +3,7 @@ name: "triple-store-neo4j", icon: "🖇️🙋‍♀️", title: "Adds a Neo4j store configured to act as a triple store.", - description: "GraphRAG processing needs a triple store. This pattern adds a Cassandra store, along with plumbing so that Cassandra is integrated with GraphRag indexing and querying.", + description: "GraphRAG processing needs a triple store. This pattern adds a Neo4j store, along with plumbing so that Neo4j is integrated with GraphRag indexing and querying.", requires: ["pulsar", "trustgraph"], features: ["neo4j", "triple-store"], args: [], diff --git a/templates/stores/falkordb.jsonnet b/templates/stores/falkordb.jsonnet new file mode 100644 index 00000000..1c7924a9 --- /dev/null +++ b/templates/stores/falkordb.jsonnet @@ -0,0 +1,39 @@ +local base = import "base/base.jsonnet"; +local images = import "values/images.jsonnet"; + +{ + + "falkordb" +: { + + create:: function(engine) + + local vol = engine.volume("falkordb").with_size("20G"); + + local container = + engine.container("falkordb") + .with_image(images.falkordb) + .with_limits("1.0", "768M") + .with_reservations("0.5", "768M") + .with_port(6379, 6379, "api") + .with_port(3000, 3000, "ui") + .with_volume_mount(vol, "/data"); + + local containerSet = engine.containers( + "falkordb", [ container ] + ); + + local service = + engine.service(containerSet) + .with_port(6379, 6379, "api") + .with_port(3000, 3000, "ui"); + + engine.resources([ + vol, + containerSet, + service, + ]) + + }, + +} + diff --git a/templates/values/images.jsonnet b/templates/values/images.jsonnet index c583815b..7a4ddba7 100644 --- a/templates/values/images.jsonnet +++ b/templates/values/images.jsonnet @@ -13,4 +13,5 @@ local version = import "version.jsonnet"; qdrant: "docker.io/qdrant/qdrant:v1.11.1", memgraph_mage: "docker.io/memgraph/memgraph-mage:1.22-memgraph-2.22", memgraph_lab: "docker.io/memgraph/lab:2.19.1", + falkordb: "falkordb/falkordb:latest" } diff --git a/trustgraph-flow/scripts/triples-query-falkordb b/trustgraph-flow/scripts/triples-query-falkordb new file mode 100755 index 00000000..7f9ab74c --- /dev/null +++ b/trustgraph-flow/scripts/triples-query-falkordb @@ -0,0 +1,6 @@ +#!/usr/bin/env python3 + +from trustgraph.query.triples.falkordb import run + +run() + diff --git a/trustgraph-flow/scripts/triples-write-falkordb b/trustgraph-flow/scripts/triples-write-falkordb new file mode 100755 index 00000000..916ee352 --- /dev/null +++ b/trustgraph-flow/scripts/triples-write-falkordb @@ -0,0 +1,6 @@ +#!/usr/bin/env python3 + +from trustgraph.storage.triples.falkordb import run + +run() + diff --git a/trustgraph-flow/setup.py b/trustgraph-flow/setup.py index a345df5e..30ec0170 100644 --- a/trustgraph-flow/setup.py +++ b/trustgraph-flow/setup.py @@ -60,6 +60,7 @@ setuptools.setup( "jsonschema", "aiohttp", "pinecone[grpc]", + "falkordb", ], scripts=[ "scripts/api-gateway", @@ -104,9 +105,11 @@ setuptools.setup( "scripts/triples-query-cassandra", "scripts/triples-query-neo4j", "scripts/triples-query-memgraph", + "scripts/triples-query-falkordb", "scripts/triples-write-cassandra", "scripts/triples-write-neo4j", "scripts/triples-write-memgraph", + "scripts/triples-write-falkordb", "scripts/wikipedia-lookup", ] ) diff --git a/trustgraph-flow/trustgraph/query/triples/falkordb/__init__.py b/trustgraph-flow/trustgraph/query/triples/falkordb/__init__.py new file mode 100644 index 00000000..ba844705 --- /dev/null +++ b/trustgraph-flow/trustgraph/query/triples/falkordb/__init__.py @@ -0,0 +1,3 @@ + +from . service import * + diff --git a/trustgraph-flow/trustgraph/query/triples/falkordb/__main__.py b/trustgraph-flow/trustgraph/query/triples/falkordb/__main__.py new file mode 100755 index 00000000..89684e3e --- /dev/null +++ b/trustgraph-flow/trustgraph/query/triples/falkordb/__main__.py @@ -0,0 +1,7 @@ +#!/usr/bin/env python3 + +from . hf import run + +if __name__ == '__main__': + run() + diff --git a/trustgraph-flow/trustgraph/query/triples/falkordb/service.py b/trustgraph-flow/trustgraph/query/triples/falkordb/service.py new file mode 100755 index 00000000..43083832 --- /dev/null +++ b/trustgraph-flow/trustgraph/query/triples/falkordb/service.py @@ -0,0 +1,341 @@ + +""" +Triples query service for FalkorDB. +Input is a (s, p, o) triple, some values may be null. Output is a list of +triples. +""" + +from falkordb import FalkorDB + +from .... schema import TriplesQueryRequest, TriplesQueryResponse, Error +from .... schema import Value, Triple +from .... schema import triples_request_queue +from .... schema import triples_response_queue +from .... base import ConsumerProducer + +module = ".".join(__name__.split(".")[1:-1]) + +default_input_queue = triples_request_queue +default_output_queue = triples_response_queue +default_subscriber = module + +default_graph_url = 'falkor://falkordb:6379' +default_database = 'falkordb' + +class Processor(ConsumerProducer): + + def __init__(self, **params): + + input_queue = params.get("input_queue", default_input_queue) + output_queue = params.get("output_queue", default_output_queue) + subscriber = params.get("subscriber", default_subscriber) + graph_url = params.get("graph_host", default_graph_url) + database = params.get("database", default_database) + + super(Processor, self).__init__( + **params | { + "input_queue": input_queue, + "output_queue": output_queue, + "subscriber": subscriber, + "input_schema": TriplesQueryRequest, + "output_schema": TriplesQueryResponse, + "graph_url": graph_url, + } + ) + + self.db = database + + self.io = FalkorDB.from_url(graph_url).select_graph(database) + + def create_value(self, ent): + + if ent.startswith("http://") or ent.startswith("https://"): + return Value(value=ent, is_uri=True) + else: + return Value(value=ent, is_uri=False) + + def handle(self, msg): + + try: + + v = msg.value() + + # Sender-produced ID + id = msg.properties()["id"] + + print(f"Handling input {id}...", flush=True) + + triples = [] + + if v.s is not None: + if v.p is not None: + if v.o is not None: + + # SPO + + records = self.io.query( + "MATCH (src:Node {uri: $src})-[rel:Rel {uri: $rel}]->(dest:Literal {value: $value}) " + "RETURN $src as src", + src=v.s.value, rel=v.p.value, value=v.o.value, + database_=self.db, + ).result_set + + for rec in records: + triples.append((v.s.value, v.p.value, v.o.value)) + + records = self.io.query( + "MATCH (src:Node {uri: $src})-[rel:Rel {uri: $rel}]->(dest:Node {uri: $uri}) " + "RETURN $src as src", + src=v.s.value, rel=v.p.value, uri=v.o.value, + database_=self.db, + ).result_set + + for rec in records: + triples.append((v.s.value, v.p.value, v.o.value)) + + else: + + # SP + + records = self.io.query( + "MATCH (src:Node {uri: $src})-[rel:Rel {uri: $rel}]->(dest:Literal) " + "RETURN dest.value as dest", + src=v.s.value, rel=v.p.value, + database_=self.db, + ).result_set + + for rec in records: + data = rec.data() + triples.append((v.s.value, v.p.value, data["dest"])) + + records = self.io.query( + "MATCH (src:Node {uri: $src})-[rel:Rel {uri: $rel}]->(dest:Node) " + "RETURN dest.uri as dest", + src=v.s.value, rel=v.p.value, + database_=self.db, + ).result_set + + for rec in records: + data = rec.data() + triples.append((v.s.value, v.p.value, data["dest"])) + + else: + + if v.o is not None: + + # SO + + records = self.io.query( + "MATCH (src:Node {uri: $src})-[rel:Rel]->(dest:Literal {value: $value}) " + "RETURN rel.uri as rel", + src=v.s.value, value=v.o.value, + database_=self.db, + ).result_set + + for rec in records: + data = rec.data() + triples.append((v.s.value, data["rel"], v.o.value)) + + records = self.io.query( + "MATCH (src:Node {uri: $src})-[rel:Rel]->(dest:Node {uri: $uri}) " + "RETURN rel.uri as rel", + src=v.s.value, uri=v.o.value, + database_=self.db, + ).result_set + + for rec in records: + data = rec.data() + triples.append((v.s.value, data["rel"], v.o.value)) + + else: + + # S + + records = self.io.query( + "MATCH (src:Node {uri: $src})-[rel:Rel]->(dest:Literal) " + "RETURN rel.uri as rel, dest.value as dest", + src=v.s.value, + database_=self.db, + ).result_set + + for rec in records: + data = rec.data() + triples.append((v.s.value, data["rel"], data["dest"])) + + records = self.io.query( + "MATCH (src:Node {uri: $src})-[rel:Rel]->(dest:Node) " + "RETURN rel.uri as rel, dest.uri as dest", + src=v.s.value, + database_=self.db, + ).result_set + + for rec in records: + data = rec.data() + triples.append((v.s.value, data["rel"], data["dest"])) + + + else: + + if v.p is not None: + + if v.o is not None: + + # PO + + records = self.io.query( + "MATCH (src:Node)-[rel:Rel {uri: $uri}]->(dest:Literal {value: $value}) " + "RETURN src.uri as src", + uri=v.p.value, value=v.o.value, + database_=self.db, + ).result_set + + for rec in records: + data = rec.data() + triples.append((data["src"], v.p.value, v.o.value)) + + records = self.io.query( + "MATCH (src:Node)-[rel:Rel {uri: $uri}]->(dest:Node {uri: $uri}) " + "RETURN src.uri as src", + uri=v.p.value, dest=v.o.value, + database_=self.db, + ).result_set + + for rec in records: + data = rec.data() + triples.append((data["src"], v.p.value, v.o.value)) + + else: + + # P + + records = self.io.query( + "MATCH (src:Node)-[rel:Rel {uri: $uri}]->(dest:Literal) " + "RETURN src.uri as src, dest.value as dest", + uri=v.p.value, + database_=self.db, + ).result_set + + for rec in records: + data = rec.data() + triples.append((data["src"], v.p.value, data["dest"])) + + records = self.io.query( + "MATCH (src:Node)-[rel:Rel {uri: $uri}]->(dest:Node) " + "RETURN src.uri as src, dest.uri as dest", + uri=v.p.value, + database_=self.db, + ).result_set + + for rec in records: + data = rec.data() + triples.append((data["src"], v.p.value, data["dest"])) + + else: + + if v.o is not None: + + # O + + records = self.io.query( + "MATCH (src:Node)-[rel:Rel]->(dest:Literal {value: $value}) " + "RETURN src.uri as src, rel.uri as rel", + value=v.o.value, + database_=self.db, + ).result_set + + for rec in records: + data = rec.data() + triples.append((data["src"], data["rel"], v.o.value)) + + records = self.io.query( + "MATCH (src:Node)-[rel:Rel]->(dest:Node {uri: $uri}) " + "RETURN src.uri as src, rel.uri as rel", + uri=v.o.value, + database_=self.db, + ).result_set + + for rec in records: + data = rec.data() + triples.append((data["src"], data["rel"], v.o.value)) + + else: + + # * + + records = self.io.query( + "MATCH (src:Node)-[rel:Rel]->(dest:Literal) " + "RETURN src.uri as src, rel.uri as rel, dest.value as dest", + database_=self.db, + ).result_set + + for rec in records: + data = rec.data() + triples.append((data["src"], data["rel"], data["dest"])) + + records = self.io.query( + "MATCH (src:Node)-[rel:Rel]->(dest:Node) " + "RETURN src.uri as src, rel.uri as rel, dest.uri as dest", + database_=self.db, + ).result_set + + for rec in records: + data = rec.data() + triples.append((data["src"], data["rel"], data["dest"])) + + triples = [ + Triple( + s=self.create_value(t[0]), + p=self.create_value(t[1]), + o=self.create_value(t[2]) + ) + for t in triples + ] + + print("Send response...", flush=True) + r = TriplesQueryResponse(triples=triples, error=None) + self.producer.send(r, properties={"id": id}) + + print("Done.", flush=True) + + except Exception as e: + + print(f"Exception: {e}") + + print("Send error response...", flush=True) + + r = TriplesQueryResponse( + error=Error( + type = "llm-error", + message = str(e), + ), + response=None, + ) + + self.producer.send(r, properties={"id": id}) + + self.consumer.acknowledge(msg) + + @staticmethod + def add_args(parser): + + ConsumerProducer.add_args( + parser, default_input_queue, default_subscriber, + default_output_queue, + ) + + parser.add_argument( + '-g', '--graph-url', + default=default_graph_url, + help=f'Graph url (default: {default_graph_url})' + ) + + parser.add_argument( + '--database', + default=default_database, + help=f'FalkorDB database (default: {default_database})' + ) + +def run(): + + Processor.start(module, __doc__) + diff --git a/trustgraph-flow/trustgraph/storage/triples/falkordb/__init__.py b/trustgraph-flow/trustgraph/storage/triples/falkordb/__init__.py new file mode 100644 index 00000000..d891d55f --- /dev/null +++ b/trustgraph-flow/trustgraph/storage/triples/falkordb/__init__.py @@ -0,0 +1,3 @@ + +from . write import * + diff --git a/trustgraph-flow/trustgraph/storage/triples/falkordb/__main__.py b/trustgraph-flow/trustgraph/storage/triples/falkordb/__main__.py new file mode 100755 index 00000000..c05d8c6d --- /dev/null +++ b/trustgraph-flow/trustgraph/storage/triples/falkordb/__main__.py @@ -0,0 +1,7 @@ +#!/usr/bin/env python3 + +from . write import run + +if __name__ == '__main__': + run() + diff --git a/trustgraph-flow/trustgraph/storage/triples/falkordb/write.py b/trustgraph-flow/trustgraph/storage/triples/falkordb/write.py new file mode 100755 index 00000000..9fb1e0ff --- /dev/null +++ b/trustgraph-flow/trustgraph/storage/triples/falkordb/write.py @@ -0,0 +1,150 @@ + +""" +Graph writer. Input is graph edge. Writes edges to FalkorDB graph. +""" + +import pulsar +import base64 +import os +import argparse +import time + +from falkordb import FalkorDB + +from .... schema import Triples +from .... schema import triples_store_queue +from .... log_level import LogLevel +from .... base import Consumer + +module = ".".join(__name__.split(".")[1:-1]) + +default_input_queue = triples_store_queue +default_subscriber = module + +default_graph_url = 'falkor://falkordb:6379' +default_database = 'falkordb' + +class Processor(Consumer): + + def __init__(self, **params): + + input_queue = params.get("input_queue", default_input_queue) + subscriber = params.get("subscriber", default_subscriber) + graph_url = params.get("graph_host", default_graph_url) + database = params.get("database", default_database) + + super(Processor, self).__init__( + **params | { + "input_queue": input_queue, + "subscriber": subscriber, + "input_schema": Triples, + "graph_url": graph_url, + } + ) + + self.db = database + + self.io = FalkorDB.from_url(graph_url).select_graph(database) + + def create_node(self, uri): + + print("Create node", uri) + + res = self.io.query( + "MERGE (n:Node {uri: $uri})", + uri=uri, + database_=self.db, + ) + + print("Created {nodes_created} nodes in {time} ms.".format( + nodes_created=res.nodes_created, + time=res.run_time_ms + )) + + def create_literal(self, value): + + print("Create literal", value) + + res = self.io.query( + "MERGE (n:Literal {value: $value})", + value=value, + database_=self.db, + ) + + print("Created {nodes_created} nodes in {time} ms.".format( + nodes_created=res.nodes_created, + time=res.run_time_ms + )) + + def relate_node(self, src, uri, dest): + + print("Create node rel", src, uri, dest) + + res = self.io.query( + "MATCH (src:Node {uri: $src}) " + "MATCH (dest:Node {uri: $dest}) " + "MERGE (src)-[:Rel {uri: $uri}]->(dest)", + src=src, dest=dest, uri=uri, + database_=self.db, + ) + + print("Created {nodes_created} nodes in {time} ms.".format( + nodes_created=res.nodes_created, + time=res.run_time_ms + )) + + def relate_literal(self, src, uri, dest): + + print("Create literal rel", src, uri, dest) + + res = self.io.query( + "MATCH (src:Node {uri: $src}) " + "MATCH (dest:Literal {value: $dest}) " + "MERGE (src)-[:Rel {uri: $uri}]->(dest)", + src=src, dest=dest, uri=uri, + database_=self.db, + ) + + print("Created {nodes_created} nodes in {time} ms.".format( + nodes_created=res.nodes_created, + time=res.run_time_ms + )) + + def handle(self, msg): + + v = msg.value() + + for t in v.triples: + + self.create_node(t.s.value) + + if t.o.is_uri: + self.create_node(t.o.value) + self.relate_node(t.s.value, t.p.value, t.o.value) + else: + self.create_literal(t.o.value) + self.relate_literal(t.s.value, t.p.value, t.o.value) + + @staticmethod + def add_args(parser): + + Consumer.add_args( + parser, default_input_queue, default_subscriber, + ) + + parser.add_argument( + '-g', '--graph_host', + default=default_graph_url, + help=f'Graph host (default: {default_graph_url})' + ) + + parser.add_argument( + '--database', + default=default_database, + help=f'FalkorDB database (default: {default_database})' + ) + +def run(): + + Processor.start(module, __doc__) + diff --git a/trustgraph-flow/trustgraph/storage/triples/neo4j/write.py b/trustgraph-flow/trustgraph/storage/triples/neo4j/write.py index 929333e5..1aa25aa8 100755 --- a/trustgraph-flow/trustgraph/storage/triples/neo4j/write.py +++ b/trustgraph-flow/trustgraph/storage/triples/neo4j/write.py @@ -1,6 +1,6 @@ """ -Graph writer. Input is graph edge. Writes edges to Cassandra graph. +Graph writer. Input is graph edge. Writes edges to Neo4j graph. """ import pulsar From 03b6b457252b06c3229ea8fbc61f7fe819152ed1 Mon Sep 17 00:00:00 2001 From: cybermaggedon Date: Thu, 19 Dec 2024 17:32:05 +0000 Subject: [PATCH 07/13] - Fix FalkorDB query API invocations (#214) - Shift FalkorDB internal web manager to be port 3010 so doesn't clash with Grafana. --- templates/stores/falkordb.jsonnet | 4 +- templates/values/images.jsonnet | 2 +- .../query/triples/falkordb/service.py | 130 ++++++++++-------- .../storage/triples/falkordb/write.py | 24 ++-- 4 files changed, 88 insertions(+), 72 deletions(-) diff --git a/templates/stores/falkordb.jsonnet b/templates/stores/falkordb.jsonnet index 1c7924a9..78509a43 100644 --- a/templates/stores/falkordb.jsonnet +++ b/templates/stores/falkordb.jsonnet @@ -15,7 +15,7 @@ local images = import "values/images.jsonnet"; .with_limits("1.0", "768M") .with_reservations("0.5", "768M") .with_port(6379, 6379, "api") - .with_port(3000, 3000, "ui") + .with_port(3010, 3000, "ui") .with_volume_mount(vol, "/data"); local containerSet = engine.containers( @@ -25,7 +25,7 @@ local images = import "values/images.jsonnet"; local service = engine.service(containerSet) .with_port(6379, 6379, "api") - .with_port(3000, 3000, "ui"); + .with_port(3010, 3010, "ui"); engine.resources([ vol, diff --git a/templates/values/images.jsonnet b/templates/values/images.jsonnet index 7a4ddba7..7fdf4f4a 100644 --- a/templates/values/images.jsonnet +++ b/templates/values/images.jsonnet @@ -13,5 +13,5 @@ local version = import "version.jsonnet"; qdrant: "docker.io/qdrant/qdrant:v1.11.1", memgraph_mage: "docker.io/memgraph/memgraph-mage:1.22-memgraph-2.22", memgraph_lab: "docker.io/memgraph/lab:2.19.1", - falkordb: "falkordb/falkordb:latest" + falkordb: "docker.io/falkordb/falkordb:latest" } diff --git a/trustgraph-flow/trustgraph/query/triples/falkordb/service.py b/trustgraph-flow/trustgraph/query/triples/falkordb/service.py index 43083832..1d77bb15 100755 --- a/trustgraph-flow/trustgraph/query/triples/falkordb/service.py +++ b/trustgraph-flow/trustgraph/query/triples/falkordb/service.py @@ -76,8 +76,11 @@ class Processor(ConsumerProducer): records = self.io.query( "MATCH (src:Node {uri: $src})-[rel:Rel {uri: $rel}]->(dest:Literal {value: $value}) " "RETURN $src as src", - src=v.s.value, rel=v.p.value, value=v.o.value, - database_=self.db, + params={ + "src": v.s.value, + "rel": v.p.value, + "value": v.o.value, + }, ).result_set for rec in records: @@ -86,8 +89,11 @@ class Processor(ConsumerProducer): records = self.io.query( "MATCH (src:Node {uri: $src})-[rel:Rel {uri: $rel}]->(dest:Node {uri: $uri}) " "RETURN $src as src", - src=v.s.value, rel=v.p.value, uri=v.o.value, - database_=self.db, + params={ + "src": v.s.value, + "rel": v.p.value, + "uri": v.o.value, + }, ).result_set for rec in records: @@ -100,24 +106,26 @@ class Processor(ConsumerProducer): records = self.io.query( "MATCH (src:Node {uri: $src})-[rel:Rel {uri: $rel}]->(dest:Literal) " "RETURN dest.value as dest", - src=v.s.value, rel=v.p.value, - database_=self.db, + params={ + "src": v.s.value, + "rel": v.p.value, + }, ).result_set for rec in records: - data = rec.data() - triples.append((v.s.value, v.p.value, data["dest"])) + triples.append((v.s.value, v.p.value, rec[0])) records = self.io.query( "MATCH (src:Node {uri: $src})-[rel:Rel {uri: $rel}]->(dest:Node) " "RETURN dest.uri as dest", - src=v.s.value, rel=v.p.value, - database_=self.db, + params={ + "src": v.s.value, + "rel": v.p.value, + }, ).result_set for rec in records: - data = rec.data() - triples.append((v.s.value, v.p.value, data["dest"])) + triples.append((v.s.value, v.p.value, rec[0])) else: @@ -128,50 +136,52 @@ class Processor(ConsumerProducer): records = self.io.query( "MATCH (src:Node {uri: $src})-[rel:Rel]->(dest:Literal {value: $value}) " "RETURN rel.uri as rel", - src=v.s.value, value=v.o.value, - database_=self.db, + params={ + "src": v.s.value, + "value": v.o.value, + }, ).result_set for rec in records: - data = rec.data() - triples.append((v.s.value, data["rel"], v.o.value)) + triples.append((v.s.value, rec[0], v.o.value)) records = self.io.query( "MATCH (src:Node {uri: $src})-[rel:Rel]->(dest:Node {uri: $uri}) " "RETURN rel.uri as rel", - src=v.s.value, uri=v.o.value, - database_=self.db, + params={ + "src": v.s.value, + "uri": v.o.value, + }, ).result_set for rec in records: - data = rec.data() - triples.append((v.s.value, data["rel"], v.o.value)) + triples.append((v.s.value, rec[0], v.o.value)) else: - # S + # s records = self.io.query( - "MATCH (src:Node {uri: $src})-[rel:Rel]->(dest:Literal) " - "RETURN rel.uri as rel, dest.value as dest", - src=v.s.value, - database_=self.db, + "match (src:node {uri: $src})-[rel:rel]->(dest:literal) " + "return rel.uri as rel, dest.value as dest", + params={ + "src": v.s.value, + }, ).result_set for rec in records: - data = rec.data() - triples.append((v.s.value, data["rel"], data["dest"])) + triples.append((v.s.value, rec[0], rec[1])) records = self.io.query( "MATCH (src:Node {uri: $src})-[rel:Rel]->(dest:Node) " "RETURN rel.uri as rel, dest.uri as dest", - src=v.s.value, - database_=self.db, + params={ + "src": v.s.value, + }, ).result_set for rec in records: - data = rec.data() - triples.append((v.s.value, data["rel"], data["dest"])) + triples.append((v.s.value, rec[0], rec[1])) else: @@ -185,24 +195,26 @@ class Processor(ConsumerProducer): records = self.io.query( "MATCH (src:Node)-[rel:Rel {uri: $uri}]->(dest:Literal {value: $value}) " "RETURN src.uri as src", - uri=v.p.value, value=v.o.value, - database_=self.db, + params={ + "uri": v.p.value, + "value": v.o.value, + }, ).result_set for rec in records: - data = rec.data() - triples.append((data["src"], v.p.value, v.o.value)) + triples.append((rec[0], v.p.value, v.o.value)) records = self.io.query( "MATCH (src:Node)-[rel:Rel {uri: $uri}]->(dest:Node {uri: $uri}) " "RETURN src.uri as src", - uri=v.p.value, dest=v.o.value, - database_=self.db, + params={ + "uri": v.p.value, + "dest": v.o.value, + }, ).result_set for rec in records: - data = rec.data() - triples.append((data["src"], v.p.value, v.o.value)) + triples.append((rec[0], v.p.value, v.o.value)) else: @@ -211,24 +223,24 @@ class Processor(ConsumerProducer): records = self.io.query( "MATCH (src:Node)-[rel:Rel {uri: $uri}]->(dest:Literal) " "RETURN src.uri as src, dest.value as dest", - uri=v.p.value, - database_=self.db, + params={ + "uri": v.p.value, + }, ).result_set for rec in records: - data = rec.data() - triples.append((data["src"], v.p.value, data["dest"])) + triples.append((rec[0], v.p.value, rec[1])) records = self.io.query( "MATCH (src:Node)-[rel:Rel {uri: $uri}]->(dest:Node) " "RETURN src.uri as src, dest.uri as dest", - uri=v.p.value, - database_=self.db, + params={ + "uri": v.p.value, + }, ).result_set for rec in records: - data = rec.data() - triples.append((data["src"], v.p.value, data["dest"])) + triples.append((rec[0], v.p.value, rec[1])) else: @@ -239,24 +251,24 @@ class Processor(ConsumerProducer): records = self.io.query( "MATCH (src:Node)-[rel:Rel]->(dest:Literal {value: $value}) " "RETURN src.uri as src, rel.uri as rel", - value=v.o.value, - database_=self.db, + params={ + "value": v.o.value, + }, ).result_set for rec in records: - data = rec.data() - triples.append((data["src"], data["rel"], v.o.value)) + triples.append((rec[0], rec[1], v.o.value)) records = self.io.query( "MATCH (src:Node)-[rel:Rel]->(dest:Node {uri: $uri}) " "RETURN src.uri as src, rel.uri as rel", - uri=v.o.value, - database_=self.db, + params={ + "uri": v.o.value, + }, ).result_set for rec in records: - data = rec.data() - triples.append((data["src"], data["rel"], v.o.value)) + triples.append((rec[0], rec[1], v.o.value)) else: @@ -265,22 +277,18 @@ class Processor(ConsumerProducer): records = self.io.query( "MATCH (src:Node)-[rel:Rel]->(dest:Literal) " "RETURN src.uri as src, rel.uri as rel, dest.value as dest", - database_=self.db, ).result_set for rec in records: - data = rec.data() - triples.append((data["src"], data["rel"], data["dest"])) + triples.append((rec[0], rec[1], rec[2])) records = self.io.query( "MATCH (src:Node)-[rel:Rel]->(dest:Node) " "RETURN src.uri as src, rel.uri as rel, dest.uri as dest", - database_=self.db, ).result_set for rec in records: - data = rec.data() - triples.append((data["src"], data["rel"], data["dest"])) + triples.append((rec[0], rec[1], rec[2])) triples = [ Triple( diff --git a/trustgraph-flow/trustgraph/storage/triples/falkordb/write.py b/trustgraph-flow/trustgraph/storage/triples/falkordb/write.py index 9fb1e0ff..3c7d1660 100755 --- a/trustgraph-flow/trustgraph/storage/triples/falkordb/write.py +++ b/trustgraph-flow/trustgraph/storage/triples/falkordb/write.py @@ -52,8 +52,9 @@ class Processor(Consumer): res = self.io.query( "MERGE (n:Node {uri: $uri})", - uri=uri, - database_=self.db, + params={ + "uri": uri, + }, ) print("Created {nodes_created} nodes in {time} ms.".format( @@ -67,8 +68,9 @@ class Processor(Consumer): res = self.io.query( "MERGE (n:Literal {value: $value})", - value=value, - database_=self.db, + params={ + "value": value, + }, ) print("Created {nodes_created} nodes in {time} ms.".format( @@ -84,8 +86,11 @@ class Processor(Consumer): "MATCH (src:Node {uri: $src}) " "MATCH (dest:Node {uri: $dest}) " "MERGE (src)-[:Rel {uri: $uri}]->(dest)", - src=src, dest=dest, uri=uri, - database_=self.db, + params={ + "src": src, + "dest": dest, + "uri": uri, + }, ) print("Created {nodes_created} nodes in {time} ms.".format( @@ -101,8 +106,11 @@ class Processor(Consumer): "MATCH (src:Node {uri: $src}) " "MATCH (dest:Literal {value: $dest}) " "MERGE (src)-[:Rel {uri: $uri}]->(dest)", - src=src, dest=dest, uri=uri, - database_=self.db, + params={ + "src": src, + "dest": dest, + "uri": uri, + }, ) print("Created {nodes_created} nodes in {time} ms.".format( From 317ae3186ae498719b487227dea584b357cb5bca Mon Sep 17 00:00:00 2001 From: cybermaggedon Date: Thu, 19 Dec 2024 21:29:53 +0000 Subject: [PATCH 08/13] Rename /api/v1/mux to /api/v1/socket (#215) --- trustgraph-flow/trustgraph/gateway/mux.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/trustgraph-flow/trustgraph/gateway/mux.py b/trustgraph-flow/trustgraph/gateway/mux.py index cd5ddfba..74797069 100644 --- a/trustgraph-flow/trustgraph/gateway/mux.py +++ b/trustgraph-flow/trustgraph/gateway/mux.py @@ -13,7 +13,7 @@ class MuxEndpoint(SocketEndpoint): def __init__( self, pulsar_host, auth, services, - path="/api/v1/mux", + path="/api/v1/socket", ): super(MuxEndpoint, self).__init__( From f145d5c3241d4ddb663e909b4af4558d264b773e Mon Sep 17 00:00:00 2001 From: cybermaggedon Date: Fri, 20 Dec 2024 00:45:34 +0000 Subject: [PATCH 09/13] Add workbench-ui (#216) --- templates/components.jsonnet | 1 + templates/components/workbench-ui.jsonnet | 32 +++++++++++++++++++ templates/values/images.jsonnet | 3 +- .../storage/triples/memgraph/write.py | 2 +- 4 files changed, 36 insertions(+), 2 deletions(-) create mode 100644 templates/components/workbench-ui.jsonnet diff --git a/templates/components.jsonnet b/templates/components.jsonnet index d3a4a112..b14665d6 100644 --- a/templates/components.jsonnet +++ b/templates/components.jsonnet @@ -29,6 +29,7 @@ "vector-store-qdrant": import "components/qdrant.jsonnet", "vector-store-pinecone": import "components/pinecone.jsonnet", "vertexai": import "components/vertexai.jsonnet", + "workbench-ui": import "components/workbench-ui.jsonnet", "null": {}, "agent-manager-react": import "components/agent-manager-react.jsonnet", diff --git a/templates/components/workbench-ui.jsonnet b/templates/components/workbench-ui.jsonnet new file mode 100644 index 00000000..f2048e47 --- /dev/null +++ b/templates/components/workbench-ui.jsonnet @@ -0,0 +1,32 @@ +local images = import "values/images.jsonnet"; + +{ + + "workbench-ui" +: { + + create:: function(engine) + + local container = + engine.container("workbench-ui") + .with_image(images["workbench-ui"]) + .with_limits("0.1", "256M") + .with_reservations("0.1", "256M") + .with_port(8888, 8888, "ui"); + + local containerSet = engine.containers( + "workbench-ui", [ container ] + ); + + local service = + engine.internalService(containerSet) + .with_port(8888, 8888, "ui"); + + engine.resources([ + containerSet, + service, + ]) + + }, + +} + diff --git a/templates/values/images.jsonnet b/templates/values/images.jsonnet index 7fdf4f4a..df724938 100644 --- a/templates/values/images.jsonnet +++ b/templates/values/images.jsonnet @@ -13,5 +13,6 @@ local version = import "version.jsonnet"; qdrant: "docker.io/qdrant/qdrant:v1.11.1", memgraph_mage: "docker.io/memgraph/memgraph-mage:1.22-memgraph-2.22", memgraph_lab: "docker.io/memgraph/lab:2.19.1", - falkordb: "docker.io/falkordb/falkordb:latest" + falkordb: "docker.io/falkordb/falkordb:latest", + "workbench-ui": "docker.io/trustgraph/workbench-ui:0.1.4", } diff --git a/trustgraph-flow/trustgraph/storage/triples/memgraph/write.py b/trustgraph-flow/trustgraph/storage/triples/memgraph/write.py index 17e8c67e..f106170a 100755 --- a/trustgraph-flow/trustgraph/storage/triples/memgraph/write.py +++ b/trustgraph-flow/trustgraph/storage/triples/memgraph/write.py @@ -1,6 +1,6 @@ """ -Graph writer. Input is graph edge. Writes edges to Cassandra graph. +Graph writer. Input is graph edge. Writes edges to Memgraph. """ import pulsar From 62d25effd5f49178be4f9befc4a6670bc0ddbf35 Mon Sep 17 00:00:00 2001 From: Cyber MacGeddon Date: Fri, 20 Dec 2024 10:16:25 +0000 Subject: [PATCH 10/13] Fix pipeline --- .github/workflows/release.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/release.yaml b/.github/workflows/release.yaml index 88b13db4..1b6dc177 100644 --- a/.github/workflows/release.yaml +++ b/.github/workflows/release.yaml @@ -30,7 +30,7 @@ jobs: - name: Log in to Docker Hub uses: docker/login-action@f4ef78c080cd8ba55a85445d5b36e214a81df20a with: - username: ${{ secrets.DOCKER_USERNAME }} + username: ${{ vars.DOCKER_USERNAME }} password: ${{ secrets.DOCKER_SECRET }} - name: Install build dependencies From 7f5296feca7024d649b853cbf1a951aabd78f4d0 Mon Sep 17 00:00:00 2001 From: cybermaggedon Date: Fri, 27 Dec 2024 10:34:16 +0000 Subject: [PATCH 11/13] Refactor socket threading (#219) * Multiple requests can be handled in parallel. * Refactor to fix timeout issue. --- trustgraph-flow/trustgraph/gateway/mux.py | 119 +++++++++++++++---- trustgraph-flow/trustgraph/gateway/socket.py | 29 ++--- 2 files changed, 104 insertions(+), 44 deletions(-) diff --git a/trustgraph-flow/trustgraph/gateway/mux.py b/trustgraph-flow/trustgraph/gateway/mux.py index 74797069..ae699ae6 100644 --- a/trustgraph-flow/trustgraph/gateway/mux.py +++ b/trustgraph-flow/trustgraph/gateway/mux.py @@ -8,6 +8,13 @@ from aiohttp import web, WSMsgType from . socket import SocketEndpoint from . text_completion import TextCompletionRequestor +MAX_OUTSTANDING_REQUESTS = 15 +WORKER_CLOSE_WAIT = 0.01 +START_REQUEST_WAIT = 0.1 + +# This buffers requests until task start, so short-lived +MAX_QUEUE_SIZE = 10 + class MuxEndpoint(SocketEndpoint): def __init__( @@ -20,53 +27,113 @@ class MuxEndpoint(SocketEndpoint): endpoint_path=path, auth=auth, ) - self.q = asyncio.Queue(maxsize=10) - self.services = services async def start(self): pass - async def async_thread(self, ws, running): + async def maybe_tidy_workers(self, workers): + + while True: + + try: + + await asyncio.wait_for( + asyncio.shield(workers[0]), + WORKER_CLOSE_WAIT + ) + + # worker[0] now stopped + # FIXME: Delete reference??? + + workers.pop(0) + + if len(workers) == 0: + break + + # Loop iterates to try the next worker + + except TimeoutError: + # worker[0] still running, move on + break + + async def start_request_task(self, ws, id, svc, request, workers): + + if svc not in self.services: + await ws.send_json({"id": id, "error": "Service not recognised"}) + return + + requestor = self.services[svc] + + async def responder(resp, fin): + await ws.send_json({ + "id": id, + "response": resp, + "complete": fin, + }) + + # Wait for outstanding requests to go below MAX_OUTSTANDING_REQUESTS + while len(workers) > MAX_OUTSTANDING_REQUESTS: + + # Fixes deadlock + # FIXME: Put it in its own loop + await asyncio.sleep(START_REQUEST_WAIT) + + await self.maybe_tidy_workers(workers) + + worker = asyncio.create_task( + requestor.process(request, responder) + ) + + workers.append(worker) + + async def async_thread(self, ws, running, q): + + # Worker threads, servicing + workers = [] while running.get(): try: - id, svc, request = await asyncio.wait_for(self.q.get(), 1) + + if len(workers) > 0: + await self.maybe_tidy_workers(workers) + + # Get next request on queue + id, svc, request = await asyncio.wait_for(q.get(), 1) + except TimeoutError: continue + except Exception as e: + # This is an internal working error, may not be recoverable + print("Exception:", e) await ws.send_json({"id": id, "error": str(e)}) + break try: - - print(svc, request) - - requestor = self.services[svc] - - async def responder(resp, fin): - await ws.send_json({ - "id": id, - "response": resp, - "complete": fin, - }) - - resp = await requestor.process(request, responder) + print(id, svc, request) + await self.start_request_task(ws, id, svc, request, workers) except Exception as e: - + print("Exception2:", e) await ws.send_json({"error": str(e)}) running.stop() async def listener(self, ws, running): + + # The outstanding request queue, max size is MAX_QUEUE_SIZE + q = asyncio.Queue(maxsize=MAX_QUEUE_SIZE) + + async_task = asyncio.create_task(self.async_thread( + ws, running, q + )) async for msg in ws: # On error, finish - if msg.type == WSMsgType.ERROR: - break - else: + if msg.type == WSMsgType.TEXT: try: @@ -81,7 +148,7 @@ class MuxEndpoint(SocketEndpoint): if "id" not in data: raise RuntimeError("Bad message") - await self.q.put( + await q.put( (data["id"], data["service"], data["request"]) ) @@ -90,5 +157,13 @@ class MuxEndpoint(SocketEndpoint): await ws.send_json({"error": str(e)}) continue + elif msg.type == WSMsgType.ERROR: + break + elif msg.type == WSMsgType.CLOSE: + break + else: + break + running.stop() + await async_task diff --git a/trustgraph-flow/trustgraph/gateway/socket.py b/trustgraph-flow/trustgraph/gateway/socket.py index 869792b7..fd408d7b 100644 --- a/trustgraph-flow/trustgraph/gateway/socket.py +++ b/trustgraph-flow/trustgraph/gateway/socket.py @@ -22,25 +22,16 @@ class SocketEndpoint: async for msg in ws: # On error, finish - if msg.type == WSMsgType.ERROR: - break + if msg.type == WSMsgType.TEXT: + # Ignore incoming message + continue + elif msg.type == WSMsgType.BINARY: + # Ignore incoming message + continue else: - # Ignore incoming messages - pass + break running.stop() - - async def async_thread(self, ws, running): - - while running.get(): - try: - await asyncio.sleep(1) - - except TimeoutError: - continue - - except Exception as e: - print(f"Exception: {str(e)}", flush=True) async def handle(self, request): @@ -56,12 +47,8 @@ class SocketEndpoint: ws = web.WebSocketResponse() await ws.prepare(request) - task = asyncio.create_task(self.async_thread(ws, running)) - try: - await self.listener(ws, running) - except Exception as e: print(e, flush=True) @@ -69,8 +56,6 @@ class SocketEndpoint: await ws.close() - await task - return ws async def start(self): From 74ea3b8b965048b1cff9d90bb1ab7818e12127c1 Mon Sep 17 00:00:00 2001 From: cybermaggedon Date: Sat, 28 Dec 2024 11:33:35 +0000 Subject: [PATCH 12/13] Upgrade workbench-ui to 0.1.6 (#220) --- templates/values/images.jsonnet | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/templates/values/images.jsonnet b/templates/values/images.jsonnet index df724938..40954289 100644 --- a/templates/values/images.jsonnet +++ b/templates/values/images.jsonnet @@ -14,5 +14,5 @@ local version = import "version.jsonnet"; memgraph_mage: "docker.io/memgraph/memgraph-mage:1.22-memgraph-2.22", memgraph_lab: "docker.io/memgraph/lab:2.19.1", falkordb: "docker.io/falkordb/falkordb:latest", - "workbench-ui": "docker.io/trustgraph/workbench-ui:0.1.4", + "workbench-ui": "docker.io/trustgraph/workbench-ui:0.1.6", } From 2d3802e001b7733fb41b014af6e858d7fa833e79 Mon Sep 17 00:00:00 2001 From: cybermaggedon Date: Sat, 28 Dec 2024 16:59:11 +0000 Subject: [PATCH 13/13] API documentation (#221) --- README.md | 6 +- docs/apis/README.md | 72 ++++++++++++ docs/apis/api-agent.md | 135 +++++++++++++++++++++ docs/apis/api-document-load.md | 3 + docs/apis/api-embeddings.md | 106 +++++++++++++++++ docs/apis/api-graph-embeddings.md | 155 +++++++++++++++++++++++++ docs/apis/api-graph-rag.md | 97 ++++++++++++++++ docs/apis/api-prompt.md | 140 ++++++++++++++++++++++ docs/apis/api-text-completion.md | 105 +++++++++++++++++ docs/apis/api-triples-query.md | 187 ++++++++++++++++++++++++++++++ docs/apis/pulsar.md | 3 + docs/apis/websocket.md | 136 ++++++++++++++++++++++ prometheus/prometheus.yml | 38 +++--- 13 files changed, 1163 insertions(+), 20 deletions(-) create mode 100644 docs/apis/README.md create mode 100644 docs/apis/api-agent.md create mode 100644 docs/apis/api-document-load.md create mode 100644 docs/apis/api-embeddings.md create mode 100644 docs/apis/api-graph-embeddings.md create mode 100644 docs/apis/api-graph-rag.md create mode 100644 docs/apis/api-prompt.md create mode 100644 docs/apis/api-text-completion.md create mode 100644 docs/apis/api-triples-query.md create mode 100644 docs/apis/pulsar.md create mode 100644 docs/apis/websocket.md diff --git a/README.md b/README.md index 7f83b8ee..5c8ec2eb 100644 --- a/README.md +++ b/README.md @@ -125,9 +125,13 @@ Once the knowledge graph has been built or a knowledge core has been loaded, Gra tg-query-graph-rag -q "Write a blog post about the 5 key takeaways from SB1047 and how they will impact AI development." ``` +## API documentation + +[Developing on TrustGraph using APIs](docs/api/README.md) + ## Deploy and Manage TrustGraph -[🚀 Full Deployment Guide 🚀](https://trustgraph.ai/docs/getstarted) +[🚀🙏 Full Deployment Guide 🚀🙏](https://trustgraph.ai/docs/getstarted) ## TrustGraph Developer's Guide diff --git a/docs/apis/README.md b/docs/apis/README.md new file mode 100644 index 00000000..ea14926a --- /dev/null +++ b/docs/apis/README.md @@ -0,0 +1,72 @@ + +# TrustGraph APIs + +## Overview + +If you want to interact with TrustGraph through APIs, there are 3 +forms of API which may be of interest to you: + +### Pulsar APIs + +Apache Pulsar is a pub/sub system used to deliver messages between TrustGraph +components. Using Pulsar, you can communicate with TrustGraph components. + +Pros: + - Provides complete access to all TrustGraph functionality + - Simple integration with metrics and observability + +Cons: + - Integration is non-trivial, requires a special-purpose Pulsar client + library + - The Pulsar interfaces are likely something that you would not want to + expose outside of the processing cluster in a production or well-secured + deployment + +### REST APIs + +A component, `api-gateway`, provides a bridge between Pulsar internals and +the REST API which allows many services to be invoked using REST APIs. + +Pros: + - Uses standard REST approach can be easily integrated into many kinds + of technology + - Can be easily protected with authentication and TLS for production-grade + or secure deployments + +Cons: + - For a complex application, a long series of REST invocations has + latency and performance overheads - HTTP has limits on the number + of concurrent service invocations + - Lower coverage of functionality - service interfaces need to be added to + `api-gateway` to permit REST invocation + +### Websocket API + +The `api-gateway` component also provides access to services through a +websocket API. + +Pros: + - Usable through a standard websocket library + - Can be easily protected with authentication and TLS for production-grade + or secure deployments + - Supports concurrent service invocations + +Cons: + - Websocket service invocation is a little more complex to develop than + using a basic REST API, particular if you want to cover all of the error + scenarios well + +## See also + +- [TrustGraph websocket overview](websocket.md) +- [TrustGraph Pulsar overview](pulsar.md) +- API details + - [Text completion](api-text-completion.md) + - [Prompt completion](api-prompt.md) + - [Graph RAG](api-graph-rag.md) + - [Agent](api-agent.md) + - [Embeddings](api-embeddings.md) + - [Graph embeddings](api-graph-embeddings.md) + - [Triples query](api-triples-query.md) + - [Document load](api-document-load.md) + diff --git a/docs/apis/api-agent.md b/docs/apis/api-agent.md new file mode 100644 index 00000000..99e28a26 --- /dev/null +++ b/docs/apis/api-agent.md @@ -0,0 +1,135 @@ + +# TrustGraph Agent API + +The REST service provides incomplete functionality: The agent service +is able to provide multi-part responses containing 'thought' and +'observation' messages as the agent manager iterates over resolution of the +question. These responses are provided in the websocket, but not the REST +API. + +## Request/response + +### Request + +The request contains the following fields: +- `question`: A string, the question which the agent API must resolve +- `plan`: Optional, not used +- `state`: Optional, not used + +### Response + +The request contains the following fields: +- `thought`: Optional, a string, provides an interim agent thought +- `observation`: Optional, a string, provides an interim agent thought +- `answer`: Optional, a string, provides the final answer + +## REST service + +The REST service accepts a request object containing the question field. +The response is a JSON object containing the `answer` field. Interim +responses are not provided. + +e.g. + +Request: +``` +{ + "question": "What does NASA stand for?" +} +``` + +Response: + +``` +{ + "answer": "National Aeronautics and Space Administration" +} +``` + +## Websocket + +Agent requests have a `request` object containing the `question` field. +Responses have a `response` object containing `thought`, `observation` +and `answer` fields in multi-part responses. The final `answer` response +has `complete` set to `true`. + +e.g. + +Request: + +``` +{ + "id": "blrqotfefnmnh7de-20", + "service": "agent", + "request": { + "question": "What does NASA stand for?" + } +} +``` + +Responses: + +``` +{ + "id": "blrqotfefnmnh7de-20", + "response": { + "thought": "I need to query a knowledge base" + }, + "complete": false +} +``` + +``` +{ + "id": "blrqotfefnmnh7de-20", + "response": { + "observation": "National Aeronautics and Space Administration." + }, + "complete": false +} +``` + +``` +{ + "id": "blrqotfefnmnh7de-20", + "response": { + "thought": "I now know the final answer" + }, + "complete": false +} +``` + +``` +{ + "id": "blrqotfefnmnh7de-20", + "response": { + "answer": "National Aeronautics and Space Administration" + }, + "complete": true +} +``` + +## Pulsar + +The Pulsar schema for the Agent API is defined in Python code here: + +https://github.com/trustgraph-ai/trustgraph/blob/master/trustgraph-base/trustgraph/schema/agent.py + +Default request queue: +`non-persistent://tg/request/agent` + +Default response queue: +`non-persistent://tg/response/agent` + +Request schema: +`trustgraph.schema.AgentRequest` + +Response schema: +`trustgraph.schema.AgentResponse` + +## Pulsar Python client + +The client class is +`trustgraph.clients.AgentClient` + +https://github.com/trustgraph-ai/trustgraph/blob/master/trustgraph-base/trustgraph/clients/agent_client.py diff --git a/docs/apis/api-document-load.md b/docs/apis/api-document-load.md new file mode 100644 index 00000000..dfc6a87a --- /dev/null +++ b/docs/apis/api-document-load.md @@ -0,0 +1,3 @@ + +Coming soon + diff --git a/docs/apis/api-embeddings.md b/docs/apis/api-embeddings.md new file mode 100644 index 00000000..b66280cb --- /dev/null +++ b/docs/apis/api-embeddings.md @@ -0,0 +1,106 @@ + +# TrustGraph Embeddings API + +## Request/response + +### Request + +The request contains the following fields: +- `text`: A string, the text to apply the embedding to + +### Response + +The request contains the following fields: +- `vectors`: Embeddings response, an array of arrays. An embedding is + an array of floating-point numbers. As multiple embeddings may be + returned, an array of embeddings is returned, hence an array + of arrays. + +## REST service + +The REST service accepts a request object containing the question field. +The response is a JSON object containing the `answer` field. + +e.g. + +Request: +``` +{ + "text": "What does NASA stand for?" +} +``` + +Response: + +``` +{ + "vectors": [ 0.231341245, ... ] +} +``` + +## Websocket + +Embeddings requests have a `request` object containing the `text` field. +Responses have a `response` object containing `vectors` field. + +e.g. + +Request: + +``` +{ + "id": "qgzw1287vfjc8wsk-2", + "service": "embeddings", + "request": { + "text": "What is a cat?" + } +} +``` + +Responses: + +``` + + +{ + "id": "qgzw1287vfjc8wsk-2", + "response": { + "vectors": [ + [ + 0.04013510048389435, + 0.07536131888628006, + ... + -0.023531345650553703, + 0.03591292351484299 + ] + ] + }, + "complete": true +} +``` + +## Pulsar + +The Pulsar schema for the Embeddings API is defined in Python code here: + +https://github.com/trustgraph-ai/trustgraph/blob/master/trustgraph-base/trustgraph/schema/models.py + +Default request queue: +`non-persistent://tg/request/embeddings` + +Default response queue: +`non-persistent://tg/response/embeddings` + +Request schema: +`trustgraph.schema.EmbeddingsRequest` + +Response schema: +`trustgraph.schema.EmbeddingsResponse` + +## Pulsar Python client + +The client class is +`trustgraph.clients.EmbeddingsClient` + +https://github.com/trustgraph-ai/trustgraph/blob/master/trustgraph-base/trustgraph/clients/embeddings_client.py + diff --git a/docs/apis/api-graph-embeddings.md b/docs/apis/api-graph-embeddings.md new file mode 100644 index 00000000..9af8b6f9 --- /dev/null +++ b/docs/apis/api-graph-embeddings.md @@ -0,0 +1,155 @@ + +# TrustGraph Graph Embeddings API + +The purpose of this API is to search for knowledge graph entities +by embeddings. The request is a list of embeddings, the response is +a list of knowledge graph entities. The search is performed using a +vector store. + +## Request/response + +### Request + +The request contains the following fields: +- `vectors`: An array of embeddings. Each embedding is itself an array + of numbers. +- `limit`: Optional: a limit on the number of graph entities to return. + +### Response + +The request contains the following fields: +- `entities`: An array of graph entities. The entity type is described here: + +TrustGraph uses the same schema for knowledge graph elements: +- `value`: the entity URI or literal value depending on whether this is + graph entity or literal value. +- `is_uri`: A boolean value which is true if this is a graph entity i.e. + `value` is a URI, not a literal value. + +## REST service + +The REST service accepts a request object containing the `vectors` field. +The response is a JSON object containing the `entities` field. + +To reduce the size of the JSON, the graph entities are encoded as an +object with `value` and `is_uri` mapped to `v` and `e` respectively. + +e.g. + +Request: +``` +{ + "vectors": [ + [ + 0.04013510048389435, + 0.07536131888628006, + ... + -0.10790473222732544, + 0.03591292351484299 + ] + ], + "limit": 15 +} +``` + +Response: + +``` +{ + "entities": [ + { + "v": "http://trustgraph.ai/e/space-station-modules", + "e": true + }, + { + "v": "http://trustgraph.ai/e/rocket-propellants", + "e": true + }, + ] +} +``` + +## Websocket + +The websocket service accepts a request object containing the `vectors` field. +The response is a JSON object containing the `entities` field. + +To reduce the size of the JSON, the graph entities are encoded as an +object with `value` and `is_uri` mapped to `v` and `e` respectively. + +e.g. + +Request: + +``` +{ + "id": "qgzw1287vfjc8wsk-3", + "service": "graph-embeddings-query", + "request": { + "vectors": [ + [ + 0.04013510048389435, + 0.07536131888628006, + ... + -0.10790473222732544, + 0.03591292351484299 + ] + ], + "limit": 15 + } +} +``` + +Response: + +``` +{ + "id": "qgzw1287vfjc8wsk-3", + "response": { + "entities": [ + { + "v": "http://trustgraph.ai/e/space-station-modules", + "e": true + }, + { + "v": "http://trustgraph.ai/e/rocket-propellants", + "e": true + }, + ] + }, + "complete": true +} +``` + +## Pulsar + +The Pulsar schema for the Graph Embeddings API is defined in Python code here: + +https://github.com/trustgraph-ai/trustgraph/blob/master/trustgraph-base/trustgraph/schema/graph.py + +Default request queue: +`non-persistent://tg/request/graph-embeddings` + +Default response queue: +`non-persistent://tg/response/graph-embeddings` + +Request schema: +`trustgraph.schema.GraphEmbeddingsRequest` + +Response schema: +`trustgraph.schema.GraphEmbeddingsResponse` + +## Pulsar Python client + +The client class is +`trustgraph.clients.GraphEmbeddingsClient` + +https://github.com/trustgraph-ai/trustgraph/blob/master/trustgraph-base/trustgraph/clients/graph_embeddings.py + + + + + + + + diff --git a/docs/apis/api-graph-rag.md b/docs/apis/api-graph-rag.md new file mode 100644 index 00000000..96821a38 --- /dev/null +++ b/docs/apis/api-graph-rag.md @@ -0,0 +1,97 @@ + +# TrustGraph Graph RAG API + +This presents a prompt to the Graph RAG service and retrieves the answer. +This makes use of a number of the other APIs behind the scenes: +Embeddings, Graph Embeddings, Prompt, TextCompletion, Triples Query. + +## Request/response + +### Request + +The request contains the following fields: +- `query`: The question to answer + +### Response + +The request contains the following fields: +- `response`: LLM response + +## REST service + +The REST service accepts a request object containing the `query` field. +The response is a JSON object containing the `response` field. + +e.g. + +Request: +``` +{ + "query": "What does NASA stand for?" +} +``` + +Response: + +``` +{ + "response": "National Aeronautics and Space Administration" +} +``` + +## Websocket + +Requests have a `request` object containing the `query` field. +Responses have a `response` object containing `response` field. + +e.g. + +Request: + +``` +{ + "id": "blrqotfefnmnh7de-14", + "service": "graph-rag", + "request": { + "query": "What does NASA stand for?" + } +} +``` + +Response: + +``` +{ + "id": "blrqotfefnmnh7de-14", + "response": { + "response": "National Aeronautics and Space Administration" + }, + "complete": true +} +``` + +## Pulsar + +The Pulsar schema for the Graph RAG API is defined in Python code here: + +https://github.com/trustgraph-ai/trustgraph/blob/master/trustgraph-base/trustgraph/schema/retrieval.py + +Default request queue: +`non-persistent://tg/request/graph-rag` + +Default response queue: +`non-persistent://tg/response/graph-rag` + +Request schema: +`trustgraph.schema.GraphRagRequest` + +Response schema: +`trustgraph.schema.GraphRagResponse` + +## Pulsar Python client + +The client class is +`trustgraph.clients.GraphRagClient` + +https://github.com/trustgraph-ai/trustgraph/blob/master/trustgraph-base/trustgraph/clients/graph_rag_client.py + diff --git a/docs/apis/api-prompt.md b/docs/apis/api-prompt.md new file mode 100644 index 00000000..9bb0cb49 --- /dev/null +++ b/docs/apis/api-prompt.md @@ -0,0 +1,140 @@ + +# TrustGraph Prompt API + +This is a higher-level interface to the LLM service. The input +specifies a prompt template by ID and some variables to include in the +template. + +## Request/response + +### Request + +The request contains the following fields: +- `id`: A prompt template ID +- `variables`: A set of key/values describing the variables + +### Response + +The request contains either of these fields: +- `text`: A plain text response +- `object`: A structured object, JSON-encoded + +## REST service + +The REST service accepts `id` and `variables` fields, the variables are +encoded as a JSON object. + +e.g. + +In this example, the template takes a `text` variable and returns an +array of entity definitions in t he `object` field. The value is +JSON-encoded. + +Request: +``` +{ + "id": "extract-definitions", + "variables": { + "text": "A cat is a domesticated Felidae animal" + } +} +``` + +Response: + +``` +{ + "object": "[{\"entity\": \"cat\", \"definition\": \"a domesticated Felidae animal\"}]" +}, +``` + +## Websocket + +Requests have `id` and `variables` fields. + +e.g. + +Request: + +``` +{ + "id": "akshfkiehfkseffh-142", + "service": "prompt", + "request": { + "id": "extract-definitions", + "variables": { + "text": "A cat is a domesticated Felidae animal" + } + } +} +``` + +Responses: + +``` +{ + "id": "akshfkiehfkseffh-142", + "response": { + "object": "[{\"entity\": \"cat\", \"definition\": \"a domesticated Felidae animal\"}]" + }, + "complete": true +} +``` + +e.g. + +An example which returns plain text + +Request: + +``` +{ + "id": "akshfkiehfkseffh-141", + "service": "prompt", + "request": { + "id": "question", + "variables": { + "question": "What is 2 + 2?" + } + } +} +``` + +Response: + +``` +{ + "id": "akshfkiehfkseffh-141", + "response": { + "text": "2 + 2 = 4" + }, + "complete": true +} +``` + + +## Pulsar + +The Pulsar schema for the Prompt API is defined in Python code here: + +https://github.com/trustgraph-ai/trustgraph/blob/master/trustgraph-base/trustgraph/schema/prompt.py + +Default request queue: +`non-persistent://tg/request/prompt` + +Default response queue: +`non-persistent://tg/response/prompt` + +Request schema: +`trustgraph.schema.PromptRequest` + +Response schema: +`trustgraph.schema.PromptResponse` + +## Pulsar Python client + +The client class is +`trustgraph.clients.PromptClient` + +https://github.com/trustgraph-ai/trustgraph/blob/master/trustgraph-base/trustgraph/clients/prompt_client.py + diff --git a/docs/apis/api-text-completion.md b/docs/apis/api-text-completion.md new file mode 100644 index 00000000..b93c4c8a --- /dev/null +++ b/docs/apis/api-text-completion.md @@ -0,0 +1,105 @@ + +# TrustGraph Text Completion API + +This is a low-level interface to the LLM service. For a higher-level +interface with template management, consider the +[Prompt API](api-prompt.md). + +## Request/response + +### Request + +Some LLM system permit specifying a separate `system` prompt. When +the same system prompt is used repeatedly, this can result in lower +token costs for the system part or quicker LLM response. + +The request contains the following fields: +- `system`: A string, the system part +- `prompt`: A string, the user part + +### Response + +The request contains the following fields: +- `response`: LLM response + +## REST service + +The REST service accepts a request object containing the question field. +The response is a JSON object containing the `answer` field. + +e.g. + +Request: +``` +{ + "system": "You are a helpful agent", + "prompt": "What does NASA stand for?" +} +``` + +Response: + +``` +{ + "response": "National Aeronautics and Space Administration" +} +``` + +## Websocket + +Requests have a `request` object containing the `system` and +`prompt` fields. +Responses have a `response` object containing `response` field. + +e.g. + +Request: + +``` +{ + "id": "blrqotfefnmnh7de-1", + "service": "text-completion", + "request": { + "system": "You are a helpful agent", + "prompt": "What does NASA stand for?" + } +} +``` + +Response: + +``` +{ + "id": "blrqotfefnmnh7de-1", + "response": { + "response": "National Aeronautics and Space Administration" + }, + "complete": true +} +``` + +## Pulsar + +The Pulsar schema for the Text Completion API is defined in Python code here: + +https://github.com/trustgraph-ai/trustgraph/blob/master/trustgraph-base/trustgraph/schema/models.py + +Default request queue: +`non-persistent://tg/request/text-completion` + +Default response queue: +`non-persistent://tg/response/text-completion` + +Request schema: +`trustgraph.schema.TextCompletionRequest` + +Response schema: +`trustgraph.schema.TextCompletionResponse` + +## Pulsar Python client + +The client class is +`trustgraph.clients.LlmClient` + +https://github.com/trustgraph-ai/trustgraph/blob/master/trustgraph-base/trustgraph/clients/llm_client.py + diff --git a/docs/apis/api-triples-query.md b/docs/apis/api-triples-query.md new file mode 100644 index 00000000..6e096a56 --- /dev/null +++ b/docs/apis/api-triples-query.md @@ -0,0 +1,187 @@ + +# TrustGraph Triples Query API + +This is a service which queries the knowledge graph for triples ("facts"). + +## Request/response + +### Request + +The request contains the following fields: +- `s`: Optional, if included specifies a match for the subject part of a + triple. +- `p`: Optional, if included specifies a match for the subject part of a + triple. +- `o`: Optional, if included specifies a match for the subject part of a + triple. +- `limit`: Optional, if included specifies the maximum number of triples to + return. If not specified, an arbitrary value is used. + +Returned triples will match all of `s`, `p` and `o` where provided. + +### Response + +The request contains the following fields: +- `response`: A list of triples. + +Each triple contains `s`, `p` and `o` fields describing the +subject, predicate and object part of each triple. + +Each triple element uses the same schema: +- `value`: the entity URI or literal value depending on whether this is + graph entity or literal value. +- `is_uri`: A boolean value which is true if this is a graph entity i.e. + `value` is a URI, not a literal value. + +## REST service + +The REST service accepts a request object containing the `s`, `p`, `o` +and `limit` fields. +The response is a JSON object containing the `response` field. + +To reduce the size of the JSON, the graph entities are encoded as an +object with `value` and `is_uri` mapped to `v` and `e` respectively. + +e.g. + +This example query matches triples with a subject of +`http://trustgraph.ai/e/space-station-modules` and a predicate of +`http://www.w3.org/2000/01/rdf-schema#label`. This predicate +represents the RDF schema 'label' relationship. + +The response is a single triple - the `o` element contains the +literal "space station modules" which is the label for +`http://trustgraph.ai/e/space-station-modules`. + +Request: +``` +{ + "id": "qgzw1287vfjc8wsk-4", + "service": "triples-query", + "request": { + "s": { + "v": "http://trustgraph.ai/e/space-station-modules", + "e": true + }, + "p": { + "v": "http://www.w3.org/2000/01/rdf-schema#label", + "e": true + }, + "limit": 5 + } +} +``` + +Response: + +``` +{ + "response": [ + { + "s": { + "v": "http://trustgraph.ai/e/space-station-modules", + "e": true + }, + "p": { + "v": "http://www.w3.org/2000/01/rdf-schema#label", + "e": true + }, + "o": { + "v": "space station modules", + "e": false + } + } + ] +} +``` + +## Websocket + +Requests have a `request` object containing the `system` and +`prompt` fields. +Responses have a `response` object containing `response` field. + +To reduce the size of the JSON, the graph entities are encoded as an +object with `value` and `is_uri` mapped to `v` and `e` respectively. + +e.g. + +Request: + +``` +{ + "id": "qgzw1287vfjc8wsk-4", + "service": "triples-query", + "request": { + "s": { + "v": "http://trustgraph.ai/e/space-station-modules", + "e": true + }, + "p": { + "v": "http://www.w3.org/2000/01/rdf-schema#label", + "e": true + }, + "limit": 5 + } +} +``` + +Responses: + +``` +{ + "id": "qgzw1287vfjc8wsk-4", + "response": { + "response": [ + { + "s": { + "v": "http://trustgraph.ai/e/space-station-modules", + "e": true + }, + "p": { + "v": "http://www.w3.org/2000/01/rdf-schema#label", + "e": true + }, + "o": { + "v": "space station modules", + "e": false + } + } + ] + }, + "complete": true +} +``` + +## Pulsar + +The Pulsar schema for the Triples Query API is defined in Python code here: + +https://github.com/trustgraph-ai/trustgraph/blob/master/trustgraph-base/trustgraph/schema/graph.py + +Default request queue: +`non-persistent://tg/request/triples-query` + +Default response queue: +`non-persistent://tg/response/triples-query` + +Request schema: +`trustgraph.schema.TriplesQueryRequest` + +Response schema: +`trustgraph.schema.TriplesQueryResponse` + +## Pulsar Python client + +The client class is +`trustgraph.clients.TriplesQueryClient` + +https://github.com/trustgraph-ai/trustgraph/blob/master/trustgraph-base/trustgraph/clients/triples_query_client.py + + + + + + + + diff --git a/docs/apis/pulsar.md b/docs/apis/pulsar.md new file mode 100644 index 00000000..dfc6a87a --- /dev/null +++ b/docs/apis/pulsar.md @@ -0,0 +1,3 @@ + +Coming soon + diff --git a/docs/apis/websocket.md b/docs/apis/websocket.md new file mode 100644 index 00000000..1895646c --- /dev/null +++ b/docs/apis/websocket.md @@ -0,0 +1,136 @@ + +# TrustGraph websocket overview + +The websocket service is provided by the `api-gateway` service on port +8088. + +## URL + +Depending on how the service is hosted, the websocket is invoked on this +URL on `api-gateway`: + +``` +/api/v1/socket +``` + +When hosted using docker compose, you can access the service at +`ws://localhost:8088/api/v1/socket` + +## Request + +A request message is a JSON message containing 3 fields: + +- `id`: A unique ID which is used to correlate requests and responses. + You should make sure it is unique. +- `service`: The name of the service to invoke. +- `request`: The request body which is passed to the service - this is + defined in the API documentation for that service. + +e.g. + +``` +{ + "id": "qgzw1287vfjc8wsk-1", + "service": "graph-rag", + "request": { + "query": "What does NASA stand for?" + } +} +``` + +## Response + +A response message is JSON encoded, and may contain the following fields: + +- `id`: This is the same value provided on the request and shows which + request this response is returned for. +- `error`: If an error occured, this field is provided, and provides an + error message. +- `response`: For a non-error case, this provides a response from the + service - the response structure depends on the service invoked. It is + not provided if the `error` field is provided. +- `complete`: A boolean value indicating whether this response is the + final response from the service. If set to false, the response values + are intermediate values. It is not provided if the `error` field is + provided. + +An error response completes a request - no further responses +will be provided. + +e.g. + +``` +{ + "id": "qgzw1287vfjc8wsk-1", + "response": { + "response": "National Aeronautics and Space Administration." + }, + "complete": true +} +``` + +## Multi-part response + +For a multi-part response, a number of responses are provided with the +same ID until the final message which has the `complete` field set to +true. + +Note that multi-part responses are a feature of the websocket API which +the request/response nature of the REST API is not able to provide. + +e.g. + +Request: + +``` +{ + "id": "blrqotfefnmnh7de-20", + "service": "agent", + "request": { + "question": "What does NASA stand for?" + } +} +``` + +Responses: + +``` +{ + "id": "blrqotfefnmnh7de-20", + "response": { + "thought": "I need to query a knowledge base" + }, + "complete": false +} +``` + +``` +{ + "id": "blrqotfefnmnh7de-20", + "response": { + "observation": "National Aeronautics and Space Administration." + }, + "complete": false +} +``` + +``` +{ + "id": "blrqotfefnmnh7de-20", + "response": { + "thought": "I now know the final answer" + }, + "complete": false +} +``` + +``` +{ + "id": "blrqotfefnmnh7de-20", + "response": { + "answer": "National Aeronautics and Space Administration" + }, + "complete": true +} +``` + diff --git a/prometheus/prometheus.yml b/prometheus/prometheus.yml index c74f5df3..24102a23 100644 --- a/prometheus/prometheus.yml +++ b/prometheus/prometheus.yml @@ -14,6 +14,12 @@ scrape_configs: # The job name is added as a label `job=` to any timeseries # scraped from this config. + - job_name: 'pulsar' + scrape_interval: 5s + static_configs: + - targets: + - 'pulsar:8080' + - job_name: 'pdf-decoder' scrape_interval: 5s static_configs: @@ -26,122 +32,116 @@ scrape_configs: - targets: - 'chunker:8000' - - job_name: 'vectorize' scrape_interval: 5s static_configs: - targets: - 'vectorize:8000' - - job_name: 'embeddings' scrape_interval: 5s static_configs: - targets: - 'embeddings:8000' - - job_name: 'kg-extract-definitions' scrape_interval: 5s static_configs: - targets: - 'kg-extract-definitions:8000' - - job_name: 'kg-extract-topics' scrape_interval: 5s static_configs: - targets: - 'kg-extract-topics:8000' - - job_name: 'kg-extract-relationships' scrape_interval: 5s static_configs: - targets: - 'kg-extract-relationships:8000' - - job_name: 'metering' scrape_interval: 5s static_configs: - targets: - 'metering:8000' - - job_name: 'metering-rag' scrape_interval: 5s static_configs: - targets: - 'metering-rag:8000' - - job_name: 'store-graph-embeddings' scrape_interval: 5s static_configs: - targets: - 'store-graph-embeddings:8000' - - job_name: 'store-triples' scrape_interval: 5s static_configs: - targets: - 'store-triples:8000' - - job_name: 'text-completion' scrape_interval: 5s static_configs: - targets: - 'text-completion:8000' - - job_name: 'text-completion-rag' scrape_interval: 5s static_configs: - targets: - 'text-completion-rag:8000' - - job_name: 'graph-rag' scrape_interval: 5s static_configs: - targets: - 'graph-rag:8000' - - job_name: 'prompt' scrape_interval: 5s static_configs: - targets: - 'prompt:8000' - - job_name: 'prompt-rag' scrape_interval: 5s static_configs: - targets: - 'prompt-rag:8000' - - job_name: 'query-graph-embeddings' scrape_interval: 5s static_configs: - targets: - 'query-graph-embeddings:8000' - - job_name: 'query-triples' scrape_interval: 5s static_configs: - targets: - 'query-triples:8000' - - - job_name: 'pulsar' + - job_name: 'agent-manager' scrape_interval: 5s static_configs: - targets: - - 'pulsar:8080' + - 'agent-manager:8000' + - job_name: 'api-gateway' + scrape_interval: 5s + static_configs: + - targets: + - 'api-gateway:8000' + + - job_name: 'workbench-ui' + scrape_interval: 5s + static_configs: + - targets: + - 'workbench-ui:8000'