mirror of
https://github.com/trustgraph-ai/trustgraph.git
synced 2026-04-28 01:46:22 +02:00
Use UUID-based URNs for page and chunk IDs (#703)
Page and chunk document IDs were deterministic ({doc_id}/p{num},
{doc_id}/p{num}/c{num}), causing "Document already exists" errors
when reprocessing documents through different flows. Content may
differ between runs due to different parameters or extractors, so
deterministic IDs are incorrect.
Pages now use urn:page:{uuid}, chunks use
urn:chunk:{uuid}. Parent- child relationships are tracked via
librarian metadata and provenance triples.
Also brings Mistral OCR and Tesseract OCR decoders up to parity
with the PDF decoder: librarian fetch/save support, per-page
output with unique IDs, and provenance triple emission. Fixes
Mistral OCR bug where only the first 5 pages were processed.
This commit is contained in:
parent
1a7b654bd3
commit
96fd1eab15
10 changed files with 694 additions and 286 deletions
|
|
@ -1,29 +1,48 @@
|
|||
|
||||
"""
|
||||
Simple decoder, accepts PDF documents on input, outputs pages from the
|
||||
PDF document as text as separate output objects.
|
||||
Mistral OCR decoder, accepts PDF documents on input, outputs pages from the
|
||||
PDF document as markdown text as separate output objects.
|
||||
|
||||
Supports both inline document data and fetching from librarian via Pulsar
|
||||
for large documents.
|
||||
"""
|
||||
|
||||
from pypdf import PdfWriter, PdfReader
|
||||
from io import BytesIO
|
||||
import asyncio
|
||||
import base64
|
||||
import uuid
|
||||
import os
|
||||
|
||||
from mistralai import Mistral
|
||||
from mistralai import DocumentURLChunk, ImageURLChunk, TextChunk
|
||||
from mistralai.models import OCRResponse
|
||||
|
||||
from ... schema import Document, TextDocument, Metadata
|
||||
from ... schema import LibrarianRequest, LibrarianResponse, DocumentMetadata
|
||||
from ... schema import librarian_request_queue, librarian_response_queue
|
||||
from ... schema import Triples
|
||||
from ... base import FlowProcessor, ConsumerSpec, ProducerSpec
|
||||
from ... base import Consumer, Producer, ConsumerMetrics, ProducerMetrics
|
||||
|
||||
from ... provenance import (
|
||||
document_uri, page_uri as make_page_uri, derived_entity_triples,
|
||||
set_graph, GRAPH_SOURCE,
|
||||
)
|
||||
|
||||
import logging
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Component identification for provenance
|
||||
COMPONENT_NAME = "mistral-ocr-decoder"
|
||||
COMPONENT_VERSION = "1.0.0"
|
||||
|
||||
default_ident = "pdf-decoder"
|
||||
default_api_key = os.getenv("MISTRAL_TOKEN")
|
||||
|
||||
default_librarian_request_queue = librarian_request_queue
|
||||
default_librarian_response_queue = librarian_response_queue
|
||||
|
||||
pages_per_chunk = 5
|
||||
|
||||
def chunks(lst, n):
|
||||
|
|
@ -48,27 +67,6 @@ def replace_images_in_markdown(markdown_str: str, images_dict: dict) -> str:
|
|||
)
|
||||
return markdown_str
|
||||
|
||||
def get_combined_markdown(ocr_response: OCRResponse) -> str:
|
||||
"""
|
||||
Combine OCR text and images into a single markdown document.
|
||||
|
||||
Args:
|
||||
ocr_response: Response from OCR processing containing text and images
|
||||
|
||||
Returns:
|
||||
Combined markdown string with embedded images
|
||||
"""
|
||||
markdowns: list[str] = []
|
||||
# Extract images from page
|
||||
for page in ocr_response.pages:
|
||||
image_data = {}
|
||||
for img in page.images:
|
||||
image_data[img.id] = img.image_base64
|
||||
# Replace image placeholders with actual images
|
||||
markdowns.append(replace_images_in_markdown(page.markdown, image_data))
|
||||
|
||||
return "\n\n".join(markdowns)
|
||||
|
||||
class Processor(FlowProcessor):
|
||||
|
||||
def __init__(self, **params):
|
||||
|
|
@ -97,6 +95,50 @@ class Processor(FlowProcessor):
|
|||
)
|
||||
)
|
||||
|
||||
self.register_specification(
|
||||
ProducerSpec(
|
||||
name = "triples",
|
||||
schema = Triples,
|
||||
)
|
||||
)
|
||||
|
||||
# Librarian client for fetching document content
|
||||
librarian_request_q = params.get(
|
||||
"librarian_request_queue", default_librarian_request_queue
|
||||
)
|
||||
librarian_response_q = params.get(
|
||||
"librarian_response_queue", default_librarian_response_queue
|
||||
)
|
||||
|
||||
librarian_request_metrics = ProducerMetrics(
|
||||
processor = id, flow = None, name = "librarian-request"
|
||||
)
|
||||
|
||||
self.librarian_request_producer = Producer(
|
||||
backend = self.pubsub,
|
||||
topic = librarian_request_q,
|
||||
schema = LibrarianRequest,
|
||||
metrics = librarian_request_metrics,
|
||||
)
|
||||
|
||||
librarian_response_metrics = ConsumerMetrics(
|
||||
processor = id, flow = None, name = "librarian-response"
|
||||
)
|
||||
|
||||
self.librarian_response_consumer = Consumer(
|
||||
taskgroup = self.taskgroup,
|
||||
backend = self.pubsub,
|
||||
flow = None,
|
||||
topic = librarian_response_q,
|
||||
subscriber = f"{id}-librarian",
|
||||
schema = LibrarianResponse,
|
||||
handler = self.on_librarian_response,
|
||||
metrics = librarian_response_metrics,
|
||||
)
|
||||
|
||||
# Pending librarian requests: request_id -> asyncio.Future
|
||||
self.pending_requests = {}
|
||||
|
||||
if api_key is None:
|
||||
raise RuntimeError("Mistral API key not specified")
|
||||
|
||||
|
|
@ -107,15 +149,125 @@ class Processor(FlowProcessor):
|
|||
|
||||
logger.info("Mistral OCR processor initialized")
|
||||
|
||||
async def start(self):
|
||||
await super(Processor, self).start()
|
||||
await self.librarian_request_producer.start()
|
||||
await self.librarian_response_consumer.start()
|
||||
|
||||
async def on_librarian_response(self, msg, consumer, flow):
|
||||
"""Handle responses from the librarian service."""
|
||||
response = msg.value()
|
||||
request_id = msg.properties().get("id")
|
||||
|
||||
if request_id and request_id in self.pending_requests:
|
||||
future = self.pending_requests.pop(request_id)
|
||||
future.set_result(response)
|
||||
else:
|
||||
logger.warning(f"Received unexpected librarian response: {request_id}")
|
||||
|
||||
async def fetch_document_content(self, document_id, user, timeout=120):
|
||||
"""
|
||||
Fetch document content from librarian via Pulsar.
|
||||
"""
|
||||
request_id = str(uuid.uuid4())
|
||||
|
||||
request = LibrarianRequest(
|
||||
operation="get-document-content",
|
||||
document_id=document_id,
|
||||
user=user,
|
||||
)
|
||||
|
||||
# Create future for response
|
||||
future = asyncio.get_event_loop().create_future()
|
||||
self.pending_requests[request_id] = future
|
||||
|
||||
try:
|
||||
# Send request
|
||||
await self.librarian_request_producer.send(
|
||||
request, properties={"id": request_id}
|
||||
)
|
||||
|
||||
# Wait for response
|
||||
response = await asyncio.wait_for(future, timeout=timeout)
|
||||
|
||||
if response.error:
|
||||
raise RuntimeError(
|
||||
f"Librarian error: {response.error.type}: {response.error.message}"
|
||||
)
|
||||
|
||||
return response.content
|
||||
|
||||
except asyncio.TimeoutError:
|
||||
self.pending_requests.pop(request_id, None)
|
||||
raise RuntimeError(f"Timeout fetching document {document_id}")
|
||||
|
||||
async def save_child_document(self, doc_id, parent_id, user, content,
|
||||
document_type="page", title=None, timeout=120):
|
||||
"""
|
||||
Save a child document to the librarian.
|
||||
"""
|
||||
request_id = str(uuid.uuid4())
|
||||
|
||||
doc_metadata = DocumentMetadata(
|
||||
id=doc_id,
|
||||
user=user,
|
||||
kind="text/plain",
|
||||
title=title or doc_id,
|
||||
parent_id=parent_id,
|
||||
document_type=document_type,
|
||||
)
|
||||
|
||||
request = LibrarianRequest(
|
||||
operation="add-child-document",
|
||||
document_metadata=doc_metadata,
|
||||
content=base64.b64encode(content).decode("utf-8"),
|
||||
)
|
||||
|
||||
# Create future for response
|
||||
future = asyncio.get_event_loop().create_future()
|
||||
self.pending_requests[request_id] = future
|
||||
|
||||
try:
|
||||
# Send request
|
||||
await self.librarian_request_producer.send(
|
||||
request, properties={"id": request_id}
|
||||
)
|
||||
|
||||
# Wait for response
|
||||
response = await asyncio.wait_for(future, timeout=timeout)
|
||||
|
||||
if response.error:
|
||||
raise RuntimeError(
|
||||
f"Librarian error saving child document: {response.error.type}: {response.error.message}"
|
||||
)
|
||||
|
||||
return doc_id
|
||||
|
||||
except asyncio.TimeoutError:
|
||||
self.pending_requests.pop(request_id, None)
|
||||
raise RuntimeError(f"Timeout saving child document {doc_id}")
|
||||
|
||||
def ocr(self, blob):
|
||||
"""
|
||||
Run Mistral OCR on a PDF blob, returning per-page markdown strings.
|
||||
|
||||
Args:
|
||||
blob: Raw PDF bytes
|
||||
|
||||
Returns:
|
||||
List of (page_markdown, page_number) tuples, 1-indexed
|
||||
"""
|
||||
|
||||
logger.debug("Parse PDF...")
|
||||
|
||||
pdfbuf = BytesIO(blob)
|
||||
pdf = PdfReader(pdfbuf)
|
||||
|
||||
pages = []
|
||||
global_page_num = 0
|
||||
|
||||
for chunk in chunks(pdf.pages, pages_per_chunk):
|
||||
|
||||
|
||||
logger.debug("Get next pages...")
|
||||
|
||||
part = PdfWriter()
|
||||
|
|
@ -152,11 +304,19 @@ class Processor(FlowProcessor):
|
|||
|
||||
logger.debug("Extract markdown...")
|
||||
|
||||
markdown = get_combined_markdown(processed)
|
||||
for page in processed.pages:
|
||||
global_page_num += 1
|
||||
image_data = {}
|
||||
for img in page.images:
|
||||
image_data[img.id] = img.image_base64
|
||||
markdown = replace_images_in_markdown(
|
||||
page.markdown, image_data
|
||||
)
|
||||
pages.append((markdown, global_page_num))
|
||||
|
||||
logger.info("OCR complete.")
|
||||
logger.info(f"OCR complete, {len(pages)} pages.")
|
||||
|
||||
return markdown
|
||||
return pages
|
||||
|
||||
async def on_message(self, msg, consumer, flow):
|
||||
|
||||
|
|
@ -166,16 +326,83 @@ class Processor(FlowProcessor):
|
|||
|
||||
logger.info(f"Decoding {v.metadata.id}...")
|
||||
|
||||
markdown = self.ocr(base64.b64decode(v.data))
|
||||
# Get PDF content - fetch from librarian or use inline data
|
||||
if v.document_id:
|
||||
logger.info(f"Fetching document {v.document_id} from librarian...")
|
||||
content = await self.fetch_document_content(
|
||||
document_id=v.document_id,
|
||||
user=v.metadata.user,
|
||||
)
|
||||
if isinstance(content, str):
|
||||
content = content.encode('utf-8')
|
||||
blob = base64.b64decode(content)
|
||||
logger.info(f"Fetched {len(blob)} bytes from librarian")
|
||||
else:
|
||||
blob = base64.b64decode(v.data)
|
||||
|
||||
r = TextDocument(
|
||||
metadata=v.metadata,
|
||||
text=markdown.encode("utf-8"),
|
||||
)
|
||||
# Get the source document ID
|
||||
source_doc_id = v.document_id or v.metadata.id
|
||||
|
||||
await flow("output").send(r)
|
||||
# Run OCR, get per-page markdown
|
||||
pages = self.ocr(blob)
|
||||
|
||||
logger.info("Done.")
|
||||
for markdown, page_num in pages:
|
||||
|
||||
logger.debug(f"Processing page {page_num}")
|
||||
|
||||
# Generate unique page ID
|
||||
pg_uri = make_page_uri()
|
||||
page_doc_id = pg_uri
|
||||
page_content = markdown.encode("utf-8")
|
||||
|
||||
# Save page as child document in librarian
|
||||
await self.save_child_document(
|
||||
doc_id=page_doc_id,
|
||||
parent_id=source_doc_id,
|
||||
user=v.metadata.user,
|
||||
content=page_content,
|
||||
document_type="page",
|
||||
title=f"Page {page_num}",
|
||||
)
|
||||
|
||||
# Emit provenance triples
|
||||
doc_uri = document_uri(source_doc_id)
|
||||
|
||||
prov_triples = derived_entity_triples(
|
||||
entity_uri=pg_uri,
|
||||
parent_uri=doc_uri,
|
||||
component_name=COMPONENT_NAME,
|
||||
component_version=COMPONENT_VERSION,
|
||||
label=f"Page {page_num}",
|
||||
page_number=page_num,
|
||||
)
|
||||
|
||||
await flow("triples").send(Triples(
|
||||
metadata=Metadata(
|
||||
id=pg_uri,
|
||||
root=v.metadata.root,
|
||||
user=v.metadata.user,
|
||||
collection=v.metadata.collection,
|
||||
),
|
||||
triples=set_graph(prov_triples, GRAPH_SOURCE),
|
||||
))
|
||||
|
||||
# Forward page document ID to chunker
|
||||
# Chunker will fetch content from librarian
|
||||
r = TextDocument(
|
||||
metadata=Metadata(
|
||||
id=pg_uri,
|
||||
root=v.metadata.root,
|
||||
user=v.metadata.user,
|
||||
collection=v.metadata.collection,
|
||||
),
|
||||
document_id=page_doc_id,
|
||||
text=b"", # Empty, chunker will fetch from librarian
|
||||
)
|
||||
|
||||
await flow("output").send(r)
|
||||
|
||||
logger.debug("PDF decoding complete")
|
||||
|
||||
@staticmethod
|
||||
def add_args(parser):
|
||||
|
|
@ -188,7 +415,18 @@ class Processor(FlowProcessor):
|
|||
help=f'Mistral API Key'
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
'--librarian-request-queue',
|
||||
default=default_librarian_request_queue,
|
||||
help=f'Librarian request queue (default: {default_librarian_request_queue})',
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
'--librarian-response-queue',
|
||||
default=default_librarian_response_queue,
|
||||
help=f'Librarian response queue (default: {default_librarian_response_queue})',
|
||||
)
|
||||
|
||||
def run():
|
||||
|
||||
Processor.launch(default_ident, __doc__)
|
||||
|
||||
|
|
|
|||
|
|
@ -23,7 +23,7 @@ from ... base import FlowProcessor, ConsumerSpec, ProducerSpec
|
|||
from ... base import Consumer, Producer, ConsumerMetrics, ProducerMetrics
|
||||
|
||||
from ... provenance import (
|
||||
document_uri, page_uri, derived_entity_triples,
|
||||
document_uri, page_uri as make_page_uri, derived_entity_triples,
|
||||
set_graph, GRAPH_SOURCE,
|
||||
)
|
||||
|
||||
|
|
@ -272,8 +272,9 @@ class Processor(FlowProcessor):
|
|||
|
||||
logger.debug(f"Processing page {page_num}")
|
||||
|
||||
# Generate page document ID
|
||||
page_doc_id = f"{source_doc_id}/p{page_num}"
|
||||
# Generate unique page ID
|
||||
pg_uri = make_page_uri()
|
||||
page_doc_id = pg_uri
|
||||
page_content = page.page_content.encode("utf-8")
|
||||
|
||||
# Save page as child document in librarian
|
||||
|
|
@ -288,7 +289,6 @@ class Processor(FlowProcessor):
|
|||
|
||||
# Emit provenance triples (stored in source graph for separation from core knowledge)
|
||||
doc_uri = document_uri(source_doc_id)
|
||||
pg_uri = page_uri(source_doc_id, page_num)
|
||||
|
||||
prov_triples = derived_entity_triples(
|
||||
entity_uri=pg_uri,
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue