diff --git a/tests/unit/test_decoding/test_pdf_decoder.py b/tests/unit/test_decoding/test_pdf_decoder.py index 04807b20..641a9d78 100644 --- a/tests/unit/test_decoding/test_pdf_decoder.py +++ b/tests/unit/test_decoding/test_pdf_decoder.py @@ -49,7 +49,7 @@ class TestPdfDecoderProcessor(IsolatedAsyncioTestCase): async def test_on_message_success(self, mock_pdf_loader_class, mock_producer, mock_consumer): """Test successful PDF processing""" # Mock PDF content - pdf_content = b"fake pdf content" + pdf_content = b"%PDF-1.7\nfake pdf content" pdf_base64 = base64.b64encode(pdf_content).decode('utf-8') # Mock PyPDFLoader @@ -88,13 +88,55 @@ class TestPdfDecoderProcessor(IsolatedAsyncioTestCase): # Verify triples were sent for each page (provenance) assert mock_triples_flow.send.call_count == 2 + @patch('trustgraph.base.librarian_client.Consumer') + @patch('trustgraph.base.librarian_client.Producer') + @patch('trustgraph.decoding.pdf.pdf_decoder.PyPDFLoader') + @patch('trustgraph.base.async_processor.AsyncProcessor', MockAsyncProcessor) + async def test_on_message_rejects_librarian_content_that_is_not_pdf(self, mock_pdf_loader_class, mock_producer, mock_consumer): + """Test rejecting non-PDF content before invoking the PDF loader""" + html_content = b"Not found" + html_base64 = base64.b64encode(html_content) + + mock_metadata = Metadata(id="test-doc") + mock_document = Document(metadata=mock_metadata, document_id="doc-123") + mock_msg = MagicMock() + mock_msg.value.return_value = mock_document + + mock_output_flow = AsyncMock() + mock_triples_flow = AsyncMock() + mock_flow = MagicMock(side_effect=lambda name: { + "output": mock_output_flow, + "triples": mock_triples_flow, + }.get(name)) + mock_flow.librarian.fetch_document_metadata = AsyncMock( + return_value=MagicMock(kind="application/pdf") + ) + mock_flow.librarian.fetch_document_content = AsyncMock( + return_value=html_base64 + ) + mock_flow.librarian.save_child_document = AsyncMock() + + config = { + 'id': 'test-pdf-decoder', + 'taskgroup': AsyncMock() + } + + processor = Processor(**config) + + await processor.on_message(mock_msg, None, mock_flow) + + mock_pdf_loader_class.assert_not_called() + mock_output_flow.send.assert_not_called() + mock_triples_flow.send.assert_not_called() + mock_flow.librarian.save_child_document.assert_not_called() + @patch('trustgraph.base.librarian_client.Consumer') @patch('trustgraph.base.librarian_client.Producer') @patch('trustgraph.decoding.pdf.pdf_decoder.PyPDFLoader') @patch('trustgraph.base.async_processor.AsyncProcessor', MockAsyncProcessor) async def test_on_message_empty_pdf(self, mock_pdf_loader_class, mock_producer, mock_consumer): """Test handling of empty PDF""" - pdf_content = b"fake pdf content" + pdf_content = b"%PDF-1.7\nfake pdf content" pdf_base64 = base64.b64encode(pdf_content).decode('utf-8') mock_loader = MagicMock() @@ -126,7 +168,7 @@ class TestPdfDecoderProcessor(IsolatedAsyncioTestCase): @patch('trustgraph.base.async_processor.AsyncProcessor', MockAsyncProcessor) async def test_on_message_unicode_content(self, mock_pdf_loader_class, mock_producer, mock_consumer): """Test handling of unicode content in PDF""" - pdf_content = b"fake pdf content" + pdf_content = b"%PDF-1.7\nfake pdf content" pdf_base64 = base64.b64encode(pdf_content).decode('utf-8') mock_loader = MagicMock() diff --git a/tests/unit/test_query/test_rows_cassandra_query.py b/tests/unit/test_query/test_rows_cassandra_query.py index b61500a4..fb385f43 100644 --- a/tests/unit/test_query/test_rows_cassandra_query.py +++ b/tests/unit/test_query/test_rows_cassandra_query.py @@ -333,8 +333,8 @@ class TestUnifiedTableQueries: """Test queries against the unified rows table""" @pytest.mark.asyncio - @patch('trustgraph.query.rows.cassandra.service.async_execute', new_callable=AsyncMock) - async def test_query_with_index_match(self, mock_async_execute): + @patch('trustgraph.query.rows.cassandra.service.async_execute_paged', new_callable=AsyncMock) + async def test_query_with_index_match(self, mock_async_execute_paged): """Test query execution with matching index""" processor = MagicMock() processor.session = MagicMock() @@ -344,10 +344,10 @@ class TestUnifiedTableQueries: processor.find_matching_index = Processor.find_matching_index.__get__(processor, Processor) processor.query_cassandra = Processor.query_cassandra.__get__(processor, Processor) - # Mock async_execute to return test data + # Mock async_execute_paged to return test data (list of pages) mock_row = MagicMock() mock_row.data = {"id": "123", "name": "Test Product", "category": "electronics"} - mock_async_execute.return_value = [mock_row] + mock_async_execute_paged.return_value = [[mock_row]] schema = RowSchema( name="products", @@ -370,10 +370,10 @@ class TestUnifiedTableQueries: # Verify Cassandra was connected and queried processor.connect_cassandra.assert_called_once() - mock_async_execute.assert_called_once() + mock_async_execute_paged.assert_called_once() # Verify query structure - should query unified rows table - call_args = mock_async_execute.call_args + call_args = mock_async_execute_paged.call_args query = call_args[0][1] params = call_args[0][2] @@ -394,8 +394,8 @@ class TestUnifiedTableQueries: assert results[0]["category"] == "electronics" @pytest.mark.asyncio - @patch('trustgraph.query.rows.cassandra.service.async_execute', new_callable=AsyncMock) - async def test_query_without_index_match(self, mock_async_execute): + @patch('trustgraph.query.rows.cassandra.service.async_scan', new_callable=AsyncMock) + async def test_query_without_index_match(self, mock_async_scan): """Test query execution without matching index (scan mode)""" processor = MagicMock() processor.session = MagicMock() @@ -406,12 +406,10 @@ class TestUnifiedTableQueries: processor._matches_filters = Processor._matches_filters.__get__(processor, Processor) processor.query_cassandra = Processor.query_cassandra.__get__(processor, Processor) - # Mock async_execute to return test data + # Mock async_scan to return filtered test data mock_row1 = MagicMock() mock_row1.data = {"id": "1", "name": "Product A", "price": "100"} - mock_row2 = MagicMock() - mock_row2.data = {"id": "2", "name": "Product B", "price": "200"} - mock_async_execute.return_value = [mock_row1, mock_row2] + mock_async_scan.return_value = [mock_row1] schema = RowSchema( name="products", @@ -432,13 +430,16 @@ class TestUnifiedTableQueries: limit=10 ) - # Query should use ALLOW FILTERING for scan - call_args = mock_async_execute.call_args + # Verify async_scan was called + mock_async_scan.assert_called_once() + + # Verify query structure + call_args = mock_async_scan.call_args query = call_args[0][1] assert "ALLOW FILTERING" in query - # Should post-filter results + # Should return filtered results assert len(results) == 1 assert results[0]["name"] == "Product A" diff --git a/trustgraph-flow/trustgraph/decoding/pdf/pdf_decoder.py b/trustgraph-flow/trustgraph/decoding/pdf/pdf_decoder.py index ca242265..ae393028 100755 --- a/trustgraph-flow/trustgraph/decoding/pdf/pdf_decoder.py +++ b/trustgraph-flow/trustgraph/decoding/pdf/pdf_decoder.py @@ -32,6 +32,10 @@ logger = logging.getLogger(__name__) default_ident = "document-decoder" +def _looks_like_pdf(content): + return content.lstrip().startswith(b"%PDF-") + + class Processor(FlowProcessor): def __init__(self, **params): @@ -94,33 +98,37 @@ class Processor(FlowProcessor): ) return - with tempfile.NamedTemporaryFile(delete_on_close=False, suffix='.pdf') as fp: + # Check if we should fetch from librarian or use inline data + if v.document_id: + # Fetch from librarian via Pulsar + logger.info(f"Fetching document {v.document_id} from librarian...") + + content = await flow.librarian.fetch_document_content( + document_id=v.document_id, + + ) + + # Content is base64 encoded + if isinstance(content, str): + content = content.encode('utf-8') + decoded_content = base64.b64decode(content) + + logger.info(f"Fetched {len(decoded_content)} bytes from librarian") + else: + # Use inline data (backward compatibility) + decoded_content = base64.b64decode(v.data) + + if not _looks_like_pdf(decoded_content): + logger.error( + f"Document {v.metadata.id} is not valid PDF content. " + f"Ignoring document." + ) + return + + with tempfile.NamedTemporaryFile(delete=False, suffix='.pdf') as fp: temp_path = fp.name - - # Check if we should fetch from librarian or use inline data - if v.document_id: - # Fetch from librarian via Pulsar - logger.info(f"Fetching document {v.document_id} from librarian...") - fp.close() - - content = await flow.librarian.fetch_document_content( - document_id=v.document_id, - - ) - - # Content is base64 encoded - if isinstance(content, str): - content = content.encode('utf-8') - decoded_content = base64.b64decode(content) - - with open(temp_path, 'wb') as f: - f.write(decoded_content) - - logger.info(f"Fetched {len(decoded_content)} bytes from librarian") - else: - # Use inline data (backward compatibility) - fp.write(base64.b64decode(v.data)) - fp.close() + fp.write(decoded_content) + fp.close() global PyPDFLoader if PyPDFLoader is None: