diff --git a/test-api/test-library-add-doc b/test-api/test-library-add-doc new file mode 100755 index 00000000..bd927367 --- /dev/null +++ b/test-api/test-library-add-doc @@ -0,0 +1,78 @@ +#!/usr/bin/env python3 + +import requests +import json +import sys +import base64 + +url = "http://localhost:8088/api/v1/" + +############################################################################ + +id = "http://trustgraph.ai/doc/12345678" + +with open("docs/README.cats") as f: + doc = base64.b64encode(f.read().encode("utf-8")).decode("utf-8") + +input = { + "operation": "add", + "document": { + "id": id, + "metadata": [ + { + "s": { + "v": id, + "e": True, + }, + "p": { + "v": "http://www.w3.org/2000/01/rdf-schema#label", + "e": True, + }, + "o": { + "v": "Mark's pets", "e": False, + }, + }, + { + "s": { + "v": id, + "e": True, + }, + "p": { + "v": 'https://schema.org/keywords', + "e": True, + }, + "o": { + "v": "cats", "e": False, + }, + }, + ], + "document": doc, + "kind": "text/plain", + "user": "trustgraph", + "collection": "default", + "title": "Mark's cats", + "comments": "Test doc taken from the TrustGraph repo", + } +} + +resp = requests.post( + f"{url}librarian", + json=input, +) + +print(resp.text) +resp = resp.json() + +print(resp) + +if "error" in resp: + print(f"Error: {resp['error']}") + sys.exit(1) + +# print(resp["response"]) +print(resp) + +sys.exit(0) + +############################################################################ + diff --git a/test-api/test-library-add-doc2 b/test-api/test-library-add-doc2 new file mode 100755 index 00000000..0c0856f9 --- /dev/null +++ b/test-api/test-library-add-doc2 @@ -0,0 +1,90 @@ +#!/usr/bin/env python3 + +import requests +import json +import sys +import base64 + +url = "http://localhost:8088/api/v1/" + +############################################################################ + +id = "http://trustgraph.ai/doc/12345678" + +source = "../sources/20160001634.pdf" + +with open(source, "rb") as f: + doc = base64.b64encode(f.read()).decode("utf-8") + +input = { + "operation": "add", + "id": id, + "document": { + "metadata": [ + { + "s": { + "v": id, + "e": True, + }, + "p": { + "v": "http://www.w3.org/2000/01/rdf-schema#label", + "e": True, + }, + "o": { + "v": "Challenger report volume 1", "e": False, + }, + }, + { + "s": { + "v": id, + "e": True, + }, + "p": { + "v": 'https://schema.org/keywords', + "e": True, + }, + "o": { + "v": "space shuttle", "e": False, + }, + }, + { + "s": { + "v": id, + "e": True, + }, + "p": { + "v": 'https://schema.org/keywords', + "e": True, + }, + "o": { + "v": "nasa", "e": False, + }, + }, + ], + "document": doc, + "kind": "application/pdf", + "user": "trustgraph", + "collection": "default", + } +} + +resp = requests.post( + f"{url}librarian", + json=input, +) + +print(resp.text) +resp = resp.json() + +print(resp) + +if "error" in resp: + print(f"Error: {resp['error']}") + sys.exit(1) + +print(resp) + +sys.exit(0) + +############################################################################ + diff --git a/test-api/test-library-list b/test-api/test-library-list new file mode 100755 index 00000000..72ea4478 --- /dev/null +++ b/test-api/test-library-list @@ -0,0 +1,39 @@ +#!/usr/bin/env python3 + +import requests +import json +import sys +import base64 + +url = "http://localhost:8088/api/v1/" + +############################################################################ + +user = "trustgraph" + +input = { + "operation": "list", + "user": user, +} + +resp = requests.post( + f"{url}librarian", + json=input, +) + +print(resp.text) +resp = resp.json() + +print(resp) + +if "error" in resp: + print(f"Error: {resp['error']}") + sys.exit(1) + +# print(resp["response"]) +print(resp) + +sys.exit(0) + +############################################################################ + diff --git a/tests/test-config b/tests/test-config new file mode 100644 index 00000000..63f77b6b --- /dev/null +++ b/tests/test-config @@ -0,0 +1,2 @@ +#!/usr/bin/env python3 + diff --git a/tests/test-flow b/tests/test-flow new file mode 100755 index 00000000..87a349af --- /dev/null +++ b/tests/test-flow @@ -0,0 +1,92 @@ +#!/usr/bin/env python3 + +import requests + +url = "http://localhost:8088/" + +resp = requests.post( + f"{url}/api/v1/flow", + json={ + "operation": "list-classes", + } +) + +print(resp) +print(resp.text) + +resp = requests.post( + f"{url}/api/v1/flow", + json={ + "operation": "get-class", + "class-name": "default", + } +) + +print(resp) +print(resp.text) + +resp = requests.post( + f"{url}/api/v1/flow", + json={ + "operation": "put-class", + "class-name": "bunch", + "class-definition": "{}", + } +) + +print(resp) +print(resp.text) + +resp = requests.post( + f"{url}/api/v1/flow", + json={ + "operation": "get-class", + "class-name": "bunch", + } +) + +print(resp) +print(resp.text) + +resp = requests.post( + f"{url}/api/v1/flow", + json={ + "operation": "list-classes", + } +) + +print(resp) +print(resp.text) + + +resp = requests.post( + f"{url}/api/v1/flow", + json={ + "operation": "delete-class", + "class-name": "bunch", + } +) + +print(resp) +print(resp.text) + + +resp = requests.post( + f"{url}/api/v1/flow", + json={ + "operation": "list-classes", + } +) + +print(resp) +print(resp.text) + +resp = requests.post( + f"{url}/api/v1/flow", + json={ + "operation": "list-flows", + } +) + +print(resp) +print(resp.text) diff --git a/tests/test-flow-get-class b/tests/test-flow-get-class new file mode 100755 index 00000000..20707b51 --- /dev/null +++ b/tests/test-flow-get-class @@ -0,0 +1,19 @@ +#!/usr/bin/env python3 + +import requests + +url = "http://localhost:8088/" + +resp = requests.post( + f"{url}/api/v1/flow", + json={ + "operation": "get-class", + "class-name": "default", + } +) + +resp = resp.json() + +print(resp["class-definition"]) + + diff --git a/tests/test-flow-put-class b/tests/test-flow-put-class new file mode 100755 index 00000000..8fd4d9f2 --- /dev/null +++ b/tests/test-flow-put-class @@ -0,0 +1,22 @@ +#!/usr/bin/env python3 + +import requests +import json + +url = "http://localhost:8088/" + +defn = {"class": {"de-query:{class}": {"request": "non-persistent://tg/request/document-embeddings:{class}", "response": "non-persistent://tg/response/document-embeddings:{class}"}, "document-rag:{class}": {"document-embeddings-request": "non-persistent://tg/request/document-embeddings:{class}", "document-embeddings-response": "non-persistent://tg/response/document-embeddings:{class}", "embeddings-request": "non-persistent://tg/request/embeddings:{class}", "embeddings-response": "non-persistent://tg/response/embeddings:{class}", "prompt-request": "non-persistent://tg/request/prompt-rag:{class}", "prompt-response": "non-persistent://tg/response/prompt-rag:{class}", "request": "non-persistent://tg/request/document-rag:{class}", "response": "non-persistent://tg/response/document-rag:{class}"}, "embeddings:{class}": {"request": "non-persistent://tg/request/embeddings:{class}", "response": "non-persistent://tg/response/embeddings:{class}"}, "ge-query:{class}": {"request": "non-persistent://tg/request/graph-embeddings:{class}", "response": "non-persistent://tg/response/graph-embeddings:{class}"}, "graph-rag:{class}": {"embeddings-request": "non-persistent://tg/request/embeddings:{class}", "embeddings-response": "non-persistent://tg/response/embeddings:{class}", "graph-embeddings-request": "non-persistent://tg/request/graph-embeddings:{class}", "graph-embeddings-response": "non-persistent://tg/response/graph-embeddings:{class}", "prompt-request": "non-persistent://tg/request/prompt-rag:{class}", "prompt-response": "non-persistent://tg/response/prompt-rag:{class}", "request": "non-persistent://tg/request/graph-rag:{class}", "response": "non-persistent://tg/response/graph-rag:{class}", "triples-request": "non-persistent://tg/request/triples:{class}", "triples-response": "non-persistent://tg/response/triples:{class}"}, "metering-rag:{class}": {"input": "non-persistent://tg/response/text-completion-rag:{class}"}, "metering:{class}": {"input": "non-persistent://tg/response/text-completion:{class}"}, "prompt-rag:{class}": {"request": "non-persistent://tg/request/prompt-rag:{class}", "response": "non-persistent://tg/response/prompt-rag:{class}", "text-completion-request": "non-persistent://tg/request/text-completion-rag:{class}", "text-completion-response": "non-persistent://tg/response/text-completion-rag:{class}"}, "prompt:{class}": {"request": "non-persistent://tg/request/prompt:{class}", "response": "non-persistent://tg/response/prompt:{class}", "text-completion-request": "non-persistent://tg/request/text-completion:{class}", "text-completion-response": "non-persistent://tg/response/text-completion:{class}"}, "text-completion-rag:{class}": {"request": "non-persistent://tg/request/text-completion-rag:{class}", "response": "non-persistent://tg/response/text-completion-rag:{class}"}, "text-completion:{class}": {"request": "non-persistent://tg/request/text-completion:{class}", "response": "non-persistent://tg/response/text-completion:{class}"}, "triples-query:{class}": {"request": "non-persistent://tg/request/triples:{class}", "response": "non-persistent://tg/response/triples:{class}"}}, "description": "Default flow class, supports GraphRAG and document RAG", "flow": {"agent-manager:{id}": {"graph-rag-request": "non-persistent://tg/request/graph-rag:{class}", "graph-rag-response": "non-persistent://tg/response/graph-rag:{class}", "next": "non-persistent://tg/request/agent:{id}", "prompt-request": "non-persistent://tg/request/prompt:{class}", "prompt-response": "non-persistent://tg/response/prompt:{class}", "request": "non-persistent://tg/request/agent:{id}", "response": "non-persistent://tg/response/agent:{id}", "text-completion-request": "non-persistent://tg/request/text-completion:{class}", "text-completion-response": "non-persistent://tg/response/text-completion:{class}"}, "chunker:{id}": {"input": "persistent://tg/flow/text-document-load:{id}", "output": "persistent://tg/flow/chunk-load:{id}"}, "de-write:{id}": {"input": "persistent://tg/flow/document-embeddings-store:{id}"}, "document-embeddings:{id}": {"embeddings-request": "non-persistent://tg/request/embeddings:{class}", "embeddings-response": "non-persistent://tg/response/embeddings:{class}", "input": "persistent://tg/flow/chunk-load:{id}", "output": "persistent://tg/flow/document-embeddings-store:{id}"}, "ge-write:{id}": {"input": "persistent://tg/flow/graph-embeddings-store:{id}"}, "graph-embeddings:{id}": {"embeddings-request": "non-persistent://tg/request/embeddings:{class}", "embeddings-response": "non-persistent://tg/response/embeddings:{class}", "input": "persistent://tg/flow/entity-contexts-load:{id}", "output": "persistent://tg/flow/graph-embeddings-store:{id}"}, "kg-extract-definitions:{id}": {"entity-contexts": "persistent://tg/flow/entity-contexts-load:{id}", "input": "persistent://tg/flow/chunk-load:{id}", "prompt-request": "non-persistent://tg/request/prompt:{class}", "prompt-response": "non-persistent://tg/response/prompt:{class}", "triples": "persistent://tg/flow/triples-store:{id}"}, "kg-extract-relationships:{id}": {"input": "persistent://tg/flow/chunk-load:{id}", "prompt-request": "non-persistent://tg/request/prompt:{class}", "prompt-response": "non-persistent://tg/response/prompt:{class}", "triples": "persistent://tg/flow/triples-store:{id}"}, "pdf-decoder:{id}": {"input": "persistent://tg/flow/document-load:{id}", "output": "persistent://tg/flow/text-document-load:{id}"}, "triples-write:{id}": {"input": "persistent://tg/flow/triples-store:{id}"}}, "tags": ["document-rag", "graph-rag", "knowledge-extraction"]} + +resp = requests.post( + f"{url}/api/v1/flow", + json={ + "operation": "put-class", + "class-name": "default", + "class-definition": json.dumps(defn), + } +) + +resp = resp.json() + +print(resp) + diff --git a/tests/test-flow-start-flow b/tests/test-flow-start-flow new file mode 100755 index 00000000..15a3c0cc --- /dev/null +++ b/tests/test-flow-start-flow @@ -0,0 +1,23 @@ +#!/usr/bin/env python3 + +import requests +import json + +url = "http://localhost:8088/" + +resp = requests.post( + f"{url}/api/v1/flow", + json={ + "operation": "start-flow", + "flow-id": "0003", + "class-name": "default", + } +) + +print(resp) +print(resp.text) +resp = resp.json() + + +print(resp) + diff --git a/tests/test-flow-stop-flow b/tests/test-flow-stop-flow new file mode 100755 index 00000000..62ea1aa9 --- /dev/null +++ b/tests/test-flow-stop-flow @@ -0,0 +1,22 @@ +#!/usr/bin/env python3 + +import requests +import json + +url = "http://localhost:8088/" + +resp = requests.post( + f"{url}/api/v1/flow", + json={ + "operation": "stop-flow", + "flow-id": "0003", + } +) + +print(resp) +print(resp.text) +resp = resp.json() + + +print(resp) + diff --git a/tests/test-load-pdf b/tests/test-load-pdf new file mode 100755 index 00000000..838a57ce --- /dev/null +++ b/tests/test-load-pdf @@ -0,0 +1,36 @@ +#!/usr/bin/env python3 + +import pulsar +from pulsar.schema import JsonSchema +import base64 + +from trustgraph.schema import Document, Metadata + +client = pulsar.Client("pulsar://localhost:6650", listener_name="localhost") + +prod = client.create_producer( + topic="persistent://tg/flow/document-load:0000", + schema=JsonSchema(Document), + chunking_enabled=True, +) + +path = "../sources/Challenger-Report-Vol1.pdf" + +with open(path, "rb") as f: + blob = base64.b64encode(f.read()).decode("utf-8") + +message = Document( + metadata = Metadata( + id = "00001", + metadata = [], + user="trustgraph", + collection="default", + ), + data=blob +) + +prod.send(message) + +prod.close() +client.close() + diff --git a/tests/test-load-text b/tests/test-load-text new file mode 100755 index 00000000..754458aa --- /dev/null +++ b/tests/test-load-text @@ -0,0 +1,37 @@ +#!/usr/bin/env python3 + +import pulsar +from pulsar.schema import JsonSchema +import base64 + +from trustgraph.schema import TextDocument, Metadata + +client = pulsar.Client("pulsar://localhost:6650", listener_name="localhost") + +prod = client.create_producer( + topic="persistent://tg/flow/text-document-load:0000", + schema=JsonSchema(TextDocument), + chunking_enabled=True, +) + +path = "docs/README.cats" + +with open(path, "r") as f: +# blob = base64.b64encode(f.read()).decode("utf-8") + blob = f.read() + +message = TextDocument( + metadata = Metadata( + id = "00001", + metadata = [], + user="trustgraph", + collection="default", + ), + text=blob +) + +prod.send(message) + +prod.close() +client.close() + diff --git a/trustgraph-base/trustgraph/api/api.py b/trustgraph-base/trustgraph/api/api.py index ddc6b2c3..fbff8d34 100644 --- a/trustgraph-base/trustgraph/api/api.py +++ b/trustgraph-base/trustgraph/api/api.py @@ -562,3 +562,233 @@ class Api: except: raise ProtocolException(f"Response not formatted correctly") + def flow_list_classes(self): + + # The input consists of system and prompt strings + input = { + "operation": "list-classes", + } + + url = f"{self.url}flow" + + # Invoke the API, input is passed as JSON + resp = requests.post(url, json=input) + + # Should be a 200 status code + if resp.status_code != 200: + raise ProtocolException(f"Status code {resp.status_code}") + + try: + # Parse the response as JSON + object = resp.json() + except: + raise ProtocolException(f"Expected JSON response") + + self.check_error(object) + + try: + return object["class-names"] + except: + raise ProtocolException(f"Response not formatted correctly") + + def flow_get_class(self, class_name): + + # The input consists of system and prompt strings + input = { + "operation": "get-class", + "class-name": class_name, + } + + url = f"{self.url}flow" + + # Invoke the API, input is passed as JSON + resp = requests.post(url, json=input) + + # Should be a 200 status code + if resp.status_code != 200: + raise ProtocolException(f"Status code {resp.status_code}") + + try: + # Parse the response as JSON + object = resp.json() + except: + raise ProtocolException(f"Expected JSON response") + + self.check_error(object) + + try: + return json.loads(object["class-definition"]) + except Exception as e: + print(e) + raise ProtocolException(f"Response not formatted correctly") + + def flow_put_class(self, class_name, definition): + + # The input consists of system and prompt strings + input = { + "operation": "put-class", + "class-name": class_name, + "class-definition": json.dumps(definition), + } + + url = f"{self.url}flow" + + # Invoke the API, input is passed as JSON + resp = requests.post(url, json=input) + + # Should be a 200 status code + if resp.status_code != 200: + raise ProtocolException(f"Status code {resp.status_code}") + + try: + # Parse the response as JSON + object = resp.json() + except: + raise ProtocolException(f"Expected JSON response") + + self.check_error(object) + + return + + def flow_delete_class(self, class_name): + + # The input consists of system and prompt strings + input = { + "operation": "delete-class", + "class-name": class_name, + } + + url = f"{self.url}flow" + + # Invoke the API, input is passed as JSON + resp = requests.post(url, json=input) + + # Should be a 200 status code + if resp.status_code != 200: + raise ProtocolException(f"Status code {resp.status_code}") + + try: + # Parse the response as JSON + object = resp.json() + except: + raise ProtocolException(f"Expected JSON response") + + self.check_error(object) + + return + + def flow_list(self): + + # The input consists of system and prompt strings + input = { + "operation": "list-flows", + } + + url = f"{self.url}flow" + + # Invoke the API, input is passed as JSON + resp = requests.post(url, json=input) + + # Should be a 200 status code + if resp.status_code != 200: + raise ProtocolException(f"Status code {resp.status_code}") + + try: + # Parse the response as JSON + object = resp.json() + except: + raise ProtocolException(f"Expected JSON response") + + self.check_error(object) + + try: + return object["flow-ids"] + except: + raise ProtocolException(f"Response not formatted correctly") + + def flow_get(self, id): + + # The input consists of system and prompt strings + input = { + "operation": "get-flow", + "flow-id": id, + } + + url = f"{self.url}flow" + + # Invoke the API, input is passed as JSON + resp = requests.post(url, json=input) + + # Should be a 200 status code + if resp.status_code != 200: + raise ProtocolException(f"Status code {resp.status_code}") + + try: + # Parse the response as JSON + object = resp.json() + except: + raise ProtocolException(f"Expected JSON response") + + self.check_error(object) + + try: + return json.loads(object["flow"]) + except: + raise ProtocolException(f"Response not formatted correctly") + + def flow_start(self, class_name, id, description): + + # The input consists of system and prompt strings + input = { + "operation": "start-flow", + "flow-id": id, + "class-name": class_name, + "description": description, + } + + url = f"{self.url}flow" + + # Invoke the API, input is passed as JSON + resp = requests.post(url, json=input) + + # Should be a 200 status code + if resp.status_code != 200: + raise ProtocolException(f"Status code {resp.status_code}") + + try: + # Parse the response as JSON + object = resp.json() + except: + raise ProtocolException(f"Expected JSON response") + + self.check_error(object) + + return + + def flow_stop(self, id): + + # The input consists of system and prompt strings + input = { + "operation": "stop-flow", + "flow-id": id, + } + + url = f"{self.url}flow" + + # Invoke the API, input is passed as JSON + resp = requests.post(url, json=input) + + # Should be a 200 status code + if resp.status_code != 200: + raise ProtocolException(f"Status code {resp.status_code}") + + try: + # Parse the response as JSON + object = resp.json() + except: + raise ProtocolException(f"Expected JSON response") + + self.check_error(object) + + return + diff --git a/trustgraph-base/trustgraph/base/async_processor.py b/trustgraph-base/trustgraph/base/async_processor.py index 80440b36..bdf9a0bb 100644 --- a/trustgraph-base/trustgraph/base/async_processor.py +++ b/trustgraph-base/trustgraph/base/async_processor.py @@ -17,7 +17,7 @@ from .. exceptions import TooManyRequests from . pubsub import PulsarClient from . producer import Producer from . consumer import Consumer -from . metrics import ProcessorMetrics +from . metrics import ProcessorMetrics, ConsumerMetrics default_config_queue = config_push_queue @@ -30,10 +30,10 @@ class AsyncProcessor: self.id = params.get("id") # Register a pulsar client - self.pulsar_client = PulsarClient(**params) + self.pulsar_client_object = PulsarClient(**params) # Initialise metrics, records the parameters - ProcessorMetrics(id=self.id).info({ + ProcessorMetrics(processor = self.id).info({ k: str(params[k]) for k in params if k != "id" @@ -57,11 +57,15 @@ class AsyncProcessor: # service config_subscriber_id = str(uuid.uuid4()) + config_consumer_metrics = ConsumerMetrics( + processor = self.id, flow = None, name = "config", + ) + # Subscribe to config queue self.config_sub_task = Consumer( taskgroup = self.taskgroup, - client = self.client, + client = self.pulsar_client, subscriber = config_subscriber_id, flow = None, @@ -70,6 +74,8 @@ class AsyncProcessor: handler = self.on_config_change, + metrics = config_consumer_metrics, + # This causes new subscriptions to view the entire history of # configuration start_of_messages = True @@ -85,31 +91,28 @@ class AsyncProcessor: # This is called to stop all threads. An over-ride point for extra # functionality def stop(self): - self.client.close() + self.pulsar_client.close() self.running = False # Returns the pulsar host @property - def pulsar_host(self): return self.client.pulsar_host + def pulsar_host(self): return self.pulsar_client_object.pulsar_host # Returns the pulsar client @property - def client(self): return self.pulsar_client.client + def pulsar_client(self): return self.pulsar_client_object.client # Register a new event handler for configuration change def register_config_handler(self, handler): self.config_handlers.append(handler) # Called when a new configuration message push occurs - async def on_config_change(self, message, consumer): + async def on_config_change(self, message, consumer, flow): # Get configuration data and version number config = message.value().config version = message.value().version - # Acknowledge the message - consumer.acknowledge(message) - # Invoke message handlers print("Config change event", config, version, flush=True) for ch in self.config_handlers: @@ -234,7 +237,7 @@ class AsyncProcessor: PulsarClient.add_args(parser) parser.add_argument( - '--config-push-queue', + '--config-queue', default=default_config_queue, help=f'Config push queue {default_config_queue}', ) diff --git a/trustgraph-base/trustgraph/base/consumer.py b/trustgraph-base/trustgraph/base/consumer.py index 57b940ac..8f262b83 100644 --- a/trustgraph-base/trustgraph/base/consumer.py +++ b/trustgraph-base/trustgraph/base/consumer.py @@ -156,7 +156,7 @@ class Consumer: await self.handler(msg, self, self.flow) else: - await self.handler(msg, self.consumer) + await self.handler(msg, self, self.flow) print("Handled.", flush=True) diff --git a/trustgraph-base/trustgraph/base/consumer_spec.py b/trustgraph-base/trustgraph/base/consumer_spec.py index aaeca677..21497dc5 100644 --- a/trustgraph-base/trustgraph/base/consumer_spec.py +++ b/trustgraph-base/trustgraph/base/consumer_spec.py @@ -12,13 +12,13 @@ class ConsumerSpec(Spec): def add(self, flow, processor, definition): consumer_metrics = ConsumerMetrics( - flow.id, f"{flow.name}-{self.name}" + processor = flow.id, flow = flow.name, name = self.name, ) consumer = Consumer( taskgroup = processor.taskgroup, flow = flow, - client = processor.client, + client = processor.pulsar_client, topic = definition[self.name], subscriber = processor.id + "--" + self.name, schema = self.schema, diff --git a/trustgraph-base/trustgraph/base/metrics.py b/trustgraph-base/trustgraph/base/metrics.py index 5d87849f..4ffbac9c 100644 --- a/trustgraph-base/trustgraph/base/metrics.py +++ b/trustgraph-base/trustgraph/base/metrics.py @@ -4,79 +4,133 @@ from prometheus_client import Counter class ConsumerMetrics: - def __init__(self, id, flow=None): + def __init__(self, processor, flow, name): - self.id = id + self.processor = processor self.flow = flow + self.name = name if not hasattr(__class__, "state_metric"): __class__.state_metric = Enum( 'consumer_state', 'Consumer state', - ["id", "flow"], + ["processor", "flow", "name"], states=['stopped', 'running'] ) + if not hasattr(__class__, "request_metric"): __class__.request_metric = Histogram( 'request_latency', 'Request latency (seconds)', - ["id", "flow"], + ["processor", "flow", "name"], ) + if not hasattr(__class__, "processing_metric"): __class__.processing_metric = Counter( 'processing_count', 'Processing count', - ["id", "flow", "status"] + ["processor", "flow", "name", "status"], ) + if not hasattr(__class__, "rate_limit_metric"): __class__.rate_limit_metric = Counter( 'rate_limit_count', 'Rate limit event count', - ["id", "flow"] + ["processor", "flow", "name"], ) def process(self, status): __class__.processing_metric.labels( - id=self.id, flow=self.flow, status=status + processor = self.processor, flow = self.flow, name = self.name, + status=status ).inc() def rate_limit(self): __class__.rate_limit_metric.labels( - id=self.id, flow=self.flow + processor = self.processor, flow = self.flow, name = self.name, ).inc() def state(self, state): __class__.state_metric.labels( - id=self.id, flow=self.flow + processor = self.processor, flow = self.flow, name = self.name, ).state(state) def record_time(self): return __class__.request_metric.labels( - id=self.id, flow=self.flow + processor = self.processor, flow = self.flow, name = self.name, ).time() class ProducerMetrics: - def __init__(self, id, flow=None): - self.id = id + def __init__(self, processor, flow, name): + + self.processor = processor self.flow = flow + self.name = name - if not hasattr(__class__, "output_metric"): - __class__.output_metric = Counter( - 'output_count', 'Output items created', - ["id", "flow"] + if not hasattr(__class__, "producer_metric"): + __class__.producer_metric = Counter( + 'producer_count', 'Output items produced', + ["processor", "flow", "name"], ) def inc(self): - __class__.output_metric.labels(id=self.id, flow=self.flow).inc() + __class__.producer_metric.labels( + processor = self.processor, flow = self.flow, name = self.name + ).inc() class ProcessorMetrics: - def __init__(self, id): + def __init__(self, processor): - self.id = id + self.processor = processor if not hasattr(__class__, "processor_metric"): __class__.processor_metric = Info( 'processor', 'Processor configuration', - ["id"] + ["processor"] ) def info(self, info): - __class__.processor_metric.labels(id=self.id).info(info) - + __class__.processor_metric.labels( + processor = self.processor + ).info(info) + +class SubscriberMetrics: + + def __init__(self, processor, flow, name): + + self.processor = processor + self.flow = flow + self.name = name + + if not hasattr(__class__, "state_metric"): + __class__.state_metric = Enum( + 'subscriber_state', 'Subscriber state', + ["processor", "flow", "name"], + states=['stopped', 'running'] + ) + + if not hasattr(__class__, "received_metric"): + __class__.received_metric = Counter( + 'received_count', 'Received count', + ["processor", "flow", "name"], + ) + + if not hasattr(__class__, "dropped_metric"): + __class__.dropped_metric = Counter( + 'dropped_count', 'Dropped messages count', + ["processor", "flow", "name"], + ) + + def received(self): + __class__.received_metric.labels( + processor = self.processor, flow = self.flow, name = self.name, + ).inc() + + def state(self, state): + + __class__.state_metric.labels( + processor = self.processor, flow = self.flow, name = self.name, + ).state(state) + + def dropped(self, state): + __class__.dropped_metric.labels( + processor = self.processor, flow = self.flow, name = self.name, + ).inc() + diff --git a/trustgraph-base/trustgraph/base/producer_spec.py b/trustgraph-base/trustgraph/base/producer_spec.py index 9007f48b..9c8bbc6a 100644 --- a/trustgraph-base/trustgraph/base/producer_spec.py +++ b/trustgraph-base/trustgraph/base/producer_spec.py @@ -11,11 +11,11 @@ class ProducerSpec(Spec): def add(self, flow, processor, definition): producer_metrics = ProducerMetrics( - flow.id, f"{flow.name}-{self.name}" + processor = flow.id, flow = flow.name, name = self.name ) producer = Producer( - client = processor.client, + client = processor.pulsar_client, topic = definition[self.name], schema = self.schema, metrics = producer_metrics, diff --git a/trustgraph-base/trustgraph/base/request_response_spec.py b/trustgraph-base/trustgraph/base/request_response_spec.py index dcfcbf9b..88ee4563 100644 --- a/trustgraph-base/trustgraph/base/request_response_spec.py +++ b/trustgraph-base/trustgraph/base/request_response_spec.py @@ -5,7 +5,7 @@ import asyncio from . subscriber import Subscriber from . producer import Producer from . spec import Spec -from . metrics import ConsumerMetrics, ProducerMetrics +from . metrics import ConsumerMetrics, ProducerMetrics, SubscriberMetrics class RequestResponse(Subscriber): @@ -23,6 +23,7 @@ class RequestResponse(Subscriber): consumer_name = consumer_name, topic = response_topic, schema = response_schema, + metrics = response_metrics, ) self.producer = Producer( @@ -116,20 +117,24 @@ class RequestResponseSpec(Spec): def add(self, flow, processor, definition): - producer_metrics = ProducerMetrics( - flow.id, f"{flow.name}-{self.response_name}" + request_metrics = ProducerMetrics( + processor = flow.id, flow = flow.name, name = self.request_name + ) + + response_metrics = SubscriberMetrics( + processor = flow.id, flow = flow.name, name = self.request_name ) rr = self.impl( - client = processor.client, + client = processor.pulsar_client, subscription = flow.id, consumer_name = flow.id, request_topic = definition[self.request_name], request_schema = self.request_schema, - request_metrics = producer_metrics, + request_metrics = request_metrics, response_topic = definition[self.response_name], response_schema = self.response_schema, - response_metrics = None, + response_metrics = response_metrics, ) flow.consumer[self.request_name] = rr diff --git a/trustgraph-base/trustgraph/base/subscriber.py b/trustgraph-base/trustgraph/base/subscriber.py index a8ff58f7..1cf263d4 100644 --- a/trustgraph-base/trustgraph/base/subscriber.py +++ b/trustgraph-base/trustgraph/base/subscriber.py @@ -7,7 +7,7 @@ import time class Subscriber: def __init__(self, client, topic, subscription, consumer_name, - schema=None, max_size=100): + schema=None, max_size=100, metrics=None): self.client = client self.topic = topic self.subscription = subscription @@ -18,6 +18,7 @@ class Subscriber: self.max_size = max_size self.lock = asyncio.Lock() self.running = True + self.metrics = metrics async def __del__(self): self.running = False @@ -36,6 +37,9 @@ class Subscriber: while self.running: + if self.metrics: + self.metrics.state("stopped") + try: consumer = self.client.subscribe( @@ -45,6 +49,9 @@ class Subscriber: schema = JsonSchema(self.schema), ) + if self.metrics: + self.metrics.state("running") + print("Subscriber running...", flush=True) while self.running: @@ -61,6 +68,9 @@ class Subscriber: print(type(e)) raise e + if self.metrics: + self.metrics.received() + # Acknowledge successful reception of the message consumer.acknowledge(msg) @@ -83,7 +93,9 @@ class Subscriber: self.q[id].put(value), timeout=2 ) + except Exception as e: + self.metrics.dropped() print("Q Put:", e, flush=True) for q in self.full.values(): @@ -94,6 +106,7 @@ class Subscriber: timeout=2 ) except Exception as e: + self.metrics.dropped() print("Q Put:", e, flush=True) except Exception as e: @@ -101,6 +114,9 @@ class Subscriber: consumer.close() + if self.metrics: + self.metrics.state("stopped") + # If handler drops out, sleep a retry time.sleep(2) diff --git a/trustgraph-base/trustgraph/base/subscriber_spec.py b/trustgraph-base/trustgraph/base/subscriber_spec.py index 2f89290b..7dca09db 100644 --- a/trustgraph-base/trustgraph/base/subscriber_spec.py +++ b/trustgraph-base/trustgraph/base/subscriber_spec.py @@ -1,5 +1,5 @@ -from . metrics import ConsumerMetrics +from . metrics import SubscriberMetrics from . subscriber import Subscriber from . spec import Spec @@ -11,17 +11,17 @@ class SubscriberSpec(Spec): def add(self, flow, processor, definition): - # FIXME: Metrics not used - subscriber_metrics = ConsumerMetrics( - flow.id, f"{flow.name}-{self.name}" + subscriber_metrics = SubscriberMetrics( + processor = flow.id, flow = flow.name, name = self.name ) subscriber = Subscriber( - client = processor.client, + client = processor.pulsar_client, topic = definition[self.name], subscription = flow.id, consumer_name = flow.id, schema = self.schema, + metrics = subscriber_metrics, ) # Put it in the consumer map, does that work? diff --git a/trustgraph-base/trustgraph/schema/__init__.py b/trustgraph-base/trustgraph/schema/__init__.py index 28e1a879..a9bb30a6 100644 --- a/trustgraph-base/trustgraph/schema/__init__.py +++ b/trustgraph-base/trustgraph/schema/__init__.py @@ -12,5 +12,5 @@ from . agent import * from . lookup import * from . library import * from . config import * - +from . flows import * diff --git a/trustgraph-base/trustgraph/schema/flows.py b/trustgraph-base/trustgraph/schema/flows.py index 5ac51a37..28b90f5d 100644 --- a/trustgraph-base/trustgraph/schema/flows.py +++ b/trustgraph-base/trustgraph/schema/flows.py @@ -20,14 +20,14 @@ from . types import Error # Prompt services, abstract the prompt generation class FlowRequest(Record): - operation = String() # list_classes, get_class, put_class, delete_class - # list_flows, get_flow, start_flow, stop_flow + operation = String() # list-classes, get-class, put-class, delete-class + # list-flows, get-flow, start-flow, stop-flow # get_class, put_class, delete_class, start_flow class_name = String() # put_class - class = String() + class_definition = String() # start_flow description = String() @@ -44,7 +44,7 @@ class FlowResponse(Record): flow_ids = Array(String()) # get_class - class = String() + class_definition = String() # get_flow flow = String() diff --git a/trustgraph-base/trustgraph/schema/graph.py b/trustgraph-base/trustgraph/schema/graph.py index 7c304e1d..97a99fbd 100644 --- a/trustgraph-base/trustgraph/schema/graph.py +++ b/trustgraph-base/trustgraph/schema/graph.py @@ -18,8 +18,6 @@ class EntityContexts(Record): metadata = Metadata() entities = Array(EntityContext()) -entity_contexts_ingest_queue = topic('entity-contexts-load') - ############################################################################ # Graph embeddings are embeddings associated with a graph entity @@ -33,8 +31,6 @@ class GraphEmbeddings(Record): metadata = Metadata() entities = Array(EntityEmbeddings()) -graph_embeddings_store_queue = topic('graph-embeddings-store') - ############################################################################ # Graph embeddings query @@ -49,13 +45,6 @@ class GraphEmbeddingsResponse(Record): error = Error() entities = Array(Value()) -graph_embeddings_request_queue = topic( - 'graph-embeddings', kind='non-persistent', namespace='request' -) -graph_embeddings_response_queue = topic( - 'graph-embeddings', kind='non-persistent', namespace='response' -) - ############################################################################ # Graph triples @@ -64,8 +53,6 @@ class Triples(Record): metadata = Metadata() triples = Array(Triple()) -triples_store_queue = topic('triples-store') - ############################################################################ # Triples query @@ -82,9 +69,3 @@ class TriplesQueryResponse(Record): error = Error() triples = Array(Triple()) -triples_request_queue = topic( - 'triples', kind='non-persistent', namespace='request' -) -triples_response_queue = topic( - 'triples', kind='non-persistent', namespace='response' -) diff --git a/trustgraph-base/trustgraph/schema/lookup.py b/trustgraph-base/trustgraph/schema/lookup.py index d0a0517c..a88d188e 100644 --- a/trustgraph-base/trustgraph/schema/lookup.py +++ b/trustgraph-base/trustgraph/schema/lookup.py @@ -17,26 +17,5 @@ class LookupResponse(Record): text = String() error = Error() -encyclopedia_lookup_request_queue = topic( - 'encyclopedia', kind='non-persistent', namespace='request' -) -encyclopedia_lookup_response_queue = topic( - 'encyclopedia', kind='non-persistent', namespace='response', -) - -dbpedia_lookup_request_queue = topic( - 'dbpedia', kind='non-persistent', namespace='request' -) -dbpedia_lookup_response_queue = topic( - 'dbpedia', kind='non-persistent', namespace='response', -) - -internet_search_request_queue = topic( - 'internet-search', kind='non-persistent', namespace='request' -) -internet_search_response_queue = topic( - 'internet-search', kind='non-persistent', namespace='response', -) - ############################################################################ diff --git a/trustgraph-base/trustgraph/schema/models.py b/trustgraph-base/trustgraph/schema/models.py index a634e1c4..ea3b9128 100644 --- a/trustgraph-base/trustgraph/schema/models.py +++ b/trustgraph-base/trustgraph/schema/models.py @@ -19,13 +19,6 @@ class TextCompletionResponse(Record): out_token = Integer() model = String() -text_completion_request_queue = topic( - 'text-completion', kind='non-persistent', namespace='request' -) -text_completion_response_queue = topic( - 'text-completion', kind='non-persistent', namespace='response' -) - ############################################################################ # Embeddings @@ -37,9 +30,3 @@ class EmbeddingsResponse(Record): error = Error() vectors = Array(Array(Double())) -embeddings_request_queue = topic( - 'embeddings', kind='non-persistent', namespace='request' -) -embeddings_response_queue = topic( - 'embeddings', kind='non-persistent', namespace='response' -) diff --git a/trustgraph-base/trustgraph/schema/object.py b/trustgraph-base/trustgraph/schema/object.py index 60c2bdc3..6667fdf3 100644 --- a/trustgraph-base/trustgraph/schema/object.py +++ b/trustgraph-base/trustgraph/schema/object.py @@ -18,8 +18,6 @@ class ObjectEmbeddings(Record): key_name = String() id = String() -object_embeddings_store_queue = topic('object-embeddings-store') - ############################################################################ # Stores rows of information @@ -29,5 +27,5 @@ class Rows(Record): row_schema = RowSchema() rows = Array(Map(String())) -rows_store_queue = topic('rows-store') + diff --git a/trustgraph-base/trustgraph/schema/prompt.py b/trustgraph-base/trustgraph/schema/prompt.py index 15eddea8..369ace53 100644 --- a/trustgraph-base/trustgraph/schema/prompt.py +++ b/trustgraph-base/trustgraph/schema/prompt.py @@ -55,12 +55,5 @@ class PromptResponse(Record): # JSON encoded object = String() -prompt_request_queue = topic( - 'prompt', kind='non-persistent', namespace='request' -) -prompt_response_queue = topic( - 'prompt', kind='non-persistent', namespace='response' -) - ############################################################################ diff --git a/trustgraph-base/trustgraph/schema/retrieval.py b/trustgraph-base/trustgraph/schema/retrieval.py index caeb8e67..1077e4f9 100644 --- a/trustgraph-base/trustgraph/schema/retrieval.py +++ b/trustgraph-base/trustgraph/schema/retrieval.py @@ -20,13 +20,6 @@ class GraphRagResponse(Record): error = Error() response = String() -graph_rag_request_queue = topic( - 'graph-rag', kind='non-persistent', namespace='request' -) -graph_rag_response_queue = topic( - 'graph-rag', kind='non-persistent', namespace='response' -) - ############################################################################ # Document RAG text retrieval @@ -41,9 +34,3 @@ class DocumentRagResponse(Record): error = Error() response = String() -document_rag_request_queue = topic( - 'doc-rag', kind='non-persistent', namespace='request' -) -document_rag_response_queue = topic( - 'doc-rag', kind='non-persistent', namespace='response' -) diff --git a/trustgraph-cli/scripts/tg-delete-flow-class b/trustgraph-cli/scripts/tg-delete-flow-class new file mode 100755 index 00000000..345fe00f --- /dev/null +++ b/trustgraph-cli/scripts/tg-delete-flow-class @@ -0,0 +1,52 @@ +#!/usr/bin/env python3 + +""" +""" + +import argparse +import os +import tabulate +from trustgraph.api import Api +import json + +default_url = os.getenv("TRUSTGRAPH_URL", 'http://localhost:8088/') + +def delete_flow_class(url, class_name): + + api = Api(url) + + class_names = api.flow_delete_class(class_name) + +def main(): + + parser = argparse.ArgumentParser( + prog='tg-delete-flow-class', + description=__doc__, + ) + + parser.add_argument( + '-u', '--api-url', + default=default_url, + help=f'API URL (default: {default_url})', + ) + + parser.add_argument( + '-n', '--class-name', + help=f'Flow class name', + ) + + args = parser.parse_args() + + try: + + delete_flow_class( + url=args.api_url, + class_name=args.class_name, + ) + + except Exception as e: + + print("Exception:", e, flush=True) + +main() + diff --git a/trustgraph-cli/scripts/tg-get-flow-class b/trustgraph-cli/scripts/tg-get-flow-class new file mode 100755 index 00000000..450f1df7 --- /dev/null +++ b/trustgraph-cli/scripts/tg-get-flow-class @@ -0,0 +1,56 @@ +#!/usr/bin/env python3 + +""" +Dumps out the current configuration +""" + +import argparse +import os +import tabulate +from trustgraph.api import Api +import json + +default_url = os.getenv("TRUSTGRAPH_URL", 'http://localhost:8088/') + +def get_flow_class(url, class_name): + + api = Api(url) + + cls = api.flow_get_class(class_name) + + print(json.dumps(cls, indent=4)) + +def main(): + + parser = argparse.ArgumentParser( + prog='tg-get-flow-class', + description=__doc__, + ) + + parser.add_argument( + '-u', '--api-url', + default=default_url, + help=f'API URL (default: {default_url})', + ) + + parser.add_argument( + '-n', '--class-name', + required=True, + help=f'Flow class name', + ) + + args = parser.parse_args() + + try: + + get_flow_class( + url=args.api_url, + class_name=args.class_name, + ) + + except Exception as e: + + print("Exception:", e, flush=True) + +main() + diff --git a/trustgraph-cli/scripts/tg-put-flow-class b/trustgraph-cli/scripts/tg-put-flow-class new file mode 100755 index 00000000..ca048e1f --- /dev/null +++ b/trustgraph-cli/scripts/tg-put-flow-class @@ -0,0 +1,59 @@ +#!/usr/bin/env python3 + +""" +Dumps out the current configuration +""" + +import argparse +import os +import tabulate +from trustgraph.api import Api +import json + +default_url = os.getenv("TRUSTGRAPH_URL", 'http://localhost:8088/') + +def put_flow_class(url, class_name, config): + + api = Api(url) + + class_names = api.flow_put_class(class_name, config) + +def main(): + + parser = argparse.ArgumentParser( + prog='tg-put-flow-class', + description=__doc__, + ) + + parser.add_argument( + '-u', '--api-url', + default=default_url, + help=f'API URL (default: {default_url})', + ) + + parser.add_argument( + '-n', '--class-name', + help=f'Flow class name', + ) + + parser.add_argument( + '-c', '--config', + help=f'Initial configuration to load', + ) + + args = parser.parse_args() + + try: + + put_flow_class( + url=args.api_url, + class_name=args.class_name, + config=json.loads(args.config), + ) + + except Exception as e: + + print("Exception:", e, flush=True) + +main() + diff --git a/trustgraph-cli/scripts/tg-show-flow-classes b/trustgraph-cli/scripts/tg-show-flow-classes new file mode 100755 index 00000000..7a133dc4 --- /dev/null +++ b/trustgraph-cli/scripts/tg-show-flow-classes @@ -0,0 +1,64 @@ +#!/usr/bin/env python3 + +""" +""" + +import argparse +import os +import tabulate +from trustgraph.api import Api +import json + +default_url = os.getenv("TRUSTGRAPH_URL", 'http://localhost:8088/') + +def show_flow_classes(url): + + api = Api(url) + + class_names = api.flow_list_classes() + + classes = [] + + for class_name in class_names: + cls = api.flow_get_class(class_name) + classes.append(( + class_name, + cls.get("description", ""), + ", ".join(cls.get("tags", [])), + )) + + print(tabulate.tabulate( + classes, + tablefmt="pretty", + maxcolwidths=[None, 40, 20], + stralign="left", + headers = ["flow class", "description", "tags"], + )) + +def main(): + + parser = argparse.ArgumentParser( + prog='tg-show-flow-classes', + description=__doc__, + ) + + parser.add_argument( + '-u', '--api-url', + default=default_url, + help=f'API URL (default: {default_url})', + ) + + args = parser.parse_args() + + try: + + show_flow_classes( + url=args.api_url, + ) + + except Exception as e: + + print("Exception:", e, flush=True) + +main() + diff --git a/trustgraph-cli/scripts/tg-show-flows b/trustgraph-cli/scripts/tg-show-flows new file mode 100755 index 00000000..e458d798 --- /dev/null +++ b/trustgraph-cli/scripts/tg-show-flows @@ -0,0 +1,65 @@ +#!/usr/bin/env python3 + +""" +""" + +import argparse +import os +import tabulate +from trustgraph.api import Api +import json + +default_url = os.getenv("TRUSTGRAPH_URL", 'http://localhost:8088/') + +def show_flows(url): + + api = Api(url) + + flow_ids = api.flow_list() + + print(flow_ids) + + flows = [] + + for id in flow_ids: + flow = api.flow_get(id) + flows.append(( + id, + flow.get("description", ""), + )) + + print(tabulate.tabulate( + flows, + tablefmt="pretty", + maxcolwidths=[None, 40], + stralign="left", + headers = ["id", "description"], + )) + +def main(): + + parser = argparse.ArgumentParser( + prog='tg-show-flows', + description=__doc__, + ) + + parser.add_argument( + '-u', '--api-url', + default=default_url, + help=f'API URL (default: {default_url})', + ) + + args = parser.parse_args() + + try: + + show_flows( + url=args.api_url, + ) + + except Exception as e: + + print("Exception:", e, flush=True) + +main() + diff --git a/trustgraph-cli/setup.py b/trustgraph-cli/setup.py index dda401ec..a16bd732 100644 --- a/trustgraph-cli/setup.py +++ b/trustgraph-cli/setup.py @@ -63,6 +63,13 @@ setuptools.setup( "scripts/tg-save-kg-core", "scripts/tg-save-doc-embeds", "scripts/tg-show-config", + "scripts/tg-show-flows", + "scripts/tg-show-flow-classes", + "scripts/tg-get-flow-class", + "scripts/tg-start-flow", + "scripts/tg-stop-flow", + "scripts/tg-delete-flow-class", + "scripts/tg-put-flow-class", "scripts/tg-set-prompt", "scripts/tg-show-tools", "scripts/tg-show-prompts", diff --git a/trustgraph-flow/trustgraph/config/service/flow.py b/trustgraph-flow/trustgraph/config/service/flow.py new file mode 100644 index 00000000..beef679e --- /dev/null +++ b/trustgraph-flow/trustgraph/config/service/flow.py @@ -0,0 +1,210 @@ + +from trustgraph.schema import FlowResponse, Error +import json + +class FlowConfig: + def __init__(self, config): + + self.config = config + + async def handle_list_classes(self, msg): + + names = list(self.config["flow-classes"].keys()) + + return FlowResponse( + error = None, + class_names = names, + ) + + async def handle_get_class(self, msg): + + return FlowResponse( + error = None, + class_definition = self.config["flow-classes"][msg.class_name], + ) + + async def handle_put_class(self, msg): + + self.config["flow-classes"][msg.class_name] = msg.class_definition + + await self.config.push() + + return FlowResponse( + error = None, + ) + + async def handle_delete_class(self, msg): + + print(msg) + + del self.config["flow-classes"][msg.class_name] + + await self.config.push() + + return FlowResponse( + error = None, + ) + + async def handle_list_flows(self, msg): + + names = list(self.config["flows"].keys()) + + return FlowResponse( + error = None, + flow_ids = names, + ) + + async def handle_get_flow(self, msg): + + flow = self.config["flows"][msg.flow_id] + + return FlowResponse( + error = None, + flow = flow, + ) + + async def handle_start_flow(self, msg): + + if msg.class_name is None: + raise RuntimeError("No class name") + + if msg.flow_id is None: + raise RuntimeError("No flow ID") + + if msg.flow_id in self.config["flows"]: + raise RuntimeError("Flow already exists") + + if msg.description is None: + raise RuntimeError("No description") + + if msg.class_name not in self.config["flow-classes"]: + raise RuntimeError("Class does not exist") + + def repl_template(tmp): + return tmp.replace( + "{class}", msg.class_name + ).replace( + "{id}", msg.flow_id + ) + + cls = json.loads(self.config["flow-classes"][msg.class_name]) + + for kind in ("class", "flow"): + + for k, v in cls[kind].items(): + + processor, variant = k.split(":", 1) + + variant = repl_template(variant) + + v = { + repl_template(k2): repl_template(v2) + for k2, v2 in v.items() + } + + if processor in self.config["flows-active"]: + target = json.loads(self.config["flows-active"][processor]) + else: + target = {} + + if variant not in target: + target[variant] = v + + self.config["flows-active"][processor] = json.dumps(target) + + self.config["flows"][msg.flow_id] = json.dumps({ + "description": msg.description, + "class-name": msg.class_name, + }) + + await self.config.push() + + return FlowResponse( + error = None, + ) + + async def handle_stop_flow(self, msg): + + if msg.flow_id is None: + raise RuntimeError("No flow ID") + + if msg.flow_id not in self.config["flows"]: + raise RuntimeError("Flow ID invalid") + + flow = json.loads(self.config["flows"][msg.flow_id]) + + if "class-name" not in flow: + raise RuntimeError("Internal error: flow has no flow class") + + class_name = flow["class-name"] + + cls = json.loads(self.config["flow-classes"][class_name]) + + def repl_template(tmp): + return tmp.replace( + "{class}", class_name + ).replace( + "{id}", msg.flow_id + ) + + for kind in ("flow",): + + for k, v in cls[kind].items(): + + processor, variant = k.split(":", 1) + + variant = repl_template(variant) + + if processor in self.config["flows-active"]: + target = json.loads(self.config["flows-active"][processor]) + else: + target = {} + + if variant in target: + del target[variant] + + self.config["flows-active"][processor] = json.dumps(target) + + if msg.flow_id in self.config["flows"]: + del self.config["flows"][msg.flow_id] + + await self.config.push() + + return FlowResponse( + error = None, + ) + + async def handle(self, msg): + + print("Handle message ", msg.operation) + + if msg.operation == "list-classes": + resp = await self.handle_list_classes(msg) + elif msg.operation == "get-class": + resp = await self.handle_get_class(msg) + elif msg.operation == "put-class": + resp = await self.handle_put_class(msg) + elif msg.operation == "delete-class": + resp = await self.handle_delete_class(msg) + elif msg.operation == "list-flows": + resp = await self.handle_list_flows(msg) + elif msg.operation == "get-flow": + resp = await self.handle_get_flow(msg) + elif msg.operation == "start-flow": + resp = await self.handle_start_flow(msg) + elif msg.operation == "stop-flow": + resp = await self.handle_stop_flow(msg) + else: + + resp = FlowResponse( + value=None, + directory=None, + values=None, + error=Error( + type = "bad-operation", + message = "Bad operation" + ) + ) + + return resp + diff --git a/trustgraph-flow/trustgraph/config/service/service.py b/trustgraph-flow/trustgraph/config/service/service.py index 47bac828..c0268389 100644 --- a/trustgraph-flow/trustgraph/config/service/service.py +++ b/trustgraph-flow/trustgraph/config/service/service.py @@ -5,81 +5,139 @@ Config service. Manages system global configuration state from pulsar.schema import JsonSchema -from trustgraph.schema import ConfigRequest, ConfigResponse, ConfigPush from trustgraph.schema import Error + +from trustgraph.schema import ConfigRequest, ConfigResponse, ConfigPush from trustgraph.schema import config_request_queue, config_response_queue from trustgraph.schema import config_push_queue + +from trustgraph.schema import FlowRequest, FlowResponse +from trustgraph.schema import flow_request_queue, flow_response_queue + from trustgraph.log_level import LogLevel from trustgraph.base import AsyncProcessor, Consumer, Producer from . config import Configuration +from . flow import FlowConfig + from ... base import ProcessorMetrics, ConsumerMetrics, ProducerMetrics from ... base import Consumer, Producer default_ident = "config-svc" -default_request_queue = config_request_queue -default_response_queue = config_response_queue -default_push_queue = config_push_queue +default_config_request_queue = config_request_queue +default_config_response_queue = config_response_queue +default_config_push_queue = config_push_queue + +default_flow_request_queue = flow_request_queue +default_flow_response_queue = flow_response_queue class Processor(AsyncProcessor): def __init__(self, **params): - request_queue = params.get("request_queue", default_request_queue) - response_queue = params.get("response_queue", default_response_queue) - push_queue = params.get("push_queue", default_push_queue) + config_request_queue = params.get( + "config_request_queue", default_config_request_queue + ) + config_response_queue = params.get( + "config_response_queue", default_config_response_queue + ) + config_push_queue = params.get( + "config_push_queue", default_config_push_queue + ) + + flow_request_queue = params.get( + "flow_request_queue", default_flow_request_queue + ) + flow_response_queue = params.get( + "flow_response_queue", default_flow_response_queue + ) + id = params.get("id") - request_schema = ConfigRequest - response_schema = ConfigResponse - push_schema = ConfigResponse + flow_request_schema = FlowRequest + flow_response_schema = FlowResponse super(Processor, self).__init__( **params | { - "request_schema": request_schema.__name__, - "response_schema": response_schema.__name__, - "push_schema": push_schema.__name__, + "config_request_schema": ConfigRequest.__name__, + "config_response_schema": ConfigResponse.__name__, + "config_push_schema": ConfigPush.__name__, + "flow_request_schema": FlowRequest.__name__, + "flow_response_schema": FlowResponse.__name__, } ) - request_metrics = ConsumerMetrics(id + "-request") - response_metrics = ProducerMetrics(id + "-response") - push_metrics = ProducerMetrics(id + "-push") - - self.push_pub = Producer( - client = self.client, - topic = push_queue, - schema = ConfigPush, - metrics = push_metrics, + config_request_metrics = ConsumerMetrics( + processor = self.id, flow = None, name = "config-request" + ) + config_response_metrics = ProducerMetrics( + processor = self.id, flow = None, name = "config-response" + ) + config_push_metrics = ProducerMetrics( + processor = self.id, flow = None, name = "config-push" ) - self.response_pub = Producer( - client = self.client, - topic = response_queue, - schema = ConfigResponse, - metrics = response_metrics, + flow_request_metrics = ConsumerMetrics( + processor = self.id, flow = None, name = "flow-request" + ) + flow_response_metrics = ProducerMetrics( + processor = self.id, flow = None, name = "flow-response" ) - self.subs = Consumer( + self.config_request_consumer = Consumer( taskgroup = self.taskgroup, - client = self.client, + client = self.pulsar_client, flow = None, - topic = request_queue, + topic = config_request_queue, subscriber = id, - schema = request_schema, - handler = self.on_message, - metrics = request_metrics, + schema = ConfigRequest, + handler = self.on_config_request, + metrics = config_request_metrics, + ) + + self.config_response_producer = Producer( + client = self.pulsar_client, + topic = config_response_queue, + schema = ConfigResponse, + metrics = config_response_metrics, + ) + + self.config_push_producer = Producer( + client = self.pulsar_client, + topic = config_push_queue, + schema = ConfigPush, + metrics = config_push_metrics, + ) + + self.flow_request_consumer = Consumer( + taskgroup = self.taskgroup, + client = self.pulsar_client, + flow = None, + topic = flow_request_queue, + subscriber = id, + schema = FlowRequest, + handler = self.on_flow_request, + metrics = flow_request_metrics, + ) + + self.flow_response_producer = Producer( + client = self.pulsar_client, + topic = flow_response_queue, + schema = FlowResponse, + metrics = flow_response_metrics, ) self.config = Configuration(self.push) + self.flow = FlowConfig(self.config) print("Service initialised.") async def start(self): await self.push() - await self.subs.start() + await self.config_request_consumer.start() + await self.flow_request_consumer.start() async def push(self): @@ -92,11 +150,11 @@ class Processor(AsyncProcessor): error = None, ) - await self.push_pub.send(resp) + await self.config_push_producer.send(resp) print("Pushed version ", self.config.version) - async def on_message(self, msg, consumer, flow): + async def on_config_request(self, msg, consumer, flow): try: @@ -109,19 +167,54 @@ class Processor(AsyncProcessor): resp = await self.config.handle(v) - await self.response_pub.send(resp, properties={"id": id}) + await self.config_response_producer.send( + resp, properties={"id": id} + ) except Exception as e: resp = ConfigResponse( error=Error( - type = "unexpected-error", + type = "config-error", message = str(e), ), text=None, ) - await self.response_pub.send(resp, properties={"id": id}) + await self.config_response_producer.send( + resp, properties={"id": id} + ) + + async def on_flow_request(self, msg, consumer, flow): + + try: + + v = msg.value() + + # Sender-produced ID + id = msg.properties()["id"] + + print(f"Handling {id}...", flush=True) + + resp = await self.flow.handle(v) + + await self.flow_response_producer.send( + resp, properties={"id": id} + ) + + except Exception as e: + + resp = FlowResponse( + error=Error( + type = "flow-error", + message = str(e), + ), + text=None, + ) + + await self.flow_response_producer.send( + resp, properties={"id": id} + ) @staticmethod def add_args(parser): @@ -129,21 +222,33 @@ class Processor(AsyncProcessor): AsyncProcessor.add_args(parser) parser.add_argument( - '-q', '--request-queue', - default=default_request_queue, - help=f'Request queue (default: {default_request_queue})' + '--config-request-queue', + default=default_config_request_queue, + help=f'Config request queue (default: {default_config_request_queue})' ) parser.add_argument( - '-r', '--response-queue', - default=default_response_queue, - help=f'Response queue {default_response_queue}', + '--config-response-queue', + default=default_config_response_queue, + help=f'Config response queue {default_config_response_queue}', ) parser.add_argument( '--push-queue', - default=default_push_queue, - help=f'Config push queue (default: {default_push_queue})' + default=default_config_push_queue, + help=f'Config push queue (default: {default_config_push_queue})' + ) + + parser.add_argument( + '--flow-request-queue', + default=default_flow_request_queue, + help=f'Flow request queue (default: {default_flow_request_queue})' + ) + + parser.add_argument( + '--flow-response-queue', + default=default_flow_response_queue, + help=f'Flow response queue {default_flow_response_queue}', ) def run(): diff --git a/trustgraph-flow/trustgraph/gateway/flow.py b/trustgraph-flow/trustgraph/gateway/flow.py new file mode 100644 index 00000000..c666d99c --- /dev/null +++ b/trustgraph-flow/trustgraph/gateway/flow.py @@ -0,0 +1,51 @@ + +from .. schema import FlowRequest, FlowResponse, ConfigKey, ConfigValue +from .. schema import flow_request_queue +from .. schema import flow_response_queue + +from . endpoint import ServiceEndpoint +from . requestor import ServiceRequestor + +class FlowRequestor(ServiceRequestor): + def __init__(self, pulsar_client, timeout, auth): + + super(FlowRequestor, self).__init__( + pulsar_client=pulsar_client, + request_queue=flow_request_queue, + response_queue=flow_response_queue, + request_schema=FlowRequest, + response_schema=FlowResponse, + timeout=timeout, + ) + + def to_request(self, body): + + return FlowRequest( + operation = body.get("operation", None), + class_name = body.get("class-name", None), + class_definition = body.get("class-definition", None), + description = body.get("description", None), + flow_id = body.get("flow-id", None), + ) + + def from_response(self, message): + + response = { } + + if message.class_names is not None: + response["class-names"] = message.class_names + + if message.flow_ids is not None: + response["flow-ids"] = message.flow_ids + + if message.class_definition is not None: + response["class-definition"] = message.class_definition + + if message.flow is not None: + response["flow"] = message.flow + + if message.description is not None: + response["description"] = message.description + + return response, True + diff --git a/trustgraph-flow/trustgraph/gateway/mux.py b/trustgraph-flow/trustgraph/gateway/mux.py index 8195c542..1afc3225 100644 --- a/trustgraph-flow/trustgraph/gateway/mux.py +++ b/trustgraph-flow/trustgraph/gateway/mux.py @@ -5,7 +5,6 @@ import uuid from aiohttp import web, WSMsgType from . socket import SocketEndpoint -from . text_completion import TextCompletionRequestor MAX_OUTSTANDING_REQUESTS = 15 WORKER_CLOSE_WAIT = 0.01 diff --git a/trustgraph-flow/trustgraph/gateway/service.py b/trustgraph-flow/trustgraph/gateway/service.py index 29b31483..d7df3240 100755 --- a/trustgraph-flow/trustgraph/gateway/service.py +++ b/trustgraph-flow/trustgraph/gateway/service.py @@ -25,28 +25,30 @@ from .. log_level import LogLevel from . serialize import to_subgraph from . running import Running -from . text_completion import TextCompletionRequestor -from . prompt import PromptRequestor -from . graph_rag import GraphRagRequestor -from . document_rag import DocumentRagRequestor -from . triples_query import TriplesQueryRequestor -from . graph_embeddings_query import GraphEmbeddingsQueryRequestor -from . embeddings import EmbeddingsRequestor -from . encyclopedia import EncyclopediaRequestor -from . agent import AgentRequestor -from . dbpedia import DbpediaRequestor -from . internet_search import InternetSearchRequestor -from . librarian import LibrarianRequestor + +#from . text_completion import TextCompletionRequestor +#from . prompt import PromptRequestor +#from . graph_rag import GraphRagRequestor +#from . document_rag import DocumentRagRequestor +#from . triples_query import TriplesQueryRequestor +#from . graph_embeddings_query import GraphEmbeddingsQueryRequestor +#from . embeddings import EmbeddingsRequestor +#from . encyclopedia import EncyclopediaRequestor +#from . agent import AgentRequestor +#from . dbpedia import DbpediaRequestor +#from . internet_search import InternetSearchRequestor +#from . librarian import LibrarianRequestor from . config import ConfigRequestor -from . triples_stream import TriplesStreamEndpoint -from . graph_embeddings_stream import GraphEmbeddingsStreamEndpoint -from . document_embeddings_stream import DocumentEmbeddingsStreamEndpoint -from . triples_load import TriplesLoadEndpoint -from . graph_embeddings_load import GraphEmbeddingsLoadEndpoint -from . document_embeddings_load import DocumentEmbeddingsLoadEndpoint +from . flow import FlowRequestor +#from . triples_stream import TriplesStreamEndpoint +#from . graph_embeddings_stream import GraphEmbeddingsStreamEndpoint +#from . document_embeddings_stream import DocumentEmbeddingsStreamEndpoint +#from . triples_load import TriplesLoadEndpoint +#from . graph_embeddings_load import GraphEmbeddingsLoadEndpoint +#from . document_embeddings_load import DocumentEmbeddingsLoadEndpoint from . mux import MuxEndpoint -from . document_load import DocumentLoadSender -from . text_load import TextLoadSender +#from . document_load import DocumentLoadSender +#from . text_load import TextLoadSender from . metrics import MetricsEndpoint from . endpoint import ServiceEndpoint @@ -105,157 +107,165 @@ class Api: self.auth = Authenticator(allow_all=True) self.services = { - "text-completion": TextCompletionRequestor( - pulsar_client=self.pulsar_client, timeout=self.timeout, - auth = self.auth, - ), - "prompt": PromptRequestor( - pulsar_client=self.pulsar_client, timeout=self.timeout, - auth = self.auth, - ), - "graph-rag": GraphRagRequestor( - pulsar_client=self.pulsar_client, timeout=self.timeout, - auth = self.auth, - ), - "document-rag": DocumentRagRequestor( - pulsar_client=self.pulsar_client, timeout=self.timeout, - auth = self.auth, - ), - "triples-query": TriplesQueryRequestor( - pulsar_client=self.pulsar_client, timeout=self.timeout, - auth = self.auth, - ), - "graph-embeddings-query": GraphEmbeddingsQueryRequestor( - pulsar_client=self.pulsar_client, timeout=self.timeout, - auth = self.auth, - ), - "embeddings": EmbeddingsRequestor( - pulsar_client=self.pulsar_client, timeout=self.timeout, - auth = self.auth, - ), - "agent": AgentRequestor( - pulsar_client=self.pulsar_client, timeout=self.timeout, - auth = self.auth, - ), - "librarian": LibrarianRequestor( - pulsar_client=self.pulsar_client, timeout=self.timeout, - auth = self.auth, - ), + # "text-completion": TextCompletionRequestor( + # pulsar_client=self.pulsar_client, timeout=self.timeout, + # auth = self.auth, + # ), + # "prompt": PromptRequestor( + # pulsar_client=self.pulsar_client, timeout=self.timeout, + # auth = self.auth, + # ), + # "graph-rag": GraphRagRequestor( + # pulsar_client=self.pulsar_client, timeout=self.timeout, + # auth = self.auth, + # ), + # "document-rag": DocumentRagRequestor( + # pulsar_client=self.pulsar_client, timeout=self.timeout, + # auth = self.auth, + # ), + # "triples-query": TriplesQueryRequestor( + # pulsar_client=self.pulsar_client, timeout=self.timeout, + # auth = self.auth, + # ), + # "graph-embeddings-query": GraphEmbeddingsQueryRequestor( + # pulsar_client=self.pulsar_client, timeout=self.timeout, + # auth = self.auth, + # ), + # "embeddings": EmbeddingsRequestor( + # pulsar_client=self.pulsar_client, timeout=self.timeout, + # auth = self.auth, + # ), + # "agent": AgentRequestor( + # pulsar_client=self.pulsar_client, timeout=self.timeout, + # auth = self.auth, + # ), + # "librarian": LibrarianRequestor( + # pulsar_client=self.pulsar_client, timeout=self.timeout, + # auth = self.auth, + # ), "config": ConfigRequestor( pulsar_client=self.pulsar_client, timeout=self.timeout, auth = self.auth, ), - "encyclopedia": EncyclopediaRequestor( + "flow": FlowRequestor( pulsar_client=self.pulsar_client, timeout=self.timeout, auth = self.auth, ), - "dbpedia": DbpediaRequestor( - pulsar_client=self.pulsar_client, timeout=self.timeout, - auth = self.auth, - ), - "internet-search": InternetSearchRequestor( - pulsar_client=self.pulsar_client, timeout=self.timeout, - auth = self.auth, - ), - "document-load": DocumentLoadSender( - pulsar_client=self.pulsar_client, - ), - "text-load": TextLoadSender( - pulsar_client=self.pulsar_client, - ), + # "encyclopedia": EncyclopediaRequestor( + # pulsar_client=self.pulsar_client, timeout=self.timeout, + # auth = self.auth, + # ), + # "dbpedia": DbpediaRequestor( + # pulsar_client=self.pulsar_client, timeout=self.timeout, + # auth = self.auth, + # ), + # "internet-search": InternetSearchRequestor( + # pulsar_client=self.pulsar_client, timeout=self.timeout, + # auth = self.auth, + # ), + # "document-load": DocumentLoadSender( + # pulsar_client=self.pulsar_client, + # ), + # "text-load": TextLoadSender( + # pulsar_client=self.pulsar_client, + # ), } self.endpoints = [ - ServiceEndpoint( - endpoint_path = "/api/v1/text-completion", auth=self.auth, - requestor = self.services["text-completion"], - ), - ServiceEndpoint( - endpoint_path = "/api/v1/prompt", auth=self.auth, - requestor = self.services["prompt"], - ), - ServiceEndpoint( - endpoint_path = "/api/v1/graph-rag", auth=self.auth, - requestor = self.services["graph-rag"], - ), - ServiceEndpoint( - endpoint_path = "/api/v1/document-rag", auth=self.auth, - requestor = self.services["document-rag"], - ), - ServiceEndpoint( - endpoint_path = "/api/v1/triples-query", auth=self.auth, - requestor = self.services["triples-query"], - ), - ServiceEndpoint( - endpoint_path = "/api/v1/graph-embeddings-query", - auth=self.auth, - requestor = self.services["graph-embeddings-query"], - ), - ServiceEndpoint( - endpoint_path = "/api/v1/embeddings", auth=self.auth, - requestor = self.services["embeddings"], - ), - ServiceEndpoint( - endpoint_path = "/api/v1/agent", auth=self.auth, - requestor = self.services["agent"], - ), - ServiceEndpoint( - endpoint_path = "/api/v1/librarian", auth=self.auth, - requestor = self.services["librarian"], - ), + # ServiceEndpoint( + # endpoint_path = "/api/v1/text-completion", auth=self.auth, + # requestor = self.services["text-completion"], + # ), + # ServiceEndpoint( + # endpoint_path = "/api/v1/prompt", auth=self.auth, + # requestor = self.services["prompt"], + # ), + # ServiceEndpoint( + # endpoint_path = "/api/v1/graph-rag", auth=self.auth, + # requestor = self.services["graph-rag"], + # ), + # ServiceEndpoint( + # endpoint_path = "/api/v1/document-rag", auth=self.auth, + # requestor = self.services["document-rag"], + # ), + # ServiceEndpoint( + # endpoint_path = "/api/v1/triples-query", auth=self.auth, + # requestor = self.services["triples-query"], + # ), + # ServiceEndpoint( + # endpoint_path = "/api/v1/graph-embeddings-query", + # auth=self.auth, + # requestor = self.services["graph-embeddings-query"], + # ), + # ServiceEndpoint( + # endpoint_path = "/api/v1/embeddings", auth=self.auth, + # requestor = self.services["embeddings"], + # ), + # ServiceEndpoint( + # endpoint_path = "/api/v1/agent", auth=self.auth, + # requestor = self.services["agent"], + # ), + # ServiceEndpoint( + # endpoint_path = "/api/v1/librarian", auth=self.auth, + # requestor = self.services["librarian"], + # ), ServiceEndpoint( endpoint_path = "/api/v1/config", auth=self.auth, requestor = self.services["config"], ), ServiceEndpoint( - endpoint_path = "/api/v1/encyclopedia", auth=self.auth, - requestor = self.services["encyclopedia"], - ), - ServiceEndpoint( - endpoint_path = "/api/v1/dbpedia", auth=self.auth, - requestor = self.services["dbpedia"], - ), - ServiceEndpoint( - endpoint_path = "/api/v1/internet-search", auth=self.auth, - requestor = self.services["internet-search"], - ), - ServiceEndpoint( - endpoint_path = "/api/v1/load/document", auth=self.auth, - requestor = self.services["document-load"], - ), - ServiceEndpoint( - endpoint_path = "/api/v1/load/text", auth=self.auth, - requestor = self.services["text-load"], - ), - TriplesStreamEndpoint( - pulsar_client=self.pulsar_client, - auth = self.auth, - ), - GraphEmbeddingsStreamEndpoint( - pulsar_client=self.pulsar_client, - auth = self.auth, - ), - DocumentEmbeddingsStreamEndpoint( - pulsar_client=self.pulsar_client, - auth = self.auth, - ), - TriplesLoadEndpoint( - pulsar_client=self.pulsar_client, - auth = self.auth, - ), - GraphEmbeddingsLoadEndpoint( - pulsar_client=self.pulsar_client, - auth = self.auth, - ), - DocumentEmbeddingsLoadEndpoint( - pulsar_client=self.pulsar_client, - auth = self.auth, - ), - MuxEndpoint( - pulsar_client=self.pulsar_client, - auth = self.auth, - services = self.services, + endpoint_path = "/api/v1/flow", auth=self.auth, + requestor = self.services["flow"], ), + # ServiceEndpoint( + # endpoint_path = "/api/v1/encyclopedia", auth=self.auth, + # requestor = self.services["encyclopedia"], + # ), + # ServiceEndpoint( + # endpoint_path = "/api/v1/dbpedia", auth=self.auth, + # requestor = self.services["dbpedia"], + # ), + # ServiceEndpoint( + # endpoint_path = "/api/v1/internet-search", auth=self.auth, + # requestor = self.services["internet-search"], + # ), + # ServiceEndpoint( + # endpoint_path = "/api/v1/load/document", auth=self.auth, + # requestor = self.services["document-load"], + # ), + # ServiceEndpoint( + # endpoint_path = "/api/v1/load/text", auth=self.auth, + # requestor = self.services["text-load"], + # ), + # TriplesStreamEndpoint( + # pulsar_client=self.pulsar_client, + # auth = self.auth, + # ), + # GraphEmbeddingsStreamEndpoint( + # pulsar_client=self.pulsar_client, + # auth = self.auth, + # ), + # DocumentEmbeddingsStreamEndpoint( + # pulsar_client=self.pulsar_client, + # auth = self.auth, + # ), + # TriplesLoadEndpoint( + # pulsar_client=self.pulsar_client, + # auth = self.auth, + # ), + # GraphEmbeddingsLoadEndpoint( + # pulsar_client=self.pulsar_client, + # auth = self.auth, + # ), + # DocumentEmbeddingsLoadEndpoint( + # pulsar_client=self.pulsar_client, + # auth = self.auth, + # ), + # MuxEndpoint( + # pulsar_client=self.pulsar_client, + # auth = self.auth, + # services = self.services, + # ), MetricsEndpoint( endpoint_path = "/api/v1/metrics", prometheus_url = self.prometheus_url,