diff --git a/trustgraph-base/trustgraph/base/base_processor.py b/trustgraph-base/trustgraph/base/base_processor.py index b0fdd1bb..a5a819c2 100644 --- a/trustgraph-base/trustgraph/base/base_processor.py +++ b/trustgraph-base/trustgraph/base/base_processor.py @@ -1,4 +1,5 @@ +import asyncio import os import argparse import pulsar @@ -83,11 +84,20 @@ class BaseProcessor: help=f'Pulsar host (default: 8000)', ) - def run(self): + async def start(self): + pass + + async def run(self): raise RuntimeError("Something should have implemented the run method") @classmethod - def start(cls, prog, doc): + async def launch_async(cls, args): + p = cls(**args) + await p.start() + await p.run() + + @classmethod + def launch(cls, prog, doc): parser = argparse.ArgumentParser( prog=prog, @@ -108,8 +118,7 @@ class BaseProcessor: try: - p = cls(**args) - p.run() + asyncio.run(cls.launch_async(args)) except KeyboardInterrupt: print("Keyboard interrupt.") @@ -127,3 +136,4 @@ class BaseProcessor: print("Will retry...", flush=True) time.sleep(4) + diff --git a/trustgraph-base/trustgraph/base/consumer.py b/trustgraph-base/trustgraph/base/consumer.py index 521dd3c1..175f1fd7 100644 --- a/trustgraph-base/trustgraph/base/consumer.py +++ b/trustgraph-base/trustgraph/base/consumer.py @@ -1,4 +1,5 @@ +import asyncio from pulsar.schema import JsonSchema import pulsar from prometheus_client import Histogram, Info, Counter, Enum @@ -75,7 +76,7 @@ class Consumer(BaseProcessor): print("Initialised consumer.", flush=True) - def run(self): + async def run(self): __class__.state_metric.state('running') @@ -104,7 +105,7 @@ class Consumer(BaseProcessor): try: with __class__.request_metric.time(): - self.handle(msg) + await self.handle(msg) # Acknowledge successful processing of the message self.consumer.acknowledge(msg) diff --git a/trustgraph-base/trustgraph/base/consumer_producer.py b/trustgraph-base/trustgraph/base/consumer_producer.py index be9915ce..1006f9b5 100644 --- a/trustgraph-base/trustgraph/base/consumer_producer.py +++ b/trustgraph-base/trustgraph/base/consumer_producer.py @@ -42,7 +42,7 @@ class ConsumerProducer(Consumer): print("Initialised consumer/producer.") - def send(self, msg, properties={}): + async def send(self, msg, properties={}): self.producer.send(msg, properties) __class__.output_metric.inc() diff --git a/trustgraph-base/trustgraph/base/producer.py b/trustgraph-base/trustgraph/base/producer.py index 84d7fc99..bc2d7791 100644 --- a/trustgraph-base/trustgraph/base/producer.py +++ b/trustgraph-base/trustgraph/base/producer.py @@ -37,7 +37,7 @@ class Producer(BaseProcessor): chunking_enabled=True, ) - def send(self, msg, properties={}): + async def send(self, msg, properties={}): self.producer.send(msg, properties) __class__.output_metric.inc() diff --git a/trustgraph-bedrock/trustgraph/model/text_completion/bedrock/llm.py b/trustgraph-bedrock/trustgraph/model/text_completion/bedrock/llm.py index f46e35fb..3bca324d 100755 --- a/trustgraph-bedrock/trustgraph/model/text_completion/bedrock/llm.py +++ b/trustgraph-bedrock/trustgraph/model/text_completion/bedrock/llm.py @@ -109,7 +109,7 @@ class Processor(ConsumerProducer): print("Initialised", flush=True) - def handle(self, msg): + async def handle(self, msg): v = msg.value() @@ -341,5 +341,5 @@ class Processor(ConsumerProducer): def run(): - Processor.start(module, __doc__) + Processor.launch(module, __doc__) diff --git a/trustgraph-embeddings-hf/trustgraph/embeddings/hf/hf.py b/trustgraph-embeddings-hf/trustgraph/embeddings/hf/hf.py index 4b3b39c1..5470333b 100755 --- a/trustgraph-embeddings-hf/trustgraph/embeddings/hf/hf.py +++ b/trustgraph-embeddings-hf/trustgraph/embeddings/hf/hf.py @@ -40,7 +40,7 @@ class Processor(ConsumerProducer): self.embeddings = HuggingFaceEmbeddings(model_name=model) - def handle(self, msg): + async def handle(self, msg): v = msg.value() @@ -96,5 +96,5 @@ class Processor(ConsumerProducer): def run(): - Processor.start(module, __doc__) + Processor.launch(module, __doc__) diff --git a/trustgraph-flow/trustgraph/agent/react/service.py b/trustgraph-flow/trustgraph/agent/react/service.py index 2414ea13..684ebe5a 100755 --- a/trustgraph-flow/trustgraph/agent/react/service.py +++ b/trustgraph-flow/trustgraph/agent/react/service.py @@ -189,7 +189,7 @@ class Processor(ConsumerProducer): return json.loads(json_str) - def handle(self, msg): + async def handle(self, msg): try: @@ -229,7 +229,7 @@ class Processor(ConsumerProducer): observation=None, ) - self.producer.send(r, properties={"id": id}) + await self.producer.send(r, properties={"id": id}) def observe(x): @@ -242,7 +242,7 @@ class Processor(ConsumerProducer): observation=x, ) - self.producer.send(r, properties={"id": id}) + await self.producer.send(r, properties={"id": id}) act = self.agent.react(v.question, history, think, observe) @@ -258,7 +258,7 @@ class Processor(ConsumerProducer): thought=None, ) - self.producer.send(r, properties={"id": id}) + await self.producer.send(r, properties={"id": id}) print("Done.", flush=True) @@ -281,7 +281,7 @@ class Processor(ConsumerProducer): ] ) - self.recursive_input.send(r, properties={"id": id}) + await self.recursive_input.send(r, properties={"id": id}) print("Done.", flush=True) @@ -301,7 +301,7 @@ class Processor(ConsumerProducer): response=None, ) - self.producer.send(r, properties={"id": id}) + await self.producer.send(r, properties={"id": id}) @staticmethod def add_args(parser): @@ -377,5 +377,5 @@ description.''' def run(): - Processor.start(module, __doc__) + Processor.launch(module, __doc__) diff --git a/trustgraph-flow/trustgraph/chunking/recursive/chunker.py b/trustgraph-flow/trustgraph/chunking/recursive/chunker.py index 694ced70..82f333b5 100755 --- a/trustgraph-flow/trustgraph/chunking/recursive/chunker.py +++ b/trustgraph-flow/trustgraph/chunking/recursive/chunker.py @@ -52,7 +52,7 @@ class Processor(ConsumerProducer): is_separator_regex=False, ) - def handle(self, msg): + async def handle(self, msg): v = msg.value() print(f"Chunking {v.metadata.id}...", flush=True) @@ -70,7 +70,7 @@ class Processor(ConsumerProducer): __class__.chunk_metric.observe(len(chunk.page_content)) - self.send(r) + await self.send(r) print("Done.", flush=True) @@ -98,5 +98,5 @@ class Processor(ConsumerProducer): def run(): - Processor.start(module, __doc__) + Processor.launch(module, __doc__) diff --git a/trustgraph-flow/trustgraph/chunking/token/chunker.py b/trustgraph-flow/trustgraph/chunking/token/chunker.py index dccd9c89..c625b48c 100755 --- a/trustgraph-flow/trustgraph/chunking/token/chunker.py +++ b/trustgraph-flow/trustgraph/chunking/token/chunker.py @@ -51,7 +51,7 @@ class Processor(ConsumerProducer): chunk_overlap=chunk_overlap, ) - def handle(self, msg): + async def handle(self, msg): v = msg.value() print(f"Chunking {v.metadata.id}...", flush=True) @@ -69,7 +69,7 @@ class Processor(ConsumerProducer): __class__.chunk_metric.observe(len(chunk.page_content)) - self.send(r) + await self.send(r) print("Done.", flush=True) @@ -97,5 +97,5 @@ class Processor(ConsumerProducer): def run(): - Processor.start(module, __doc__) + Processor.launch(module, __doc__) diff --git a/trustgraph-flow/trustgraph/decoding/pdf/pdf_decoder.py b/trustgraph-flow/trustgraph/decoding/pdf/pdf_decoder.py index 38ac9257..5e5e3612 100755 --- a/trustgraph-flow/trustgraph/decoding/pdf/pdf_decoder.py +++ b/trustgraph-flow/trustgraph/decoding/pdf/pdf_decoder.py @@ -39,7 +39,7 @@ class Processor(ConsumerProducer): print("PDF inited") - def handle(self, msg): + async def handle(self, msg): print("PDF message received") @@ -64,7 +64,7 @@ class Processor(ConsumerProducer): text=page.page_content.encode("utf-8"), ) - self.send(r) + await self.send(r) print("Done.", flush=True) @@ -78,5 +78,5 @@ class Processor(ConsumerProducer): def run(): - Processor.start(module, __doc__) + Processor.launch(module, __doc__) diff --git a/trustgraph-flow/trustgraph/embeddings/document_embeddings/embeddings.py b/trustgraph-flow/trustgraph/embeddings/document_embeddings/embeddings.py index 745ab4db..6a4a4a67 100755 --- a/trustgraph-flow/trustgraph/embeddings/document_embeddings/embeddings.py +++ b/trustgraph-flow/trustgraph/embeddings/document_embeddings/embeddings.py @@ -52,7 +52,7 @@ class Processor(ConsumerProducer): subscriber=module + "-emb", ) - def handle(self, msg): + async def handle(self, msg): v = msg.value() print(f"Indexing {v.metadata.id}...", flush=True) @@ -73,7 +73,7 @@ class Processor(ConsumerProducer): chunks=embeds, ) - self.producer.send(r) + await self.send(r) except Exception as e: print("Exception:", e, flush=True) @@ -105,5 +105,5 @@ class Processor(ConsumerProducer): def run(): - Processor.start(module, __doc__) + Processor.launch(module, __doc__) diff --git a/trustgraph-flow/trustgraph/embeddings/fastembed/processor.py b/trustgraph-flow/trustgraph/embeddings/fastembed/processor.py index 635387b8..253d77ac 100755 --- a/trustgraph-flow/trustgraph/embeddings/fastembed/processor.py +++ b/trustgraph-flow/trustgraph/embeddings/fastembed/processor.py @@ -41,7 +41,7 @@ class Processor(ConsumerProducer): self.embeddings = TextEmbedding(model_name = model) - def handle(self, msg): + async def handle(self, msg): v = msg.value() @@ -65,7 +65,7 @@ class Processor(ConsumerProducer): error=None, ) - self.producer.send(r, properties={"id": id}) + await self.producer.send(r, properties={"id": id}) print("Done.", flush=True) @@ -85,5 +85,5 @@ class Processor(ConsumerProducer): def run(): - Processor.start(module, __doc__) + Processor.launch(module, __doc__) diff --git a/trustgraph-flow/trustgraph/embeddings/graph_embeddings/embeddings.py b/trustgraph-flow/trustgraph/embeddings/graph_embeddings/embeddings.py index e4d1646e..2cbe9907 100755 --- a/trustgraph-flow/trustgraph/embeddings/graph_embeddings/embeddings.py +++ b/trustgraph-flow/trustgraph/embeddings/graph_embeddings/embeddings.py @@ -52,7 +52,7 @@ class Processor(ConsumerProducer): subscriber=module + "-emb", ) - def handle(self, msg): + async def handle(self, msg): v = msg.value() print(f"Indexing {v.metadata.id}...", flush=True) @@ -77,7 +77,7 @@ class Processor(ConsumerProducer): entities=entities, ) - self.producer.send(r) + await self.send(r) except Exception as e: print("Exception:", e, flush=True) @@ -109,5 +109,5 @@ class Processor(ConsumerProducer): def run(): - Processor.start(module, __doc__) + Processor.launch(module, __doc__) diff --git a/trustgraph-flow/trustgraph/embeddings/ollama/processor.py b/trustgraph-flow/trustgraph/embeddings/ollama/processor.py index 5baf64aa..f9dd87a9 100755 --- a/trustgraph-flow/trustgraph/embeddings/ollama/processor.py +++ b/trustgraph-flow/trustgraph/embeddings/ollama/processor.py @@ -45,7 +45,7 @@ class Processor(ConsumerProducer): self.client = Client(host=ollama) self.model = model - def handle(self, msg): + async def handle(self, msg): v = msg.value() @@ -67,7 +67,7 @@ class Processor(ConsumerProducer): error=None, ) - self.producer.send(r, properties={"id": id}) + await self.producer.send(r, properties={"id": id}) print("Done.", flush=True) @@ -93,5 +93,5 @@ class Processor(ConsumerProducer): def run(): - Processor.start(module, __doc__) + Processor.launch(module, __doc__) diff --git a/trustgraph-flow/trustgraph/external/wikipedia/service.py b/trustgraph-flow/trustgraph/external/wikipedia/service.py index 932e1213..a269c8ce 100644 --- a/trustgraph-flow/trustgraph/external/wikipedia/service.py +++ b/trustgraph-flow/trustgraph/external/wikipedia/service.py @@ -39,7 +39,7 @@ class Processor(ConsumerProducer): self.url = url - def handle(self, msg): + async def handle(self, msg): v = msg.value() @@ -60,7 +60,7 @@ class Processor(ConsumerProducer): text=resp ) - self.producer.send(r, properties={"id": id}) + await self.producer.send(r, properties={"id": id}) self.consumer.acknowledge(msg) @@ -75,7 +75,7 @@ class Processor(ConsumerProducer): ), text=None, ) - self.producer.send(r, properties={"id": id}) + await self.producer.send(r, properties={"id": id}) self.consumer.acknowledge(msg) @@ -98,5 +98,5 @@ class Processor(ConsumerProducer): def run(): - Processor.start(module, __doc__) + Processor.launch(module, __doc__) diff --git a/trustgraph-flow/trustgraph/extract/kg/definitions/extract.py b/trustgraph-flow/trustgraph/extract/kg/definitions/extract.py index dcb1123e..c88005b7 100755 --- a/trustgraph-flow/trustgraph/extract/kg/definitions/extract.py +++ b/trustgraph-flow/trustgraph/extract/kg/definitions/extract.py @@ -96,15 +96,15 @@ class Processor(ConsumerProducer): return self.prompt.request_definitions(chunk) - def emit_edges(self, metadata, triples): + async def emit_edges(self, metadata, triples): t = Triples( metadata=metadata, triples=triples, ) - self.producer.send(t) + await self.send(t) - def emit_ecs(self, metadata, entities): + async def emit_ecs(self, metadata, entities): t = EntityContexts( metadata=metadata, @@ -112,7 +112,7 @@ class Processor(ConsumerProducer): ) self.ec_prod.send(t) - def handle(self, msg): + async def handle(self, msg): v = msg.value() print(f"Indexing {v.metadata.id}...", flush=True) @@ -171,7 +171,7 @@ class Processor(ConsumerProducer): entities.append(ec) - self.emit_edges( + await self.emit_edges( Metadata( id=v.metadata.id, metadata=[], @@ -181,7 +181,7 @@ class Processor(ConsumerProducer): triples ) - self.emit_ecs( + await self.emit_ecs( Metadata( id=v.metadata.id, metadata=[], @@ -224,5 +224,5 @@ class Processor(ConsumerProducer): def run(): - Processor.start(module, __doc__) + Processor.launch(module, __doc__) diff --git a/trustgraph-flow/trustgraph/extract/kg/relationships/extract.py b/trustgraph-flow/trustgraph/extract/kg/relationships/extract.py index 0fd7b9a8..82973662 100755 --- a/trustgraph-flow/trustgraph/extract/kg/relationships/extract.py +++ b/trustgraph-flow/trustgraph/extract/kg/relationships/extract.py @@ -71,15 +71,15 @@ class Processor(ConsumerProducer): return self.prompt.request_relationships(chunk) - def emit_edges(self, metadata, triples): + async def emit_edges(self, metadata, triples): t = Triples( metadata=metadata, triples=triples, ) - self.producer.send(t) + await self.send(t) - def handle(self, msg): + async def handle(self, msg): v = msg.value() print(f"Indexing {v.metadata.id}...", flush=True) @@ -166,7 +166,7 @@ class Processor(ConsumerProducer): o=Value(value=v.metadata.id, is_uri=True) )) - self.emit_edges( + await self.emit_edges( Metadata( id=v.metadata.id, metadata=[], @@ -203,5 +203,5 @@ class Processor(ConsumerProducer): def run(): - Processor.start(module, __doc__) + Processor.launch(module, __doc__) diff --git a/trustgraph-flow/trustgraph/extract/kg/topics/extract.py b/trustgraph-flow/trustgraph/extract/kg/topics/extract.py index 9181ae2c..efac2c05 100755 --- a/trustgraph-flow/trustgraph/extract/kg/topics/extract.py +++ b/trustgraph-flow/trustgraph/extract/kg/topics/extract.py @@ -69,15 +69,15 @@ class Processor(ConsumerProducer): return self.prompt.request_topics(chunk) - def emit_edge(self, metadata, s, p, o): + async def emit_edge(self, metadata, s, p, o): t = Triples( metadata=metadata, triples=[Triple(s=s, p=p, o=o)], ) - self.producer.send(t) + await self.send(t) - def handle(self, msg): + async def handle(self, msg): v = msg.value() print(f"Indexing {v.metadata.id}...", flush=True) @@ -104,7 +104,9 @@ class Processor(ConsumerProducer): s_value = Value(value=str(s_uri), is_uri=True) o_value = Value(value=str(o), is_uri=False) - self.emit_edge(v. metadata, s_value, DEFINITION_VALUE, o_value) + await self.emit_edge( + v.metadata, s_value, DEFINITION_VALUE, o_value + ) except Exception as e: print("Exception: ", e, flush=True) @@ -133,5 +135,5 @@ class Processor(ConsumerProducer): def run(): - Processor.start(module, __doc__) + Processor.launch(module, __doc__) diff --git a/trustgraph-flow/trustgraph/extract/object/row/extract.py b/trustgraph-flow/trustgraph/extract/object/row/extract.py index 185a59c3..73aa5edc 100755 --- a/trustgraph-flow/trustgraph/extract/object/row/extract.py +++ b/trustgraph-flow/trustgraph/extract/object/row/extract.py @@ -129,16 +129,16 @@ class Processor(ConsumerProducer): t = Rows( metadata=metadata, row_schema=self.row_schema, rows=rows ) - self.producer.send(t) + await self.producer.send(t) def emit_vec(self, metadata, name, vec, key_name, key): r = ObjectEmbeddings( metadata=metadata, vectors=vec, name=name, key_name=key_name, id=key ) - self.vec_prod.send(r) + await self.vec_prod.send(r) - def handle(self, msg): + async def handle(self, msg): v = msg.value() print(f"Indexing {v.metadata.id}...", flush=True) @@ -216,5 +216,5 @@ class Processor(ConsumerProducer): def run(): - Processor.start(module, __doc__) + Processor.launch(module, __doc__) diff --git a/trustgraph-flow/trustgraph/gateway/document_embeddings_load.py b/trustgraph-flow/trustgraph/gateway/document_embeddings_load.py index 39228221..17568eda 100644 --- a/trustgraph-flow/trustgraph/gateway/document_embeddings_load.py +++ b/trustgraph-flow/trustgraph/gateway/document_embeddings_load.py @@ -59,6 +59,6 @@ class DocumentEmbeddingsLoadEndpoint(SocketEndpoint): ], ) - self.publisher.send(None, elt) + await self.publisher.send(None, elt) running.stop() diff --git a/trustgraph-flow/trustgraph/gateway/graph_embeddings_load.py b/trustgraph-flow/trustgraph/gateway/graph_embeddings_load.py index e3fa1302..2b1f8291 100644 --- a/trustgraph-flow/trustgraph/gateway/graph_embeddings_load.py +++ b/trustgraph-flow/trustgraph/gateway/graph_embeddings_load.py @@ -59,7 +59,7 @@ class GraphEmbeddingsLoadEndpoint(SocketEndpoint): ] ) - self.publisher.send(None, elt) + await self.publisher.send(None, elt) running.stop() diff --git a/trustgraph-flow/trustgraph/gateway/triples_load.py b/trustgraph-flow/trustgraph/gateway/triples_load.py index f2ebd040..88fecd88 100644 --- a/trustgraph-flow/trustgraph/gateway/triples_load.py +++ b/trustgraph-flow/trustgraph/gateway/triples_load.py @@ -51,7 +51,7 @@ class TriplesLoadEndpoint(SocketEndpoint): triples=to_subgraph(data["triples"]), ) - self.publisher.send(None, elt) + await self.publisher.send(None, elt) running.stop() diff --git a/trustgraph-flow/trustgraph/librarian/librarian.py b/trustgraph-flow/trustgraph/librarian/librarian.py index df06c707..08010f3f 100644 --- a/trustgraph-flow/trustgraph/librarian/librarian.py +++ b/trustgraph-flow/trustgraph/librarian/librarian.py @@ -26,7 +26,7 @@ class Librarian: self.load_document = load_document self.load_text = load_text - def add(self, id, document): + async def add(self, id, document): if document.kind not in ( "text/plain", "application/pdf" @@ -41,9 +41,9 @@ class Librarian: self.table_store.add(object_id, document) if document.kind == "application/pdf": - self.load_document(id, document) + await self.load_document(id, document) elif document.kind == "text/plain": - self.load_text(id, document) + await self.load_text(id, document) print("Add complete", flush=True) @@ -53,3 +53,4 @@ class Librarian: info = None, ) + diff --git a/trustgraph-flow/trustgraph/librarian/service.py b/trustgraph-flow/trustgraph/librarian/service.py index 42c3b585..ade4ca38 100755 --- a/trustgraph-flow/trustgraph/librarian/service.py +++ b/trustgraph-flow/trustgraph/librarian/service.py @@ -110,14 +110,7 @@ class Processor(ConsumerProducer): listener=self.pulsar_listener, ) - self.document_load.start() - self.text_load.start() - self.triples_load.start() - - self.triples_sub = self.triples_load.subscribe_all("x") - self.triples_reader = threading.Thread(target=self.receive_triples) - self.triples_reader.start() self.librarian = Librarian( cassandra_host = cassandra_host.split(","), @@ -134,6 +127,16 @@ class Processor(ConsumerProducer): print("Initialised.", flush=True) + async def start(self): + + self.document_load.start() + self.text_load.start() + self.triples_load.start() + + self.triples_sub = self.triples_load.subscribe_all("x") + + self.triples_reader.start() + def receive_triples(self): print("Receive triples!") @@ -168,7 +171,7 @@ class Processor(ConsumerProducer): self.triples_load.stop() self.triples_load.join() - def load_document(self, id, document): + async def load_document(self, id, document): doc = Document( metadata = Metadata( @@ -182,7 +185,7 @@ class Processor(ConsumerProducer): self.document_load.send(None, doc) - def load_text(self, id, document): + async def load_text(self, id, document): doc = TextDocument( metadata = Metadata( @@ -202,7 +205,6 @@ class Processor(ConsumerProducer): raise RequestError("Null operation") if v.operation == "add": - print(v) if ( v.id and v.document and v.document.metadata and v.document.document and v.document.kind @@ -217,7 +219,7 @@ class Processor(ConsumerProducer): raise RequestError("Invalid operation: " + v.operation) - def handle(self, msg): + async def handle(self, msg): v = msg.value() @@ -236,11 +238,11 @@ class Processor(ConsumerProducer): message = str(e), ) ) - self.producer.send(resp, properties={"id": id}) + await self.send(resp, properties={"id": id}) return try: - resp = func() + resp = await func() except RequestError as e: resp = LibrarianResponse( error = Error( @@ -248,7 +250,7 @@ class Processor(ConsumerProducer): message = str(e), ) ) - self.producer.send(resp, properties={"id": id}) + await self.send(resp, properties={"id": id}) return except Exception as e: print("Exception:", e, flush=True) @@ -258,12 +260,12 @@ class Processor(ConsumerProducer): message = "Unhandled error: " + str(e), ) ) - self.producer.send(resp, properties={"id": id}) + await self.send(resp, properties={"id": id}) return print("Send response...", flush=True) - self.producer.send(resp, properties={"id": id}) + await self.send(resp, properties={"id": id}) print("Done.", flush=True) @@ -348,5 +350,5 @@ class Processor(ConsumerProducer): def run(): - Processor.start(module, __doc__) + Processor.launch(module, __doc__) diff --git a/trustgraph-flow/trustgraph/metering/counter.py b/trustgraph-flow/trustgraph/metering/counter.py index 6e6b829b..68ddf441 100644 --- a/trustgraph-flow/trustgraph/metering/counter.py +++ b/trustgraph-flow/trustgraph/metering/counter.py @@ -57,7 +57,7 @@ class Processor(Consumer): return model["input_price"], model["output_price"] return None, None # Return None if model is not found - def handle(self, msg): + async def handle(self, msg): v = msg.value() modelname = v.model @@ -98,4 +98,4 @@ class Processor(Consumer): def run(): - Processor.start(module, __doc__) + Processor.launch(module, __doc__) diff --git a/trustgraph-flow/trustgraph/model/prompt/generic/service.py b/trustgraph-flow/trustgraph/model/prompt/generic/service.py index 96c9be57..d4c1846d 100755 --- a/trustgraph-flow/trustgraph/model/prompt/generic/service.py +++ b/trustgraph-flow/trustgraph/model/prompt/generic/service.py @@ -77,7 +77,7 @@ class Processor(ConsumerProducer): return json.loads(json_str) - def handle(self, msg): + async def handle(self, msg): v = msg.value() @@ -91,32 +91,32 @@ class Processor(ConsumerProducer): if kind == "extract-definitions": - self.handle_extract_definitions(id, v) + await self.handle_extract_definitions(id, v) return elif kind == "extract-topics": - self.handle_extract_topics(id, v) + await self.handle_extract_topics(id, v) return elif kind == "extract-relationships": - self.handle_extract_relationships(id, v) + await self.handle_extract_relationships(id, v) return elif kind == "extract-rows": - self.handle_extract_rows(id, v) + await self.handle_extract_rows(id, v) return elif kind == "kg-prompt": - self.handle_kg_prompt(id, v) + await self.handle_kg_prompt(id, v) return elif kind == "document-prompt": - self.handle_document_prompt(id, v) + await self.handle_document_prompt(id, v) return else: @@ -124,7 +124,7 @@ class Processor(ConsumerProducer): print("Invalid kind.", flush=True) return - def handle_extract_definitions(self, id, v): + async def handle_extract_definitions(self, id, v): try: @@ -163,7 +163,7 @@ class Processor(ConsumerProducer): print("Send response...", flush=True) r = PromptResponse(definitions=output, error=None) - self.producer.send(r, properties={"id": id}) + await self.producer.send(r, properties={"id": id}) print("Done.", flush=True) @@ -181,9 +181,9 @@ class Processor(ConsumerProducer): response=None, ) - self.producer.send(r, properties={"id": id}) + await self.producer.send(r, properties={"id": id}) - def handle_extract_topics(self, id, v): + async def handle_extract_topics(self, id, v): try: @@ -222,7 +222,7 @@ class Processor(ConsumerProducer): print("Send response...", flush=True) r = PromptResponse(topics=output, error=None) - self.producer.send(r, properties={"id": id}) + await self.producer.send(r, properties={"id": id}) print("Done.", flush=True) @@ -240,9 +240,9 @@ class Processor(ConsumerProducer): response=None, ) - self.producer.send(r, properties={"id": id}) + await self.producer.send(r, properties={"id": id}) - def handle_extract_relationships(self, id, v): + async def handle_extract_relationships(self, id, v): try: @@ -294,7 +294,7 @@ class Processor(ConsumerProducer): print("Send response...", flush=True) r = PromptResponse(relationships=output, error=None) - self.producer.send(r, properties={"id": id}) + await self.producer.send(r, properties={"id": id}) print("Done.", flush=True) @@ -312,9 +312,9 @@ class Processor(ConsumerProducer): response=None, ) - self.producer.send(r, properties={"id": id}) + await self.producer.send(r, properties={"id": id}) - def handle_extract_rows(self, id, v): + async def handle_extract_rows(self, id, v): try: @@ -365,7 +365,7 @@ class Processor(ConsumerProducer): print("Send response...", flush=True) r = PromptResponse(rows=output, error=None) - self.producer.send(r, properties={"id": id}) + await self.producer.send(r, properties={"id": id}) print("Done.", flush=True) @@ -383,9 +383,9 @@ class Processor(ConsumerProducer): response=None, ) - self.producer.send(r, properties={"id": id}) + await self.producer.send(r, properties={"id": id}) - def handle_kg_prompt(self, id, v): + async def handle_kg_prompt(self, id, v): try: @@ -399,7 +399,7 @@ class Processor(ConsumerProducer): print("Send response...", flush=True) r = PromptResponse(answer=ans, error=None) - self.producer.send(r, properties={"id": id}) + await self.producer.send(r, properties={"id": id}) print("Done.", flush=True) @@ -417,9 +417,9 @@ class Processor(ConsumerProducer): response=None, ) - self.producer.send(r, properties={"id": id}) + await self.producer.send(r, properties={"id": id}) - def handle_document_prompt(self, id, v): + async def handle_document_prompt(self, id, v): try: @@ -436,7 +436,7 @@ class Processor(ConsumerProducer): print("Send response...", flush=True) r = PromptResponse(answer=ans, error=None) - self.producer.send(r, properties={"id": id}) + await self.producer.send(r, properties={"id": id}) print("Done.", flush=True) @@ -454,7 +454,7 @@ class Processor(ConsumerProducer): response=None, ) - self.producer.send(r, properties={"id": id}) + await self.producer.send(r, properties={"id": id}) @staticmethod def add_args(parser): @@ -480,5 +480,5 @@ def run(): raise RuntimeError("NOT IMPLEMENTED") - Processor.start(module, __doc__) + Processor.launch(module, __doc__) diff --git a/trustgraph-flow/trustgraph/model/prompt/template/service.py b/trustgraph-flow/trustgraph/model/prompt/template/service.py index 2e5416f4..f1f2c0ed 100755 --- a/trustgraph-flow/trustgraph/model/prompt/template/service.py +++ b/trustgraph-flow/trustgraph/model/prompt/template/service.py @@ -155,7 +155,7 @@ class Processor(ConsumerProducer): config = prompt_configuration, ) - def handle(self, msg): + async def handle(self, msg): v = msg.value() @@ -190,7 +190,7 @@ class Processor(ConsumerProducer): error=None, ) - self.producer.send(r, properties={"id": id}) + await self.send(r, properties={"id": id}) return @@ -205,7 +205,7 @@ class Processor(ConsumerProducer): error=None, ) - self.producer.send(r, properties={"id": id}) + await self.send(r, properties={"id": id}) return @@ -223,7 +223,7 @@ class Processor(ConsumerProducer): response=None, ) - self.producer.send(r, properties={"id": id}) + await self.send(r, properties={"id": id}) except Exception as e: @@ -239,7 +239,7 @@ class Processor(ConsumerProducer): response=None, ) - self.producer.send(r, properties={"id": id}) + await self.send(r, properties={"id": id}) @staticmethod def add_args(parser): @@ -293,5 +293,5 @@ class Processor(ConsumerProducer): def run(): - Processor.start(module, __doc__) + Processor.launch(module, __doc__) diff --git a/trustgraph-flow/trustgraph/model/text_completion/azure/llm.py b/trustgraph-flow/trustgraph/model/text_completion/azure/llm.py index 90be6962..8951b71e 100755 --- a/trustgraph-flow/trustgraph/model/text_completion/azure/llm.py +++ b/trustgraph-flow/trustgraph/model/text_completion/azure/llm.py @@ -123,7 +123,7 @@ class Processor(ConsumerProducer): return result - def handle(self, msg): + async def handle(self, msg): v = msg.value() @@ -154,7 +154,7 @@ class Processor(ConsumerProducer): print("Send response...", flush=True) r = TextCompletionResponse(response=resp, error=None, in_token=inputtokens, out_token=outputtokens, model=self.model) - self.producer.send(r, properties={"id": id}) + await self.producer.send(r, properties={"id": id}) except TooManyRequests: @@ -182,7 +182,7 @@ class Processor(ConsumerProducer): model=None, ) - self.producer.send(r, properties={"id": id}) + await self.producer.send(r, properties={"id": id}) self.consumer.acknowledge(msg) @@ -224,4 +224,4 @@ class Processor(ConsumerProducer): def run(): - Processor.start(module, __doc__) + Processor.launch(module, __doc__) diff --git a/trustgraph-flow/trustgraph/model/text_completion/azure_openai/llm.py b/trustgraph-flow/trustgraph/model/text_completion/azure_openai/llm.py index f5ecb8d6..53833898 100755 --- a/trustgraph-flow/trustgraph/model/text_completion/azure_openai/llm.py +++ b/trustgraph-flow/trustgraph/model/text_completion/azure_openai/llm.py @@ -84,7 +84,7 @@ class Processor(ConsumerProducer): azure_endpoint = endpoint, ) - def handle(self, msg): + async def handle(self, msg): v = msg.value() @@ -133,7 +133,7 @@ class Processor(ConsumerProducer): model=self.model ) - self.producer.send(r, properties={"id": id}) + await self.producer.send(r, properties={"id": id}) except RateLimitError: @@ -161,7 +161,7 @@ class Processor(ConsumerProducer): model=None, ) - self.producer.send(r, properties={"id": id}) + await self.producer.send(r, properties={"id": id}) self.consumer.acknowledge(msg) @@ -212,4 +212,4 @@ class Processor(ConsumerProducer): def run(): - Processor.start(module, __doc__) + Processor.launch(module, __doc__) diff --git a/trustgraph-flow/trustgraph/model/text_completion/claude/llm.py b/trustgraph-flow/trustgraph/model/text_completion/claude/llm.py index 5cfd8907..195a39e4 100755 --- a/trustgraph-flow/trustgraph/model/text_completion/claude/llm.py +++ b/trustgraph-flow/trustgraph/model/text_completion/claude/llm.py @@ -73,7 +73,7 @@ class Processor(ConsumerProducer): print("Initialised", flush=True) - def handle(self, msg): + async def handle(self, msg): v = msg.value() @@ -150,7 +150,7 @@ class Processor(ConsumerProducer): model=None, ) - self.producer.send(r, properties={"id": id}) + await self.send(r, properties={"id": id}) self.consumer.acknowledge(msg) @@ -190,6 +190,6 @@ class Processor(ConsumerProducer): def run(): - Processor.start(module, __doc__) + Processor.launch(module, __doc__) diff --git a/trustgraph-flow/trustgraph/model/text_completion/cohere/llm.py b/trustgraph-flow/trustgraph/model/text_completion/cohere/llm.py index 5b8e3ba9..4b18f7f1 100755 --- a/trustgraph-flow/trustgraph/model/text_completion/cohere/llm.py +++ b/trustgraph-flow/trustgraph/model/text_completion/cohere/llm.py @@ -69,7 +69,7 @@ class Processor(ConsumerProducer): print("Initialised", flush=True) - def handle(self, msg): + async def handle(self, msg): v = msg.value() @@ -106,7 +106,7 @@ class Processor(ConsumerProducer): print("Send response...", flush=True) r = TextCompletionResponse(response=resp, error=None, in_token=inputtokens, out_token=outputtokens, model=self.model) - self.send(r, properties={"id": id}) + self.await send(r, properties={"id": id}) print("Done.", flush=True) @@ -136,7 +136,7 @@ class Processor(ConsumerProducer): model=None, ) - self.producer.send(r, properties={"id": id}) + await self.producer.send(r, properties={"id": id}) self.consumer.acknowledge(msg) @@ -169,6 +169,6 @@ class Processor(ConsumerProducer): def run(): - Processor.start(module, __doc__) + Processor.launch(module, __doc__) diff --git a/trustgraph-flow/trustgraph/model/text_completion/googleaistudio/llm.py b/trustgraph-flow/trustgraph/model/text_completion/googleaistudio/llm.py index 5d5b23a0..98ecaf0e 100644 --- a/trustgraph-flow/trustgraph/model/text_completion/googleaistudio/llm.py +++ b/trustgraph-flow/trustgraph/model/text_completion/googleaistudio/llm.py @@ -102,7 +102,7 @@ class Processor(ConsumerProducer): print("Initialised", flush=True) - def handle(self, msg): + async def handle(self, msg): v = msg.value() @@ -146,7 +146,7 @@ class Processor(ConsumerProducer): out_token=outputtokens, model=self.model ) - self.send(r, properties={"id": id}) + await self.send(r, properties={"id": id}) print("Done.", flush=True) @@ -177,7 +177,7 @@ class Processor(ConsumerProducer): model=None, ) - self.producer.send(r, properties={"id": id}) + await self.send(r, properties={"id": id}) self.consumer.acknowledge(msg) @@ -217,6 +217,6 @@ class Processor(ConsumerProducer): def run(): - Processor.start(module, __doc__) + Processor.launch(module, __doc__) diff --git a/trustgraph-flow/trustgraph/model/text_completion/llamafile/llm.py b/trustgraph-flow/trustgraph/model/text_completion/llamafile/llm.py index 65a2b171..483412a2 100755 --- a/trustgraph-flow/trustgraph/model/text_completion/llamafile/llm.py +++ b/trustgraph-flow/trustgraph/model/text_completion/llamafile/llm.py @@ -74,7 +74,7 @@ class Processor(ConsumerProducer): print("Initialised", flush=True) - def handle(self, msg): + async def handle(self, msg): v = msg.value() @@ -122,7 +122,7 @@ class Processor(ConsumerProducer): out_token=outputtokens, model="llama.cpp" ) - self.send(r, properties={"id": id}) + await self.send(r, properties={"id": id}) print("Done.", flush=True) @@ -145,7 +145,7 @@ class Processor(ConsumerProducer): model=None, ) - self.producer.send(r, properties={"id": id}) + await self.send(r, properties={"id": id}) self.consumer.acknowledge(msg) @@ -185,6 +185,6 @@ class Processor(ConsumerProducer): def run(): - Processor.start(module, __doc__) + Processor.launch(module, __doc__) diff --git a/trustgraph-flow/trustgraph/model/text_completion/ollama/llm.py b/trustgraph-flow/trustgraph/model/text_completion/ollama/llm.py index 8c5bd3dc..6d825bac 100755 --- a/trustgraph-flow/trustgraph/model/text_completion/ollama/llm.py +++ b/trustgraph-flow/trustgraph/model/text_completion/ollama/llm.py @@ -71,7 +71,7 @@ class Processor(ConsumerProducer): self.model = model self.llm = Client(host=ollama) - def handle(self, msg): + async def handle(self, msg): v = msg.value() @@ -96,7 +96,7 @@ class Processor(ConsumerProducer): r = TextCompletionResponse(response=response_text, error=None, in_token=inputtokens, out_token=outputtokens, model="ollama") - self.send(r, properties={"id": id}) + await self.send(r, properties={"id": id}) print("Done.", flush=True) @@ -119,7 +119,7 @@ class Processor(ConsumerProducer): model=None, ) - self.producer.send(r, properties={"id": id}) + await self.send(r, properties={"id": id}) self.consumer.acknowledge(msg) @@ -145,6 +145,6 @@ class Processor(ConsumerProducer): def run(): - Processor.start(module, __doc__) + Processor.launch(module, __doc__) diff --git a/trustgraph-flow/trustgraph/model/text_completion/openai/llm.py b/trustgraph-flow/trustgraph/model/text_completion/openai/llm.py index c2b948d5..ebfae9ed 100755 --- a/trustgraph-flow/trustgraph/model/text_completion/openai/llm.py +++ b/trustgraph-flow/trustgraph/model/text_completion/openai/llm.py @@ -73,7 +73,7 @@ class Processor(ConsumerProducer): print("Initialised", flush=True) - def handle(self, msg): + async def handle(self, msg): v = msg.value() @@ -126,7 +126,7 @@ class Processor(ConsumerProducer): out_token=outputtokens, model=self.model ) - self.send(r, properties={"id": id}) + await self.send(r, properties={"id": id}) print("Done.", flush=True) @@ -156,7 +156,7 @@ class Processor(ConsumerProducer): model=None, ) - self.producer.send(r, properties={"id": id}) + await self.send(r, properties={"id": id}) self.consumer.acknowledge(msg) @@ -196,6 +196,6 @@ class Processor(ConsumerProducer): def run(): - Processor.start(module, __doc__) + Processor.launch(module, __doc__) diff --git a/trustgraph-flow/trustgraph/query/doc_embeddings/milvus/service.py b/trustgraph-flow/trustgraph/query/doc_embeddings/milvus/service.py index 8e106e6f..b16399e9 100755 --- a/trustgraph-flow/trustgraph/query/doc_embeddings/milvus/service.py +++ b/trustgraph-flow/trustgraph/query/doc_embeddings/milvus/service.py @@ -40,7 +40,7 @@ class Processor(ConsumerProducer): self.vecstore = DocVectors(store_uri) - def handle(self, msg): + async def handle(self, msg): try: @@ -64,7 +64,7 @@ class Processor(ConsumerProducer): print("Send response...", flush=True) r = DocumentEmbeddingsResponse(documents=chunks, error=None) - self.producer.send(r, properties={"id": id}) + await self.send(r, properties={"id": id}) print("Done.", flush=True) @@ -82,7 +82,7 @@ class Processor(ConsumerProducer): documents=None, ) - self.producer.send(r, properties={"id": id}) + await self.send(r, properties={"id": id}) self.consumer.acknowledge(msg) @@ -102,5 +102,5 @@ class Processor(ConsumerProducer): def run(): - Processor.start(module, __doc__) + Processor.launch(module, __doc__) diff --git a/trustgraph-flow/trustgraph/query/doc_embeddings/pinecone/service.py b/trustgraph-flow/trustgraph/query/doc_embeddings/pinecone/service.py index b8502143..6a88671c 100755 --- a/trustgraph-flow/trustgraph/query/doc_embeddings/pinecone/service.py +++ b/trustgraph-flow/trustgraph/query/doc_embeddings/pinecone/service.py @@ -56,7 +56,7 @@ class Processor(ConsumerProducer): } ) - def handle(self, msg): + async def handle(self, msg): try: @@ -100,7 +100,7 @@ class Processor(ConsumerProducer): print("Send response...", flush=True) r = DocumentEmbeddingsResponse(documents=chunks, error=None) - self.producer.send(r, properties={"id": id}) + await self.send(r, properties={"id": id}) print("Done.", flush=True) @@ -118,7 +118,7 @@ class Processor(ConsumerProducer): documents=None, ) - self.producer.send(r, properties={"id": id}) + await self.send(r, properties={"id": id}) self.consumer.acknowledge(msg) @@ -143,5 +143,5 @@ class Processor(ConsumerProducer): def run(): - Processor.start(module, __doc__) + Processor.launch(module, __doc__) diff --git a/trustgraph-flow/trustgraph/query/doc_embeddings/qdrant/service.py b/trustgraph-flow/trustgraph/query/doc_embeddings/qdrant/service.py index dd53862f..128203ad 100755 --- a/trustgraph-flow/trustgraph/query/doc_embeddings/qdrant/service.py +++ b/trustgraph-flow/trustgraph/query/doc_embeddings/qdrant/service.py @@ -47,7 +47,7 @@ class Processor(ConsumerProducer): self.client = QdrantClient(url=store_uri, api_key=api_key) - def handle(self, msg): + async def handle(self, msg): try: @@ -81,7 +81,7 @@ class Processor(ConsumerProducer): print("Send response...", flush=True) r = DocumentEmbeddingsResponse(documents=chunks, error=None) - self.producer.send(r, properties={"id": id}) + await self.send(r, properties={"id": id}) print("Done.", flush=True) @@ -99,7 +99,7 @@ class Processor(ConsumerProducer): documents=None, ) - self.producer.send(r, properties={"id": id}) + await self.send(r, properties={"id": id}) self.consumer.acknowledge(msg) @@ -125,5 +125,5 @@ class Processor(ConsumerProducer): def run(): - Processor.start(module, __doc__) + Processor.launch(module, __doc__) diff --git a/trustgraph-flow/trustgraph/query/graph_embeddings/milvus/service.py b/trustgraph-flow/trustgraph/query/graph_embeddings/milvus/service.py index b5f9ae5b..8dd8d04d 100755 --- a/trustgraph-flow/trustgraph/query/graph_embeddings/milvus/service.py +++ b/trustgraph-flow/trustgraph/query/graph_embeddings/milvus/service.py @@ -46,7 +46,7 @@ class Processor(ConsumerProducer): else: return Value(value=ent, is_uri=False) - def handle(self, msg): + async def handle(self, msg): try: @@ -79,7 +79,7 @@ class Processor(ConsumerProducer): print("Send response...", flush=True) r = GraphEmbeddingsResponse(entities=entities, error=None) - self.producer.send(r, properties={"id": id}) + await self.send(r, properties={"id": id}) print("Done.", flush=True) @@ -97,7 +97,7 @@ class Processor(ConsumerProducer): entities=None, ) - self.producer.send(r, properties={"id": id}) + await self.send(r, properties={"id": id}) self.consumer.acknowledge(msg) @@ -117,5 +117,5 @@ class Processor(ConsumerProducer): def run(): - Processor.start(module, __doc__) + Processor.launch(module, __doc__) diff --git a/trustgraph-flow/trustgraph/query/graph_embeddings/pinecone/service.py b/trustgraph-flow/trustgraph/query/graph_embeddings/pinecone/service.py index 2534d278..90cfc6de 100755 --- a/trustgraph-flow/trustgraph/query/graph_embeddings/pinecone/service.py +++ b/trustgraph-flow/trustgraph/query/graph_embeddings/pinecone/service.py @@ -62,7 +62,7 @@ class Processor(ConsumerProducer): else: return Value(value=ent, is_uri=False) - def handle(self, msg): + async def handle(self, msg): try: @@ -120,7 +120,7 @@ class Processor(ConsumerProducer): print("Send response...", flush=True) r = GraphEmbeddingsResponse(entities=entities, error=None) - self.producer.send(r, properties={"id": id}) + await self.send(r, properties={"id": id}) print("Done.", flush=True) @@ -138,7 +138,7 @@ class Processor(ConsumerProducer): entities=None, ) - self.producer.send(r, properties={"id": id}) + await self.send(r, properties={"id": id}) self.consumer.acknowledge(msg) @@ -163,5 +163,5 @@ class Processor(ConsumerProducer): def run(): - Processor.start(module, __doc__) + Processor.launch(module, __doc__) diff --git a/trustgraph-flow/trustgraph/query/graph_embeddings/qdrant/service.py b/trustgraph-flow/trustgraph/query/graph_embeddings/qdrant/service.py index 4e9492c6..dc3e28f3 100755 --- a/trustgraph-flow/trustgraph/query/graph_embeddings/qdrant/service.py +++ b/trustgraph-flow/trustgraph/query/graph_embeddings/qdrant/service.py @@ -52,7 +52,7 @@ class Processor(ConsumerProducer): else: return Value(value=ent, is_uri=False) - def handle(self, msg): + async def handle(self, msg): try: @@ -106,7 +106,7 @@ class Processor(ConsumerProducer): print("Send response...", flush=True) r = GraphEmbeddingsResponse(entities=entities, error=None) - self.producer.send(r, properties={"id": id}) + await self.send(r, properties={"id": id}) print("Done.", flush=True) @@ -124,7 +124,7 @@ class Processor(ConsumerProducer): entities=None, ) - self.producer.send(r, properties={"id": id}) + await self.send(r, properties={"id": id}) self.consumer.acknowledge(msg) @@ -150,5 +150,5 @@ class Processor(ConsumerProducer): def run(): - Processor.start(module, __doc__) + Processor.launch(module, __doc__) diff --git a/trustgraph-flow/trustgraph/query/triples/cassandra/service.py b/trustgraph-flow/trustgraph/query/triples/cassandra/service.py index 22fbf84d..e3687756 100755 --- a/trustgraph-flow/trustgraph/query/triples/cassandra/service.py +++ b/trustgraph-flow/trustgraph/query/triples/cassandra/service.py @@ -53,7 +53,7 @@ class Processor(ConsumerProducer): else: return Value(value=ent, is_uri=False) - def handle(self, msg): + async def handle(self, msg): try: @@ -154,7 +154,7 @@ class Processor(ConsumerProducer): print("Send response...", flush=True) r = TriplesQueryResponse(triples=triples, error=None) - self.producer.send(r, properties={"id": id}) + await self.send(r, properties={"id": id}) print("Done.", flush=True) @@ -172,7 +172,7 @@ class Processor(ConsumerProducer): response=None, ) - self.producer.send(r, properties={"id": id}) + await self.send(r, properties={"id": id}) self.consumer.acknowledge(msg) @@ -205,5 +205,5 @@ class Processor(ConsumerProducer): def run(): - Processor.start(module, __doc__) + Processor.launch(module, __doc__) diff --git a/trustgraph-flow/trustgraph/query/triples/falkordb/service.py b/trustgraph-flow/trustgraph/query/triples/falkordb/service.py index 1d77bb15..56fed6d3 100755 --- a/trustgraph-flow/trustgraph/query/triples/falkordb/service.py +++ b/trustgraph-flow/trustgraph/query/triples/falkordb/service.py @@ -54,7 +54,7 @@ class Processor(ConsumerProducer): else: return Value(value=ent, is_uri=False) - def handle(self, msg): + async def handle(self, msg): try: @@ -301,7 +301,7 @@ class Processor(ConsumerProducer): print("Send response...", flush=True) r = TriplesQueryResponse(triples=triples, error=None) - self.producer.send(r, properties={"id": id}) + await self.send(r, properties={"id": id}) print("Done.", flush=True) @@ -319,7 +319,7 @@ class Processor(ConsumerProducer): response=None, ) - self.producer.send(r, properties={"id": id}) + await self.send(r, properties={"id": id}) self.consumer.acknowledge(msg) @@ -345,5 +345,5 @@ class Processor(ConsumerProducer): def run(): - Processor.start(module, __doc__) + Processor.launch(module, __doc__) diff --git a/trustgraph-flow/trustgraph/query/triples/memgraph/service.py b/trustgraph-flow/trustgraph/query/triples/memgraph/service.py index 46dd19e3..f442c4ef 100755 --- a/trustgraph-flow/trustgraph/query/triples/memgraph/service.py +++ b/trustgraph-flow/trustgraph/query/triples/memgraph/service.py @@ -58,7 +58,7 @@ class Processor(ConsumerProducer): else: return Value(value=ent, is_uri=False) - def handle(self, msg): + async def handle(self, msg): try: @@ -313,7 +313,7 @@ class Processor(ConsumerProducer): print("Send response...", flush=True) r = TriplesQueryResponse(triples=triples, error=None) - self.producer.send(r, properties={"id": id}) + await self.send(r, properties={"id": id}) print("Done.", flush=True) @@ -331,7 +331,7 @@ class Processor(ConsumerProducer): response=None, ) - self.producer.send(r, properties={"id": id}) + await self.send(r, properties={"id": id}) self.consumer.acknowledge(msg) @@ -369,5 +369,5 @@ class Processor(ConsumerProducer): def run(): - Processor.start(module, __doc__) + Processor.launch(module, __doc__) diff --git a/trustgraph-flow/trustgraph/query/triples/neo4j/service.py b/trustgraph-flow/trustgraph/query/triples/neo4j/service.py index d60bc4f4..49ba0345 100755 --- a/trustgraph-flow/trustgraph/query/triples/neo4j/service.py +++ b/trustgraph-flow/trustgraph/query/triples/neo4j/service.py @@ -58,7 +58,7 @@ class Processor(ConsumerProducer): else: return Value(value=ent, is_uri=False) - def handle(self, msg): + async def handle(self, msg): try: @@ -297,7 +297,7 @@ class Processor(ConsumerProducer): print("Send response...", flush=True) r = TriplesQueryResponse(triples=triples, error=None) - self.producer.send(r, properties={"id": id}) + await self.send(r, properties={"id": id}) print("Done.", flush=True) @@ -315,7 +315,7 @@ class Processor(ConsumerProducer): response=None, ) - self.producer.send(r, properties={"id": id}) + await self.send(r, properties={"id": id}) self.consumer.acknowledge(msg) @@ -353,5 +353,5 @@ class Processor(ConsumerProducer): def run(): - Processor.start(module, __doc__) + Processor.launch(module, __doc__) diff --git a/trustgraph-flow/trustgraph/retrieval/document_rag/rag.py b/trustgraph-flow/trustgraph/retrieval/document_rag/rag.py index 4310cdbd..29203b4c 100755 --- a/trustgraph-flow/trustgraph/retrieval/document_rag/rag.py +++ b/trustgraph-flow/trustgraph/retrieval/document_rag/rag.py @@ -78,7 +78,7 @@ class Processor(ConsumerProducer): module=module, ) - def handle(self, msg): + async def handle(self, msg): try: @@ -93,7 +93,7 @@ class Processor(ConsumerProducer): print("Send response...", flush=True) r = DocumentRagResponse(response = response, error=None) - self.producer.send(r, properties={"id": id}) + await self.send(r, properties={"id": id}) print("Done.", flush=True) @@ -111,7 +111,7 @@ class Processor(ConsumerProducer): response=None, ) - self.producer.send(r, properties={"id": id}) + await self.send(r, properties={"id": id}) self.consumer.acknowledge(msg) @@ -161,5 +161,5 @@ class Processor(ConsumerProducer): def run(): - Processor.start(module, __doc__) + Processor.launch(module, __doc__) diff --git a/trustgraph-flow/trustgraph/retrieval/graph_rag/rag.py b/trustgraph-flow/trustgraph/retrieval/graph_rag/rag.py index 1219050e..4f7b373c 100755 --- a/trustgraph-flow/trustgraph/retrieval/graph_rag/rag.py +++ b/trustgraph-flow/trustgraph/retrieval/graph_rag/rag.py @@ -97,7 +97,7 @@ class Processor(ConsumerProducer): module=module, ) - def handle(self, msg): + async def handle(self, msg): try: @@ -114,7 +114,7 @@ class Processor(ConsumerProducer): print("Send response...", flush=True) r = GraphRagResponse(response = response, error=None) - self.producer.send(r, properties={"id": id}) + await self.send(r, properties={"id": id}) print("Done.", flush=True) @@ -132,7 +132,7 @@ class Processor(ConsumerProducer): response=None, ) - self.producer.send(r, properties={"id": id}) + await self.send(r, properties={"id": id}) self.consumer.acknowledge(msg) @@ -215,5 +215,5 @@ class Processor(ConsumerProducer): def run(): - Processor.start(module, __doc__) + Processor.launch(module, __doc__) diff --git a/trustgraph-flow/trustgraph/storage/doc_embeddings/milvus/write.py b/trustgraph-flow/trustgraph/storage/doc_embeddings/milvus/write.py index bfa6c123..b4dbc486 100755 --- a/trustgraph-flow/trustgraph/storage/doc_embeddings/milvus/write.py +++ b/trustgraph-flow/trustgraph/storage/doc_embeddings/milvus/write.py @@ -35,7 +35,7 @@ class Processor(Consumer): self.vecstore = DocVectors(store_uri) - def handle(self, msg): + async def handle(self, msg): v = msg.value() @@ -65,5 +65,5 @@ class Processor(Consumer): def run(): - Processor.start(module, __doc__) + Processor.launch(module, __doc__) diff --git a/trustgraph-flow/trustgraph/storage/doc_embeddings/pinecone/write.py b/trustgraph-flow/trustgraph/storage/doc_embeddings/pinecone/write.py index c59ecd7b..9e91db9a 100644 --- a/trustgraph-flow/trustgraph/storage/doc_embeddings/pinecone/write.py +++ b/trustgraph-flow/trustgraph/storage/doc_embeddings/pinecone/write.py @@ -61,7 +61,7 @@ class Processor(Consumer): self.last_index_name = None - def handle(self, msg): + async def handle(self, msg): v = msg.value() @@ -166,5 +166,5 @@ class Processor(Consumer): def run(): - Processor.start(module, __doc__) + Processor.launch(module, __doc__) diff --git a/trustgraph-flow/trustgraph/storage/doc_embeddings/qdrant/write.py b/trustgraph-flow/trustgraph/storage/doc_embeddings/qdrant/write.py index a01fd9e4..810c1931 100644 --- a/trustgraph-flow/trustgraph/storage/doc_embeddings/qdrant/write.py +++ b/trustgraph-flow/trustgraph/storage/doc_embeddings/qdrant/write.py @@ -42,7 +42,7 @@ class Processor(Consumer): self.client = QdrantClient(url=store_uri) - def handle(self, msg): + async def handle(self, msg): v = msg.value() @@ -110,5 +110,5 @@ class Processor(Consumer): def run(): - Processor.start(module, __doc__) + Processor.launch(module, __doc__) diff --git a/trustgraph-flow/trustgraph/storage/graph_embeddings/milvus/write.py b/trustgraph-flow/trustgraph/storage/graph_embeddings/milvus/write.py index e1379577..b2d40306 100755 --- a/trustgraph-flow/trustgraph/storage/graph_embeddings/milvus/write.py +++ b/trustgraph-flow/trustgraph/storage/graph_embeddings/milvus/write.py @@ -34,7 +34,7 @@ class Processor(Consumer): self.vecstore = EntityVectors(store_uri) - def handle(self, msg): + async def handle(self, msg): v = msg.value() @@ -59,5 +59,5 @@ class Processor(Consumer): def run(): - Processor.start(module, __doc__) + Processor.launch(module, __doc__) diff --git a/trustgraph-flow/trustgraph/storage/graph_embeddings/pinecone/write.py b/trustgraph-flow/trustgraph/storage/graph_embeddings/pinecone/write.py index a32ff627..83861b54 100755 --- a/trustgraph-flow/trustgraph/storage/graph_embeddings/pinecone/write.py +++ b/trustgraph-flow/trustgraph/storage/graph_embeddings/pinecone/write.py @@ -88,7 +88,7 @@ class Processor(Consumer): "Gave up waiting for index creation" ) - def handle(self, msg): + async def handle(self, msg): v = msg.value() @@ -170,5 +170,5 @@ class Processor(Consumer): def run(): - Processor.start(module, __doc__) + Processor.launch(module, __doc__) diff --git a/trustgraph-flow/trustgraph/storage/graph_embeddings/qdrant/write.py b/trustgraph-flow/trustgraph/storage/graph_embeddings/qdrant/write.py index ffe33565..6b0d7371 100755 --- a/trustgraph-flow/trustgraph/storage/graph_embeddings/qdrant/write.py +++ b/trustgraph-flow/trustgraph/storage/graph_embeddings/qdrant/write.py @@ -67,7 +67,7 @@ class Processor(Consumer): return cname - def handle(self, msg): + async def handle(self, msg): v = msg.value() @@ -117,5 +117,5 @@ class Processor(Consumer): def run(): - Processor.start(module, __doc__) + Processor.launch(module, __doc__) diff --git a/trustgraph-flow/trustgraph/storage/object_embeddings/milvus/write.py b/trustgraph-flow/trustgraph/storage/object_embeddings/milvus/write.py index 468b357a..5490af97 100755 --- a/trustgraph-flow/trustgraph/storage/object_embeddings/milvus/write.py +++ b/trustgraph-flow/trustgraph/storage/object_embeddings/milvus/write.py @@ -34,7 +34,7 @@ class Processor(Consumer): self.vecstore = ObjectVectors(store_uri) - def handle(self, msg): + async def handle(self, msg): v = msg.value() @@ -57,5 +57,5 @@ class Processor(Consumer): def run(): - Processor.start(module, __doc__) + Processor.launch(module, __doc__) diff --git a/trustgraph-flow/trustgraph/storage/rows/cassandra/write.py b/trustgraph-flow/trustgraph/storage/rows/cassandra/write.py index fc8f6686..c8f3b9e1 100755 --- a/trustgraph-flow/trustgraph/storage/rows/cassandra/write.py +++ b/trustgraph-flow/trustgraph/storage/rows/cassandra/write.py @@ -62,7 +62,7 @@ class Processor(Consumer): self.session.execute("use trustgraph"); - def handle(self, msg): + async def handle(self, msg): try: @@ -143,5 +143,5 @@ class Processor(Consumer): def run(): - Processor.start(module, __doc__) + Processor.launch(module, __doc__) diff --git a/trustgraph-flow/trustgraph/storage/triples/cassandra/write.py b/trustgraph-flow/trustgraph/storage/triples/cassandra/write.py index d940d0ec..17b5ae9a 100755 --- a/trustgraph-flow/trustgraph/storage/triples/cassandra/write.py +++ b/trustgraph-flow/trustgraph/storage/triples/cassandra/write.py @@ -47,7 +47,7 @@ class Processor(Consumer): self.password = graph_password self.table = None - def handle(self, msg): + async def handle(self, msg): v = msg.value() @@ -110,5 +110,5 @@ class Processor(Consumer): def run(): - Processor.start(module, __doc__) + Processor.launch(module, __doc__) diff --git a/trustgraph-flow/trustgraph/storage/triples/falkordb/write.py b/trustgraph-flow/trustgraph/storage/triples/falkordb/write.py index 3c7d1660..2d0ae38a 100755 --- a/trustgraph-flow/trustgraph/storage/triples/falkordb/write.py +++ b/trustgraph-flow/trustgraph/storage/triples/falkordb/write.py @@ -118,7 +118,7 @@ class Processor(Consumer): time=res.run_time_ms )) - def handle(self, msg): + async def handle(self, msg): v = msg.value() @@ -154,5 +154,5 @@ class Processor(Consumer): def run(): - Processor.start(module, __doc__) + Processor.launch(module, __doc__) diff --git a/trustgraph-flow/trustgraph/storage/triples/memgraph/write.py b/trustgraph-flow/trustgraph/storage/triples/memgraph/write.py index 7295e691..620e669e 100755 --- a/trustgraph-flow/trustgraph/storage/triples/memgraph/write.py +++ b/trustgraph-flow/trustgraph/storage/triples/memgraph/write.py @@ -205,7 +205,7 @@ class Processor(Consumer): src=t.s.value, dest=t.o.value, uri=t.p.value, ) - def handle(self, msg): + async def handle(self, msg): v = msg.value() @@ -256,5 +256,5 @@ class Processor(Consumer): def run(): - Processor.start(module, __doc__) + Processor.launch(module, __doc__) diff --git a/trustgraph-flow/trustgraph/storage/triples/neo4j/write.py b/trustgraph-flow/trustgraph/storage/triples/neo4j/write.py index 18b40129..3323f912 100755 --- a/trustgraph-flow/trustgraph/storage/triples/neo4j/write.py +++ b/trustgraph-flow/trustgraph/storage/triples/neo4j/write.py @@ -158,7 +158,7 @@ class Processor(Consumer): time=summary.result_available_after )) - def handle(self, msg): + async def handle(self, msg): v = msg.value() @@ -206,5 +206,5 @@ class Processor(Consumer): def run(): - Processor.start(module, __doc__) + Processor.launch(module, __doc__) diff --git a/trustgraph-vertexai/trustgraph/model/text_completion/vertexai/llm.py b/trustgraph-vertexai/trustgraph/model/text_completion/vertexai/llm.py index d6a2efec..efe63ea6 100755 --- a/trustgraph-vertexai/trustgraph/model/text_completion/vertexai/llm.py +++ b/trustgraph-vertexai/trustgraph/model/text_completion/vertexai/llm.py @@ -131,7 +131,7 @@ class Processor(ConsumerProducer): print("Initialisation complete", flush=True) - def handle(self, msg): + async def handle(self, msg): try: @@ -248,5 +248,5 @@ class Processor(ConsumerProducer): def run(): - Processor.start(module, __doc__) + Processor.launch(module, __doc__)