trustgraph/tests/unit/test_decoding/test_pdf_decoder.py

188 lines
7.3 KiB
Python
Raw Normal View History

"""
Unit tests for trustgraph.decoding.pdf.pdf_decoder
"""
import pytest
import base64
import tempfile
from unittest.mock import AsyncMock, MagicMock, patch, call
from unittest import IsolatedAsyncioTestCase
from trustgraph.decoding.pdf.pdf_decoder import Processor
from trustgraph.schema import Document, TextDocument, Metadata
Incremental / large document loading (#659) Tech spec BlobStore (trustgraph-flow/trustgraph/librarian/blob_store.py): - get_stream() - yields document content in chunks for streaming retrieval - create_multipart_upload() - initializes S3 multipart upload, returns upload_id - upload_part() - uploads a single part, returns etag - complete_multipart_upload() - finalizes upload with part etags - abort_multipart_upload() - cancels and cleans up Cassandra schema (trustgraph-flow/trustgraph/tables/library.py): - New upload_session table with 24-hour TTL - Index on user for listing sessions - Prepared statements for all operations - Methods: create_upload_session(), get_upload_session(), update_upload_session_chunk(), delete_upload_session(), list_upload_sessions() - Schema extended with UploadSession, UploadProgress, and new request/response fields - Librarian methods: begin_upload, upload_chunk, complete_upload, abort_upload, get_upload_status, list_uploads - Service routing for all new operations - Python SDK with transparent chunked upload: - add_document() auto-switches to chunked for files > 10MB - Progress callback support (on_progress) - get_pending_uploads(), get_upload_status(), abort_upload(), resume_upload() - Document table: Added parent_id and document_type columns with index - Document schema (knowledge/document.py): Added document_id field for streaming retrieval - Librarian operations: - add-child-document for extracted PDF pages - list-children to get child documents - stream-document for chunked content retrieval - Cascade delete removes children when parent is deleted - list-documents filters children by default - PDF decoder (decoding/pdf/pdf_decoder.py): Updated to stream large documents from librarian API to temp file - Librarian service (librarian/service.py): Sends document_id instead of content for large PDFs (>2MB) - Deprecated tools (load_pdf.py, load_text.py): Added deprecation warnings directing users to tg-add-library-document + tg-start-library-processing Remove load_pdf and load_text utils Move chunker/librarian comms to base class Updating tests
2026-03-04 16:57:58 +00:00
class MockAsyncProcessor:
def __init__(self, **params):
self.config_handlers = []
self.id = params.get('id', 'test-service')
self.specifications = []
self.pubsub = MagicMock()
self.taskgroup = params.get('taskgroup', MagicMock())
class TestPdfDecoderProcessor(IsolatedAsyncioTestCase):
"""Test PDF decoder processor functionality"""
RabbitMQ pub/sub backend with topic exchange architecture (#752) Adds a RabbitMQ backend as an alternative to Pulsar, selectable via PUBSUB_BACKEND=rabbitmq. Both backends implement the same PubSubBackend protocol — no application code changes needed to switch. RabbitMQ topology: - Single topic exchange per topicspace (e.g. 'tg') - Routing key derived from queue class and topic name - Shared consumers: named queue bound to exchange (competing, round-robin) - Exclusive consumers: anonymous auto-delete queue (broadcast, each gets every message). Used by Subscriber and config push consumer. - Thread-local producer connections (pika is not thread-safe) - Push-based consumption via basic_consume with process_data_events for heartbeat processing Consumer model changes: - Consumer class creates one backend consumer per concurrent task (required for pika thread safety, harmless for Pulsar) - Consumer class accepts consumer_type parameter - Subscriber passes consumer_type='exclusive' for broadcast semantics - Config push consumer uses consumer_type='exclusive' so every processor instance receives config updates - handle_one_from_queue receives consumer as parameter for correct per-connection ack/nack LibrarianClient: - New shared client class replacing duplicated librarian request-response code across 6+ services (chunking, decoders, RAG, etc.) - Uses stream-document instead of get-document-content for fetching document content in 1MB chunks (avoids broker message size limits) - Standalone object (self.librarian = LibrarianClient(...)) not a mixin - get-document-content marked deprecated in schema and OpenAPI spec Serialisation: - Extracted dataclass_to_dict/dict_to_dataclass to shared serialization.py (used by both Pulsar and RabbitMQ backends) Librarian queues: - Changed from flow class (persistent) back to request/response class now that stream-document eliminates large single messages - API upload chunk size reduced from 5MB to 3MB to stay under broker limits after base64 encoding Factory and CLI: - get_pubsub() handles 'rabbitmq' backend with RabbitMQ connection params - add_pubsub_args() includes RabbitMQ options (host, port, credentials) - add_pubsub_args(standalone=True) defaults to localhost for CLI tools - init_trustgraph skips Pulsar admin setup for non-Pulsar backends - tg-dump-queues and tg-monitor-prompts use backend abstraction - BaseClient and ConfigClient accept generic pubsub config
2026-04-02 12:47:16 +01:00
@patch('trustgraph.base.librarian_client.Consumer')
@patch('trustgraph.base.librarian_client.Producer')
Incremental / large document loading (#659) Tech spec BlobStore (trustgraph-flow/trustgraph/librarian/blob_store.py): - get_stream() - yields document content in chunks for streaming retrieval - create_multipart_upload() - initializes S3 multipart upload, returns upload_id - upload_part() - uploads a single part, returns etag - complete_multipart_upload() - finalizes upload with part etags - abort_multipart_upload() - cancels and cleans up Cassandra schema (trustgraph-flow/trustgraph/tables/library.py): - New upload_session table with 24-hour TTL - Index on user for listing sessions - Prepared statements for all operations - Methods: create_upload_session(), get_upload_session(), update_upload_session_chunk(), delete_upload_session(), list_upload_sessions() - Schema extended with UploadSession, UploadProgress, and new request/response fields - Librarian methods: begin_upload, upload_chunk, complete_upload, abort_upload, get_upload_status, list_uploads - Service routing for all new operations - Python SDK with transparent chunked upload: - add_document() auto-switches to chunked for files > 10MB - Progress callback support (on_progress) - get_pending_uploads(), get_upload_status(), abort_upload(), resume_upload() - Document table: Added parent_id and document_type columns with index - Document schema (knowledge/document.py): Added document_id field for streaming retrieval - Librarian operations: - add-child-document for extracted PDF pages - list-children to get child documents - stream-document for chunked content retrieval - Cascade delete removes children when parent is deleted - list-documents filters children by default - PDF decoder (decoding/pdf/pdf_decoder.py): Updated to stream large documents from librarian API to temp file - Librarian service (librarian/service.py): Sends document_id instead of content for large PDFs (>2MB) - Deprecated tools (load_pdf.py, load_text.py): Added deprecation warnings directing users to tg-add-library-document + tg-start-library-processing Remove load_pdf and load_text utils Move chunker/librarian comms to base class Updating tests
2026-03-04 16:57:58 +00:00
@patch('trustgraph.base.async_processor.AsyncProcessor', MockAsyncProcessor)
RabbitMQ pub/sub backend with topic exchange architecture (#752) Adds a RabbitMQ backend as an alternative to Pulsar, selectable via PUBSUB_BACKEND=rabbitmq. Both backends implement the same PubSubBackend protocol — no application code changes needed to switch. RabbitMQ topology: - Single topic exchange per topicspace (e.g. 'tg') - Routing key derived from queue class and topic name - Shared consumers: named queue bound to exchange (competing, round-robin) - Exclusive consumers: anonymous auto-delete queue (broadcast, each gets every message). Used by Subscriber and config push consumer. - Thread-local producer connections (pika is not thread-safe) - Push-based consumption via basic_consume with process_data_events for heartbeat processing Consumer model changes: - Consumer class creates one backend consumer per concurrent task (required for pika thread safety, harmless for Pulsar) - Consumer class accepts consumer_type parameter - Subscriber passes consumer_type='exclusive' for broadcast semantics - Config push consumer uses consumer_type='exclusive' so every processor instance receives config updates - handle_one_from_queue receives consumer as parameter for correct per-connection ack/nack LibrarianClient: - New shared client class replacing duplicated librarian request-response code across 6+ services (chunking, decoders, RAG, etc.) - Uses stream-document instead of get-document-content for fetching document content in 1MB chunks (avoids broker message size limits) - Standalone object (self.librarian = LibrarianClient(...)) not a mixin - get-document-content marked deprecated in schema and OpenAPI spec Serialisation: - Extracted dataclass_to_dict/dict_to_dataclass to shared serialization.py (used by both Pulsar and RabbitMQ backends) Librarian queues: - Changed from flow class (persistent) back to request/response class now that stream-document eliminates large single messages - API upload chunk size reduced from 5MB to 3MB to stay under broker limits after base64 encoding Factory and CLI: - get_pubsub() handles 'rabbitmq' backend with RabbitMQ connection params - add_pubsub_args() includes RabbitMQ options (host, port, credentials) - add_pubsub_args(standalone=True) defaults to localhost for CLI tools - init_trustgraph skips Pulsar admin setup for non-Pulsar backends - tg-dump-queues and tg-monitor-prompts use backend abstraction - BaseClient and ConfigClient accept generic pubsub config
2026-04-02 12:47:16 +01:00
async def test_processor_initialization(self, mock_producer, mock_consumer):
"""Test PDF decoder processor initialization"""
config = {
'id': 'test-pdf-decoder',
'taskgroup': AsyncMock()
}
Incremental / large document loading (#659) Tech spec BlobStore (trustgraph-flow/trustgraph/librarian/blob_store.py): - get_stream() - yields document content in chunks for streaming retrieval - create_multipart_upload() - initializes S3 multipart upload, returns upload_id - upload_part() - uploads a single part, returns etag - complete_multipart_upload() - finalizes upload with part etags - abort_multipart_upload() - cancels and cleans up Cassandra schema (trustgraph-flow/trustgraph/tables/library.py): - New upload_session table with 24-hour TTL - Index on user for listing sessions - Prepared statements for all operations - Methods: create_upload_session(), get_upload_session(), update_upload_session_chunk(), delete_upload_session(), list_upload_sessions() - Schema extended with UploadSession, UploadProgress, and new request/response fields - Librarian methods: begin_upload, upload_chunk, complete_upload, abort_upload, get_upload_status, list_uploads - Service routing for all new operations - Python SDK with transparent chunked upload: - add_document() auto-switches to chunked for files > 10MB - Progress callback support (on_progress) - get_pending_uploads(), get_upload_status(), abort_upload(), resume_upload() - Document table: Added parent_id and document_type columns with index - Document schema (knowledge/document.py): Added document_id field for streaming retrieval - Librarian operations: - add-child-document for extracted PDF pages - list-children to get child documents - stream-document for chunked content retrieval - Cascade delete removes children when parent is deleted - list-documents filters children by default - PDF decoder (decoding/pdf/pdf_decoder.py): Updated to stream large documents from librarian API to temp file - Librarian service (librarian/service.py): Sends document_id instead of content for large PDFs (>2MB) - Deprecated tools (load_pdf.py, load_text.py): Added deprecation warnings directing users to tg-add-library-document + tg-start-library-processing Remove load_pdf and load_text utils Move chunker/librarian comms to base class Updating tests
2026-03-04 16:57:58 +00:00
processor = Processor(**config)
# Check consumer spec
Incremental / large document loading (#659) Tech spec BlobStore (trustgraph-flow/trustgraph/librarian/blob_store.py): - get_stream() - yields document content in chunks for streaming retrieval - create_multipart_upload() - initializes S3 multipart upload, returns upload_id - upload_part() - uploads a single part, returns etag - complete_multipart_upload() - finalizes upload with part etags - abort_multipart_upload() - cancels and cleans up Cassandra schema (trustgraph-flow/trustgraph/tables/library.py): - New upload_session table with 24-hour TTL - Index on user for listing sessions - Prepared statements for all operations - Methods: create_upload_session(), get_upload_session(), update_upload_session_chunk(), delete_upload_session(), list_upload_sessions() - Schema extended with UploadSession, UploadProgress, and new request/response fields - Librarian methods: begin_upload, upload_chunk, complete_upload, abort_upload, get_upload_status, list_uploads - Service routing for all new operations - Python SDK with transparent chunked upload: - add_document() auto-switches to chunked for files > 10MB - Progress callback support (on_progress) - get_pending_uploads(), get_upload_status(), abort_upload(), resume_upload() - Document table: Added parent_id and document_type columns with index - Document schema (knowledge/document.py): Added document_id field for streaming retrieval - Librarian operations: - add-child-document for extracted PDF pages - list-children to get child documents - stream-document for chunked content retrieval - Cascade delete removes children when parent is deleted - list-documents filters children by default - PDF decoder (decoding/pdf/pdf_decoder.py): Updated to stream large documents from librarian API to temp file - Librarian service (librarian/service.py): Sends document_id instead of content for large PDFs (>2MB) - Deprecated tools (load_pdf.py, load_text.py): Added deprecation warnings directing users to tg-add-library-document + tg-start-library-processing Remove load_pdf and load_text utils Move chunker/librarian comms to base class Updating tests
2026-03-04 16:57:58 +00:00
consumer_specs = [s for s in processor.specifications if hasattr(s, 'handler')]
assert len(consumer_specs) >= 1
assert consumer_specs[0].name == "input"
assert consumer_specs[0].schema == Document
RabbitMQ pub/sub backend with topic exchange architecture (#752) Adds a RabbitMQ backend as an alternative to Pulsar, selectable via PUBSUB_BACKEND=rabbitmq. Both backends implement the same PubSubBackend protocol — no application code changes needed to switch. RabbitMQ topology: - Single topic exchange per topicspace (e.g. 'tg') - Routing key derived from queue class and topic name - Shared consumers: named queue bound to exchange (competing, round-robin) - Exclusive consumers: anonymous auto-delete queue (broadcast, each gets every message). Used by Subscriber and config push consumer. - Thread-local producer connections (pika is not thread-safe) - Push-based consumption via basic_consume with process_data_events for heartbeat processing Consumer model changes: - Consumer class creates one backend consumer per concurrent task (required for pika thread safety, harmless for Pulsar) - Consumer class accepts consumer_type parameter - Subscriber passes consumer_type='exclusive' for broadcast semantics - Config push consumer uses consumer_type='exclusive' so every processor instance receives config updates - handle_one_from_queue receives consumer as parameter for correct per-connection ack/nack LibrarianClient: - New shared client class replacing duplicated librarian request-response code across 6+ services (chunking, decoders, RAG, etc.) - Uses stream-document instead of get-document-content for fetching document content in 1MB chunks (avoids broker message size limits) - Standalone object (self.librarian = LibrarianClient(...)) not a mixin - get-document-content marked deprecated in schema and OpenAPI spec Serialisation: - Extracted dataclass_to_dict/dict_to_dataclass to shared serialization.py (used by both Pulsar and RabbitMQ backends) Librarian queues: - Changed from flow class (persistent) back to request/response class now that stream-document eliminates large single messages - API upload chunk size reduced from 5MB to 3MB to stay under broker limits after base64 encoding Factory and CLI: - get_pubsub() handles 'rabbitmq' backend with RabbitMQ connection params - add_pubsub_args() includes RabbitMQ options (host, port, credentials) - add_pubsub_args(standalone=True) defaults to localhost for CLI tools - init_trustgraph skips Pulsar admin setup for non-Pulsar backends - tg-dump-queues and tg-monitor-prompts use backend abstraction - BaseClient and ConfigClient accept generic pubsub config
2026-04-02 12:47:16 +01:00
@patch('trustgraph.base.librarian_client.Consumer')
@patch('trustgraph.base.librarian_client.Producer')
@patch('trustgraph.decoding.pdf.pdf_decoder.PyPDFLoader')
Incremental / large document loading (#659) Tech spec BlobStore (trustgraph-flow/trustgraph/librarian/blob_store.py): - get_stream() - yields document content in chunks for streaming retrieval - create_multipart_upload() - initializes S3 multipart upload, returns upload_id - upload_part() - uploads a single part, returns etag - complete_multipart_upload() - finalizes upload with part etags - abort_multipart_upload() - cancels and cleans up Cassandra schema (trustgraph-flow/trustgraph/tables/library.py): - New upload_session table with 24-hour TTL - Index on user for listing sessions - Prepared statements for all operations - Methods: create_upload_session(), get_upload_session(), update_upload_session_chunk(), delete_upload_session(), list_upload_sessions() - Schema extended with UploadSession, UploadProgress, and new request/response fields - Librarian methods: begin_upload, upload_chunk, complete_upload, abort_upload, get_upload_status, list_uploads - Service routing for all new operations - Python SDK with transparent chunked upload: - add_document() auto-switches to chunked for files > 10MB - Progress callback support (on_progress) - get_pending_uploads(), get_upload_status(), abort_upload(), resume_upload() - Document table: Added parent_id and document_type columns with index - Document schema (knowledge/document.py): Added document_id field for streaming retrieval - Librarian operations: - add-child-document for extracted PDF pages - list-children to get child documents - stream-document for chunked content retrieval - Cascade delete removes children when parent is deleted - list-documents filters children by default - PDF decoder (decoding/pdf/pdf_decoder.py): Updated to stream large documents from librarian API to temp file - Librarian service (librarian/service.py): Sends document_id instead of content for large PDFs (>2MB) - Deprecated tools (load_pdf.py, load_text.py): Added deprecation warnings directing users to tg-add-library-document + tg-start-library-processing Remove load_pdf and load_text utils Move chunker/librarian comms to base class Updating tests
2026-03-04 16:57:58 +00:00
@patch('trustgraph.base.async_processor.AsyncProcessor', MockAsyncProcessor)
RabbitMQ pub/sub backend with topic exchange architecture (#752) Adds a RabbitMQ backend as an alternative to Pulsar, selectable via PUBSUB_BACKEND=rabbitmq. Both backends implement the same PubSubBackend protocol — no application code changes needed to switch. RabbitMQ topology: - Single topic exchange per topicspace (e.g. 'tg') - Routing key derived from queue class and topic name - Shared consumers: named queue bound to exchange (competing, round-robin) - Exclusive consumers: anonymous auto-delete queue (broadcast, each gets every message). Used by Subscriber and config push consumer. - Thread-local producer connections (pika is not thread-safe) - Push-based consumption via basic_consume with process_data_events for heartbeat processing Consumer model changes: - Consumer class creates one backend consumer per concurrent task (required for pika thread safety, harmless for Pulsar) - Consumer class accepts consumer_type parameter - Subscriber passes consumer_type='exclusive' for broadcast semantics - Config push consumer uses consumer_type='exclusive' so every processor instance receives config updates - handle_one_from_queue receives consumer as parameter for correct per-connection ack/nack LibrarianClient: - New shared client class replacing duplicated librarian request-response code across 6+ services (chunking, decoders, RAG, etc.) - Uses stream-document instead of get-document-content for fetching document content in 1MB chunks (avoids broker message size limits) - Standalone object (self.librarian = LibrarianClient(...)) not a mixin - get-document-content marked deprecated in schema and OpenAPI spec Serialisation: - Extracted dataclass_to_dict/dict_to_dataclass to shared serialization.py (used by both Pulsar and RabbitMQ backends) Librarian queues: - Changed from flow class (persistent) back to request/response class now that stream-document eliminates large single messages - API upload chunk size reduced from 5MB to 3MB to stay under broker limits after base64 encoding Factory and CLI: - get_pubsub() handles 'rabbitmq' backend with RabbitMQ connection params - add_pubsub_args() includes RabbitMQ options (host, port, credentials) - add_pubsub_args(standalone=True) defaults to localhost for CLI tools - init_trustgraph skips Pulsar admin setup for non-Pulsar backends - tg-dump-queues and tg-monitor-prompts use backend abstraction - BaseClient and ConfigClient accept generic pubsub config
2026-04-02 12:47:16 +01:00
async def test_on_message_success(self, mock_pdf_loader_class, mock_producer, mock_consumer):
"""Test successful PDF processing"""
# Mock PDF content
pdf_content = b"fake pdf content"
pdf_base64 = base64.b64encode(pdf_content).decode('utf-8')
Incremental / large document loading (#659) Tech spec BlobStore (trustgraph-flow/trustgraph/librarian/blob_store.py): - get_stream() - yields document content in chunks for streaming retrieval - create_multipart_upload() - initializes S3 multipart upload, returns upload_id - upload_part() - uploads a single part, returns etag - complete_multipart_upload() - finalizes upload with part etags - abort_multipart_upload() - cancels and cleans up Cassandra schema (trustgraph-flow/trustgraph/tables/library.py): - New upload_session table with 24-hour TTL - Index on user for listing sessions - Prepared statements for all operations - Methods: create_upload_session(), get_upload_session(), update_upload_session_chunk(), delete_upload_session(), list_upload_sessions() - Schema extended with UploadSession, UploadProgress, and new request/response fields - Librarian methods: begin_upload, upload_chunk, complete_upload, abort_upload, get_upload_status, list_uploads - Service routing for all new operations - Python SDK with transparent chunked upload: - add_document() auto-switches to chunked for files > 10MB - Progress callback support (on_progress) - get_pending_uploads(), get_upload_status(), abort_upload(), resume_upload() - Document table: Added parent_id and document_type columns with index - Document schema (knowledge/document.py): Added document_id field for streaming retrieval - Librarian operations: - add-child-document for extracted PDF pages - list-children to get child documents - stream-document for chunked content retrieval - Cascade delete removes children when parent is deleted - list-documents filters children by default - PDF decoder (decoding/pdf/pdf_decoder.py): Updated to stream large documents from librarian API to temp file - Librarian service (librarian/service.py): Sends document_id instead of content for large PDFs (>2MB) - Deprecated tools (load_pdf.py, load_text.py): Added deprecation warnings directing users to tg-add-library-document + tg-start-library-processing Remove load_pdf and load_text utils Move chunker/librarian comms to base class Updating tests
2026-03-04 16:57:58 +00:00
# Mock PyPDFLoader
mock_loader = MagicMock()
mock_page1 = MagicMock(page_content="Page 1 content")
mock_page2 = MagicMock(page_content="Page 2 content")
mock_loader.load.return_value = [mock_page1, mock_page2]
mock_pdf_loader_class.return_value = mock_loader
Incremental / large document loading (#659) Tech spec BlobStore (trustgraph-flow/trustgraph/librarian/blob_store.py): - get_stream() - yields document content in chunks for streaming retrieval - create_multipart_upload() - initializes S3 multipart upload, returns upload_id - upload_part() - uploads a single part, returns etag - complete_multipart_upload() - finalizes upload with part etags - abort_multipart_upload() - cancels and cleans up Cassandra schema (trustgraph-flow/trustgraph/tables/library.py): - New upload_session table with 24-hour TTL - Index on user for listing sessions - Prepared statements for all operations - Methods: create_upload_session(), get_upload_session(), update_upload_session_chunk(), delete_upload_session(), list_upload_sessions() - Schema extended with UploadSession, UploadProgress, and new request/response fields - Librarian methods: begin_upload, upload_chunk, complete_upload, abort_upload, get_upload_status, list_uploads - Service routing for all new operations - Python SDK with transparent chunked upload: - add_document() auto-switches to chunked for files > 10MB - Progress callback support (on_progress) - get_pending_uploads(), get_upload_status(), abort_upload(), resume_upload() - Document table: Added parent_id and document_type columns with index - Document schema (knowledge/document.py): Added document_id field for streaming retrieval - Librarian operations: - add-child-document for extracted PDF pages - list-children to get child documents - stream-document for chunked content retrieval - Cascade delete removes children when parent is deleted - list-documents filters children by default - PDF decoder (decoding/pdf/pdf_decoder.py): Updated to stream large documents from librarian API to temp file - Librarian service (librarian/service.py): Sends document_id instead of content for large PDFs (>2MB) - Deprecated tools (load_pdf.py, load_text.py): Added deprecation warnings directing users to tg-add-library-document + tg-start-library-processing Remove load_pdf and load_text utils Move chunker/librarian comms to base class Updating tests
2026-03-04 16:57:58 +00:00
# Mock message
mock_metadata = Metadata(id="test-doc")
mock_document = Document(metadata=mock_metadata, data=pdf_base64)
mock_msg = MagicMock()
mock_msg.value.return_value = mock_document
Incremental / large document loading (#659) Tech spec BlobStore (trustgraph-flow/trustgraph/librarian/blob_store.py): - get_stream() - yields document content in chunks for streaming retrieval - create_multipart_upload() - initializes S3 multipart upload, returns upload_id - upload_part() - uploads a single part, returns etag - complete_multipart_upload() - finalizes upload with part etags - abort_multipart_upload() - cancels and cleans up Cassandra schema (trustgraph-flow/trustgraph/tables/library.py): - New upload_session table with 24-hour TTL - Index on user for listing sessions - Prepared statements for all operations - Methods: create_upload_session(), get_upload_session(), update_upload_session_chunk(), delete_upload_session(), list_upload_sessions() - Schema extended with UploadSession, UploadProgress, and new request/response fields - Librarian methods: begin_upload, upload_chunk, complete_upload, abort_upload, get_upload_status, list_uploads - Service routing for all new operations - Python SDK with transparent chunked upload: - add_document() auto-switches to chunked for files > 10MB - Progress callback support (on_progress) - get_pending_uploads(), get_upload_status(), abort_upload(), resume_upload() - Document table: Added parent_id and document_type columns with index - Document schema (knowledge/document.py): Added document_id field for streaming retrieval - Librarian operations: - add-child-document for extracted PDF pages - list-children to get child documents - stream-document for chunked content retrieval - Cascade delete removes children when parent is deleted - list-documents filters children by default - PDF decoder (decoding/pdf/pdf_decoder.py): Updated to stream large documents from librarian API to temp file - Librarian service (librarian/service.py): Sends document_id instead of content for large PDFs (>2MB) - Deprecated tools (load_pdf.py, load_text.py): Added deprecation warnings directing users to tg-add-library-document + tg-start-library-processing Remove load_pdf and load_text utils Move chunker/librarian comms to base class Updating tests
2026-03-04 16:57:58 +00:00
# Mock flow - separate mocks for output and triples
mock_output_flow = AsyncMock()
mock_triples_flow = AsyncMock()
mock_flow = MagicMock(side_effect=lambda name: {
"output": mock_output_flow,
"triples": mock_triples_flow,
}.get(name))
Incremental / large document loading (#659) Tech spec BlobStore (trustgraph-flow/trustgraph/librarian/blob_store.py): - get_stream() - yields document content in chunks for streaming retrieval - create_multipart_upload() - initializes S3 multipart upload, returns upload_id - upload_part() - uploads a single part, returns etag - complete_multipart_upload() - finalizes upload with part etags - abort_multipart_upload() - cancels and cleans up Cassandra schema (trustgraph-flow/trustgraph/tables/library.py): - New upload_session table with 24-hour TTL - Index on user for listing sessions - Prepared statements for all operations - Methods: create_upload_session(), get_upload_session(), update_upload_session_chunk(), delete_upload_session(), list_upload_sessions() - Schema extended with UploadSession, UploadProgress, and new request/response fields - Librarian methods: begin_upload, upload_chunk, complete_upload, abort_upload, get_upload_status, list_uploads - Service routing for all new operations - Python SDK with transparent chunked upload: - add_document() auto-switches to chunked for files > 10MB - Progress callback support (on_progress) - get_pending_uploads(), get_upload_status(), abort_upload(), resume_upload() - Document table: Added parent_id and document_type columns with index - Document schema (knowledge/document.py): Added document_id field for streaming retrieval - Librarian operations: - add-child-document for extracted PDF pages - list-children to get child documents - stream-document for chunked content retrieval - Cascade delete removes children when parent is deleted - list-documents filters children by default - PDF decoder (decoding/pdf/pdf_decoder.py): Updated to stream large documents from librarian API to temp file - Librarian service (librarian/service.py): Sends document_id instead of content for large PDFs (>2MB) - Deprecated tools (load_pdf.py, load_text.py): Added deprecation warnings directing users to tg-add-library-document + tg-start-library-processing Remove load_pdf and load_text utils Move chunker/librarian comms to base class Updating tests
2026-03-04 16:57:58 +00:00
config = {
'id': 'test-pdf-decoder',
'taskgroup': AsyncMock()
}
Incremental / large document loading (#659) Tech spec BlobStore (trustgraph-flow/trustgraph/librarian/blob_store.py): - get_stream() - yields document content in chunks for streaming retrieval - create_multipart_upload() - initializes S3 multipart upload, returns upload_id - upload_part() - uploads a single part, returns etag - complete_multipart_upload() - finalizes upload with part etags - abort_multipart_upload() - cancels and cleans up Cassandra schema (trustgraph-flow/trustgraph/tables/library.py): - New upload_session table with 24-hour TTL - Index on user for listing sessions - Prepared statements for all operations - Methods: create_upload_session(), get_upload_session(), update_upload_session_chunk(), delete_upload_session(), list_upload_sessions() - Schema extended with UploadSession, UploadProgress, and new request/response fields - Librarian methods: begin_upload, upload_chunk, complete_upload, abort_upload, get_upload_status, list_uploads - Service routing for all new operations - Python SDK with transparent chunked upload: - add_document() auto-switches to chunked for files > 10MB - Progress callback support (on_progress) - get_pending_uploads(), get_upload_status(), abort_upload(), resume_upload() - Document table: Added parent_id and document_type columns with index - Document schema (knowledge/document.py): Added document_id field for streaming retrieval - Librarian operations: - add-child-document for extracted PDF pages - list-children to get child documents - stream-document for chunked content retrieval - Cascade delete removes children when parent is deleted - list-documents filters children by default - PDF decoder (decoding/pdf/pdf_decoder.py): Updated to stream large documents from librarian API to temp file - Librarian service (librarian/service.py): Sends document_id instead of content for large PDFs (>2MB) - Deprecated tools (load_pdf.py, load_text.py): Added deprecation warnings directing users to tg-add-library-document + tg-start-library-processing Remove load_pdf and load_text utils Move chunker/librarian comms to base class Updating tests
2026-03-04 16:57:58 +00:00
processor = Processor(**config)
# Mock save_child_document to avoid waiting for librarian response
RabbitMQ pub/sub backend with topic exchange architecture (#752) Adds a RabbitMQ backend as an alternative to Pulsar, selectable via PUBSUB_BACKEND=rabbitmq. Both backends implement the same PubSubBackend protocol — no application code changes needed to switch. RabbitMQ topology: - Single topic exchange per topicspace (e.g. 'tg') - Routing key derived from queue class and topic name - Shared consumers: named queue bound to exchange (competing, round-robin) - Exclusive consumers: anonymous auto-delete queue (broadcast, each gets every message). Used by Subscriber and config push consumer. - Thread-local producer connections (pika is not thread-safe) - Push-based consumption via basic_consume with process_data_events for heartbeat processing Consumer model changes: - Consumer class creates one backend consumer per concurrent task (required for pika thread safety, harmless for Pulsar) - Consumer class accepts consumer_type parameter - Subscriber passes consumer_type='exclusive' for broadcast semantics - Config push consumer uses consumer_type='exclusive' so every processor instance receives config updates - handle_one_from_queue receives consumer as parameter for correct per-connection ack/nack LibrarianClient: - New shared client class replacing duplicated librarian request-response code across 6+ services (chunking, decoders, RAG, etc.) - Uses stream-document instead of get-document-content for fetching document content in 1MB chunks (avoids broker message size limits) - Standalone object (self.librarian = LibrarianClient(...)) not a mixin - get-document-content marked deprecated in schema and OpenAPI spec Serialisation: - Extracted dataclass_to_dict/dict_to_dataclass to shared serialization.py (used by both Pulsar and RabbitMQ backends) Librarian queues: - Changed from flow class (persistent) back to request/response class now that stream-document eliminates large single messages - API upload chunk size reduced from 5MB to 3MB to stay under broker limits after base64 encoding Factory and CLI: - get_pubsub() handles 'rabbitmq' backend with RabbitMQ connection params - add_pubsub_args() includes RabbitMQ options (host, port, credentials) - add_pubsub_args(standalone=True) defaults to localhost for CLI tools - init_trustgraph skips Pulsar admin setup for non-Pulsar backends - tg-dump-queues and tg-monitor-prompts use backend abstraction - BaseClient and ConfigClient accept generic pubsub config
2026-04-02 12:47:16 +01:00
processor.librarian.save_child_document = AsyncMock(return_value="mock-doc-id")
await processor.on_message(mock_msg, None, mock_flow)
# Verify output was sent for each page
assert mock_output_flow.send.call_count == 2
# Verify triples were sent for each page (provenance)
assert mock_triples_flow.send.call_count == 2
RabbitMQ pub/sub backend with topic exchange architecture (#752) Adds a RabbitMQ backend as an alternative to Pulsar, selectable via PUBSUB_BACKEND=rabbitmq. Both backends implement the same PubSubBackend protocol — no application code changes needed to switch. RabbitMQ topology: - Single topic exchange per topicspace (e.g. 'tg') - Routing key derived from queue class and topic name - Shared consumers: named queue bound to exchange (competing, round-robin) - Exclusive consumers: anonymous auto-delete queue (broadcast, each gets every message). Used by Subscriber and config push consumer. - Thread-local producer connections (pika is not thread-safe) - Push-based consumption via basic_consume with process_data_events for heartbeat processing Consumer model changes: - Consumer class creates one backend consumer per concurrent task (required for pika thread safety, harmless for Pulsar) - Consumer class accepts consumer_type parameter - Subscriber passes consumer_type='exclusive' for broadcast semantics - Config push consumer uses consumer_type='exclusive' so every processor instance receives config updates - handle_one_from_queue receives consumer as parameter for correct per-connection ack/nack LibrarianClient: - New shared client class replacing duplicated librarian request-response code across 6+ services (chunking, decoders, RAG, etc.) - Uses stream-document instead of get-document-content for fetching document content in 1MB chunks (avoids broker message size limits) - Standalone object (self.librarian = LibrarianClient(...)) not a mixin - get-document-content marked deprecated in schema and OpenAPI spec Serialisation: - Extracted dataclass_to_dict/dict_to_dataclass to shared serialization.py (used by both Pulsar and RabbitMQ backends) Librarian queues: - Changed from flow class (persistent) back to request/response class now that stream-document eliminates large single messages - API upload chunk size reduced from 5MB to 3MB to stay under broker limits after base64 encoding Factory and CLI: - get_pubsub() handles 'rabbitmq' backend with RabbitMQ connection params - add_pubsub_args() includes RabbitMQ options (host, port, credentials) - add_pubsub_args(standalone=True) defaults to localhost for CLI tools - init_trustgraph skips Pulsar admin setup for non-Pulsar backends - tg-dump-queues and tg-monitor-prompts use backend abstraction - BaseClient and ConfigClient accept generic pubsub config
2026-04-02 12:47:16 +01:00
@patch('trustgraph.base.librarian_client.Consumer')
@patch('trustgraph.base.librarian_client.Producer')
@patch('trustgraph.decoding.pdf.pdf_decoder.PyPDFLoader')
Incremental / large document loading (#659) Tech spec BlobStore (trustgraph-flow/trustgraph/librarian/blob_store.py): - get_stream() - yields document content in chunks for streaming retrieval - create_multipart_upload() - initializes S3 multipart upload, returns upload_id - upload_part() - uploads a single part, returns etag - complete_multipart_upload() - finalizes upload with part etags - abort_multipart_upload() - cancels and cleans up Cassandra schema (trustgraph-flow/trustgraph/tables/library.py): - New upload_session table with 24-hour TTL - Index on user for listing sessions - Prepared statements for all operations - Methods: create_upload_session(), get_upload_session(), update_upload_session_chunk(), delete_upload_session(), list_upload_sessions() - Schema extended with UploadSession, UploadProgress, and new request/response fields - Librarian methods: begin_upload, upload_chunk, complete_upload, abort_upload, get_upload_status, list_uploads - Service routing for all new operations - Python SDK with transparent chunked upload: - add_document() auto-switches to chunked for files > 10MB - Progress callback support (on_progress) - get_pending_uploads(), get_upload_status(), abort_upload(), resume_upload() - Document table: Added parent_id and document_type columns with index - Document schema (knowledge/document.py): Added document_id field for streaming retrieval - Librarian operations: - add-child-document for extracted PDF pages - list-children to get child documents - stream-document for chunked content retrieval - Cascade delete removes children when parent is deleted - list-documents filters children by default - PDF decoder (decoding/pdf/pdf_decoder.py): Updated to stream large documents from librarian API to temp file - Librarian service (librarian/service.py): Sends document_id instead of content for large PDFs (>2MB) - Deprecated tools (load_pdf.py, load_text.py): Added deprecation warnings directing users to tg-add-library-document + tg-start-library-processing Remove load_pdf and load_text utils Move chunker/librarian comms to base class Updating tests
2026-03-04 16:57:58 +00:00
@patch('trustgraph.base.async_processor.AsyncProcessor', MockAsyncProcessor)
RabbitMQ pub/sub backend with topic exchange architecture (#752) Adds a RabbitMQ backend as an alternative to Pulsar, selectable via PUBSUB_BACKEND=rabbitmq. Both backends implement the same PubSubBackend protocol — no application code changes needed to switch. RabbitMQ topology: - Single topic exchange per topicspace (e.g. 'tg') - Routing key derived from queue class and topic name - Shared consumers: named queue bound to exchange (competing, round-robin) - Exclusive consumers: anonymous auto-delete queue (broadcast, each gets every message). Used by Subscriber and config push consumer. - Thread-local producer connections (pika is not thread-safe) - Push-based consumption via basic_consume with process_data_events for heartbeat processing Consumer model changes: - Consumer class creates one backend consumer per concurrent task (required for pika thread safety, harmless for Pulsar) - Consumer class accepts consumer_type parameter - Subscriber passes consumer_type='exclusive' for broadcast semantics - Config push consumer uses consumer_type='exclusive' so every processor instance receives config updates - handle_one_from_queue receives consumer as parameter for correct per-connection ack/nack LibrarianClient: - New shared client class replacing duplicated librarian request-response code across 6+ services (chunking, decoders, RAG, etc.) - Uses stream-document instead of get-document-content for fetching document content in 1MB chunks (avoids broker message size limits) - Standalone object (self.librarian = LibrarianClient(...)) not a mixin - get-document-content marked deprecated in schema and OpenAPI spec Serialisation: - Extracted dataclass_to_dict/dict_to_dataclass to shared serialization.py (used by both Pulsar and RabbitMQ backends) Librarian queues: - Changed from flow class (persistent) back to request/response class now that stream-document eliminates large single messages - API upload chunk size reduced from 5MB to 3MB to stay under broker limits after base64 encoding Factory and CLI: - get_pubsub() handles 'rabbitmq' backend with RabbitMQ connection params - add_pubsub_args() includes RabbitMQ options (host, port, credentials) - add_pubsub_args(standalone=True) defaults to localhost for CLI tools - init_trustgraph skips Pulsar admin setup for non-Pulsar backends - tg-dump-queues and tg-monitor-prompts use backend abstraction - BaseClient and ConfigClient accept generic pubsub config
2026-04-02 12:47:16 +01:00
async def test_on_message_empty_pdf(self, mock_pdf_loader_class, mock_producer, mock_consumer):
"""Test handling of empty PDF"""
pdf_content = b"fake pdf content"
pdf_base64 = base64.b64encode(pdf_content).decode('utf-8')
Incremental / large document loading (#659) Tech spec BlobStore (trustgraph-flow/trustgraph/librarian/blob_store.py): - get_stream() - yields document content in chunks for streaming retrieval - create_multipart_upload() - initializes S3 multipart upload, returns upload_id - upload_part() - uploads a single part, returns etag - complete_multipart_upload() - finalizes upload with part etags - abort_multipart_upload() - cancels and cleans up Cassandra schema (trustgraph-flow/trustgraph/tables/library.py): - New upload_session table with 24-hour TTL - Index on user for listing sessions - Prepared statements for all operations - Methods: create_upload_session(), get_upload_session(), update_upload_session_chunk(), delete_upload_session(), list_upload_sessions() - Schema extended with UploadSession, UploadProgress, and new request/response fields - Librarian methods: begin_upload, upload_chunk, complete_upload, abort_upload, get_upload_status, list_uploads - Service routing for all new operations - Python SDK with transparent chunked upload: - add_document() auto-switches to chunked for files > 10MB - Progress callback support (on_progress) - get_pending_uploads(), get_upload_status(), abort_upload(), resume_upload() - Document table: Added parent_id and document_type columns with index - Document schema (knowledge/document.py): Added document_id field for streaming retrieval - Librarian operations: - add-child-document for extracted PDF pages - list-children to get child documents - stream-document for chunked content retrieval - Cascade delete removes children when parent is deleted - list-documents filters children by default - PDF decoder (decoding/pdf/pdf_decoder.py): Updated to stream large documents from librarian API to temp file - Librarian service (librarian/service.py): Sends document_id instead of content for large PDFs (>2MB) - Deprecated tools (load_pdf.py, load_text.py): Added deprecation warnings directing users to tg-add-library-document + tg-start-library-processing Remove load_pdf and load_text utils Move chunker/librarian comms to base class Updating tests
2026-03-04 16:57:58 +00:00
mock_loader = MagicMock()
mock_loader.load.return_value = []
mock_pdf_loader_class.return_value = mock_loader
Incremental / large document loading (#659) Tech spec BlobStore (trustgraph-flow/trustgraph/librarian/blob_store.py): - get_stream() - yields document content in chunks for streaming retrieval - create_multipart_upload() - initializes S3 multipart upload, returns upload_id - upload_part() - uploads a single part, returns etag - complete_multipart_upload() - finalizes upload with part etags - abort_multipart_upload() - cancels and cleans up Cassandra schema (trustgraph-flow/trustgraph/tables/library.py): - New upload_session table with 24-hour TTL - Index on user for listing sessions - Prepared statements for all operations - Methods: create_upload_session(), get_upload_session(), update_upload_session_chunk(), delete_upload_session(), list_upload_sessions() - Schema extended with UploadSession, UploadProgress, and new request/response fields - Librarian methods: begin_upload, upload_chunk, complete_upload, abort_upload, get_upload_status, list_uploads - Service routing for all new operations - Python SDK with transparent chunked upload: - add_document() auto-switches to chunked for files > 10MB - Progress callback support (on_progress) - get_pending_uploads(), get_upload_status(), abort_upload(), resume_upload() - Document table: Added parent_id and document_type columns with index - Document schema (knowledge/document.py): Added document_id field for streaming retrieval - Librarian operations: - add-child-document for extracted PDF pages - list-children to get child documents - stream-document for chunked content retrieval - Cascade delete removes children when parent is deleted - list-documents filters children by default - PDF decoder (decoding/pdf/pdf_decoder.py): Updated to stream large documents from librarian API to temp file - Librarian service (librarian/service.py): Sends document_id instead of content for large PDFs (>2MB) - Deprecated tools (load_pdf.py, load_text.py): Added deprecation warnings directing users to tg-add-library-document + tg-start-library-processing Remove load_pdf and load_text utils Move chunker/librarian comms to base class Updating tests
2026-03-04 16:57:58 +00:00
mock_metadata = Metadata(id="test-doc")
mock_document = Document(metadata=mock_metadata, data=pdf_base64)
mock_msg = MagicMock()
mock_msg.value.return_value = mock_document
Incremental / large document loading (#659) Tech spec BlobStore (trustgraph-flow/trustgraph/librarian/blob_store.py): - get_stream() - yields document content in chunks for streaming retrieval - create_multipart_upload() - initializes S3 multipart upload, returns upload_id - upload_part() - uploads a single part, returns etag - complete_multipart_upload() - finalizes upload with part etags - abort_multipart_upload() - cancels and cleans up Cassandra schema (trustgraph-flow/trustgraph/tables/library.py): - New upload_session table with 24-hour TTL - Index on user for listing sessions - Prepared statements for all operations - Methods: create_upload_session(), get_upload_session(), update_upload_session_chunk(), delete_upload_session(), list_upload_sessions() - Schema extended with UploadSession, UploadProgress, and new request/response fields - Librarian methods: begin_upload, upload_chunk, complete_upload, abort_upload, get_upload_status, list_uploads - Service routing for all new operations - Python SDK with transparent chunked upload: - add_document() auto-switches to chunked for files > 10MB - Progress callback support (on_progress) - get_pending_uploads(), get_upload_status(), abort_upload(), resume_upload() - Document table: Added parent_id and document_type columns with index - Document schema (knowledge/document.py): Added document_id field for streaming retrieval - Librarian operations: - add-child-document for extracted PDF pages - list-children to get child documents - stream-document for chunked content retrieval - Cascade delete removes children when parent is deleted - list-documents filters children by default - PDF decoder (decoding/pdf/pdf_decoder.py): Updated to stream large documents from librarian API to temp file - Librarian service (librarian/service.py): Sends document_id instead of content for large PDFs (>2MB) - Deprecated tools (load_pdf.py, load_text.py): Added deprecation warnings directing users to tg-add-library-document + tg-start-library-processing Remove load_pdf and load_text utils Move chunker/librarian comms to base class Updating tests
2026-03-04 16:57:58 +00:00
mock_output_flow = AsyncMock()
mock_flow = MagicMock(return_value=mock_output_flow)
Incremental / large document loading (#659) Tech spec BlobStore (trustgraph-flow/trustgraph/librarian/blob_store.py): - get_stream() - yields document content in chunks for streaming retrieval - create_multipart_upload() - initializes S3 multipart upload, returns upload_id - upload_part() - uploads a single part, returns etag - complete_multipart_upload() - finalizes upload with part etags - abort_multipart_upload() - cancels and cleans up Cassandra schema (trustgraph-flow/trustgraph/tables/library.py): - New upload_session table with 24-hour TTL - Index on user for listing sessions - Prepared statements for all operations - Methods: create_upload_session(), get_upload_session(), update_upload_session_chunk(), delete_upload_session(), list_upload_sessions() - Schema extended with UploadSession, UploadProgress, and new request/response fields - Librarian methods: begin_upload, upload_chunk, complete_upload, abort_upload, get_upload_status, list_uploads - Service routing for all new operations - Python SDK with transparent chunked upload: - add_document() auto-switches to chunked for files > 10MB - Progress callback support (on_progress) - get_pending_uploads(), get_upload_status(), abort_upload(), resume_upload() - Document table: Added parent_id and document_type columns with index - Document schema (knowledge/document.py): Added document_id field for streaming retrieval - Librarian operations: - add-child-document for extracted PDF pages - list-children to get child documents - stream-document for chunked content retrieval - Cascade delete removes children when parent is deleted - list-documents filters children by default - PDF decoder (decoding/pdf/pdf_decoder.py): Updated to stream large documents from librarian API to temp file - Librarian service (librarian/service.py): Sends document_id instead of content for large PDFs (>2MB) - Deprecated tools (load_pdf.py, load_text.py): Added deprecation warnings directing users to tg-add-library-document + tg-start-library-processing Remove load_pdf and load_text utils Move chunker/librarian comms to base class Updating tests
2026-03-04 16:57:58 +00:00
config = {
'id': 'test-pdf-decoder',
'taskgroup': AsyncMock()
}
Incremental / large document loading (#659) Tech spec BlobStore (trustgraph-flow/trustgraph/librarian/blob_store.py): - get_stream() - yields document content in chunks for streaming retrieval - create_multipart_upload() - initializes S3 multipart upload, returns upload_id - upload_part() - uploads a single part, returns etag - complete_multipart_upload() - finalizes upload with part etags - abort_multipart_upload() - cancels and cleans up Cassandra schema (trustgraph-flow/trustgraph/tables/library.py): - New upload_session table with 24-hour TTL - Index on user for listing sessions - Prepared statements for all operations - Methods: create_upload_session(), get_upload_session(), update_upload_session_chunk(), delete_upload_session(), list_upload_sessions() - Schema extended with UploadSession, UploadProgress, and new request/response fields - Librarian methods: begin_upload, upload_chunk, complete_upload, abort_upload, get_upload_status, list_uploads - Service routing for all new operations - Python SDK with transparent chunked upload: - add_document() auto-switches to chunked for files > 10MB - Progress callback support (on_progress) - get_pending_uploads(), get_upload_status(), abort_upload(), resume_upload() - Document table: Added parent_id and document_type columns with index - Document schema (knowledge/document.py): Added document_id field for streaming retrieval - Librarian operations: - add-child-document for extracted PDF pages - list-children to get child documents - stream-document for chunked content retrieval - Cascade delete removes children when parent is deleted - list-documents filters children by default - PDF decoder (decoding/pdf/pdf_decoder.py): Updated to stream large documents from librarian API to temp file - Librarian service (librarian/service.py): Sends document_id instead of content for large PDFs (>2MB) - Deprecated tools (load_pdf.py, load_text.py): Added deprecation warnings directing users to tg-add-library-document + tg-start-library-processing Remove load_pdf and load_text utils Move chunker/librarian comms to base class Updating tests
2026-03-04 16:57:58 +00:00
processor = Processor(**config)
await processor.on_message(mock_msg, None, mock_flow)
mock_output_flow.send.assert_not_called()
RabbitMQ pub/sub backend with topic exchange architecture (#752) Adds a RabbitMQ backend as an alternative to Pulsar, selectable via PUBSUB_BACKEND=rabbitmq. Both backends implement the same PubSubBackend protocol — no application code changes needed to switch. RabbitMQ topology: - Single topic exchange per topicspace (e.g. 'tg') - Routing key derived from queue class and topic name - Shared consumers: named queue bound to exchange (competing, round-robin) - Exclusive consumers: anonymous auto-delete queue (broadcast, each gets every message). Used by Subscriber and config push consumer. - Thread-local producer connections (pika is not thread-safe) - Push-based consumption via basic_consume with process_data_events for heartbeat processing Consumer model changes: - Consumer class creates one backend consumer per concurrent task (required for pika thread safety, harmless for Pulsar) - Consumer class accepts consumer_type parameter - Subscriber passes consumer_type='exclusive' for broadcast semantics - Config push consumer uses consumer_type='exclusive' so every processor instance receives config updates - handle_one_from_queue receives consumer as parameter for correct per-connection ack/nack LibrarianClient: - New shared client class replacing duplicated librarian request-response code across 6+ services (chunking, decoders, RAG, etc.) - Uses stream-document instead of get-document-content for fetching document content in 1MB chunks (avoids broker message size limits) - Standalone object (self.librarian = LibrarianClient(...)) not a mixin - get-document-content marked deprecated in schema and OpenAPI spec Serialisation: - Extracted dataclass_to_dict/dict_to_dataclass to shared serialization.py (used by both Pulsar and RabbitMQ backends) Librarian queues: - Changed from flow class (persistent) back to request/response class now that stream-document eliminates large single messages - API upload chunk size reduced from 5MB to 3MB to stay under broker limits after base64 encoding Factory and CLI: - get_pubsub() handles 'rabbitmq' backend with RabbitMQ connection params - add_pubsub_args() includes RabbitMQ options (host, port, credentials) - add_pubsub_args(standalone=True) defaults to localhost for CLI tools - init_trustgraph skips Pulsar admin setup for non-Pulsar backends - tg-dump-queues and tg-monitor-prompts use backend abstraction - BaseClient and ConfigClient accept generic pubsub config
2026-04-02 12:47:16 +01:00
@patch('trustgraph.base.librarian_client.Consumer')
@patch('trustgraph.base.librarian_client.Producer')
@patch('trustgraph.decoding.pdf.pdf_decoder.PyPDFLoader')
Incremental / large document loading (#659) Tech spec BlobStore (trustgraph-flow/trustgraph/librarian/blob_store.py): - get_stream() - yields document content in chunks for streaming retrieval - create_multipart_upload() - initializes S3 multipart upload, returns upload_id - upload_part() - uploads a single part, returns etag - complete_multipart_upload() - finalizes upload with part etags - abort_multipart_upload() - cancels and cleans up Cassandra schema (trustgraph-flow/trustgraph/tables/library.py): - New upload_session table with 24-hour TTL - Index on user for listing sessions - Prepared statements for all operations - Methods: create_upload_session(), get_upload_session(), update_upload_session_chunk(), delete_upload_session(), list_upload_sessions() - Schema extended with UploadSession, UploadProgress, and new request/response fields - Librarian methods: begin_upload, upload_chunk, complete_upload, abort_upload, get_upload_status, list_uploads - Service routing for all new operations - Python SDK with transparent chunked upload: - add_document() auto-switches to chunked for files > 10MB - Progress callback support (on_progress) - get_pending_uploads(), get_upload_status(), abort_upload(), resume_upload() - Document table: Added parent_id and document_type columns with index - Document schema (knowledge/document.py): Added document_id field for streaming retrieval - Librarian operations: - add-child-document for extracted PDF pages - list-children to get child documents - stream-document for chunked content retrieval - Cascade delete removes children when parent is deleted - list-documents filters children by default - PDF decoder (decoding/pdf/pdf_decoder.py): Updated to stream large documents from librarian API to temp file - Librarian service (librarian/service.py): Sends document_id instead of content for large PDFs (>2MB) - Deprecated tools (load_pdf.py, load_text.py): Added deprecation warnings directing users to tg-add-library-document + tg-start-library-processing Remove load_pdf and load_text utils Move chunker/librarian comms to base class Updating tests
2026-03-04 16:57:58 +00:00
@patch('trustgraph.base.async_processor.AsyncProcessor', MockAsyncProcessor)
RabbitMQ pub/sub backend with topic exchange architecture (#752) Adds a RabbitMQ backend as an alternative to Pulsar, selectable via PUBSUB_BACKEND=rabbitmq. Both backends implement the same PubSubBackend protocol — no application code changes needed to switch. RabbitMQ topology: - Single topic exchange per topicspace (e.g. 'tg') - Routing key derived from queue class and topic name - Shared consumers: named queue bound to exchange (competing, round-robin) - Exclusive consumers: anonymous auto-delete queue (broadcast, each gets every message). Used by Subscriber and config push consumer. - Thread-local producer connections (pika is not thread-safe) - Push-based consumption via basic_consume with process_data_events for heartbeat processing Consumer model changes: - Consumer class creates one backend consumer per concurrent task (required for pika thread safety, harmless for Pulsar) - Consumer class accepts consumer_type parameter - Subscriber passes consumer_type='exclusive' for broadcast semantics - Config push consumer uses consumer_type='exclusive' so every processor instance receives config updates - handle_one_from_queue receives consumer as parameter for correct per-connection ack/nack LibrarianClient: - New shared client class replacing duplicated librarian request-response code across 6+ services (chunking, decoders, RAG, etc.) - Uses stream-document instead of get-document-content for fetching document content in 1MB chunks (avoids broker message size limits) - Standalone object (self.librarian = LibrarianClient(...)) not a mixin - get-document-content marked deprecated in schema and OpenAPI spec Serialisation: - Extracted dataclass_to_dict/dict_to_dataclass to shared serialization.py (used by both Pulsar and RabbitMQ backends) Librarian queues: - Changed from flow class (persistent) back to request/response class now that stream-document eliminates large single messages - API upload chunk size reduced from 5MB to 3MB to stay under broker limits after base64 encoding Factory and CLI: - get_pubsub() handles 'rabbitmq' backend with RabbitMQ connection params - add_pubsub_args() includes RabbitMQ options (host, port, credentials) - add_pubsub_args(standalone=True) defaults to localhost for CLI tools - init_trustgraph skips Pulsar admin setup for non-Pulsar backends - tg-dump-queues and tg-monitor-prompts use backend abstraction - BaseClient and ConfigClient accept generic pubsub config
2026-04-02 12:47:16 +01:00
async def test_on_message_unicode_content(self, mock_pdf_loader_class, mock_producer, mock_consumer):
"""Test handling of unicode content in PDF"""
pdf_content = b"fake pdf content"
pdf_base64 = base64.b64encode(pdf_content).decode('utf-8')
Incremental / large document loading (#659) Tech spec BlobStore (trustgraph-flow/trustgraph/librarian/blob_store.py): - get_stream() - yields document content in chunks for streaming retrieval - create_multipart_upload() - initializes S3 multipart upload, returns upload_id - upload_part() - uploads a single part, returns etag - complete_multipart_upload() - finalizes upload with part etags - abort_multipart_upload() - cancels and cleans up Cassandra schema (trustgraph-flow/trustgraph/tables/library.py): - New upload_session table with 24-hour TTL - Index on user for listing sessions - Prepared statements for all operations - Methods: create_upload_session(), get_upload_session(), update_upload_session_chunk(), delete_upload_session(), list_upload_sessions() - Schema extended with UploadSession, UploadProgress, and new request/response fields - Librarian methods: begin_upload, upload_chunk, complete_upload, abort_upload, get_upload_status, list_uploads - Service routing for all new operations - Python SDK with transparent chunked upload: - add_document() auto-switches to chunked for files > 10MB - Progress callback support (on_progress) - get_pending_uploads(), get_upload_status(), abort_upload(), resume_upload() - Document table: Added parent_id and document_type columns with index - Document schema (knowledge/document.py): Added document_id field for streaming retrieval - Librarian operations: - add-child-document for extracted PDF pages - list-children to get child documents - stream-document for chunked content retrieval - Cascade delete removes children when parent is deleted - list-documents filters children by default - PDF decoder (decoding/pdf/pdf_decoder.py): Updated to stream large documents from librarian API to temp file - Librarian service (librarian/service.py): Sends document_id instead of content for large PDFs (>2MB) - Deprecated tools (load_pdf.py, load_text.py): Added deprecation warnings directing users to tg-add-library-document + tg-start-library-processing Remove load_pdf and load_text utils Move chunker/librarian comms to base class Updating tests
2026-03-04 16:57:58 +00:00
mock_loader = MagicMock()
mock_page = MagicMock(page_content="Page with unicode: 你好世界 🌍")
mock_loader.load.return_value = [mock_page]
mock_pdf_loader_class.return_value = mock_loader
Incremental / large document loading (#659) Tech spec BlobStore (trustgraph-flow/trustgraph/librarian/blob_store.py): - get_stream() - yields document content in chunks for streaming retrieval - create_multipart_upload() - initializes S3 multipart upload, returns upload_id - upload_part() - uploads a single part, returns etag - complete_multipart_upload() - finalizes upload with part etags - abort_multipart_upload() - cancels and cleans up Cassandra schema (trustgraph-flow/trustgraph/tables/library.py): - New upload_session table with 24-hour TTL - Index on user for listing sessions - Prepared statements for all operations - Methods: create_upload_session(), get_upload_session(), update_upload_session_chunk(), delete_upload_session(), list_upload_sessions() - Schema extended with UploadSession, UploadProgress, and new request/response fields - Librarian methods: begin_upload, upload_chunk, complete_upload, abort_upload, get_upload_status, list_uploads - Service routing for all new operations - Python SDK with transparent chunked upload: - add_document() auto-switches to chunked for files > 10MB - Progress callback support (on_progress) - get_pending_uploads(), get_upload_status(), abort_upload(), resume_upload() - Document table: Added parent_id and document_type columns with index - Document schema (knowledge/document.py): Added document_id field for streaming retrieval - Librarian operations: - add-child-document for extracted PDF pages - list-children to get child documents - stream-document for chunked content retrieval - Cascade delete removes children when parent is deleted - list-documents filters children by default - PDF decoder (decoding/pdf/pdf_decoder.py): Updated to stream large documents from librarian API to temp file - Librarian service (librarian/service.py): Sends document_id instead of content for large PDFs (>2MB) - Deprecated tools (load_pdf.py, load_text.py): Added deprecation warnings directing users to tg-add-library-document + tg-start-library-processing Remove load_pdf and load_text utils Move chunker/librarian comms to base class Updating tests
2026-03-04 16:57:58 +00:00
mock_metadata = Metadata(id="test-doc")
mock_document = Document(metadata=mock_metadata, data=pdf_base64)
mock_msg = MagicMock()
mock_msg.value.return_value = mock_document
Incremental / large document loading (#659) Tech spec BlobStore (trustgraph-flow/trustgraph/librarian/blob_store.py): - get_stream() - yields document content in chunks for streaming retrieval - create_multipart_upload() - initializes S3 multipart upload, returns upload_id - upload_part() - uploads a single part, returns etag - complete_multipart_upload() - finalizes upload with part etags - abort_multipart_upload() - cancels and cleans up Cassandra schema (trustgraph-flow/trustgraph/tables/library.py): - New upload_session table with 24-hour TTL - Index on user for listing sessions - Prepared statements for all operations - Methods: create_upload_session(), get_upload_session(), update_upload_session_chunk(), delete_upload_session(), list_upload_sessions() - Schema extended with UploadSession, UploadProgress, and new request/response fields - Librarian methods: begin_upload, upload_chunk, complete_upload, abort_upload, get_upload_status, list_uploads - Service routing for all new operations - Python SDK with transparent chunked upload: - add_document() auto-switches to chunked for files > 10MB - Progress callback support (on_progress) - get_pending_uploads(), get_upload_status(), abort_upload(), resume_upload() - Document table: Added parent_id and document_type columns with index - Document schema (knowledge/document.py): Added document_id field for streaming retrieval - Librarian operations: - add-child-document for extracted PDF pages - list-children to get child documents - stream-document for chunked content retrieval - Cascade delete removes children when parent is deleted - list-documents filters children by default - PDF decoder (decoding/pdf/pdf_decoder.py): Updated to stream large documents from librarian API to temp file - Librarian service (librarian/service.py): Sends document_id instead of content for large PDFs (>2MB) - Deprecated tools (load_pdf.py, load_text.py): Added deprecation warnings directing users to tg-add-library-document + tg-start-library-processing Remove load_pdf and load_text utils Move chunker/librarian comms to base class Updating tests
2026-03-04 16:57:58 +00:00
# Mock flow - separate mocks for output and triples
mock_output_flow = AsyncMock()
mock_triples_flow = AsyncMock()
mock_flow = MagicMock(side_effect=lambda name: {
"output": mock_output_flow,
"triples": mock_triples_flow,
}.get(name))
Incremental / large document loading (#659) Tech spec BlobStore (trustgraph-flow/trustgraph/librarian/blob_store.py): - get_stream() - yields document content in chunks for streaming retrieval - create_multipart_upload() - initializes S3 multipart upload, returns upload_id - upload_part() - uploads a single part, returns etag - complete_multipart_upload() - finalizes upload with part etags - abort_multipart_upload() - cancels and cleans up Cassandra schema (trustgraph-flow/trustgraph/tables/library.py): - New upload_session table with 24-hour TTL - Index on user for listing sessions - Prepared statements for all operations - Methods: create_upload_session(), get_upload_session(), update_upload_session_chunk(), delete_upload_session(), list_upload_sessions() - Schema extended with UploadSession, UploadProgress, and new request/response fields - Librarian methods: begin_upload, upload_chunk, complete_upload, abort_upload, get_upload_status, list_uploads - Service routing for all new operations - Python SDK with transparent chunked upload: - add_document() auto-switches to chunked for files > 10MB - Progress callback support (on_progress) - get_pending_uploads(), get_upload_status(), abort_upload(), resume_upload() - Document table: Added parent_id and document_type columns with index - Document schema (knowledge/document.py): Added document_id field for streaming retrieval - Librarian operations: - add-child-document for extracted PDF pages - list-children to get child documents - stream-document for chunked content retrieval - Cascade delete removes children when parent is deleted - list-documents filters children by default - PDF decoder (decoding/pdf/pdf_decoder.py): Updated to stream large documents from librarian API to temp file - Librarian service (librarian/service.py): Sends document_id instead of content for large PDFs (>2MB) - Deprecated tools (load_pdf.py, load_text.py): Added deprecation warnings directing users to tg-add-library-document + tg-start-library-processing Remove load_pdf and load_text utils Move chunker/librarian comms to base class Updating tests
2026-03-04 16:57:58 +00:00
config = {
'id': 'test-pdf-decoder',
'taskgroup': AsyncMock()
}
Incremental / large document loading (#659) Tech spec BlobStore (trustgraph-flow/trustgraph/librarian/blob_store.py): - get_stream() - yields document content in chunks for streaming retrieval - create_multipart_upload() - initializes S3 multipart upload, returns upload_id - upload_part() - uploads a single part, returns etag - complete_multipart_upload() - finalizes upload with part etags - abort_multipart_upload() - cancels and cleans up Cassandra schema (trustgraph-flow/trustgraph/tables/library.py): - New upload_session table with 24-hour TTL - Index on user for listing sessions - Prepared statements for all operations - Methods: create_upload_session(), get_upload_session(), update_upload_session_chunk(), delete_upload_session(), list_upload_sessions() - Schema extended with UploadSession, UploadProgress, and new request/response fields - Librarian methods: begin_upload, upload_chunk, complete_upload, abort_upload, get_upload_status, list_uploads - Service routing for all new operations - Python SDK with transparent chunked upload: - add_document() auto-switches to chunked for files > 10MB - Progress callback support (on_progress) - get_pending_uploads(), get_upload_status(), abort_upload(), resume_upload() - Document table: Added parent_id and document_type columns with index - Document schema (knowledge/document.py): Added document_id field for streaming retrieval - Librarian operations: - add-child-document for extracted PDF pages - list-children to get child documents - stream-document for chunked content retrieval - Cascade delete removes children when parent is deleted - list-documents filters children by default - PDF decoder (decoding/pdf/pdf_decoder.py): Updated to stream large documents from librarian API to temp file - Librarian service (librarian/service.py): Sends document_id instead of content for large PDFs (>2MB) - Deprecated tools (load_pdf.py, load_text.py): Added deprecation warnings directing users to tg-add-library-document + tg-start-library-processing Remove load_pdf and load_text utils Move chunker/librarian comms to base class Updating tests
2026-03-04 16:57:58 +00:00
processor = Processor(**config)
# Mock save_child_document to avoid waiting for librarian response
RabbitMQ pub/sub backend with topic exchange architecture (#752) Adds a RabbitMQ backend as an alternative to Pulsar, selectable via PUBSUB_BACKEND=rabbitmq. Both backends implement the same PubSubBackend protocol — no application code changes needed to switch. RabbitMQ topology: - Single topic exchange per topicspace (e.g. 'tg') - Routing key derived from queue class and topic name - Shared consumers: named queue bound to exchange (competing, round-robin) - Exclusive consumers: anonymous auto-delete queue (broadcast, each gets every message). Used by Subscriber and config push consumer. - Thread-local producer connections (pika is not thread-safe) - Push-based consumption via basic_consume with process_data_events for heartbeat processing Consumer model changes: - Consumer class creates one backend consumer per concurrent task (required for pika thread safety, harmless for Pulsar) - Consumer class accepts consumer_type parameter - Subscriber passes consumer_type='exclusive' for broadcast semantics - Config push consumer uses consumer_type='exclusive' so every processor instance receives config updates - handle_one_from_queue receives consumer as parameter for correct per-connection ack/nack LibrarianClient: - New shared client class replacing duplicated librarian request-response code across 6+ services (chunking, decoders, RAG, etc.) - Uses stream-document instead of get-document-content for fetching document content in 1MB chunks (avoids broker message size limits) - Standalone object (self.librarian = LibrarianClient(...)) not a mixin - get-document-content marked deprecated in schema and OpenAPI spec Serialisation: - Extracted dataclass_to_dict/dict_to_dataclass to shared serialization.py (used by both Pulsar and RabbitMQ backends) Librarian queues: - Changed from flow class (persistent) back to request/response class now that stream-document eliminates large single messages - API upload chunk size reduced from 5MB to 3MB to stay under broker limits after base64 encoding Factory and CLI: - get_pubsub() handles 'rabbitmq' backend with RabbitMQ connection params - add_pubsub_args() includes RabbitMQ options (host, port, credentials) - add_pubsub_args(standalone=True) defaults to localhost for CLI tools - init_trustgraph skips Pulsar admin setup for non-Pulsar backends - tg-dump-queues and tg-monitor-prompts use backend abstraction - BaseClient and ConfigClient accept generic pubsub config
2026-04-02 12:47:16 +01:00
processor.librarian.save_child_document = AsyncMock(return_value="mock-doc-id")
await processor.on_message(mock_msg, None, mock_flow)
mock_output_flow.send.assert_called_once()
call_args = mock_output_flow.send.call_args[0][0]
# PDF decoder now forwards document_id with UUID-based URN
assert call_args.document_id.startswith("urn:page:")
assert call_args.text == b"" # Content stored in librarian, not inline
@patch('trustgraph.base.flow_processor.FlowProcessor.add_args')
def test_add_args(self, mock_parent_add_args):
"""Test add_args calls parent method"""
mock_parser = MagicMock()
Processor.add_args(mock_parser)
mock_parent_add_args.assert_called_once_with(mock_parser)
@patch('trustgraph.decoding.pdf.pdf_decoder.Processor.launch')
def test_run(self, mock_launch):
"""Test run function"""
from trustgraph.decoding.pdf.pdf_decoder import run
run()
mock_launch.assert_called_once_with("document-decoder",
Incremental / large document loading (#659) Tech spec BlobStore (trustgraph-flow/trustgraph/librarian/blob_store.py): - get_stream() - yields document content in chunks for streaming retrieval - create_multipart_upload() - initializes S3 multipart upload, returns upload_id - upload_part() - uploads a single part, returns etag - complete_multipart_upload() - finalizes upload with part etags - abort_multipart_upload() - cancels and cleans up Cassandra schema (trustgraph-flow/trustgraph/tables/library.py): - New upload_session table with 24-hour TTL - Index on user for listing sessions - Prepared statements for all operations - Methods: create_upload_session(), get_upload_session(), update_upload_session_chunk(), delete_upload_session(), list_upload_sessions() - Schema extended with UploadSession, UploadProgress, and new request/response fields - Librarian methods: begin_upload, upload_chunk, complete_upload, abort_upload, get_upload_status, list_uploads - Service routing for all new operations - Python SDK with transparent chunked upload: - add_document() auto-switches to chunked for files > 10MB - Progress callback support (on_progress) - get_pending_uploads(), get_upload_status(), abort_upload(), resume_upload() - Document table: Added parent_id and document_type columns with index - Document schema (knowledge/document.py): Added document_id field for streaming retrieval - Librarian operations: - add-child-document for extracted PDF pages - list-children to get child documents - stream-document for chunked content retrieval - Cascade delete removes children when parent is deleted - list-documents filters children by default - PDF decoder (decoding/pdf/pdf_decoder.py): Updated to stream large documents from librarian API to temp file - Librarian service (librarian/service.py): Sends document_id instead of content for large PDFs (>2MB) - Deprecated tools (load_pdf.py, load_text.py): Added deprecation warnings directing users to tg-add-library-document + tg-start-library-processing Remove load_pdf and load_text utils Move chunker/librarian comms to base class Updating tests
2026-03-04 16:57:58 +00:00
"\nSimple decoder, accepts PDF documents on input, outputs pages from the\nPDF document as text as separate output objects.\n\nSupports both inline document data and fetching from librarian via Pulsar\nfor large documents.\n")
if __name__ == '__main__':
Incremental / large document loading (#659) Tech spec BlobStore (trustgraph-flow/trustgraph/librarian/blob_store.py): - get_stream() - yields document content in chunks for streaming retrieval - create_multipart_upload() - initializes S3 multipart upload, returns upload_id - upload_part() - uploads a single part, returns etag - complete_multipart_upload() - finalizes upload with part etags - abort_multipart_upload() - cancels and cleans up Cassandra schema (trustgraph-flow/trustgraph/tables/library.py): - New upload_session table with 24-hour TTL - Index on user for listing sessions - Prepared statements for all operations - Methods: create_upload_session(), get_upload_session(), update_upload_session_chunk(), delete_upload_session(), list_upload_sessions() - Schema extended with UploadSession, UploadProgress, and new request/response fields - Librarian methods: begin_upload, upload_chunk, complete_upload, abort_upload, get_upload_status, list_uploads - Service routing for all new operations - Python SDK with transparent chunked upload: - add_document() auto-switches to chunked for files > 10MB - Progress callback support (on_progress) - get_pending_uploads(), get_upload_status(), abort_upload(), resume_upload() - Document table: Added parent_id and document_type columns with index - Document schema (knowledge/document.py): Added document_id field for streaming retrieval - Librarian operations: - add-child-document for extracted PDF pages - list-children to get child documents - stream-document for chunked content retrieval - Cascade delete removes children when parent is deleted - list-documents filters children by default - PDF decoder (decoding/pdf/pdf_decoder.py): Updated to stream large documents from librarian API to temp file - Librarian service (librarian/service.py): Sends document_id instead of content for large PDFs (>2MB) - Deprecated tools (load_pdf.py, load_text.py): Added deprecation warnings directing users to tg-add-library-document + tg-start-library-processing Remove load_pdf and load_text utils Move chunker/librarian comms to base class Updating tests
2026-03-04 16:57:58 +00:00
pytest.main([__file__])