trustgraph/trustgraph-ocr/trustgraph/decoding/ocr/pdf_decoder.py
cybermaggedon 6b1dd16f9f
fix: large document handling and Cassandra query pagination (#969)
- Paginate heavy Cassandra reads (triples, graph/document embeddings)
  using synchronous session.execute() in run_in_executor with fetch_size
  paging, preventing materialization hang on large result sets
- Fix document stream endpoint to use workspace-scoped librarian queues
- Add decoder error handling for PDF/OCR/unstructured processors
- Add WebSocket mux guards for missing auth fields
- Add null check in librarian document streaming
- Rewrite get_document_content CLI to stream via librarian
- Add Poppler dependency to unstructured container
2026-06-01 22:39:30 +01:00

189 lines
5.4 KiB
Python
Executable file

"""
Simple decoder, accepts PDF documents on input, outputs pages from the
PDF document as text as separate output objects.
Supports both inline document data and fetching from librarian via Pulsar
for large documents.
"""
import base64
import logging
import pytesseract
from pdf2image import convert_from_bytes
from ... schema import Document, TextDocument, Metadata
from ... schema import Triples
from ... base import FlowProcessor, ConsumerSpec, ProducerSpec, LibrarianSpec
from ... provenance import (
document_uri, page_uri as make_page_uri, derived_entity_triples,
set_graph, GRAPH_SOURCE,
)
# Component identification for provenance
COMPONENT_NAME = "tesseract-ocr-decoder"
COMPONENT_VERSION = "1.0.0"
# Module logger
logger = logging.getLogger(__name__)
default_ident = "document-decoder"
class Processor(FlowProcessor):
def __init__(self, **params):
id = params.get("id", default_ident)
super(Processor, self).__init__(
**params | {
"id": id,
}
)
self.register_specification(
ConsumerSpec(
name = "input",
schema = Document,
handler = self.on_message,
)
)
self.register_specification(
ProducerSpec(
name = "output",
schema = TextDocument,
)
)
self.register_specification(
ProducerSpec(
name = "triples",
schema = Triples,
)
)
self.register_specification(
LibrarianSpec()
)
logger.info("PDF OCR processor initialized")
async def on_message(self, msg, consumer, flow):
logger.info("PDF message received")
v = msg.value()
logger.info(f"Decoding {v.metadata.id}...")
# Check MIME type if fetching from librarian
if v.document_id:
doc_meta = await flow.librarian.fetch_document_metadata(
document_id=v.document_id,
)
if doc_meta and doc_meta.kind and doc_meta.kind != "application/pdf":
logger.error(
f"Unsupported MIME type: {doc_meta.kind}. "
f"Tesseract OCR decoder only handles application/pdf. "
f"Ignoring document {v.metadata.id}."
)
return
# Get PDF content - fetch from librarian or use inline data
if v.document_id:
logger.info(f"Fetching document {v.document_id} from librarian...")
content = await flow.librarian.fetch_document_content(
document_id=v.document_id,
)
if isinstance(content, str):
content = content.encode('utf-8')
blob = base64.b64decode(content)
logger.info(f"Fetched {len(blob)} bytes from librarian")
else:
blob = base64.b64decode(v.data)
# Get the source document ID
source_doc_id = v.document_id or v.metadata.id
try:
pages = convert_from_bytes(blob)
except Exception as e:
logger.error(
f"Failed to decode PDF {source_doc_id}: "
f"{type(e).__name__}: {e}"
)
return
for ix, page in enumerate(pages):
page_num = ix + 1 # 1-indexed
try:
text = pytesseract.image_to_string(page, lang='eng')
except Exception as e:
logger.warning(f"Page {page_num} did not OCR: {e}")
continue
logger.debug(f"Processing page {page_num}")
# Generate unique page ID
pg_uri = make_page_uri()
page_doc_id = pg_uri
page_content = text.encode("utf-8")
# Save page as child document in librarian
await flow.librarian.save_child_document(
doc_id=page_doc_id,
parent_id=source_doc_id,
content=page_content,
document_type="page",
title=f"Page {page_num}",
)
# Emit provenance triples
doc_uri = document_uri(source_doc_id)
prov_triples = derived_entity_triples(
entity_uri=pg_uri,
parent_uri=doc_uri,
component_name=COMPONENT_NAME,
component_version=COMPONENT_VERSION,
label=f"Page {page_num}",
page_number=page_num,
)
await flow("triples").send(Triples(
metadata=Metadata(
id=pg_uri,
root=v.metadata.root,
collection=v.metadata.collection,
),
triples=set_graph(prov_triples, GRAPH_SOURCE),
))
# Forward page document ID to chunker
# Chunker will fetch content from librarian
r = TextDocument(
metadata=Metadata(
id=pg_uri,
root=v.metadata.root,
collection=v.metadata.collection,
),
document_id=page_doc_id,
text=b"", # Empty, chunker will fetch from librarian
)
await flow("output").send(r)
logger.info("PDF decoding complete")
@staticmethod
def add_args(parser):
FlowProcessor.add_args(parser)
def run():
Processor.launch(default_ident, __doc__)