From 1e861dc98432ba29317028410c1aefe1a67dd182 Mon Sep 17 00:00:00 2001 From: cybermaggedon Date: Fri, 9 May 2025 09:25:56 +0100 Subject: [PATCH 1/3] Fix LLM launch bugs (#377) --- .../trustgraph/model/text_completion/llamafile/llm.py | 1 + trustgraph-flow/trustgraph/model/text_completion/openai/llm.py | 1 - 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/trustgraph-flow/trustgraph/model/text_completion/llamafile/llm.py b/trustgraph-flow/trustgraph/model/text_completion/llamafile/llm.py index 76300c5a..baede64c 100755 --- a/trustgraph-flow/trustgraph/model/text_completion/llamafile/llm.py +++ b/trustgraph-flow/trustgraph/model/text_completion/llamafile/llm.py @@ -5,6 +5,7 @@ Input is prompt, output is response. """ from openai import OpenAI +import os from .... exceptions import TooManyRequests from .... base import LlmService, LlmResult diff --git a/trustgraph-flow/trustgraph/model/text_completion/openai/llm.py b/trustgraph-flow/trustgraph/model/text_completion/openai/llm.py index c8bfcdda..a52f400e 100755 --- a/trustgraph-flow/trustgraph/model/text_completion/openai/llm.py +++ b/trustgraph-flow/trustgraph/model/text_completion/openai/llm.py @@ -12,7 +12,6 @@ from .... base import LlmService, LlmResult default_ident = "text-completion" -default_subscriber = module default_model = 'gpt-3.5-turbo' default_temperature = 0.0 default_max_output = 4096 From 848d93922b0cd24dfb2e7d62576eedbef7fe0151 Mon Sep 17 00:00:00 2001 From: Cyber MacGeddon Date: Mon, 12 May 2025 16:27:04 +0100 Subject: [PATCH 2/3] Port Tesseract OCR code to new API --- .../trustgraph/decoding/ocr/pdf_decoder.py | 51 +++++++++---------- 1 file changed, 25 insertions(+), 26 deletions(-) diff --git a/trustgraph-ocr/trustgraph/decoding/ocr/pdf_decoder.py b/trustgraph-ocr/trustgraph/decoding/ocr/pdf_decoder.py index 5fa436b8..8cf0b719 100755 --- a/trustgraph-ocr/trustgraph/decoding/ocr/pdf_decoder.py +++ b/trustgraph-ocr/trustgraph/decoding/ocr/pdf_decoder.py @@ -10,39 +10,42 @@ import pytesseract from pdf2image import convert_from_bytes from ... schema import Document, TextDocument, Metadata -from ... schema import document_ingest_queue, text_ingest_queue -from ... log_level import LogLevel -from ... base import ConsumerProducer +from ... base import FlowProcessor, ConsumerSpec, ProducerSpec -module = "ocr" +default_ident = "pdf-decoder" -default_input_queue = document_ingest_queue -default_output_queue = text_ingest_queue -default_subscriber = module - -class Processor(ConsumerProducer): +class Processor(FlowProcessor): def __init__(self, **params): - input_queue = params.get("input_queue", default_input_queue) - output_queue = params.get("output_queue", default_output_queue) - subscriber = params.get("subscriber", default_subscriber) + id = params.get("id", default_ident) super(Processor, self).__init__( **params | { - "input_queue": input_queue, - "output_queue": output_queue, - "subscriber": subscriber, - "input_schema": Document, - "output_schema": TextDocument, + "id": id, } ) + self.register_specification( + ConsumerSpec( + name = "input", + schema = Document, + handler = self.on_message, + ) + ) + + self.register_specification( + ProducerSpec( + name = "output", + schema = TextDocument, + ) + ) + print("PDF OCR inited") - async def handle(self, msg): + async def on_message(self, msg, consumer, flow): - print("PDF message received") + print("PDF message received", flush=True) v = msg.value() @@ -65,19 +68,15 @@ class Processor(ConsumerProducer): text=text.encode("utf-8"), ) - await self.send(r) + await flow("output").send(r) print("Done.", flush=True) @staticmethod def add_args(parser): - - ConsumerProducer.add_args( - parser, default_input_queue, default_subscriber, - default_output_queue, - ) + FlowProcessor.add_args(parser) def run(): - Processor.launch(module, __doc__) + Processor.launch(default_ident, __doc__) From 410636b409e3f199d00cbf0f19647a294dd74031 Mon Sep 17 00:00:00 2001 From: Cyber MacGeddon Date: Sat, 17 May 2025 13:01:52 +0100 Subject: [PATCH 3/3] Fix missing queue initialisation --- .../trustgraph/gateway/dispatch/document_embeddings_import.py | 3 +++ .../trustgraph/gateway/dispatch/graph_embeddings_import.py | 3 +++ trustgraph-flow/trustgraph/gateway/dispatch/manager.py | 2 ++ trustgraph-flow/trustgraph/gateway/dispatch/triples_import.py | 3 +++ 4 files changed, 11 insertions(+) diff --git a/trustgraph-flow/trustgraph/gateway/dispatch/document_embeddings_import.py b/trustgraph-flow/trustgraph/gateway/dispatch/document_embeddings_import.py index 1f459081..e486f613 100644 --- a/trustgraph-flow/trustgraph/gateway/dispatch/document_embeddings_import.py +++ b/trustgraph-flow/trustgraph/gateway/dispatch/document_embeddings_import.py @@ -22,6 +22,9 @@ class DocumentEmbeddingsImport: pulsar_client, topic = queue, schema = DocumentEmbeddings ) + async def start(self): + await self.publisher.start() + async def destroy(self): self.running.stop() diff --git a/trustgraph-flow/trustgraph/gateway/dispatch/graph_embeddings_import.py b/trustgraph-flow/trustgraph/gateway/dispatch/graph_embeddings_import.py index 70e78c87..85174460 100644 --- a/trustgraph-flow/trustgraph/gateway/dispatch/graph_embeddings_import.py +++ b/trustgraph-flow/trustgraph/gateway/dispatch/graph_embeddings_import.py @@ -22,6 +22,9 @@ class GraphEmbeddingsImport: pulsar_client, topic = queue, schema = GraphEmbeddings ) + async def start(self): + await self.publisher.start() + async def destroy(self): self.running.stop() diff --git a/trustgraph-flow/trustgraph/gateway/dispatch/manager.py b/trustgraph-flow/trustgraph/gateway/dispatch/manager.py index 7896d588..f0cbc234 100644 --- a/trustgraph-flow/trustgraph/gateway/dispatch/manager.py +++ b/trustgraph-flow/trustgraph/gateway/dispatch/manager.py @@ -160,6 +160,8 @@ class DispatcherManager: queue = qconfig, ) + await dispatcher.start() + return dispatcher async def process_flow_export(self, ws, running, params): diff --git a/trustgraph-flow/trustgraph/gateway/dispatch/triples_import.py b/trustgraph-flow/trustgraph/gateway/dispatch/triples_import.py index 9b59a0ed..687b424a 100644 --- a/trustgraph-flow/trustgraph/gateway/dispatch/triples_import.py +++ b/trustgraph-flow/trustgraph/gateway/dispatch/triples_import.py @@ -22,6 +22,9 @@ class TriplesImport: pulsar_client, topic = queue, schema = Triples ) + async def start(self): + await self.publisher.start() + async def destroy(self): self.running.stop()