diff --git a/README.md b/README.md index 2fa82db5..90e7bcf6 100644 --- a/README.md +++ b/README.md @@ -211,6 +211,7 @@ python3 -m venv env . env/bin/activate pip3 install pulsar-client pip3 install cassandra-driver +export PYTHON_PATH=. ``` ### Load some data @@ -236,7 +237,7 @@ ERROR tag) you should be good. Look at the PDF decoder... ``` -docker logs trustgraph_pdf-decoder_1 +docker logs trustgraph-pdf-decoder-1 ``` which should contain some text like... @@ -248,7 +249,7 @@ Done. Look at the chunker output... ``` -docker logs trustgraph_chunker_1 +docker logs trustgraph-chunker-1 ``` You will see similar output, except many entries instead of 1. @@ -256,7 +257,7 @@ You will see similar output, except many entries instead of 1. Look at the vectorizer output... ``` -docker logs trustgraph_vectorize_1 +docker logs trustgraph-vectorize-1 ``` You will see similar output, except many entries instead of 1. @@ -264,7 +265,7 @@ You will see similar output, except many entries instead of 1. Look at the LLM output... ``` -docker logs trustgraph_llm_1 +docker logs trustgraph-llm-1 ``` You will see output like this... @@ -277,8 +278,8 @@ Done. Two more log outputs to look at... ``` -docker logs trustgraph_kg-extract-definitions_1 -docker logs trustgraph_kg-extract-relationships_1 +docker logs trustgraph-kg-extract-definitions-1 +docker logs trustgraph-kg-extract-relationships-1 ``` Definitions output similar to this should be visible @@ -388,7 +389,7 @@ If it looks like something isn't working, try following the graph-rag logs: ``` -docker logs -f trustgraph_graph-rag_1 +docker logs -f trustgraph-graph-rag-1 ``` If you get an answer to your query, Graph RAG is working! diff --git a/docker-compose-stores.yaml b/docker-compose-stores.yaml new file mode 100644 index 00000000..7aa04559 --- /dev/null +++ b/docker-compose-stores.yaml @@ -0,0 +1,92 @@ + +volumes: + cassandra: + pulsar-conf: + pulsar-data: + etcd: + minio-data: + milvus: + +services: + + cassandra: + image: docker.io/cassandra:4.1.5 + ports: + - "9042:9042" + volumes: + - "cassandra:/var/lib/cassandra" + restart: on-failure:100 + + pulsar: + image: docker.io/apachepulsar/pulsar:3.3.0 + command: bin/pulsar standalone + ports: + - "6650:6650" + - "8080:8080" + volumes: + - "pulsar-conf:/pulsar/conf" + - "pulsar-data:/pulsar/data" + restart: on-failure:100 + + pulsar-manager: + image: docker.io/apachepulsar/pulsar-manager:v0.3.0 + ports: + - "9527:9527" + - "7750:7750" + environment: + SPRING_CONFIGURATION_FILE: /pulsar-manager/pulsar-manager/application.properties + restart: on-failure:100 + + etcd: + image: quay.io/coreos/etcd:v3.5.5 + command: + - "etcd" + - "-advertise-client-urls=http://127.0.0.1:2379" + - "-listen-client-urls" + - "http://0.0.0.0:2379" + - "--data-dir" + - "/etcd" + environment: + ETCD_AUTO_COMPACTION_MODE: revision + ETCD_AUTO_COMPACTION_RETENTION: "1000" + ETCD_QUOTA_BACKEND_BYTES: "4294967296" + ETCD_SNAPSHOT_COUNT: "50000" + ports: + - "2379:2379" + volumes: + - "etcd:/etcd" + restart: on-failure:100 + + minio: + image: docker.io/minio/minio:RELEASE.2024-07-04T14-25-45Z + command: + - "minio" + - "server" + - "/minio_data" + - "--console-address" + - ":9001" + environment: + MINIO_ROOT_USER: minioadmin + MINIO_ROOT_PASSWORD: minioadmin + ports: + - "9001:9001" + volumes: + - "minio-data:/minio_data" + restart: on-failure:100 + + milvus: + image: docker.io/milvusdb/milvus:v2.4.5 + command: + - "milvus" + - "run" + - "standalone" + environment: + ETCD_ENDPOINTS: etcd:2379 + MINIO_ADDRESS: minio:9000 + ports: + - "9091:9091" + - "19530:19530" + volumes: + - "milvus:/var/lib/milvus" + restart: on-failure:100 + diff --git a/processing.yaml b/processing.yaml new file mode 100644 index 00000000..3ad01eb2 --- /dev/null +++ b/processing.yaml @@ -0,0 +1,34 @@ + +services: + + pdf-decoder: + module: trustgraph.decoder.pdf + + chunker: + module: trustgraph.chunker.recursive + + vectorize: + module: trustgraph.embeddings.vectorize + + embeddings: + module: trustgraph.embeddings.hf + + kg-extract-definitions: + module: trustgraph.kg.extract_definitions + + kg-extract-relationships: + module: trustgraph.kg.extract_relationships + + vector-write: + module: trustgraph.vector.milvus_write + + graph-write: + module: trustgraph.graph.cassandra_write + + llm: + module: trustgraph.llm.ollama_text + parameters: + ollama: http://monster:11434 + graph-rag: + module: trustgraph.rag.graph + diff --git a/scripts/run-processing b/scripts/run-processing new file mode 100755 index 00000000..cdfbb871 --- /dev/null +++ b/scripts/run-processing @@ -0,0 +1,6 @@ +#!/usr/bin/env python3 + +from trustgraph.processing import run + +run() + diff --git a/setup.py b/setup.py index 2ad4db47..fdd4c7d3 100644 --- a/setup.py +++ b/setup.py @@ -42,6 +42,7 @@ setuptools.setup( "pypdf", "anthropic", "google-cloud-aiplatform", + "pyyaml", ], scripts=[ "scripts/chunker-recursive", diff --git a/tests/test-llm b/tests/test-llm index 35177e81..22f2229b 100755 --- a/tests/test-llm +++ b/tests/test-llm @@ -11,5 +11,5 @@ resp = llm.request(prompt) print(resp) -llm.close() + diff --git a/trustgraph/chunker/recursive/chunker.py b/trustgraph/chunker/recursive/chunker.py index ba5eb939..f1389186 100755 --- a/trustgraph/chunker/recursive/chunker.py +++ b/trustgraph/chunker/recursive/chunker.py @@ -16,17 +16,24 @@ import time from ... schema import TextDocument, Chunk, Source from ... log_level import LogLevel +default_pulsar_host = os.getenv("PULSAR_HOST", 'pulsar://pulsar:6650') +default_input_queue = 'text-doc-load' +default_output_queue = 'chunk-load' +default_subscriber = 'chunker-recursive' + class Processor: def __init__( self, - pulsar_host, - input_queue, - output_queue, - subscriber, - log_level, + pulsar_host=default_pulsar_host, + input_queue=default_input_queue, + output_queue=default_output_queue, + subscriber=default_subscriber, + log_level=LogLevel.INFO, ): + self.client = None + self.client = pulsar.Client( pulsar_host, logger=pulsar.ConsoleLogger(log_level.to_pulsar()) @@ -49,11 +56,16 @@ class Processor: is_separator_regex=False, ) + print("Chunker inited") + def run(self): + print("Chunker running") + while True: msg = self.consumer.receive() + print("Chunker message received") try: @@ -91,7 +103,9 @@ class Processor: self.consumer.negative_acknowledge(msg) def __del__(self): - self.client.close() + + if self.client: + self.client.close() def run(): @@ -100,11 +114,6 @@ def run(): description=__doc__, ) - default_pulsar_host = os.getenv("PULSAR_HOST", 'pulsar://pulsar:6650') - default_input_queue = 'text-doc-load' - default_output_queue = 'chunk-load' - default_subscriber = 'chunker-recursive' - parser.add_argument( '-p', '--pulsar-host', default=default_pulsar_host, diff --git a/trustgraph/decoder/pdf/pdf_decoder.py b/trustgraph/decoder/pdf/pdf_decoder.py index f892ebac..f2ff56a2 100755 --- a/trustgraph/decoder/pdf/pdf_decoder.py +++ b/trustgraph/decoder/pdf/pdf_decoder.py @@ -16,17 +16,24 @@ import time from ... schema import Document, TextDocument, Source from ... log_level import LogLevel +default_pulsar_host = os.getenv("PULSAR_HOST", 'pulsar://pulsar:6650') +default_input_queue = 'document-load' +default_output_queue = 'text-doc-load' +default_subscriber = 'pdf-decoder' + class Processor: def __init__( self, - pulsar_host, - input_queue, - output_queue, - subscriber, - log_level, + pulsar_host=default_pulsar_host, + input_queue=default_input_queue, + output_queue=default_output_queue, + subscriber=default_subscriber, + log_level=LogLevel.INFO, ): + self.client = None + self.client = pulsar.Client( pulsar_host, logger=pulsar.ConsoleLogger(log_level.to_pulsar()) @@ -42,12 +49,23 @@ class Processor: schema=JsonSchema(TextDocument), ) + print("PDF inited") + + print("Pulsar", pulsar_host) + print("Input", input_queue) + print("Output", output_queue) + print("Subscriber", subscriber) + def run(self): + print("PDF running") + while True: msg = self.consumer.receive() + print("PDF message received") + try: v = msg.value() @@ -89,7 +107,9 @@ class Processor: self.consumer.negative_acknowledge(msg) def __del__(self): - self.client.close() + + if self.client: + self.client.close() def run(): @@ -98,11 +118,6 @@ def run(): description=__doc__, ) - default_pulsar_host = os.getenv("PULSAR_HOST", 'pulsar://pulsar:6650') - default_input_queue = 'document-load' - default_output_queue = 'text-doc-load' - default_subscriber = 'pdf-decoder' - parser.add_argument( '-p', '--pulsar-host', default=default_pulsar_host, diff --git a/trustgraph/embeddings/hf/hf.py b/trustgraph/embeddings/hf/hf.py index 671ba879..96ee8e8b 100755 --- a/trustgraph/embeddings/hf/hf.py +++ b/trustgraph/embeddings/hf/hf.py @@ -16,18 +16,26 @@ import time from ... schema import EmbeddingsRequest, EmbeddingsResponse from ... log_level import LogLevel +default_pulsar_host = os.getenv("PULSAR_HOST", 'pulsar://pulsar:6650') +default_input_queue = 'embeddings' +default_output_queue = 'embeddings-response' +default_subscriber = 'embeddings-hf' +default_model="all-MiniLM-L6-v2" + class Processor: def __init__( self, - pulsar_host, - input_queue, - output_queue, - subscriber, - log_level, - model, + pulsar_host=default_pulsar_host, + input_queue=default_input_queue, + output_queue=default_output_queue, + subscriber=default_subscriber, + log_level=LogLevel.INFO, + model=default_model, ): + self.client = None + self.client = pulsar.Client( pulsar_host, logger=pulsar.ConsoleLogger(log_level.to_pulsar()) @@ -81,8 +89,9 @@ class Processor: self.consumer.negative_acknowledge(msg) def __del__(self): - print("Closing", flush=True) - self.client.close() + + if self.client: + self.client.close() def run(): @@ -91,11 +100,6 @@ def run(): description=__doc__, ) - default_pulsar_host = os.getenv("PULSAR_HOST", 'pulsar://pulsar:6650') - default_input_queue = 'embeddings' - default_output_queue = 'embeddings-response' - default_subscriber = 'embeddings-hf' - parser.add_argument( '-p', '--pulsar-host', default=default_pulsar_host, diff --git a/trustgraph/embeddings/vectorize/vectorize.py b/trustgraph/embeddings/vectorize/vectorize.py index 99ee6013..deeb280e 100755 --- a/trustgraph/embeddings/vectorize/vectorize.py +++ b/trustgraph/embeddings/vectorize/vectorize.py @@ -16,18 +16,24 @@ from ... schema import Chunk, VectorsChunk from ... embeddings_client import EmbeddingsClient from ... log_level import LogLevel +default_pulsar_host = os.getenv("PULSAR_HOST", 'pulsar://pulsar:6650') +default_input_queue = 'chunk-load' +default_output_queue = 'vectors-chunk-load' +default_subscriber = 'embeddings-vectorizer' + class Processor: def __init__( self, - pulsar_host, - input_queue, - output_queue, - subscriber, - log_level, - model, + pulsar_host=default_pulsar_host, + input_queue=default_input_queue, + output_queue=default_output_queue, + subscriber=default_subscriber, + log_level=LogLevel.INFO, ): + self.client = None + self.client = pulsar.Client( pulsar_host, logger=pulsar.ConsoleLogger(log_level.to_pulsar()) @@ -89,7 +95,9 @@ class Processor: self.consumer.negative_acknowledge(msg) def __del__(self): - self.client.close() + + if self.client: + self.client.close() def run(): @@ -98,11 +106,6 @@ def run(): description=__doc__, ) - default_pulsar_host = os.getenv("PULSAR_HOST", 'pulsar://pulsar:6650') - default_input_queue = 'chunk-load' - default_output_queue = 'vectors-chunk-load' - default_subscriber = 'embeddings-vectorizer' - parser.add_argument( '-p', '--pulsar-host', default=default_pulsar_host, @@ -135,12 +138,6 @@ def run(): help=f'Output queue (default: info)' ) - parser.add_argument( - '-m', '--model', - default="all-MiniLM-L6-v2", - help=f'LLM model (default: all-MiniLM-L6-v2)' - ) - args = parser.parse_args() while True: @@ -153,7 +150,6 @@ def run(): output_queue=args.output_queue, subscriber=args.subscriber, log_level=args.log_level, - model=args.model, ) p.run() diff --git a/trustgraph/graph/cassandra_write/write.py b/trustgraph/graph/cassandra_write/write.py index f8603a63..01ca72f1 100755 --- a/trustgraph/graph/cassandra_write/write.py +++ b/trustgraph/graph/cassandra_write/write.py @@ -15,17 +15,24 @@ from ... trustgraph import TrustGraph from ... schema import Triple from ... log_level import LogLevel +default_pulsar_host = os.getenv("PULSAR_HOST", 'pulsar://pulsar:6650') +default_input_queue = 'graph-load' +default_subscriber = 'graph-write-cassandra' +default_graph_host='localhost' + class Processor: def __init__( self, - pulsar_host, - input_queue, - subscriber, - log_level, - graph_host, + pulsar_host=default_pulsar_host, + input_queue=default_input_queue, + subscriber=default_subscriber, + graph_host=default_graph_host, + log_level=LogLevel.INFO, ): + self.client = None + self.client = pulsar.Client( pulsar_host, logger=pulsar.ConsoleLogger(log_level.to_pulsar()) @@ -72,7 +79,9 @@ class Processor: self.consumer.negative_acknowledge(msg) def __del__(self): - self.client.close() + + if self.client: + self.client.close() def run(): @@ -81,10 +90,6 @@ def run(): description=__doc__, ) - default_pulsar_host = os.getenv("PULSAR_HOST", 'pulsar://pulsar:6650') - default_input_queue = 'graph-load' - default_subscriber = 'graph-write-cassandra' - parser.add_argument( '-p', '--pulsar-host', default=default_pulsar_host, diff --git a/trustgraph/kg/extract_definitions/extract.py b/trustgraph/kg/extract_definitions/extract.py index 513fd726..7ff569dc 100755 --- a/trustgraph/kg/extract_definitions/extract.py +++ b/trustgraph/kg/extract_definitions/extract.py @@ -24,17 +24,24 @@ from ... rdf import TRUSTGRAPH_ENTITIES, DEFINITION DEFINITION_VALUE = Value(value=DEFINITION, is_uri=True) +default_pulsar_host = os.getenv("PULSAR_HOST", 'pulsar://pulsar:6650') +default_input_queue = 'vectors-chunk-load' +default_output_queue = 'graph-load' +default_subscriber = 'kg-extract-definitions' + class Processor: def __init__( self, - pulsar_host, - input_queue, - output_queue, - subscriber, - log_level, + pulsar_host=default_pulsar_host, + input_queue=default_input_queue, + output_queue=default_output_queue, + subscriber=default_subscriber, + log_level=LogLevel.INFO, ): + self.client = None + self.client = pulsar.Client( pulsar_host, logger=pulsar.ConsoleLogger(log_level.to_pulsar()) @@ -122,7 +129,9 @@ class Processor: self.consumer.negative_acknowledge(msg) def __del__(self): - self.client.close() + + if self.client: + self.client.close() def run(): @@ -131,11 +140,6 @@ def run(): description=__doc__, ) - default_pulsar_host = os.getenv("PULSAR_HOST", 'pulsar://pulsar:6650') - default_input_queue = 'vectors-chunk-load' - default_output_queue = 'graph-load' - default_subscriber = 'kg-extract-definitions' - parser.add_argument( '-p', '--pulsar-host', default=default_pulsar_host, diff --git a/trustgraph/kg/extract_relationships/extract.py b/trustgraph/kg/extract_relationships/extract.py index cd7d1aa1..14f837df 100755 --- a/trustgraph/kg/extract_relationships/extract.py +++ b/trustgraph/kg/extract_relationships/extract.py @@ -25,16 +25,22 @@ from ... rdf import RDF_LABEL, TRUSTGRAPH_ENTITIES RDF_LABEL_VALUE = Value(value=RDF_LABEL, is_uri=True) +default_pulsar_host = os.getenv("PULSAR_HOST", 'pulsar://pulsar:6650') +default_input_queue = 'vectors-chunk-load' +default_output_queue = 'graph-load' +default_subscriber = 'kg-extract-relationships' +default_vector_queue='vectors-load' + class Processor: def __init__( self, - pulsar_host, - input_queue, - output_queue, - vec_queue, - subscriber, - log_level, + pulsar_host=default_pulsar_host, + input_queue=default_input_queue, + vector_queue=default_vector_queue, + output_queue=default_output_queue, + subscriber=default_subscriber, + log_level=LogLevel.INFO, ): self.client = pulsar.Client( @@ -53,7 +59,7 @@ class Processor: ) self.vec_prod = self.client.create_producer( - topic=vec_queue, + topic=vector_queue, schema=JsonSchema(VectorsAssociation), ) @@ -182,12 +188,6 @@ def run(): description=__doc__, ) - default_pulsar_host = os.getenv("PULSAR_HOST", 'pulsar://pulsar:6650') - default_input_queue = 'vectors-chunk-load' - default_output_queue = 'graph-load' - default_subscriber = 'kg-extract-relationships' - default_vector_queue='vectors-load' - parser.add_argument( '-p', '--pulsar-host', default=default_pulsar_host, @@ -236,7 +236,7 @@ def run(): pulsar_host=args.pulsar_host, input_queue=args.input_queue, output_queue=args.output_queue, - vec_queue=args.vector_queue, + vector_queue=args.vector_queue, subscriber=args.subscriber, log_level=args.log_level, ) diff --git a/trustgraph/llm/azure_text/llm.py b/trustgraph/llm/azure_text/llm.py index 26030920..252be0b0 100755 --- a/trustgraph/llm/azure_text/llm.py +++ b/trustgraph/llm/azure_text/llm.py @@ -18,15 +18,20 @@ import json from ... schema import TextCompletionRequest, TextCompletionResponse from ... log_level import LogLevel +default_pulsar_host = os.getenv("PULSAR_HOST", 'pulsar://pulsar:6650') +default_input_queue = 'llm-complete-text' +default_output_queue = 'llm-complete-text-response' +default_subscriber = 'llm-azure-text' + class Processor: def __init__( self, - pulsar_host, - input_queue, - output_queue, - subscriber, - log_level, + pulsar_host=default_pulsar_host, + input_queue=default_input_queue, + output_queue=default_output_queue, + subscriber=default_subscriber, + log_level=LogLevel.INFO, endpoint, token, ): @@ -138,11 +143,6 @@ def run(): description=__doc__, ) - default_pulsar_host = os.getenv("PULSAR_HOST", 'pulsar://pulsar:6650') - default_input_queue = 'llm-complete-text' - default_output_queue = 'llm-complete-text-response' - default_subscriber = 'llm-ollama-text' - parser.add_argument( '-p', '--pulsar-host', default=default_pulsar_host, diff --git a/trustgraph/llm/claude_text/llm.py b/trustgraph/llm/claude_text/llm.py index 85ffc9c7..00e6998e 100755 --- a/trustgraph/llm/claude_text/llm.py +++ b/trustgraph/llm/claude_text/llm.py @@ -16,19 +16,27 @@ import time from ... schema import TextCompletionRequest, TextCompletionResponse from ... log_level import LogLevel +default_pulsar_host = os.getenv("PULSAR_HOST", 'pulsar://pulsar:6650') +default_input_queue = 'llm-complete-text' +default_output_queue = 'llm-complete-text-response' +default_subscriber = 'llm-claude-text' +default_model = 'claude-3-5-sonnet-20240620' + class Processor: def __init__( self, - pulsar_host, - input_queue, - output_queue, - subscriber, - log_level, - model, + pulsar_host=default_pulsar_host, + input_queue=default_input_queue, + output_queue=default_output_queue, + subscriber=default_subscriber, + log_level=LogLevel.INFO, + model=default_model, api_key, ): + self.client = None + self.client = pulsar.Client( pulsar_host, logger=pulsar.ConsoleLogger(log_level.to_pulsar()) @@ -114,11 +122,6 @@ def run(): description=__doc__, ) - default_pulsar_host = os.getenv("PULSAR_HOST", 'pulsar://pulsar:6650') - default_input_queue = 'llm-complete-text' - default_output_queue = 'llm-complete-text-response' - default_subscriber = 'llm-claude-text' - parser.add_argument( '-p', '--pulsar-host', default=default_pulsar_host, @@ -187,4 +190,3 @@ def run(): time.sleep(10) - diff --git a/trustgraph/llm/ollama_text/llm.py b/trustgraph/llm/ollama_text/llm.py index 9d9c7dad..048af534 100755 --- a/trustgraph/llm/ollama_text/llm.py +++ b/trustgraph/llm/ollama_text/llm.py @@ -16,19 +16,28 @@ import time from ... schema import TextCompletionRequest, TextCompletionResponse from ... log_level import LogLevel +default_pulsar_host = os.getenv("PULSAR_HOST", 'pulsar://pulsar:6650') +default_input_queue = 'llm-complete-text' +default_output_queue = 'llm-complete-text-response' +default_subscriber = 'llm-ollama-text' +default_model = 'gemma2' +default_ollama = 'http://localhost:11434' + class Processor: def __init__( self, - pulsar_host, - input_queue, - output_queue, - subscriber, - log_level, - model, - ollama, + pulsar_host=default_pulsar_host, + input_queue=default_input_queue, + output_queue=default_output_queue, + subscriber=default_subscriber, + log_level=LogLevel.INFO, + model=default_model, + ollama=default_ollama, ): + self.client = None + self.client = pulsar.Client( pulsar_host, logger=pulsar.ConsoleLogger(log_level.to_pulsar()) @@ -82,8 +91,9 @@ class Processor: self.consumer.negative_acknowledge(msg) def __del__(self): - print("Closing") - self.client.close() + + if self.client: + self.client.close() def run(): @@ -92,11 +102,6 @@ def run(): description=__doc__, ) - default_pulsar_host = os.getenv("PULSAR_HOST", 'pulsar://pulsar:6650') - default_input_queue = 'llm-complete-text' - default_output_queue = 'llm-complete-text-response' - default_subscriber = 'llm-ollama-text' - parser.add_argument( '-p', '--pulsar-host', default=default_pulsar_host, diff --git a/trustgraph/llm/vertexai_text/llm.py b/trustgraph/llm/vertexai_text/llm.py index d108b6da..f4d55a18 100755 --- a/trustgraph/llm/vertexai_text/llm.py +++ b/trustgraph/llm/vertexai_text/llm.py @@ -30,20 +30,27 @@ from vertexai.preview.generative_models import ( from ... schema import TextCompletionRequest, TextCompletionResponse from ... log_level import LogLevel +default_pulsar_host = os.getenv("PULSAR_HOST", 'pulsar://pulsar:6650') +default_input_queue = 'llm-complete-text' +default_output_queue = 'llm-complete-text-response' +default_subscriber = 'llm-vertexai-text' + class Processor: def __init__( self, - pulsar_host, - input_queue, - output_queue, - subscriber, - log_level, + pulsar_host=default_pulsar_host, + input_queue=default_input_queue, + output_queue=default_output_queue, + subscriber=default_subscriber, + log_level=LogLevel.INFO, + region="us-west1", + model="gemini-1.0-pro-001", credentials, - region, - model, ): + self.client = None + self.client = pulsar.Client( pulsar_host, logger=pulsar.ConsoleLogger(log_level.to_pulsar()) @@ -155,7 +162,9 @@ class Processor: self.consumer.negative_acknowledge(msg) def __del__(self): - self.client.close() + + if self.client: + self.client.close() def run(): @@ -164,11 +173,6 @@ def run(): description=__doc__, ) - default_pulsar_host = os.getenv("PULSAR_HOST", 'pulsar://pulsar:6650') - default_input_queue = 'llm-complete-text' - default_output_queue = 'llm-complete-text-response' - default_subscriber = 'llm-vertexai-text' - parser.add_argument( '-p', '--pulsar-host', default=default_pulsar_host, diff --git a/trustgraph/processing/__init__.py b/trustgraph/processing/__init__.py new file mode 100644 index 00000000..262fe3b0 --- /dev/null +++ b/trustgraph/processing/__init__.py @@ -0,0 +1,3 @@ + +from . processing import * + diff --git a/trustgraph/processing/__main__.py b/trustgraph/processing/__main__.py new file mode 100755 index 00000000..80904384 --- /dev/null +++ b/trustgraph/processing/__main__.py @@ -0,0 +1,7 @@ +#!/usr/bin/env python3 + +from . processing import run + +if __name__ == '__main__': + run() + diff --git a/trustgraph/processing/processing.py b/trustgraph/processing/processing.py new file mode 100644 index 00000000..5e4c7c8a --- /dev/null +++ b/trustgraph/processing/processing.py @@ -0,0 +1,171 @@ + +import argparse +import time +import os +from yaml import load, Loader +import json +import multiprocessing +from multiprocessing.connection import wait + +import importlib + +from .. log_level import LogLevel + +def fn(module_name, class_name, params, w): + + print(f"Starting {module_name}...") + + if "log_level" in params: + params["log_level"] = LogLevel(params["log_level"]) + + while True: + + try: + + print(f"Starting {class_name} using {module_name}...") + + module = importlib.import_module(module_name) + class_object = getattr(module, class_name) + + processor = class_object(**params) + + processor.run() + print(f"{module_name} stopped.") + + except Exception as e: + print("Exception:", e) + + print("Restarting in 10...") + + time.sleep(10) + + print("Closing") + w.close() + +class Processing: + + def __init__( + self, + pulsar_host, + log_level, + file, + ): + self.pulsar_host = pulsar_host + self.log_level = log_level + self.file = file + + self.defs = load(open(file, "r"), Loader=Loader) + + def run(self): + + procs = [] + readers = [] + services = [] + + for service in self.defs["services"]: + + sdef = self.defs["services"][service] + + params = { + "pulsar_host": self.pulsar_host, + "log_level": str(self.log_level), + } + + if "parameters" in sdef: + for par in sdef["parameters"]: + params[par] = sdef["parameters"][par] + + module_name = sdef["module"] + class_name = sdef.get("class", "Processor") + + r, w = multiprocessing.Pipe(duplex=False) + + process = multiprocessing.Process( + target=fn, + args=(module_name, class_name, params, w) + ) + process.start() + + w.close() + + procs.append(process) + services.append(service) + readers.append(r) + + wait_for = len(readers) + + while wait_for > 0: + + ret = wait(readers) + + for r in ret: + + try: + msg = r.recv() + except EOFError: + readers.remove(r) + wait_for -= 1 + + print("All processes exited") + + for p in procs: + p.join() + + def __del__(self): + pass + +def run(): + + # Seems not to work. +# multiprocessing.set_start_method('spawn') + + parser = argparse.ArgumentParser( + prog='run-processing', + description=__doc__, + ) + + default_pulsar_host = os.getenv("PULSAR_HOST", 'pulsar://pulsar:6650') + + parser.add_argument( + '-p', '--pulsar-host', + default=default_pulsar_host, + help=f'Pulsar host (default: {default_pulsar_host})', + ) + + parser.add_argument( + '-l', '--log-level', + type=LogLevel, + default=LogLevel.INFO, + choices=list(LogLevel), + help=f'Output queue (default: info)' + ) + + parser.add_argument( + '-f', '--file', + default="processing.yaml", + help=f'Processing definition file (default: processing.yaml)' + ) + + args = parser.parse_args() + + while True: + + try: + p = Processing( + pulsar_host=args.pulsar_host, + file=args.file, + log_level=args.log_level, + ) + + p.run() + + print("Finished.") + break + + except Exception as e: + + print("Exception:", e, flush=True) + print("Will retry...", flush=True) + + time.sleep(10) + diff --git a/trustgraph/rag/graph/rag.py b/trustgraph/rag/graph/rag.py index 28ad298b..ba4e1335 100755 --- a/trustgraph/rag/graph/rag.py +++ b/trustgraph/rag/graph/rag.py @@ -16,19 +16,28 @@ from ... schema import GraphRagQuery, GraphRagResponse from ... log_level import LogLevel from ... graph_rag import GraphRag +default_pulsar_host = os.getenv("PULSAR_HOST", 'pulsar://pulsar:6650') +default_input_queue = 'graph-rag-query' +default_output_queue = 'graph-rag-response' +default_subscriber = 'graph-rag' +default_graph_hosts = [ 'localhost' ] +default_vector_store = 'http://localhost:19530' + class Processor: def __init__( self, - pulsar_host, - input_queue, - output_queue, - subscriber, - log_level, - graph_hosts, - vector_store, + pulsar_host=default_pulsar_host, + input_queue=default_input_queue, + output_queue=default_output_queue, + subscriber=default_subscriber, + log_level=LogLevel.INFO, + graph_hosts=default_graph_hosts, + vector_store=default_vector_store, ): + self.client = None + self.client = pulsar.Client( pulsar_host, logger=pulsar.ConsoleLogger(log_level.to_pulsar()) @@ -86,8 +95,9 @@ class Processor: self.consumer.negative_acknowledge(msg) def __del__(self): - print("Closing", flush=True) - self.client.close() + + if self.client: + self.client.close() def run(): @@ -96,11 +106,6 @@ def run(): description=__doc__, ) - default_pulsar_host = os.getenv("PULSAR_HOST", 'pulsar://pulsar:6650') - default_input_queue = 'graph-rag-query' - default_output_queue = 'graph-rag-response' - default_subscriber = 'graph-rag' - parser.add_argument( '-p', '--pulsar-host', default=default_pulsar_host, diff --git a/trustgraph/vector/milvus_write/write.py b/trustgraph/vector/milvus_write/write.py index 67bdcd62..b25fa329 100755 --- a/trustgraph/vector/milvus_write/write.py +++ b/trustgraph/vector/milvus_write/write.py @@ -16,17 +16,24 @@ from ... schema import VectorsAssociation from ... log_level import LogLevel from ... edge_map import VectorStore +default_pulsar_host = os.getenv("PULSAR_HOST", 'pulsar://pulsar:6650') +default_input_queue = 'vectors-load' +default_subscriber = 'vector-write-milvus' +default_store_uri = 'http://localhost:19530' + class Processor: def __init__( self, - pulsar_host, - input_queue, - subscriber, - store_uri, - log_level, + pulsar_host=default_pulsar_host, + input_queue=default_input_queue, + subscriber=default_subscriber, + store_uri=default_store_uri, + log_level=LogLevel.INFO, ): + self.client = None + self.client = pulsar.Client( pulsar_host, logger=pulsar.ConsoleLogger(log_level.to_pulsar()) @@ -64,7 +71,9 @@ class Processor: self.consumer.negative_acknowledge(msg) def __del__(self): - self.client.close() + + if self.client: + self.client.close() def run(): @@ -73,10 +82,6 @@ def run(): description=__doc__, ) - default_pulsar_host = os.getenv("PULSAR_HOST", 'pulsar://pulsar:6650') - default_input_queue = 'vectors-load' - default_subscriber = 'vector-write-milvus' - parser.add_argument( '-p', '--pulsar-host', default=default_pulsar_host, @@ -105,8 +110,8 @@ def run(): parser.add_argument( '-t', '--store-uri', - default="http://localhost:19530", - help=f'Milvus store URI (default: http://localhost:19530)' + default="http://milvus:19530", + help=f'Milvus store URI (default: http://milvus:19530)' ) args = parser.parse_args()