From 3adb3cf59c12d2e1bb5c548c01cf2ca658a58d87 Mon Sep 17 00:00:00 2001 From: Cyber MacGeddon Date: Fri, 25 Apr 2025 19:02:08 +0100 Subject: [PATCH] Revert "Feature/flow management cli (#346)" This reverts commit 3b021720c55a77f5a2e7030af36db5ca29990fbf. --- test-api/test-library-add-doc | 78 ----- test-api/test-library-add-doc2 | 90 ----- test-api/test-library-list | 39 --- tests/test-config | 2 - tests/test-flow | 92 ----- tests/test-flow-get-class | 19 -- tests/test-flow-put-class | 22 -- tests/test-flow-start-flow | 23 -- tests/test-flow-stop-flow | 22 -- tests/test-load-pdf | 36 -- tests/test-load-text | 37 -- trustgraph-base/trustgraph/api/api.py | 230 ------------- .../trustgraph/base/async_processor.py | 27 +- trustgraph-base/trustgraph/base/consumer.py | 2 +- .../trustgraph/base/consumer_spec.py | 4 +- trustgraph-base/trustgraph/base/metrics.py | 98 ++---- .../trustgraph/base/producer_spec.py | 4 +- .../trustgraph/base/request_response_spec.py | 17 +- trustgraph-base/trustgraph/base/subscriber.py | 18 +- .../trustgraph/base/subscriber_spec.py | 10 +- trustgraph-base/trustgraph/schema/__init__.py | 2 +- trustgraph-base/trustgraph/schema/flows.py | 8 +- trustgraph-base/trustgraph/schema/graph.py | 19 ++ trustgraph-base/trustgraph/schema/lookup.py | 21 ++ trustgraph-base/trustgraph/schema/models.py | 13 + trustgraph-base/trustgraph/schema/object.py | 4 +- trustgraph-base/trustgraph/schema/prompt.py | 7 + .../trustgraph/schema/retrieval.py | 13 + trustgraph-cli/scripts/tg-delete-flow-class | 52 --- trustgraph-cli/scripts/tg-get-flow-class | 56 --- trustgraph-cli/scripts/tg-put-flow-class | 59 ---- trustgraph-cli/scripts/tg-show-flow-classes | 64 ---- trustgraph-cli/scripts/tg-show-flows | 65 ---- trustgraph-cli/setup.py | 7 - .../trustgraph/config/service/flow.py | 210 ------------ .../trustgraph/config/service/service.py | 199 +++-------- trustgraph-flow/trustgraph/gateway/flow.py | 51 --- trustgraph-flow/trustgraph/gateway/mux.py | 1 + trustgraph-flow/trustgraph/gateway/service.py | 320 +++++++++--------- 39 files changed, 335 insertions(+), 1706 deletions(-) delete mode 100755 test-api/test-library-add-doc delete mode 100755 test-api/test-library-add-doc2 delete mode 100755 test-api/test-library-list delete mode 100644 tests/test-config delete mode 100755 tests/test-flow delete mode 100755 tests/test-flow-get-class delete mode 100755 tests/test-flow-put-class delete mode 100755 tests/test-flow-start-flow delete mode 100755 tests/test-flow-stop-flow delete mode 100755 tests/test-load-pdf delete mode 100755 tests/test-load-text delete mode 100755 trustgraph-cli/scripts/tg-delete-flow-class delete mode 100755 trustgraph-cli/scripts/tg-get-flow-class delete mode 100755 trustgraph-cli/scripts/tg-put-flow-class delete mode 100755 trustgraph-cli/scripts/tg-show-flow-classes delete mode 100755 trustgraph-cli/scripts/tg-show-flows delete mode 100644 trustgraph-flow/trustgraph/config/service/flow.py delete mode 100644 trustgraph-flow/trustgraph/gateway/flow.py diff --git a/test-api/test-library-add-doc b/test-api/test-library-add-doc deleted file mode 100755 index bd927367..00000000 --- a/test-api/test-library-add-doc +++ /dev/null @@ -1,78 +0,0 @@ -#!/usr/bin/env python3 - -import requests -import json -import sys -import base64 - -url = "http://localhost:8088/api/v1/" - -############################################################################ - -id = "http://trustgraph.ai/doc/12345678" - -with open("docs/README.cats") as f: - doc = base64.b64encode(f.read().encode("utf-8")).decode("utf-8") - -input = { - "operation": "add", - "document": { - "id": id, - "metadata": [ - { - "s": { - "v": id, - "e": True, - }, - "p": { - "v": "http://www.w3.org/2000/01/rdf-schema#label", - "e": True, - }, - "o": { - "v": "Mark's pets", "e": False, - }, - }, - { - "s": { - "v": id, - "e": True, - }, - "p": { - "v": 'https://schema.org/keywords', - "e": True, - }, - "o": { - "v": "cats", "e": False, - }, - }, - ], - "document": doc, - "kind": "text/plain", - "user": "trustgraph", - "collection": "default", - "title": "Mark's cats", - "comments": "Test doc taken from the TrustGraph repo", - } -} - -resp = requests.post( - f"{url}librarian", - json=input, -) - -print(resp.text) -resp = resp.json() - -print(resp) - -if "error" in resp: - print(f"Error: {resp['error']}") - sys.exit(1) - -# print(resp["response"]) -print(resp) - -sys.exit(0) - -############################################################################ - diff --git a/test-api/test-library-add-doc2 b/test-api/test-library-add-doc2 deleted file mode 100755 index 0c0856f9..00000000 --- a/test-api/test-library-add-doc2 +++ /dev/null @@ -1,90 +0,0 @@ -#!/usr/bin/env python3 - -import requests -import json -import sys -import base64 - -url = "http://localhost:8088/api/v1/" - -############################################################################ - -id = "http://trustgraph.ai/doc/12345678" - -source = "../sources/20160001634.pdf" - -with open(source, "rb") as f: - doc = base64.b64encode(f.read()).decode("utf-8") - -input = { - "operation": "add", - "id": id, - "document": { - "metadata": [ - { - "s": { - "v": id, - "e": True, - }, - "p": { - "v": "http://www.w3.org/2000/01/rdf-schema#label", - "e": True, - }, - "o": { - "v": "Challenger report volume 1", "e": False, - }, - }, - { - "s": { - "v": id, - "e": True, - }, - "p": { - "v": 'https://schema.org/keywords', - "e": True, - }, - "o": { - "v": "space shuttle", "e": False, - }, - }, - { - "s": { - "v": id, - "e": True, - }, - "p": { - "v": 'https://schema.org/keywords', - "e": True, - }, - "o": { - "v": "nasa", "e": False, - }, - }, - ], - "document": doc, - "kind": "application/pdf", - "user": "trustgraph", - "collection": "default", - } -} - -resp = requests.post( - f"{url}librarian", - json=input, -) - -print(resp.text) -resp = resp.json() - -print(resp) - -if "error" in resp: - print(f"Error: {resp['error']}") - sys.exit(1) - -print(resp) - -sys.exit(0) - -############################################################################ - diff --git a/test-api/test-library-list b/test-api/test-library-list deleted file mode 100755 index 72ea4478..00000000 --- a/test-api/test-library-list +++ /dev/null @@ -1,39 +0,0 @@ -#!/usr/bin/env python3 - -import requests -import json -import sys -import base64 - -url = "http://localhost:8088/api/v1/" - -############################################################################ - -user = "trustgraph" - -input = { - "operation": "list", - "user": user, -} - -resp = requests.post( - f"{url}librarian", - json=input, -) - -print(resp.text) -resp = resp.json() - -print(resp) - -if "error" in resp: - print(f"Error: {resp['error']}") - sys.exit(1) - -# print(resp["response"]) -print(resp) - -sys.exit(0) - -############################################################################ - diff --git a/tests/test-config b/tests/test-config deleted file mode 100644 index 63f77b6b..00000000 --- a/tests/test-config +++ /dev/null @@ -1,2 +0,0 @@ -#!/usr/bin/env python3 - diff --git a/tests/test-flow b/tests/test-flow deleted file mode 100755 index 87a349af..00000000 --- a/tests/test-flow +++ /dev/null @@ -1,92 +0,0 @@ -#!/usr/bin/env python3 - -import requests - -url = "http://localhost:8088/" - -resp = requests.post( - f"{url}/api/v1/flow", - json={ - "operation": "list-classes", - } -) - -print(resp) -print(resp.text) - -resp = requests.post( - f"{url}/api/v1/flow", - json={ - "operation": "get-class", - "class-name": "default", - } -) - -print(resp) -print(resp.text) - -resp = requests.post( - f"{url}/api/v1/flow", - json={ - "operation": "put-class", - "class-name": "bunch", - "class-definition": "{}", - } -) - -print(resp) -print(resp.text) - -resp = requests.post( - f"{url}/api/v1/flow", - json={ - "operation": "get-class", - "class-name": "bunch", - } -) - -print(resp) -print(resp.text) - -resp = requests.post( - f"{url}/api/v1/flow", - json={ - "operation": "list-classes", - } -) - -print(resp) -print(resp.text) - - -resp = requests.post( - f"{url}/api/v1/flow", - json={ - "operation": "delete-class", - "class-name": "bunch", - } -) - -print(resp) -print(resp.text) - - -resp = requests.post( - f"{url}/api/v1/flow", - json={ - "operation": "list-classes", - } -) - -print(resp) -print(resp.text) - -resp = requests.post( - f"{url}/api/v1/flow", - json={ - "operation": "list-flows", - } -) - -print(resp) -print(resp.text) diff --git a/tests/test-flow-get-class b/tests/test-flow-get-class deleted file mode 100755 index 20707b51..00000000 --- a/tests/test-flow-get-class +++ /dev/null @@ -1,19 +0,0 @@ -#!/usr/bin/env python3 - -import requests - -url = "http://localhost:8088/" - -resp = requests.post( - f"{url}/api/v1/flow", - json={ - "operation": "get-class", - "class-name": "default", - } -) - -resp = resp.json() - -print(resp["class-definition"]) - - diff --git a/tests/test-flow-put-class b/tests/test-flow-put-class deleted file mode 100755 index 8fd4d9f2..00000000 --- a/tests/test-flow-put-class +++ /dev/null @@ -1,22 +0,0 @@ -#!/usr/bin/env python3 - -import requests -import json - -url = "http://localhost:8088/" - -defn = {"class": {"de-query:{class}": {"request": "non-persistent://tg/request/document-embeddings:{class}", "response": "non-persistent://tg/response/document-embeddings:{class}"}, "document-rag:{class}": {"document-embeddings-request": "non-persistent://tg/request/document-embeddings:{class}", "document-embeddings-response": "non-persistent://tg/response/document-embeddings:{class}", "embeddings-request": "non-persistent://tg/request/embeddings:{class}", "embeddings-response": "non-persistent://tg/response/embeddings:{class}", "prompt-request": "non-persistent://tg/request/prompt-rag:{class}", "prompt-response": "non-persistent://tg/response/prompt-rag:{class}", "request": "non-persistent://tg/request/document-rag:{class}", "response": "non-persistent://tg/response/document-rag:{class}"}, "embeddings:{class}": {"request": "non-persistent://tg/request/embeddings:{class}", "response": "non-persistent://tg/response/embeddings:{class}"}, "ge-query:{class}": {"request": "non-persistent://tg/request/graph-embeddings:{class}", "response": "non-persistent://tg/response/graph-embeddings:{class}"}, "graph-rag:{class}": {"embeddings-request": "non-persistent://tg/request/embeddings:{class}", "embeddings-response": "non-persistent://tg/response/embeddings:{class}", "graph-embeddings-request": "non-persistent://tg/request/graph-embeddings:{class}", "graph-embeddings-response": "non-persistent://tg/response/graph-embeddings:{class}", "prompt-request": "non-persistent://tg/request/prompt-rag:{class}", "prompt-response": "non-persistent://tg/response/prompt-rag:{class}", "request": "non-persistent://tg/request/graph-rag:{class}", "response": "non-persistent://tg/response/graph-rag:{class}", "triples-request": "non-persistent://tg/request/triples:{class}", "triples-response": "non-persistent://tg/response/triples:{class}"}, "metering-rag:{class}": {"input": "non-persistent://tg/response/text-completion-rag:{class}"}, "metering:{class}": {"input": "non-persistent://tg/response/text-completion:{class}"}, "prompt-rag:{class}": {"request": "non-persistent://tg/request/prompt-rag:{class}", "response": "non-persistent://tg/response/prompt-rag:{class}", "text-completion-request": "non-persistent://tg/request/text-completion-rag:{class}", "text-completion-response": "non-persistent://tg/response/text-completion-rag:{class}"}, "prompt:{class}": {"request": "non-persistent://tg/request/prompt:{class}", "response": "non-persistent://tg/response/prompt:{class}", "text-completion-request": "non-persistent://tg/request/text-completion:{class}", "text-completion-response": "non-persistent://tg/response/text-completion:{class}"}, "text-completion-rag:{class}": {"request": "non-persistent://tg/request/text-completion-rag:{class}", "response": "non-persistent://tg/response/text-completion-rag:{class}"}, "text-completion:{class}": {"request": "non-persistent://tg/request/text-completion:{class}", "response": "non-persistent://tg/response/text-completion:{class}"}, "triples-query:{class}": {"request": "non-persistent://tg/request/triples:{class}", "response": "non-persistent://tg/response/triples:{class}"}}, "description": "Default flow class, supports GraphRAG and document RAG", "flow": {"agent-manager:{id}": {"graph-rag-request": "non-persistent://tg/request/graph-rag:{class}", "graph-rag-response": "non-persistent://tg/response/graph-rag:{class}", "next": "non-persistent://tg/request/agent:{id}", "prompt-request": "non-persistent://tg/request/prompt:{class}", "prompt-response": "non-persistent://tg/response/prompt:{class}", "request": "non-persistent://tg/request/agent:{id}", "response": "non-persistent://tg/response/agent:{id}", "text-completion-request": "non-persistent://tg/request/text-completion:{class}", "text-completion-response": "non-persistent://tg/response/text-completion:{class}"}, "chunker:{id}": {"input": "persistent://tg/flow/text-document-load:{id}", "output": "persistent://tg/flow/chunk-load:{id}"}, "de-write:{id}": {"input": "persistent://tg/flow/document-embeddings-store:{id}"}, "document-embeddings:{id}": {"embeddings-request": "non-persistent://tg/request/embeddings:{class}", "embeddings-response": "non-persistent://tg/response/embeddings:{class}", "input": "persistent://tg/flow/chunk-load:{id}", "output": "persistent://tg/flow/document-embeddings-store:{id}"}, "ge-write:{id}": {"input": "persistent://tg/flow/graph-embeddings-store:{id}"}, "graph-embeddings:{id}": {"embeddings-request": "non-persistent://tg/request/embeddings:{class}", "embeddings-response": "non-persistent://tg/response/embeddings:{class}", "input": "persistent://tg/flow/entity-contexts-load:{id}", "output": "persistent://tg/flow/graph-embeddings-store:{id}"}, "kg-extract-definitions:{id}": {"entity-contexts": "persistent://tg/flow/entity-contexts-load:{id}", "input": "persistent://tg/flow/chunk-load:{id}", "prompt-request": "non-persistent://tg/request/prompt:{class}", "prompt-response": "non-persistent://tg/response/prompt:{class}", "triples": "persistent://tg/flow/triples-store:{id}"}, "kg-extract-relationships:{id}": {"input": "persistent://tg/flow/chunk-load:{id}", "prompt-request": "non-persistent://tg/request/prompt:{class}", "prompt-response": "non-persistent://tg/response/prompt:{class}", "triples": "persistent://tg/flow/triples-store:{id}"}, "pdf-decoder:{id}": {"input": "persistent://tg/flow/document-load:{id}", "output": "persistent://tg/flow/text-document-load:{id}"}, "triples-write:{id}": {"input": "persistent://tg/flow/triples-store:{id}"}}, "tags": ["document-rag", "graph-rag", "knowledge-extraction"]} - -resp = requests.post( - f"{url}/api/v1/flow", - json={ - "operation": "put-class", - "class-name": "default", - "class-definition": json.dumps(defn), - } -) - -resp = resp.json() - -print(resp) - diff --git a/tests/test-flow-start-flow b/tests/test-flow-start-flow deleted file mode 100755 index 15a3c0cc..00000000 --- a/tests/test-flow-start-flow +++ /dev/null @@ -1,23 +0,0 @@ -#!/usr/bin/env python3 - -import requests -import json - -url = "http://localhost:8088/" - -resp = requests.post( - f"{url}/api/v1/flow", - json={ - "operation": "start-flow", - "flow-id": "0003", - "class-name": "default", - } -) - -print(resp) -print(resp.text) -resp = resp.json() - - -print(resp) - diff --git a/tests/test-flow-stop-flow b/tests/test-flow-stop-flow deleted file mode 100755 index 62ea1aa9..00000000 --- a/tests/test-flow-stop-flow +++ /dev/null @@ -1,22 +0,0 @@ -#!/usr/bin/env python3 - -import requests -import json - -url = "http://localhost:8088/" - -resp = requests.post( - f"{url}/api/v1/flow", - json={ - "operation": "stop-flow", - "flow-id": "0003", - } -) - -print(resp) -print(resp.text) -resp = resp.json() - - -print(resp) - diff --git a/tests/test-load-pdf b/tests/test-load-pdf deleted file mode 100755 index 838a57ce..00000000 --- a/tests/test-load-pdf +++ /dev/null @@ -1,36 +0,0 @@ -#!/usr/bin/env python3 - -import pulsar -from pulsar.schema import JsonSchema -import base64 - -from trustgraph.schema import Document, Metadata - -client = pulsar.Client("pulsar://localhost:6650", listener_name="localhost") - -prod = client.create_producer( - topic="persistent://tg/flow/document-load:0000", - schema=JsonSchema(Document), - chunking_enabled=True, -) - -path = "../sources/Challenger-Report-Vol1.pdf" - -with open(path, "rb") as f: - blob = base64.b64encode(f.read()).decode("utf-8") - -message = Document( - metadata = Metadata( - id = "00001", - metadata = [], - user="trustgraph", - collection="default", - ), - data=blob -) - -prod.send(message) - -prod.close() -client.close() - diff --git a/tests/test-load-text b/tests/test-load-text deleted file mode 100755 index 754458aa..00000000 --- a/tests/test-load-text +++ /dev/null @@ -1,37 +0,0 @@ -#!/usr/bin/env python3 - -import pulsar -from pulsar.schema import JsonSchema -import base64 - -from trustgraph.schema import TextDocument, Metadata - -client = pulsar.Client("pulsar://localhost:6650", listener_name="localhost") - -prod = client.create_producer( - topic="persistent://tg/flow/text-document-load:0000", - schema=JsonSchema(TextDocument), - chunking_enabled=True, -) - -path = "docs/README.cats" - -with open(path, "r") as f: -# blob = base64.b64encode(f.read()).decode("utf-8") - blob = f.read() - -message = TextDocument( - metadata = Metadata( - id = "00001", - metadata = [], - user="trustgraph", - collection="default", - ), - text=blob -) - -prod.send(message) - -prod.close() -client.close() - diff --git a/trustgraph-base/trustgraph/api/api.py b/trustgraph-base/trustgraph/api/api.py index fbff8d34..ddc6b2c3 100644 --- a/trustgraph-base/trustgraph/api/api.py +++ b/trustgraph-base/trustgraph/api/api.py @@ -562,233 +562,3 @@ class Api: except: raise ProtocolException(f"Response not formatted correctly") - def flow_list_classes(self): - - # The input consists of system and prompt strings - input = { - "operation": "list-classes", - } - - url = f"{self.url}flow" - - # Invoke the API, input is passed as JSON - resp = requests.post(url, json=input) - - # Should be a 200 status code - if resp.status_code != 200: - raise ProtocolException(f"Status code {resp.status_code}") - - try: - # Parse the response as JSON - object = resp.json() - except: - raise ProtocolException(f"Expected JSON response") - - self.check_error(object) - - try: - return object["class-names"] - except: - raise ProtocolException(f"Response not formatted correctly") - - def flow_get_class(self, class_name): - - # The input consists of system and prompt strings - input = { - "operation": "get-class", - "class-name": class_name, - } - - url = f"{self.url}flow" - - # Invoke the API, input is passed as JSON - resp = requests.post(url, json=input) - - # Should be a 200 status code - if resp.status_code != 200: - raise ProtocolException(f"Status code {resp.status_code}") - - try: - # Parse the response as JSON - object = resp.json() - except: - raise ProtocolException(f"Expected JSON response") - - self.check_error(object) - - try: - return json.loads(object["class-definition"]) - except Exception as e: - print(e) - raise ProtocolException(f"Response not formatted correctly") - - def flow_put_class(self, class_name, definition): - - # The input consists of system and prompt strings - input = { - "operation": "put-class", - "class-name": class_name, - "class-definition": json.dumps(definition), - } - - url = f"{self.url}flow" - - # Invoke the API, input is passed as JSON - resp = requests.post(url, json=input) - - # Should be a 200 status code - if resp.status_code != 200: - raise ProtocolException(f"Status code {resp.status_code}") - - try: - # Parse the response as JSON - object = resp.json() - except: - raise ProtocolException(f"Expected JSON response") - - self.check_error(object) - - return - - def flow_delete_class(self, class_name): - - # The input consists of system and prompt strings - input = { - "operation": "delete-class", - "class-name": class_name, - } - - url = f"{self.url}flow" - - # Invoke the API, input is passed as JSON - resp = requests.post(url, json=input) - - # Should be a 200 status code - if resp.status_code != 200: - raise ProtocolException(f"Status code {resp.status_code}") - - try: - # Parse the response as JSON - object = resp.json() - except: - raise ProtocolException(f"Expected JSON response") - - self.check_error(object) - - return - - def flow_list(self): - - # The input consists of system and prompt strings - input = { - "operation": "list-flows", - } - - url = f"{self.url}flow" - - # Invoke the API, input is passed as JSON - resp = requests.post(url, json=input) - - # Should be a 200 status code - if resp.status_code != 200: - raise ProtocolException(f"Status code {resp.status_code}") - - try: - # Parse the response as JSON - object = resp.json() - except: - raise ProtocolException(f"Expected JSON response") - - self.check_error(object) - - try: - return object["flow-ids"] - except: - raise ProtocolException(f"Response not formatted correctly") - - def flow_get(self, id): - - # The input consists of system and prompt strings - input = { - "operation": "get-flow", - "flow-id": id, - } - - url = f"{self.url}flow" - - # Invoke the API, input is passed as JSON - resp = requests.post(url, json=input) - - # Should be a 200 status code - if resp.status_code != 200: - raise ProtocolException(f"Status code {resp.status_code}") - - try: - # Parse the response as JSON - object = resp.json() - except: - raise ProtocolException(f"Expected JSON response") - - self.check_error(object) - - try: - return json.loads(object["flow"]) - except: - raise ProtocolException(f"Response not formatted correctly") - - def flow_start(self, class_name, id, description): - - # The input consists of system and prompt strings - input = { - "operation": "start-flow", - "flow-id": id, - "class-name": class_name, - "description": description, - } - - url = f"{self.url}flow" - - # Invoke the API, input is passed as JSON - resp = requests.post(url, json=input) - - # Should be a 200 status code - if resp.status_code != 200: - raise ProtocolException(f"Status code {resp.status_code}") - - try: - # Parse the response as JSON - object = resp.json() - except: - raise ProtocolException(f"Expected JSON response") - - self.check_error(object) - - return - - def flow_stop(self, id): - - # The input consists of system and prompt strings - input = { - "operation": "stop-flow", - "flow-id": id, - } - - url = f"{self.url}flow" - - # Invoke the API, input is passed as JSON - resp = requests.post(url, json=input) - - # Should be a 200 status code - if resp.status_code != 200: - raise ProtocolException(f"Status code {resp.status_code}") - - try: - # Parse the response as JSON - object = resp.json() - except: - raise ProtocolException(f"Expected JSON response") - - self.check_error(object) - - return - diff --git a/trustgraph-base/trustgraph/base/async_processor.py b/trustgraph-base/trustgraph/base/async_processor.py index bdf9a0bb..80440b36 100644 --- a/trustgraph-base/trustgraph/base/async_processor.py +++ b/trustgraph-base/trustgraph/base/async_processor.py @@ -17,7 +17,7 @@ from .. exceptions import TooManyRequests from . pubsub import PulsarClient from . producer import Producer from . consumer import Consumer -from . metrics import ProcessorMetrics, ConsumerMetrics +from . metrics import ProcessorMetrics default_config_queue = config_push_queue @@ -30,10 +30,10 @@ class AsyncProcessor: self.id = params.get("id") # Register a pulsar client - self.pulsar_client_object = PulsarClient(**params) + self.pulsar_client = PulsarClient(**params) # Initialise metrics, records the parameters - ProcessorMetrics(processor = self.id).info({ + ProcessorMetrics(id=self.id).info({ k: str(params[k]) for k in params if k != "id" @@ -57,15 +57,11 @@ class AsyncProcessor: # service config_subscriber_id = str(uuid.uuid4()) - config_consumer_metrics = ConsumerMetrics( - processor = self.id, flow = None, name = "config", - ) - # Subscribe to config queue self.config_sub_task = Consumer( taskgroup = self.taskgroup, - client = self.pulsar_client, + client = self.client, subscriber = config_subscriber_id, flow = None, @@ -74,8 +70,6 @@ class AsyncProcessor: handler = self.on_config_change, - metrics = config_consumer_metrics, - # This causes new subscriptions to view the entire history of # configuration start_of_messages = True @@ -91,28 +85,31 @@ class AsyncProcessor: # This is called to stop all threads. An over-ride point for extra # functionality def stop(self): - self.pulsar_client.close() + self.client.close() self.running = False # Returns the pulsar host @property - def pulsar_host(self): return self.pulsar_client_object.pulsar_host + def pulsar_host(self): return self.client.pulsar_host # Returns the pulsar client @property - def pulsar_client(self): return self.pulsar_client_object.client + def client(self): return self.pulsar_client.client # Register a new event handler for configuration change def register_config_handler(self, handler): self.config_handlers.append(handler) # Called when a new configuration message push occurs - async def on_config_change(self, message, consumer, flow): + async def on_config_change(self, message, consumer): # Get configuration data and version number config = message.value().config version = message.value().version + # Acknowledge the message + consumer.acknowledge(message) + # Invoke message handlers print("Config change event", config, version, flush=True) for ch in self.config_handlers: @@ -237,7 +234,7 @@ class AsyncProcessor: PulsarClient.add_args(parser) parser.add_argument( - '--config-queue', + '--config-push-queue', default=default_config_queue, help=f'Config push queue {default_config_queue}', ) diff --git a/trustgraph-base/trustgraph/base/consumer.py b/trustgraph-base/trustgraph/base/consumer.py index 8f262b83..57b940ac 100644 --- a/trustgraph-base/trustgraph/base/consumer.py +++ b/trustgraph-base/trustgraph/base/consumer.py @@ -156,7 +156,7 @@ class Consumer: await self.handler(msg, self, self.flow) else: - await self.handler(msg, self, self.flow) + await self.handler(msg, self.consumer) print("Handled.", flush=True) diff --git a/trustgraph-base/trustgraph/base/consumer_spec.py b/trustgraph-base/trustgraph/base/consumer_spec.py index 21497dc5..aaeca677 100644 --- a/trustgraph-base/trustgraph/base/consumer_spec.py +++ b/trustgraph-base/trustgraph/base/consumer_spec.py @@ -12,13 +12,13 @@ class ConsumerSpec(Spec): def add(self, flow, processor, definition): consumer_metrics = ConsumerMetrics( - processor = flow.id, flow = flow.name, name = self.name, + flow.id, f"{flow.name}-{self.name}" ) consumer = Consumer( taskgroup = processor.taskgroup, flow = flow, - client = processor.pulsar_client, + client = processor.client, topic = definition[self.name], subscriber = processor.id + "--" + self.name, schema = self.schema, diff --git a/trustgraph-base/trustgraph/base/metrics.py b/trustgraph-base/trustgraph/base/metrics.py index 4ffbac9c..5d87849f 100644 --- a/trustgraph-base/trustgraph/base/metrics.py +++ b/trustgraph-base/trustgraph/base/metrics.py @@ -4,133 +4,79 @@ from prometheus_client import Counter class ConsumerMetrics: - def __init__(self, processor, flow, name): + def __init__(self, id, flow=None): - self.processor = processor + self.id = id self.flow = flow - self.name = name if not hasattr(__class__, "state_metric"): __class__.state_metric = Enum( 'consumer_state', 'Consumer state', - ["processor", "flow", "name"], + ["id", "flow"], states=['stopped', 'running'] ) - if not hasattr(__class__, "request_metric"): __class__.request_metric = Histogram( 'request_latency', 'Request latency (seconds)', - ["processor", "flow", "name"], + ["id", "flow"], ) - if not hasattr(__class__, "processing_metric"): __class__.processing_metric = Counter( 'processing_count', 'Processing count', - ["processor", "flow", "name", "status"], + ["id", "flow", "status"] ) - if not hasattr(__class__, "rate_limit_metric"): __class__.rate_limit_metric = Counter( 'rate_limit_count', 'Rate limit event count', - ["processor", "flow", "name"], + ["id", "flow"] ) def process(self, status): __class__.processing_metric.labels( - processor = self.processor, flow = self.flow, name = self.name, - status=status + id=self.id, flow=self.flow, status=status ).inc() def rate_limit(self): __class__.rate_limit_metric.labels( - processor = self.processor, flow = self.flow, name = self.name, + id=self.id, flow=self.flow ).inc() def state(self, state): __class__.state_metric.labels( - processor = self.processor, flow = self.flow, name = self.name, + id=self.id, flow=self.flow ).state(state) def record_time(self): return __class__.request_metric.labels( - processor = self.processor, flow = self.flow, name = self.name, + id=self.id, flow=self.flow ).time() class ProducerMetrics: + def __init__(self, id, flow=None): - def __init__(self, processor, flow, name): - - self.processor = processor + self.id = id self.flow = flow - self.name = name - if not hasattr(__class__, "producer_metric"): - __class__.producer_metric = Counter( - 'producer_count', 'Output items produced', - ["processor", "flow", "name"], + if not hasattr(__class__, "output_metric"): + __class__.output_metric = Counter( + 'output_count', 'Output items created', + ["id", "flow"] ) def inc(self): - __class__.producer_metric.labels( - processor = self.processor, flow = self.flow, name = self.name - ).inc() + __class__.output_metric.labels(id=self.id, flow=self.flow).inc() class ProcessorMetrics: - def __init__(self, processor): + def __init__(self, id): - self.processor = processor + self.id = id if not hasattr(__class__, "processor_metric"): __class__.processor_metric = Info( 'processor', 'Processor configuration', - ["processor"] + ["id"] ) def info(self, info): - __class__.processor_metric.labels( - processor = self.processor - ).info(info) - -class SubscriberMetrics: - - def __init__(self, processor, flow, name): - - self.processor = processor - self.flow = flow - self.name = name - - if not hasattr(__class__, "state_metric"): - __class__.state_metric = Enum( - 'subscriber_state', 'Subscriber state', - ["processor", "flow", "name"], - states=['stopped', 'running'] - ) - - if not hasattr(__class__, "received_metric"): - __class__.received_metric = Counter( - 'received_count', 'Received count', - ["processor", "flow", "name"], - ) - - if not hasattr(__class__, "dropped_metric"): - __class__.dropped_metric = Counter( - 'dropped_count', 'Dropped messages count', - ["processor", "flow", "name"], - ) - - def received(self): - __class__.received_metric.labels( - processor = self.processor, flow = self.flow, name = self.name, - ).inc() - - def state(self, state): - - __class__.state_metric.labels( - processor = self.processor, flow = self.flow, name = self.name, - ).state(state) - - def dropped(self, state): - __class__.dropped_metric.labels( - processor = self.processor, flow = self.flow, name = self.name, - ).inc() - + __class__.processor_metric.labels(id=self.id).info(info) + diff --git a/trustgraph-base/trustgraph/base/producer_spec.py b/trustgraph-base/trustgraph/base/producer_spec.py index 9c8bbc6a..9007f48b 100644 --- a/trustgraph-base/trustgraph/base/producer_spec.py +++ b/trustgraph-base/trustgraph/base/producer_spec.py @@ -11,11 +11,11 @@ class ProducerSpec(Spec): def add(self, flow, processor, definition): producer_metrics = ProducerMetrics( - processor = flow.id, flow = flow.name, name = self.name + flow.id, f"{flow.name}-{self.name}" ) producer = Producer( - client = processor.pulsar_client, + client = processor.client, topic = definition[self.name], schema = self.schema, metrics = producer_metrics, diff --git a/trustgraph-base/trustgraph/base/request_response_spec.py b/trustgraph-base/trustgraph/base/request_response_spec.py index 88ee4563..dcfcbf9b 100644 --- a/trustgraph-base/trustgraph/base/request_response_spec.py +++ b/trustgraph-base/trustgraph/base/request_response_spec.py @@ -5,7 +5,7 @@ import asyncio from . subscriber import Subscriber from . producer import Producer from . spec import Spec -from . metrics import ConsumerMetrics, ProducerMetrics, SubscriberMetrics +from . metrics import ConsumerMetrics, ProducerMetrics class RequestResponse(Subscriber): @@ -23,7 +23,6 @@ class RequestResponse(Subscriber): consumer_name = consumer_name, topic = response_topic, schema = response_schema, - metrics = response_metrics, ) self.producer = Producer( @@ -117,24 +116,20 @@ class RequestResponseSpec(Spec): def add(self, flow, processor, definition): - request_metrics = ProducerMetrics( - processor = flow.id, flow = flow.name, name = self.request_name - ) - - response_metrics = SubscriberMetrics( - processor = flow.id, flow = flow.name, name = self.request_name + producer_metrics = ProducerMetrics( + flow.id, f"{flow.name}-{self.response_name}" ) rr = self.impl( - client = processor.pulsar_client, + client = processor.client, subscription = flow.id, consumer_name = flow.id, request_topic = definition[self.request_name], request_schema = self.request_schema, - request_metrics = request_metrics, + request_metrics = producer_metrics, response_topic = definition[self.response_name], response_schema = self.response_schema, - response_metrics = response_metrics, + response_metrics = None, ) flow.consumer[self.request_name] = rr diff --git a/trustgraph-base/trustgraph/base/subscriber.py b/trustgraph-base/trustgraph/base/subscriber.py index 1cf263d4..a8ff58f7 100644 --- a/trustgraph-base/trustgraph/base/subscriber.py +++ b/trustgraph-base/trustgraph/base/subscriber.py @@ -7,7 +7,7 @@ import time class Subscriber: def __init__(self, client, topic, subscription, consumer_name, - schema=None, max_size=100, metrics=None): + schema=None, max_size=100): self.client = client self.topic = topic self.subscription = subscription @@ -18,7 +18,6 @@ class Subscriber: self.max_size = max_size self.lock = asyncio.Lock() self.running = True - self.metrics = metrics async def __del__(self): self.running = False @@ -37,9 +36,6 @@ class Subscriber: while self.running: - if self.metrics: - self.metrics.state("stopped") - try: consumer = self.client.subscribe( @@ -49,9 +45,6 @@ class Subscriber: schema = JsonSchema(self.schema), ) - if self.metrics: - self.metrics.state("running") - print("Subscriber running...", flush=True) while self.running: @@ -68,9 +61,6 @@ class Subscriber: print(type(e)) raise e - if self.metrics: - self.metrics.received() - # Acknowledge successful reception of the message consumer.acknowledge(msg) @@ -93,9 +83,7 @@ class Subscriber: self.q[id].put(value), timeout=2 ) - except Exception as e: - self.metrics.dropped() print("Q Put:", e, flush=True) for q in self.full.values(): @@ -106,7 +94,6 @@ class Subscriber: timeout=2 ) except Exception as e: - self.metrics.dropped() print("Q Put:", e, flush=True) except Exception as e: @@ -114,9 +101,6 @@ class Subscriber: consumer.close() - if self.metrics: - self.metrics.state("stopped") - # If handler drops out, sleep a retry time.sleep(2) diff --git a/trustgraph-base/trustgraph/base/subscriber_spec.py b/trustgraph-base/trustgraph/base/subscriber_spec.py index 7dca09db..2f89290b 100644 --- a/trustgraph-base/trustgraph/base/subscriber_spec.py +++ b/trustgraph-base/trustgraph/base/subscriber_spec.py @@ -1,5 +1,5 @@ -from . metrics import SubscriberMetrics +from . metrics import ConsumerMetrics from . subscriber import Subscriber from . spec import Spec @@ -11,17 +11,17 @@ class SubscriberSpec(Spec): def add(self, flow, processor, definition): - subscriber_metrics = SubscriberMetrics( - processor = flow.id, flow = flow.name, name = self.name + # FIXME: Metrics not used + subscriber_metrics = ConsumerMetrics( + flow.id, f"{flow.name}-{self.name}" ) subscriber = Subscriber( - client = processor.pulsar_client, + client = processor.client, topic = definition[self.name], subscription = flow.id, consumer_name = flow.id, schema = self.schema, - metrics = subscriber_metrics, ) # Put it in the consumer map, does that work? diff --git a/trustgraph-base/trustgraph/schema/__init__.py b/trustgraph-base/trustgraph/schema/__init__.py index a9bb30a6..28e1a879 100644 --- a/trustgraph-base/trustgraph/schema/__init__.py +++ b/trustgraph-base/trustgraph/schema/__init__.py @@ -12,5 +12,5 @@ from . agent import * from . lookup import * from . library import * from . config import * -from . flows import * + diff --git a/trustgraph-base/trustgraph/schema/flows.py b/trustgraph-base/trustgraph/schema/flows.py index 28b90f5d..5ac51a37 100644 --- a/trustgraph-base/trustgraph/schema/flows.py +++ b/trustgraph-base/trustgraph/schema/flows.py @@ -20,14 +20,14 @@ from . types import Error # Prompt services, abstract the prompt generation class FlowRequest(Record): - operation = String() # list-classes, get-class, put-class, delete-class - # list-flows, get-flow, start-flow, stop-flow + operation = String() # list_classes, get_class, put_class, delete_class + # list_flows, get_flow, start_flow, stop_flow # get_class, put_class, delete_class, start_flow class_name = String() # put_class - class_definition = String() + class = String() # start_flow description = String() @@ -44,7 +44,7 @@ class FlowResponse(Record): flow_ids = Array(String()) # get_class - class_definition = String() + class = String() # get_flow flow = String() diff --git a/trustgraph-base/trustgraph/schema/graph.py b/trustgraph-base/trustgraph/schema/graph.py index 97a99fbd..7c304e1d 100644 --- a/trustgraph-base/trustgraph/schema/graph.py +++ b/trustgraph-base/trustgraph/schema/graph.py @@ -18,6 +18,8 @@ class EntityContexts(Record): metadata = Metadata() entities = Array(EntityContext()) +entity_contexts_ingest_queue = topic('entity-contexts-load') + ############################################################################ # Graph embeddings are embeddings associated with a graph entity @@ -31,6 +33,8 @@ class GraphEmbeddings(Record): metadata = Metadata() entities = Array(EntityEmbeddings()) +graph_embeddings_store_queue = topic('graph-embeddings-store') + ############################################################################ # Graph embeddings query @@ -45,6 +49,13 @@ class GraphEmbeddingsResponse(Record): error = Error() entities = Array(Value()) +graph_embeddings_request_queue = topic( + 'graph-embeddings', kind='non-persistent', namespace='request' +) +graph_embeddings_response_queue = topic( + 'graph-embeddings', kind='non-persistent', namespace='response' +) + ############################################################################ # Graph triples @@ -53,6 +64,8 @@ class Triples(Record): metadata = Metadata() triples = Array(Triple()) +triples_store_queue = topic('triples-store') + ############################################################################ # Triples query @@ -69,3 +82,9 @@ class TriplesQueryResponse(Record): error = Error() triples = Array(Triple()) +triples_request_queue = topic( + 'triples', kind='non-persistent', namespace='request' +) +triples_response_queue = topic( + 'triples', kind='non-persistent', namespace='response' +) diff --git a/trustgraph-base/trustgraph/schema/lookup.py b/trustgraph-base/trustgraph/schema/lookup.py index a88d188e..d0a0517c 100644 --- a/trustgraph-base/trustgraph/schema/lookup.py +++ b/trustgraph-base/trustgraph/schema/lookup.py @@ -17,5 +17,26 @@ class LookupResponse(Record): text = String() error = Error() +encyclopedia_lookup_request_queue = topic( + 'encyclopedia', kind='non-persistent', namespace='request' +) +encyclopedia_lookup_response_queue = topic( + 'encyclopedia', kind='non-persistent', namespace='response', +) + +dbpedia_lookup_request_queue = topic( + 'dbpedia', kind='non-persistent', namespace='request' +) +dbpedia_lookup_response_queue = topic( + 'dbpedia', kind='non-persistent', namespace='response', +) + +internet_search_request_queue = topic( + 'internet-search', kind='non-persistent', namespace='request' +) +internet_search_response_queue = topic( + 'internet-search', kind='non-persistent', namespace='response', +) + ############################################################################ diff --git a/trustgraph-base/trustgraph/schema/models.py b/trustgraph-base/trustgraph/schema/models.py index ea3b9128..a634e1c4 100644 --- a/trustgraph-base/trustgraph/schema/models.py +++ b/trustgraph-base/trustgraph/schema/models.py @@ -19,6 +19,13 @@ class TextCompletionResponse(Record): out_token = Integer() model = String() +text_completion_request_queue = topic( + 'text-completion', kind='non-persistent', namespace='request' +) +text_completion_response_queue = topic( + 'text-completion', kind='non-persistent', namespace='response' +) + ############################################################################ # Embeddings @@ -30,3 +37,9 @@ class EmbeddingsResponse(Record): error = Error() vectors = Array(Array(Double())) +embeddings_request_queue = topic( + 'embeddings', kind='non-persistent', namespace='request' +) +embeddings_response_queue = topic( + 'embeddings', kind='non-persistent', namespace='response' +) diff --git a/trustgraph-base/trustgraph/schema/object.py b/trustgraph-base/trustgraph/schema/object.py index 6667fdf3..60c2bdc3 100644 --- a/trustgraph-base/trustgraph/schema/object.py +++ b/trustgraph-base/trustgraph/schema/object.py @@ -18,6 +18,8 @@ class ObjectEmbeddings(Record): key_name = String() id = String() +object_embeddings_store_queue = topic('object-embeddings-store') + ############################################################################ # Stores rows of information @@ -27,5 +29,5 @@ class Rows(Record): row_schema = RowSchema() rows = Array(Map(String())) - +rows_store_queue = topic('rows-store') diff --git a/trustgraph-base/trustgraph/schema/prompt.py b/trustgraph-base/trustgraph/schema/prompt.py index 369ace53..15eddea8 100644 --- a/trustgraph-base/trustgraph/schema/prompt.py +++ b/trustgraph-base/trustgraph/schema/prompt.py @@ -55,5 +55,12 @@ class PromptResponse(Record): # JSON encoded object = String() +prompt_request_queue = topic( + 'prompt', kind='non-persistent', namespace='request' +) +prompt_response_queue = topic( + 'prompt', kind='non-persistent', namespace='response' +) + ############################################################################ diff --git a/trustgraph-base/trustgraph/schema/retrieval.py b/trustgraph-base/trustgraph/schema/retrieval.py index 1077e4f9..caeb8e67 100644 --- a/trustgraph-base/trustgraph/schema/retrieval.py +++ b/trustgraph-base/trustgraph/schema/retrieval.py @@ -20,6 +20,13 @@ class GraphRagResponse(Record): error = Error() response = String() +graph_rag_request_queue = topic( + 'graph-rag', kind='non-persistent', namespace='request' +) +graph_rag_response_queue = topic( + 'graph-rag', kind='non-persistent', namespace='response' +) + ############################################################################ # Document RAG text retrieval @@ -34,3 +41,9 @@ class DocumentRagResponse(Record): error = Error() response = String() +document_rag_request_queue = topic( + 'doc-rag', kind='non-persistent', namespace='request' +) +document_rag_response_queue = topic( + 'doc-rag', kind='non-persistent', namespace='response' +) diff --git a/trustgraph-cli/scripts/tg-delete-flow-class b/trustgraph-cli/scripts/tg-delete-flow-class deleted file mode 100755 index 345fe00f..00000000 --- a/trustgraph-cli/scripts/tg-delete-flow-class +++ /dev/null @@ -1,52 +0,0 @@ -#!/usr/bin/env python3 - -""" -""" - -import argparse -import os -import tabulate -from trustgraph.api import Api -import json - -default_url = os.getenv("TRUSTGRAPH_URL", 'http://localhost:8088/') - -def delete_flow_class(url, class_name): - - api = Api(url) - - class_names = api.flow_delete_class(class_name) - -def main(): - - parser = argparse.ArgumentParser( - prog='tg-delete-flow-class', - description=__doc__, - ) - - parser.add_argument( - '-u', '--api-url', - default=default_url, - help=f'API URL (default: {default_url})', - ) - - parser.add_argument( - '-n', '--class-name', - help=f'Flow class name', - ) - - args = parser.parse_args() - - try: - - delete_flow_class( - url=args.api_url, - class_name=args.class_name, - ) - - except Exception as e: - - print("Exception:", e, flush=True) - -main() - diff --git a/trustgraph-cli/scripts/tg-get-flow-class b/trustgraph-cli/scripts/tg-get-flow-class deleted file mode 100755 index 450f1df7..00000000 --- a/trustgraph-cli/scripts/tg-get-flow-class +++ /dev/null @@ -1,56 +0,0 @@ -#!/usr/bin/env python3 - -""" -Dumps out the current configuration -""" - -import argparse -import os -import tabulate -from trustgraph.api import Api -import json - -default_url = os.getenv("TRUSTGRAPH_URL", 'http://localhost:8088/') - -def get_flow_class(url, class_name): - - api = Api(url) - - cls = api.flow_get_class(class_name) - - print(json.dumps(cls, indent=4)) - -def main(): - - parser = argparse.ArgumentParser( - prog='tg-get-flow-class', - description=__doc__, - ) - - parser.add_argument( - '-u', '--api-url', - default=default_url, - help=f'API URL (default: {default_url})', - ) - - parser.add_argument( - '-n', '--class-name', - required=True, - help=f'Flow class name', - ) - - args = parser.parse_args() - - try: - - get_flow_class( - url=args.api_url, - class_name=args.class_name, - ) - - except Exception as e: - - print("Exception:", e, flush=True) - -main() - diff --git a/trustgraph-cli/scripts/tg-put-flow-class b/trustgraph-cli/scripts/tg-put-flow-class deleted file mode 100755 index ca048e1f..00000000 --- a/trustgraph-cli/scripts/tg-put-flow-class +++ /dev/null @@ -1,59 +0,0 @@ -#!/usr/bin/env python3 - -""" -Dumps out the current configuration -""" - -import argparse -import os -import tabulate -from trustgraph.api import Api -import json - -default_url = os.getenv("TRUSTGRAPH_URL", 'http://localhost:8088/') - -def put_flow_class(url, class_name, config): - - api = Api(url) - - class_names = api.flow_put_class(class_name, config) - -def main(): - - parser = argparse.ArgumentParser( - prog='tg-put-flow-class', - description=__doc__, - ) - - parser.add_argument( - '-u', '--api-url', - default=default_url, - help=f'API URL (default: {default_url})', - ) - - parser.add_argument( - '-n', '--class-name', - help=f'Flow class name', - ) - - parser.add_argument( - '-c', '--config', - help=f'Initial configuration to load', - ) - - args = parser.parse_args() - - try: - - put_flow_class( - url=args.api_url, - class_name=args.class_name, - config=json.loads(args.config), - ) - - except Exception as e: - - print("Exception:", e, flush=True) - -main() - diff --git a/trustgraph-cli/scripts/tg-show-flow-classes b/trustgraph-cli/scripts/tg-show-flow-classes deleted file mode 100755 index 7a133dc4..00000000 --- a/trustgraph-cli/scripts/tg-show-flow-classes +++ /dev/null @@ -1,64 +0,0 @@ -#!/usr/bin/env python3 - -""" -""" - -import argparse -import os -import tabulate -from trustgraph.api import Api -import json - -default_url = os.getenv("TRUSTGRAPH_URL", 'http://localhost:8088/') - -def show_flow_classes(url): - - api = Api(url) - - class_names = api.flow_list_classes() - - classes = [] - - for class_name in class_names: - cls = api.flow_get_class(class_name) - classes.append(( - class_name, - cls.get("description", ""), - ", ".join(cls.get("tags", [])), - )) - - print(tabulate.tabulate( - classes, - tablefmt="pretty", - maxcolwidths=[None, 40, 20], - stralign="left", - headers = ["flow class", "description", "tags"], - )) - -def main(): - - parser = argparse.ArgumentParser( - prog='tg-show-flow-classes', - description=__doc__, - ) - - parser.add_argument( - '-u', '--api-url', - default=default_url, - help=f'API URL (default: {default_url})', - ) - - args = parser.parse_args() - - try: - - show_flow_classes( - url=args.api_url, - ) - - except Exception as e: - - print("Exception:", e, flush=True) - -main() - diff --git a/trustgraph-cli/scripts/tg-show-flows b/trustgraph-cli/scripts/tg-show-flows deleted file mode 100755 index e458d798..00000000 --- a/trustgraph-cli/scripts/tg-show-flows +++ /dev/null @@ -1,65 +0,0 @@ -#!/usr/bin/env python3 - -""" -""" - -import argparse -import os -import tabulate -from trustgraph.api import Api -import json - -default_url = os.getenv("TRUSTGRAPH_URL", 'http://localhost:8088/') - -def show_flows(url): - - api = Api(url) - - flow_ids = api.flow_list() - - print(flow_ids) - - flows = [] - - for id in flow_ids: - flow = api.flow_get(id) - flows.append(( - id, - flow.get("description", ""), - )) - - print(tabulate.tabulate( - flows, - tablefmt="pretty", - maxcolwidths=[None, 40], - stralign="left", - headers = ["id", "description"], - )) - -def main(): - - parser = argparse.ArgumentParser( - prog='tg-show-flows', - description=__doc__, - ) - - parser.add_argument( - '-u', '--api-url', - default=default_url, - help=f'API URL (default: {default_url})', - ) - - args = parser.parse_args() - - try: - - show_flows( - url=args.api_url, - ) - - except Exception as e: - - print("Exception:", e, flush=True) - -main() - diff --git a/trustgraph-cli/setup.py b/trustgraph-cli/setup.py index a16bd732..dda401ec 100644 --- a/trustgraph-cli/setup.py +++ b/trustgraph-cli/setup.py @@ -63,13 +63,6 @@ setuptools.setup( "scripts/tg-save-kg-core", "scripts/tg-save-doc-embeds", "scripts/tg-show-config", - "scripts/tg-show-flows", - "scripts/tg-show-flow-classes", - "scripts/tg-get-flow-class", - "scripts/tg-start-flow", - "scripts/tg-stop-flow", - "scripts/tg-delete-flow-class", - "scripts/tg-put-flow-class", "scripts/tg-set-prompt", "scripts/tg-show-tools", "scripts/tg-show-prompts", diff --git a/trustgraph-flow/trustgraph/config/service/flow.py b/trustgraph-flow/trustgraph/config/service/flow.py deleted file mode 100644 index beef679e..00000000 --- a/trustgraph-flow/trustgraph/config/service/flow.py +++ /dev/null @@ -1,210 +0,0 @@ - -from trustgraph.schema import FlowResponse, Error -import json - -class FlowConfig: - def __init__(self, config): - - self.config = config - - async def handle_list_classes(self, msg): - - names = list(self.config["flow-classes"].keys()) - - return FlowResponse( - error = None, - class_names = names, - ) - - async def handle_get_class(self, msg): - - return FlowResponse( - error = None, - class_definition = self.config["flow-classes"][msg.class_name], - ) - - async def handle_put_class(self, msg): - - self.config["flow-classes"][msg.class_name] = msg.class_definition - - await self.config.push() - - return FlowResponse( - error = None, - ) - - async def handle_delete_class(self, msg): - - print(msg) - - del self.config["flow-classes"][msg.class_name] - - await self.config.push() - - return FlowResponse( - error = None, - ) - - async def handle_list_flows(self, msg): - - names = list(self.config["flows"].keys()) - - return FlowResponse( - error = None, - flow_ids = names, - ) - - async def handle_get_flow(self, msg): - - flow = self.config["flows"][msg.flow_id] - - return FlowResponse( - error = None, - flow = flow, - ) - - async def handle_start_flow(self, msg): - - if msg.class_name is None: - raise RuntimeError("No class name") - - if msg.flow_id is None: - raise RuntimeError("No flow ID") - - if msg.flow_id in self.config["flows"]: - raise RuntimeError("Flow already exists") - - if msg.description is None: - raise RuntimeError("No description") - - if msg.class_name not in self.config["flow-classes"]: - raise RuntimeError("Class does not exist") - - def repl_template(tmp): - return tmp.replace( - "{class}", msg.class_name - ).replace( - "{id}", msg.flow_id - ) - - cls = json.loads(self.config["flow-classes"][msg.class_name]) - - for kind in ("class", "flow"): - - for k, v in cls[kind].items(): - - processor, variant = k.split(":", 1) - - variant = repl_template(variant) - - v = { - repl_template(k2): repl_template(v2) - for k2, v2 in v.items() - } - - if processor in self.config["flows-active"]: - target = json.loads(self.config["flows-active"][processor]) - else: - target = {} - - if variant not in target: - target[variant] = v - - self.config["flows-active"][processor] = json.dumps(target) - - self.config["flows"][msg.flow_id] = json.dumps({ - "description": msg.description, - "class-name": msg.class_name, - }) - - await self.config.push() - - return FlowResponse( - error = None, - ) - - async def handle_stop_flow(self, msg): - - if msg.flow_id is None: - raise RuntimeError("No flow ID") - - if msg.flow_id not in self.config["flows"]: - raise RuntimeError("Flow ID invalid") - - flow = json.loads(self.config["flows"][msg.flow_id]) - - if "class-name" not in flow: - raise RuntimeError("Internal error: flow has no flow class") - - class_name = flow["class-name"] - - cls = json.loads(self.config["flow-classes"][class_name]) - - def repl_template(tmp): - return tmp.replace( - "{class}", class_name - ).replace( - "{id}", msg.flow_id - ) - - for kind in ("flow",): - - for k, v in cls[kind].items(): - - processor, variant = k.split(":", 1) - - variant = repl_template(variant) - - if processor in self.config["flows-active"]: - target = json.loads(self.config["flows-active"][processor]) - else: - target = {} - - if variant in target: - del target[variant] - - self.config["flows-active"][processor] = json.dumps(target) - - if msg.flow_id in self.config["flows"]: - del self.config["flows"][msg.flow_id] - - await self.config.push() - - return FlowResponse( - error = None, - ) - - async def handle(self, msg): - - print("Handle message ", msg.operation) - - if msg.operation == "list-classes": - resp = await self.handle_list_classes(msg) - elif msg.operation == "get-class": - resp = await self.handle_get_class(msg) - elif msg.operation == "put-class": - resp = await self.handle_put_class(msg) - elif msg.operation == "delete-class": - resp = await self.handle_delete_class(msg) - elif msg.operation == "list-flows": - resp = await self.handle_list_flows(msg) - elif msg.operation == "get-flow": - resp = await self.handle_get_flow(msg) - elif msg.operation == "start-flow": - resp = await self.handle_start_flow(msg) - elif msg.operation == "stop-flow": - resp = await self.handle_stop_flow(msg) - else: - - resp = FlowResponse( - value=None, - directory=None, - values=None, - error=Error( - type = "bad-operation", - message = "Bad operation" - ) - ) - - return resp - diff --git a/trustgraph-flow/trustgraph/config/service/service.py b/trustgraph-flow/trustgraph/config/service/service.py index c0268389..47bac828 100644 --- a/trustgraph-flow/trustgraph/config/service/service.py +++ b/trustgraph-flow/trustgraph/config/service/service.py @@ -5,139 +5,81 @@ Config service. Manages system global configuration state from pulsar.schema import JsonSchema -from trustgraph.schema import Error - from trustgraph.schema import ConfigRequest, ConfigResponse, ConfigPush +from trustgraph.schema import Error from trustgraph.schema import config_request_queue, config_response_queue from trustgraph.schema import config_push_queue - -from trustgraph.schema import FlowRequest, FlowResponse -from trustgraph.schema import flow_request_queue, flow_response_queue - from trustgraph.log_level import LogLevel from trustgraph.base import AsyncProcessor, Consumer, Producer from . config import Configuration -from . flow import FlowConfig - from ... base import ProcessorMetrics, ConsumerMetrics, ProducerMetrics from ... base import Consumer, Producer default_ident = "config-svc" -default_config_request_queue = config_request_queue -default_config_response_queue = config_response_queue -default_config_push_queue = config_push_queue - -default_flow_request_queue = flow_request_queue -default_flow_response_queue = flow_response_queue +default_request_queue = config_request_queue +default_response_queue = config_response_queue +default_push_queue = config_push_queue class Processor(AsyncProcessor): def __init__(self, **params): - config_request_queue = params.get( - "config_request_queue", default_config_request_queue - ) - config_response_queue = params.get( - "config_response_queue", default_config_response_queue - ) - config_push_queue = params.get( - "config_push_queue", default_config_push_queue - ) - - flow_request_queue = params.get( - "flow_request_queue", default_flow_request_queue - ) - flow_response_queue = params.get( - "flow_response_queue", default_flow_response_queue - ) - + request_queue = params.get("request_queue", default_request_queue) + response_queue = params.get("response_queue", default_response_queue) + push_queue = params.get("push_queue", default_push_queue) id = params.get("id") - flow_request_schema = FlowRequest - flow_response_schema = FlowResponse + request_schema = ConfigRequest + response_schema = ConfigResponse + push_schema = ConfigResponse super(Processor, self).__init__( **params | { - "config_request_schema": ConfigRequest.__name__, - "config_response_schema": ConfigResponse.__name__, - "config_push_schema": ConfigPush.__name__, - "flow_request_schema": FlowRequest.__name__, - "flow_response_schema": FlowResponse.__name__, + "request_schema": request_schema.__name__, + "response_schema": response_schema.__name__, + "push_schema": push_schema.__name__, } ) - config_request_metrics = ConsumerMetrics( - processor = self.id, flow = None, name = "config-request" - ) - config_response_metrics = ProducerMetrics( - processor = self.id, flow = None, name = "config-response" - ) - config_push_metrics = ProducerMetrics( - processor = self.id, flow = None, name = "config-push" - ) + request_metrics = ConsumerMetrics(id + "-request") + response_metrics = ProducerMetrics(id + "-response") + push_metrics = ProducerMetrics(id + "-push") - flow_request_metrics = ConsumerMetrics( - processor = self.id, flow = None, name = "flow-request" - ) - flow_response_metrics = ProducerMetrics( - processor = self.id, flow = None, name = "flow-response" - ) - - self.config_request_consumer = Consumer( - taskgroup = self.taskgroup, - client = self.pulsar_client, - flow = None, - topic = config_request_queue, - subscriber = id, - schema = ConfigRequest, - handler = self.on_config_request, - metrics = config_request_metrics, - ) - - self.config_response_producer = Producer( - client = self.pulsar_client, - topic = config_response_queue, - schema = ConfigResponse, - metrics = config_response_metrics, - ) - - self.config_push_producer = Producer( - client = self.pulsar_client, - topic = config_push_queue, + self.push_pub = Producer( + client = self.client, + topic = push_queue, schema = ConfigPush, - metrics = config_push_metrics, + metrics = push_metrics, ) - self.flow_request_consumer = Consumer( + self.response_pub = Producer( + client = self.client, + topic = response_queue, + schema = ConfigResponse, + metrics = response_metrics, + ) + + self.subs = Consumer( taskgroup = self.taskgroup, - client = self.pulsar_client, + client = self.client, flow = None, - topic = flow_request_queue, + topic = request_queue, subscriber = id, - schema = FlowRequest, - handler = self.on_flow_request, - metrics = flow_request_metrics, - ) - - self.flow_response_producer = Producer( - client = self.pulsar_client, - topic = flow_response_queue, - schema = FlowResponse, - metrics = flow_response_metrics, + schema = request_schema, + handler = self.on_message, + metrics = request_metrics, ) self.config = Configuration(self.push) - self.flow = FlowConfig(self.config) print("Service initialised.") async def start(self): await self.push() - await self.config_request_consumer.start() - await self.flow_request_consumer.start() + await self.subs.start() async def push(self): @@ -150,11 +92,11 @@ class Processor(AsyncProcessor): error = None, ) - await self.config_push_producer.send(resp) + await self.push_pub.send(resp) print("Pushed version ", self.config.version) - async def on_config_request(self, msg, consumer, flow): + async def on_message(self, msg, consumer, flow): try: @@ -167,54 +109,19 @@ class Processor(AsyncProcessor): resp = await self.config.handle(v) - await self.config_response_producer.send( - resp, properties={"id": id} - ) + await self.response_pub.send(resp, properties={"id": id}) except Exception as e: resp = ConfigResponse( error=Error( - type = "config-error", + type = "unexpected-error", message = str(e), ), text=None, ) - await self.config_response_producer.send( - resp, properties={"id": id} - ) - - async def on_flow_request(self, msg, consumer, flow): - - try: - - v = msg.value() - - # Sender-produced ID - id = msg.properties()["id"] - - print(f"Handling {id}...", flush=True) - - resp = await self.flow.handle(v) - - await self.flow_response_producer.send( - resp, properties={"id": id} - ) - - except Exception as e: - - resp = FlowResponse( - error=Error( - type = "flow-error", - message = str(e), - ), - text=None, - ) - - await self.flow_response_producer.send( - resp, properties={"id": id} - ) + await self.response_pub.send(resp, properties={"id": id}) @staticmethod def add_args(parser): @@ -222,33 +129,21 @@ class Processor(AsyncProcessor): AsyncProcessor.add_args(parser) parser.add_argument( - '--config-request-queue', - default=default_config_request_queue, - help=f'Config request queue (default: {default_config_request_queue})' + '-q', '--request-queue', + default=default_request_queue, + help=f'Request queue (default: {default_request_queue})' ) parser.add_argument( - '--config-response-queue', - default=default_config_response_queue, - help=f'Config response queue {default_config_response_queue}', + '-r', '--response-queue', + default=default_response_queue, + help=f'Response queue {default_response_queue}', ) parser.add_argument( '--push-queue', - default=default_config_push_queue, - help=f'Config push queue (default: {default_config_push_queue})' - ) - - parser.add_argument( - '--flow-request-queue', - default=default_flow_request_queue, - help=f'Flow request queue (default: {default_flow_request_queue})' - ) - - parser.add_argument( - '--flow-response-queue', - default=default_flow_response_queue, - help=f'Flow response queue {default_flow_response_queue}', + default=default_push_queue, + help=f'Config push queue (default: {default_push_queue})' ) def run(): diff --git a/trustgraph-flow/trustgraph/gateway/flow.py b/trustgraph-flow/trustgraph/gateway/flow.py deleted file mode 100644 index c666d99c..00000000 --- a/trustgraph-flow/trustgraph/gateway/flow.py +++ /dev/null @@ -1,51 +0,0 @@ - -from .. schema import FlowRequest, FlowResponse, ConfigKey, ConfigValue -from .. schema import flow_request_queue -from .. schema import flow_response_queue - -from . endpoint import ServiceEndpoint -from . requestor import ServiceRequestor - -class FlowRequestor(ServiceRequestor): - def __init__(self, pulsar_client, timeout, auth): - - super(FlowRequestor, self).__init__( - pulsar_client=pulsar_client, - request_queue=flow_request_queue, - response_queue=flow_response_queue, - request_schema=FlowRequest, - response_schema=FlowResponse, - timeout=timeout, - ) - - def to_request(self, body): - - return FlowRequest( - operation = body.get("operation", None), - class_name = body.get("class-name", None), - class_definition = body.get("class-definition", None), - description = body.get("description", None), - flow_id = body.get("flow-id", None), - ) - - def from_response(self, message): - - response = { } - - if message.class_names is not None: - response["class-names"] = message.class_names - - if message.flow_ids is not None: - response["flow-ids"] = message.flow_ids - - if message.class_definition is not None: - response["class-definition"] = message.class_definition - - if message.flow is not None: - response["flow"] = message.flow - - if message.description is not None: - response["description"] = message.description - - return response, True - diff --git a/trustgraph-flow/trustgraph/gateway/mux.py b/trustgraph-flow/trustgraph/gateway/mux.py index 1afc3225..8195c542 100644 --- a/trustgraph-flow/trustgraph/gateway/mux.py +++ b/trustgraph-flow/trustgraph/gateway/mux.py @@ -5,6 +5,7 @@ import uuid from aiohttp import web, WSMsgType from . socket import SocketEndpoint +from . text_completion import TextCompletionRequestor MAX_OUTSTANDING_REQUESTS = 15 WORKER_CLOSE_WAIT = 0.01 diff --git a/trustgraph-flow/trustgraph/gateway/service.py b/trustgraph-flow/trustgraph/gateway/service.py index d7df3240..29b31483 100755 --- a/trustgraph-flow/trustgraph/gateway/service.py +++ b/trustgraph-flow/trustgraph/gateway/service.py @@ -25,30 +25,28 @@ from .. log_level import LogLevel from . serialize import to_subgraph from . running import Running - -#from . text_completion import TextCompletionRequestor -#from . prompt import PromptRequestor -#from . graph_rag import GraphRagRequestor -#from . document_rag import DocumentRagRequestor -#from . triples_query import TriplesQueryRequestor -#from . graph_embeddings_query import GraphEmbeddingsQueryRequestor -#from . embeddings import EmbeddingsRequestor -#from . encyclopedia import EncyclopediaRequestor -#from . agent import AgentRequestor -#from . dbpedia import DbpediaRequestor -#from . internet_search import InternetSearchRequestor -#from . librarian import LibrarianRequestor +from . text_completion import TextCompletionRequestor +from . prompt import PromptRequestor +from . graph_rag import GraphRagRequestor +from . document_rag import DocumentRagRequestor +from . triples_query import TriplesQueryRequestor +from . graph_embeddings_query import GraphEmbeddingsQueryRequestor +from . embeddings import EmbeddingsRequestor +from . encyclopedia import EncyclopediaRequestor +from . agent import AgentRequestor +from . dbpedia import DbpediaRequestor +from . internet_search import InternetSearchRequestor +from . librarian import LibrarianRequestor from . config import ConfigRequestor -from . flow import FlowRequestor -#from . triples_stream import TriplesStreamEndpoint -#from . graph_embeddings_stream import GraphEmbeddingsStreamEndpoint -#from . document_embeddings_stream import DocumentEmbeddingsStreamEndpoint -#from . triples_load import TriplesLoadEndpoint -#from . graph_embeddings_load import GraphEmbeddingsLoadEndpoint -#from . document_embeddings_load import DocumentEmbeddingsLoadEndpoint +from . triples_stream import TriplesStreamEndpoint +from . graph_embeddings_stream import GraphEmbeddingsStreamEndpoint +from . document_embeddings_stream import DocumentEmbeddingsStreamEndpoint +from . triples_load import TriplesLoadEndpoint +from . graph_embeddings_load import GraphEmbeddingsLoadEndpoint +from . document_embeddings_load import DocumentEmbeddingsLoadEndpoint from . mux import MuxEndpoint -#from . document_load import DocumentLoadSender -#from . text_load import TextLoadSender +from . document_load import DocumentLoadSender +from . text_load import TextLoadSender from . metrics import MetricsEndpoint from . endpoint import ServiceEndpoint @@ -107,165 +105,157 @@ class Api: self.auth = Authenticator(allow_all=True) self.services = { - # "text-completion": TextCompletionRequestor( - # pulsar_client=self.pulsar_client, timeout=self.timeout, - # auth = self.auth, - # ), - # "prompt": PromptRequestor( - # pulsar_client=self.pulsar_client, timeout=self.timeout, - # auth = self.auth, - # ), - # "graph-rag": GraphRagRequestor( - # pulsar_client=self.pulsar_client, timeout=self.timeout, - # auth = self.auth, - # ), - # "document-rag": DocumentRagRequestor( - # pulsar_client=self.pulsar_client, timeout=self.timeout, - # auth = self.auth, - # ), - # "triples-query": TriplesQueryRequestor( - # pulsar_client=self.pulsar_client, timeout=self.timeout, - # auth = self.auth, - # ), - # "graph-embeddings-query": GraphEmbeddingsQueryRequestor( - # pulsar_client=self.pulsar_client, timeout=self.timeout, - # auth = self.auth, - # ), - # "embeddings": EmbeddingsRequestor( - # pulsar_client=self.pulsar_client, timeout=self.timeout, - # auth = self.auth, - # ), - # "agent": AgentRequestor( - # pulsar_client=self.pulsar_client, timeout=self.timeout, - # auth = self.auth, - # ), - # "librarian": LibrarianRequestor( - # pulsar_client=self.pulsar_client, timeout=self.timeout, - # auth = self.auth, - # ), + "text-completion": TextCompletionRequestor( + pulsar_client=self.pulsar_client, timeout=self.timeout, + auth = self.auth, + ), + "prompt": PromptRequestor( + pulsar_client=self.pulsar_client, timeout=self.timeout, + auth = self.auth, + ), + "graph-rag": GraphRagRequestor( + pulsar_client=self.pulsar_client, timeout=self.timeout, + auth = self.auth, + ), + "document-rag": DocumentRagRequestor( + pulsar_client=self.pulsar_client, timeout=self.timeout, + auth = self.auth, + ), + "triples-query": TriplesQueryRequestor( + pulsar_client=self.pulsar_client, timeout=self.timeout, + auth = self.auth, + ), + "graph-embeddings-query": GraphEmbeddingsQueryRequestor( + pulsar_client=self.pulsar_client, timeout=self.timeout, + auth = self.auth, + ), + "embeddings": EmbeddingsRequestor( + pulsar_client=self.pulsar_client, timeout=self.timeout, + auth = self.auth, + ), + "agent": AgentRequestor( + pulsar_client=self.pulsar_client, timeout=self.timeout, + auth = self.auth, + ), + "librarian": LibrarianRequestor( + pulsar_client=self.pulsar_client, timeout=self.timeout, + auth = self.auth, + ), "config": ConfigRequestor( pulsar_client=self.pulsar_client, timeout=self.timeout, auth = self.auth, ), - "flow": FlowRequestor( + "encyclopedia": EncyclopediaRequestor( pulsar_client=self.pulsar_client, timeout=self.timeout, auth = self.auth, ), - # "encyclopedia": EncyclopediaRequestor( - # pulsar_client=self.pulsar_client, timeout=self.timeout, - # auth = self.auth, - # ), - # "dbpedia": DbpediaRequestor( - # pulsar_client=self.pulsar_client, timeout=self.timeout, - # auth = self.auth, - # ), - # "internet-search": InternetSearchRequestor( - # pulsar_client=self.pulsar_client, timeout=self.timeout, - # auth = self.auth, - # ), - # "document-load": DocumentLoadSender( - # pulsar_client=self.pulsar_client, - # ), - # "text-load": TextLoadSender( - # pulsar_client=self.pulsar_client, - # ), + "dbpedia": DbpediaRequestor( + pulsar_client=self.pulsar_client, timeout=self.timeout, + auth = self.auth, + ), + "internet-search": InternetSearchRequestor( + pulsar_client=self.pulsar_client, timeout=self.timeout, + auth = self.auth, + ), + "document-load": DocumentLoadSender( + pulsar_client=self.pulsar_client, + ), + "text-load": TextLoadSender( + pulsar_client=self.pulsar_client, + ), } self.endpoints = [ - # ServiceEndpoint( - # endpoint_path = "/api/v1/text-completion", auth=self.auth, - # requestor = self.services["text-completion"], - # ), - # ServiceEndpoint( - # endpoint_path = "/api/v1/prompt", auth=self.auth, - # requestor = self.services["prompt"], - # ), - # ServiceEndpoint( - # endpoint_path = "/api/v1/graph-rag", auth=self.auth, - # requestor = self.services["graph-rag"], - # ), - # ServiceEndpoint( - # endpoint_path = "/api/v1/document-rag", auth=self.auth, - # requestor = self.services["document-rag"], - # ), - # ServiceEndpoint( - # endpoint_path = "/api/v1/triples-query", auth=self.auth, - # requestor = self.services["triples-query"], - # ), - # ServiceEndpoint( - # endpoint_path = "/api/v1/graph-embeddings-query", - # auth=self.auth, - # requestor = self.services["graph-embeddings-query"], - # ), - # ServiceEndpoint( - # endpoint_path = "/api/v1/embeddings", auth=self.auth, - # requestor = self.services["embeddings"], - # ), - # ServiceEndpoint( - # endpoint_path = "/api/v1/agent", auth=self.auth, - # requestor = self.services["agent"], - # ), - # ServiceEndpoint( - # endpoint_path = "/api/v1/librarian", auth=self.auth, - # requestor = self.services["librarian"], - # ), + ServiceEndpoint( + endpoint_path = "/api/v1/text-completion", auth=self.auth, + requestor = self.services["text-completion"], + ), + ServiceEndpoint( + endpoint_path = "/api/v1/prompt", auth=self.auth, + requestor = self.services["prompt"], + ), + ServiceEndpoint( + endpoint_path = "/api/v1/graph-rag", auth=self.auth, + requestor = self.services["graph-rag"], + ), + ServiceEndpoint( + endpoint_path = "/api/v1/document-rag", auth=self.auth, + requestor = self.services["document-rag"], + ), + ServiceEndpoint( + endpoint_path = "/api/v1/triples-query", auth=self.auth, + requestor = self.services["triples-query"], + ), + ServiceEndpoint( + endpoint_path = "/api/v1/graph-embeddings-query", + auth=self.auth, + requestor = self.services["graph-embeddings-query"], + ), + ServiceEndpoint( + endpoint_path = "/api/v1/embeddings", auth=self.auth, + requestor = self.services["embeddings"], + ), + ServiceEndpoint( + endpoint_path = "/api/v1/agent", auth=self.auth, + requestor = self.services["agent"], + ), + ServiceEndpoint( + endpoint_path = "/api/v1/librarian", auth=self.auth, + requestor = self.services["librarian"], + ), ServiceEndpoint( endpoint_path = "/api/v1/config", auth=self.auth, requestor = self.services["config"], ), ServiceEndpoint( - endpoint_path = "/api/v1/flow", auth=self.auth, - requestor = self.services["flow"], + endpoint_path = "/api/v1/encyclopedia", auth=self.auth, + requestor = self.services["encyclopedia"], + ), + ServiceEndpoint( + endpoint_path = "/api/v1/dbpedia", auth=self.auth, + requestor = self.services["dbpedia"], + ), + ServiceEndpoint( + endpoint_path = "/api/v1/internet-search", auth=self.auth, + requestor = self.services["internet-search"], + ), + ServiceEndpoint( + endpoint_path = "/api/v1/load/document", auth=self.auth, + requestor = self.services["document-load"], + ), + ServiceEndpoint( + endpoint_path = "/api/v1/load/text", auth=self.auth, + requestor = self.services["text-load"], + ), + TriplesStreamEndpoint( + pulsar_client=self.pulsar_client, + auth = self.auth, + ), + GraphEmbeddingsStreamEndpoint( + pulsar_client=self.pulsar_client, + auth = self.auth, + ), + DocumentEmbeddingsStreamEndpoint( + pulsar_client=self.pulsar_client, + auth = self.auth, + ), + TriplesLoadEndpoint( + pulsar_client=self.pulsar_client, + auth = self.auth, + ), + GraphEmbeddingsLoadEndpoint( + pulsar_client=self.pulsar_client, + auth = self.auth, + ), + DocumentEmbeddingsLoadEndpoint( + pulsar_client=self.pulsar_client, + auth = self.auth, + ), + MuxEndpoint( + pulsar_client=self.pulsar_client, + auth = self.auth, + services = self.services, ), - # ServiceEndpoint( - # endpoint_path = "/api/v1/encyclopedia", auth=self.auth, - # requestor = self.services["encyclopedia"], - # ), - # ServiceEndpoint( - # endpoint_path = "/api/v1/dbpedia", auth=self.auth, - # requestor = self.services["dbpedia"], - # ), - # ServiceEndpoint( - # endpoint_path = "/api/v1/internet-search", auth=self.auth, - # requestor = self.services["internet-search"], - # ), - # ServiceEndpoint( - # endpoint_path = "/api/v1/load/document", auth=self.auth, - # requestor = self.services["document-load"], - # ), - # ServiceEndpoint( - # endpoint_path = "/api/v1/load/text", auth=self.auth, - # requestor = self.services["text-load"], - # ), - # TriplesStreamEndpoint( - # pulsar_client=self.pulsar_client, - # auth = self.auth, - # ), - # GraphEmbeddingsStreamEndpoint( - # pulsar_client=self.pulsar_client, - # auth = self.auth, - # ), - # DocumentEmbeddingsStreamEndpoint( - # pulsar_client=self.pulsar_client, - # auth = self.auth, - # ), - # TriplesLoadEndpoint( - # pulsar_client=self.pulsar_client, - # auth = self.auth, - # ), - # GraphEmbeddingsLoadEndpoint( - # pulsar_client=self.pulsar_client, - # auth = self.auth, - # ), - # DocumentEmbeddingsLoadEndpoint( - # pulsar_client=self.pulsar_client, - # auth = self.auth, - # ), - # MuxEndpoint( - # pulsar_client=self.pulsar_client, - # auth = self.auth, - # services = self.services, - # ), MetricsEndpoint( endpoint_path = "/api/v1/metrics", prometheus_url = self.prometheus_url,