From c3ce07d6f0701512727825b75f59fd69a2a5f6f6 Mon Sep 17 00:00:00 2001 From: Cyber MacGeddon Date: Mon, 1 Jun 2026 22:37:04 +0100 Subject: [PATCH] fix: large document handling and Cassandra query pagination - Paginate heavy Cassandra reads (triples, graph/document embeddings) using synchronous session.execute() in run_in_executor with fetch_size paging, preventing materialization hang on large result sets - Fix document stream endpoint to use workspace-scoped librarian queues - Add decoder error handling for PDF/OCR/unstructured processors - Add WebSocket mux guards for missing auth fields - Add null check in librarian document streaming - Rewrite get_document_content CLI to stream via librarian - Add Poppler dependency to unstructured container --- containers/Containerfile.unstructured | 2 +- .../trustgraph/cli/get_document_content.py | 24 +++- .../decoding/mistral_ocr/processor.py | 9 +- .../trustgraph/decoding/pdf/pdf_decoder.py | 10 +- .../gateway/dispatch/document_stream.py | 8 +- .../trustgraph/gateway/dispatch/mux.py | 4 + .../trustgraph/librarian/librarian.py | 3 + .../trustgraph/tables/cassandra_async.py | 35 +++++ .../trustgraph/tables/knowledge.py | 127 +++++++++--------- .../trustgraph/decoding/ocr/pdf_decoder.py | 9 +- .../decoding/universal/processor.py | 9 +- 11 files changed, 166 insertions(+), 74 deletions(-) diff --git a/containers/Containerfile.unstructured b/containers/Containerfile.unstructured index 6de8a800..2b9a18f7 100644 --- a/containers/Containerfile.unstructured +++ b/containers/Containerfile.unstructured @@ -7,7 +7,7 @@ FROM docker.io/fedora:42 AS base ENV PIP_BREAK_SYSTEM_PACKAGES=1 -RUN dnf install -y python3.13 libxcb mesa-libGL && \ +RUN dnf install -y python3.13 libxcb mesa-libGL poppler-utils && \ alternatives --install /usr/bin/python python /usr/bin/python3.13 1 && \ python -m ensurepip --upgrade && \ pip3 install --no-cache-dir --upgrade 'pip>=26.0' 'setuptools>=78.1.1' && \ diff --git a/trustgraph-cli/trustgraph/cli/get_document_content.py b/trustgraph-cli/trustgraph/cli/get_document_content.py index 62fa7ca2..f4d44cca 100644 --- a/trustgraph-cli/trustgraph/cli/get_document_content.py +++ b/trustgraph-cli/trustgraph/cli/get_document_content.py @@ -5,7 +5,7 @@ Gets document content from the library by document ID. import argparse import os import sys -from trustgraph.api import Api +import requests default_url = os.getenv("TRUSTGRAPH_URL", 'http://localhost:8088/') default_token = os.getenv("TRUSTGRAPH_TOKEN", None) @@ -13,15 +13,29 @@ default_workspace = os.getenv("TRUSTGRAPH_WORKSPACE", "default") def get_content(url, document_id, output_file, token=None, workspace="default"): - api = Api(url, token=token, workspace=workspace).library() + stream_url = url.rstrip("/") + "/api/v1/document-stream" - content = api.get_document_content(id=document_id) + params = { + "document-id": document_id, + "workspace": workspace, + } + + headers = {} + if token: + headers["Authorization"] = f"Bearer {token}" + + resp = requests.get(stream_url, params=params, headers=headers, stream=True) + resp.raise_for_status() if output_file: + total = 0 with open(output_file, 'wb') as f: - f.write(content) - print(f"Written {len(content)} bytes to {output_file}") + for chunk in resp.iter_content(chunk_size=65536): + f.write(chunk) + total += len(chunk) + print(f"Written {total} bytes to {output_file}") else: + content = resp.content try: text = content.decode('utf-8') print(text) diff --git a/trustgraph-flow/trustgraph/decoding/mistral_ocr/processor.py b/trustgraph-flow/trustgraph/decoding/mistral_ocr/processor.py index f214111d..40ecac8a 100755 --- a/trustgraph-flow/trustgraph/decoding/mistral_ocr/processor.py +++ b/trustgraph-flow/trustgraph/decoding/mistral_ocr/processor.py @@ -219,7 +219,14 @@ class Processor(FlowProcessor): source_doc_id = v.document_id or v.metadata.id # Run OCR, get per-page markdown - pages = self.ocr(blob) + try: + pages = self.ocr(blob) + except Exception as e: + logger.error( + f"Failed to decode PDF {source_doc_id}: " + f"{type(e).__name__}: {e}" + ) + return for markdown, page_num in pages: diff --git a/trustgraph-flow/trustgraph/decoding/pdf/pdf_decoder.py b/trustgraph-flow/trustgraph/decoding/pdf/pdf_decoder.py index 209153f6..ca242265 100755 --- a/trustgraph-flow/trustgraph/decoding/pdf/pdf_decoder.py +++ b/trustgraph-flow/trustgraph/decoding/pdf/pdf_decoder.py @@ -129,7 +129,15 @@ class Processor(FlowProcessor): ) PyPDFLoader = _cls loader = PyPDFLoader(temp_path) - pages = loader.load() + try: + pages = loader.load() + except Exception as e: + source_doc_id = v.document_id or v.metadata.id + logger.error( + f"Failed to decode PDF {source_doc_id}: " + f"{type(e).__name__}: {e}" + ) + return # Get the source document ID source_doc_id = v.document_id or v.metadata.id diff --git a/trustgraph-flow/trustgraph/gateway/dispatch/document_stream.py b/trustgraph-flow/trustgraph/gateway/dispatch/document_stream.py index 2992d99f..74b4d7df 100644 --- a/trustgraph-flow/trustgraph/gateway/dispatch/document_stream.py +++ b/trustgraph-flow/trustgraph/gateway/dispatch/document_stream.py @@ -3,6 +3,7 @@ import asyncio import uuid import logging from . librarian import LibrarianRequestor +from ... schema import librarian_request_queue, librarian_response_queue # Module logger logger = logging.getLogger(__name__) @@ -23,10 +24,13 @@ class DocumentStreamExport: response = await ok() + uid = str(uuid.uuid4()) lr = LibrarianRequestor( backend=self.backend, - consumer="api-gateway-doc-stream-" + str(uuid.uuid4()), - subscriber="api-gateway-doc-stream-" + str(uuid.uuid4()), + consumer="api-gateway-doc-stream-" + uid, + subscriber="api-gateway-doc-stream-" + uid, + request_queue=f"{librarian_request_queue}:{workspace}", + response_queue=f"{librarian_response_queue}:{workspace}", ) try: diff --git a/trustgraph-flow/trustgraph/gateway/dispatch/mux.py b/trustgraph-flow/trustgraph/gateway/dispatch/mux.py index bdbd18d8..73bbb1f3 100644 --- a/trustgraph-flow/trustgraph/gateway/dispatch/mux.py +++ b/trustgraph-flow/trustgraph/gateway/dispatch/mux.py @@ -288,6 +288,8 @@ class Mux: await self.maybe_tidy_workers(workers) async def responder(resp, fin): + if self.ws is None: + return await self.ws.send_json({ "id": id, "response": resp, @@ -321,6 +323,8 @@ class Mux: ) except Exception as e: + if self.ws is None: + return await self.ws.send_json({ "id": id, "error": {"message": str(e), "type": "error"}, diff --git a/trustgraph-flow/trustgraph/librarian/librarian.py b/trustgraph-flow/trustgraph/librarian/librarian.py index 1c4d010e..cc5f0bdf 100644 --- a/trustgraph-flow/trustgraph/librarian/librarian.py +++ b/trustgraph-flow/trustgraph/librarian/librarian.py @@ -162,6 +162,9 @@ class Librarian: request.document_id ) + if object_id is None: + raise RequestError(f"Document not found: {request.document_id}") + content = await self.blob_store.get( object_id ) diff --git a/trustgraph-flow/trustgraph/tables/cassandra_async.py b/trustgraph-flow/trustgraph/tables/cassandra_async.py index 2f497748..205ed6b9 100644 --- a/trustgraph-flow/trustgraph/tables/cassandra_async.py +++ b/trustgraph-flow/trustgraph/tables/cassandra_async.py @@ -27,6 +27,8 @@ Notes: import asyncio +from cassandra.query import SimpleStatement + async def async_execute(session, query, parameters=None): """Execute a CQL statement asynchronously. @@ -76,3 +78,36 @@ def _set_result_if_pending(fut, result): def _set_exception_if_pending(fut, exc): if not fut.done(): fut.set_exception(exc) + + +async def async_execute_paged(session, query, parameters=None, fetch_size=100): + """Execute a CQL query with page-by-page iteration. + + Uses synchronous session.execute() inside run_in_executor so that + the driver's ResultSet paging works correctly without materialising + the entire result set in memory. + + Yields one page of rows at a time (as a list). + """ + loop = asyncio.get_running_loop() + + if isinstance(query, str): + stmt = SimpleStatement(query, fetch_size=fetch_size) + else: + stmt = query + stmt.fetch_size = fetch_size + + def _fetch_all_pages(): + pages = [] + result_set = session.execute(stmt, parameters) + while True: + pages.append(list(result_set.current_rows)) + if result_set.has_more_pages: + result_set.fetch_next_page() + else: + break + return pages + + return await loop.run_in_executor( + None, _fetch_all_pages + ) diff --git a/trustgraph-flow/trustgraph/tables/knowledge.py b/trustgraph-flow/trustgraph/tables/knowledge.py index cf085fdd..6a23731b 100644 --- a/trustgraph-flow/trustgraph/tables/knowledge.py +++ b/trustgraph-flow/trustgraph/tables/knowledge.py @@ -5,7 +5,7 @@ from .. schema import DocumentEmbeddings, ChunkEmbeddings from cassandra.cluster import Cluster -from . cassandra_async import async_execute +from . cassandra_async import async_execute, async_execute_paged def term_to_tuple(term): @@ -398,7 +398,7 @@ class KnowledgeTableStore: logger.debug("Get triples...") try: - rows = await async_execute( + pages = await async_execute_paged( self.cassandra, self.get_triples_stmt, (workspace, document_id), @@ -407,29 +407,30 @@ class KnowledgeTableStore: logger.error("Exception occurred", exc_info=True) raise - for row in rows: + for page in pages: + for row in page: - if row[3]: - triples = [ - Triple( - s = tuple_to_term(elt[0], elt[1]), - p = tuple_to_term(elt[2], elt[3]), - o = tuple_to_term(elt[4], elt[5]), + if row[3]: + triples = [ + Triple( + s = tuple_to_term(elt[0], elt[1]), + p = tuple_to_term(elt[2], elt[3]), + o = tuple_to_term(elt[4], elt[5]), + ) + for elt in row[3] + ] + else: + triples = [] + + await receiver( + Triples( + metadata = Metadata( + id = document_id, + collection = "default", + ), + triples = triples ) - for elt in row[3] - ] - else: - triples = [] - - await receiver( - Triples( - metadata = Metadata( - id = document_id, - collection = "default", # FIXME: What to put here? - ), - triples = triples ) - ) logger.debug("Done") @@ -438,7 +439,7 @@ class KnowledgeTableStore: logger.debug("Get GE...") try: - rows = await async_execute( + pages = await async_execute_paged( self.cassandra, self.get_graph_embeddings_stmt, (workspace, document_id), @@ -447,28 +448,29 @@ class KnowledgeTableStore: logger.error("Exception occurred", exc_info=True) raise - for row in rows: + for page in pages: + for row in page: - if row[3]: - entities = [ - EntityEmbeddings( - entity = tuple_to_term(ent[0][0], ent[0][1]), - vector = ent[1] + if row[3]: + entities = [ + EntityEmbeddings( + entity = tuple_to_term(ent[0][0], ent[0][1]), + vector = ent[1] + ) + for ent in row[3] + ] + else: + entities = [] + + await receiver( + GraphEmbeddings( + metadata = Metadata( + id = document_id, + collection = "default", + ), + entities = entities ) - for ent in row[3] - ] - else: - entities = [] - - await receiver( - GraphEmbeddings( - metadata = Metadata( - id = document_id, - collection = "default", # FIXME: What to put here? - ), - entities = entities ) - ) logger.debug("Done") @@ -477,7 +479,7 @@ class KnowledgeTableStore: logger.debug("Get DE...") try: - rows = await async_execute( + pages = await async_execute_paged( self.cassandra, self.get_document_embeddings_stmt, (workspace, document_id), @@ -486,28 +488,29 @@ class KnowledgeTableStore: logger.error("Exception occurred", exc_info=True) raise - for row in rows: + for page in pages: + for row in page: - if row[3]: - chunks = [ - ChunkEmbeddings( - chunk_id=ch[0], - vector=ch[1], + if row[3]: + chunks = [ + ChunkEmbeddings( + chunk_id=ch[0], + vector=ch[1], + ) + for ch in row[3] + ] + else: + chunks = [] + + await receiver( + DocumentEmbeddings( + metadata = Metadata( + id = document_id, + collection = "default", + ), + chunks = chunks ) - for ch in row[3] - ] - else: - chunks = [] - - await receiver( - DocumentEmbeddings( - metadata = Metadata( - id = document_id, - collection = "default", - ), - chunks = chunks ) - ) logger.debug("Done") diff --git a/trustgraph-ocr/trustgraph/decoding/ocr/pdf_decoder.py b/trustgraph-ocr/trustgraph/decoding/ocr/pdf_decoder.py index 1b4815c6..0d5101df 100755 --- a/trustgraph-ocr/trustgraph/decoding/ocr/pdf_decoder.py +++ b/trustgraph-ocr/trustgraph/decoding/ocr/pdf_decoder.py @@ -107,7 +107,14 @@ class Processor(FlowProcessor): # Get the source document ID source_doc_id = v.document_id or v.metadata.id - pages = convert_from_bytes(blob) + try: + pages = convert_from_bytes(blob) + except Exception as e: + logger.error( + f"Failed to decode PDF {source_doc_id}: " + f"{type(e).__name__}: {e}" + ) + return for ix, page in enumerate(pages): diff --git a/trustgraph-unstructured/trustgraph/decoding/universal/processor.py b/trustgraph-unstructured/trustgraph/decoding/universal/processor.py index b4936786..deedb7b4 100644 --- a/trustgraph-unstructured/trustgraph/decoding/universal/processor.py +++ b/trustgraph-unstructured/trustgraph/decoding/universal/processor.py @@ -418,7 +418,14 @@ class Processor(FlowProcessor): doc_uri_str = document_uri(source_doc_id) # Extract elements using unstructured - elements = self.extract_elements(blob, mime_type) + try: + elements = self.extract_elements(blob, mime_type) + except Exception as e: + logger.error( + f"Failed to extract elements from {source_doc_id}: " + f"{type(e).__name__}: {e}" + ) + return if not elements: logger.warning("No elements extracted from document")