diff --git a/trustgraph-ocr/trustgraph/decoding/ocr/pdf_decoder.py b/trustgraph-ocr/trustgraph/decoding/ocr/pdf_decoder.py index 5fa436b8..8cf0b719 100755 --- a/trustgraph-ocr/trustgraph/decoding/ocr/pdf_decoder.py +++ b/trustgraph-ocr/trustgraph/decoding/ocr/pdf_decoder.py @@ -10,39 +10,42 @@ import pytesseract from pdf2image import convert_from_bytes from ... schema import Document, TextDocument, Metadata -from ... schema import document_ingest_queue, text_ingest_queue -from ... log_level import LogLevel -from ... base import ConsumerProducer +from ... base import FlowProcessor, ConsumerSpec, ProducerSpec -module = "ocr" +default_ident = "pdf-decoder" -default_input_queue = document_ingest_queue -default_output_queue = text_ingest_queue -default_subscriber = module - -class Processor(ConsumerProducer): +class Processor(FlowProcessor): def __init__(self, **params): - input_queue = params.get("input_queue", default_input_queue) - output_queue = params.get("output_queue", default_output_queue) - subscriber = params.get("subscriber", default_subscriber) + id = params.get("id", default_ident) super(Processor, self).__init__( **params | { - "input_queue": input_queue, - "output_queue": output_queue, - "subscriber": subscriber, - "input_schema": Document, - "output_schema": TextDocument, + "id": id, } ) + self.register_specification( + ConsumerSpec( + name = "input", + schema = Document, + handler = self.on_message, + ) + ) + + self.register_specification( + ProducerSpec( + name = "output", + schema = TextDocument, + ) + ) + print("PDF OCR inited") - async def handle(self, msg): + async def on_message(self, msg, consumer, flow): - print("PDF message received") + print("PDF message received", flush=True) v = msg.value() @@ -65,19 +68,15 @@ class Processor(ConsumerProducer): text=text.encode("utf-8"), ) - await self.send(r) + await flow("output").send(r) print("Done.", flush=True) @staticmethod def add_args(parser): - - ConsumerProducer.add_args( - parser, default_input_queue, default_subscriber, - default_output_queue, - ) + FlowProcessor.add_args(parser) def run(): - Processor.launch(module, __doc__) + Processor.launch(default_ident, __doc__)