diff --git a/tests/unit/test_retrieval/test_structured_diag/__init__.py b/tests/unit/test_retrieval/test_structured_diag/__init__.py new file mode 100644 index 00000000..a900cbbb --- /dev/null +++ b/tests/unit/test_retrieval/test_structured_diag/__init__.py @@ -0,0 +1,3 @@ +""" +Unit and contract tests for structured-diag service +""" \ No newline at end of file diff --git a/tests/unit/test_retrieval/test_structured_diag/test_message_translation.py b/tests/unit/test_retrieval/test_structured_diag/test_message_translation.py new file mode 100644 index 00000000..7a113250 --- /dev/null +++ b/tests/unit/test_retrieval/test_structured_diag/test_message_translation.py @@ -0,0 +1,172 @@ +""" +Unit tests for message translation in structured-diag service +""" + +import pytest +from trustgraph.messaging.translators.diagnosis import ( + StructuredDataDiagnosisRequestTranslator, + StructuredDataDiagnosisResponseTranslator +) +from trustgraph.schema.services.diagnosis import ( + StructuredDataDiagnosisRequest, + StructuredDataDiagnosisResponse +) + + +class TestRequestTranslation: + """Test request message translation""" + + def test_translate_schema_selection_request(self): + """Test translating schema-selection request from API to Pulsar""" + translator = StructuredDataDiagnosisRequestTranslator() + + # API format (with hyphens) + api_data = { + "operation": "schema-selection", + "sample": "test data sample", + "options": {"filter": "catalog"} + } + + # Translate to Pulsar + pulsar_msg = translator.to_pulsar(api_data) + + assert pulsar_msg.operation == "schema-selection" + assert pulsar_msg.sample == "test data sample" + assert pulsar_msg.options == {"filter": "catalog"} + + def test_translate_request_with_all_fields(self): + """Test translating request with all fields""" + translator = StructuredDataDiagnosisRequestTranslator() + + api_data = { + "operation": "generate-descriptor", + "sample": "csv data", + "type": "csv", + "schema-name": "products", + "options": {"delimiter": ","} + } + + pulsar_msg = translator.to_pulsar(api_data) + + assert pulsar_msg.operation == "generate-descriptor" + assert pulsar_msg.sample == "csv data" + assert pulsar_msg.type == "csv" + assert pulsar_msg.schema_name == "products" + assert pulsar_msg.options == {"delimiter": ","} + + +class TestResponseTranslation: + """Test response message translation""" + + def test_translate_schema_selection_response(self): + """Test translating schema-selection response from Pulsar to API""" + translator = StructuredDataDiagnosisResponseTranslator() + + # Create Pulsar response with schema_matches + pulsar_response = StructuredDataDiagnosisResponse( + operation="schema-selection", + schema_matches=["products", "inventory", "catalog"], + error=None + ) + + # Translate to API format + api_data = translator.from_pulsar(pulsar_response) + + assert api_data["operation"] == "schema-selection" + assert api_data["schema-matches"] == ["products", "inventory", "catalog"] + assert "error" not in api_data # None errors shouldn't be included + + def test_translate_empty_schema_matches(self): + """Test translating response with empty schema_matches""" + translator = StructuredDataDiagnosisResponseTranslator() + + pulsar_response = StructuredDataDiagnosisResponse( + operation="schema-selection", + schema_matches=[], + error=None + ) + + api_data = translator.from_pulsar(pulsar_response) + + assert api_data["operation"] == "schema-selection" + assert api_data["schema-matches"] == [] + + def test_translate_response_without_schema_matches(self): + """Test translating response without schema_matches field""" + translator = StructuredDataDiagnosisResponseTranslator() + + # Old-style response without schema_matches + pulsar_response = StructuredDataDiagnosisResponse( + operation="detect-type", + detected_type="xml", + confidence=0.9, + error=None + ) + + api_data = translator.from_pulsar(pulsar_response) + + assert api_data["operation"] == "detect-type" + assert api_data["detected-type"] == "xml" + assert api_data["confidence"] == 0.9 + assert "schema-matches" not in api_data # None values shouldn't be included + + def test_translate_response_with_error(self): + """Test translating response with error""" + translator = StructuredDataDiagnosisResponseTranslator() + from trustgraph.schema.core.primitives import Error + + pulsar_response = StructuredDataDiagnosisResponse( + operation="schema-selection", + error=Error( + type="PromptServiceError", + message="Service unavailable" + ) + ) + + api_data = translator.from_pulsar(pulsar_response) + + assert api_data["operation"] == "schema-selection" + # Error objects are typically handled separately by the gateway + # but the translator shouldn't break on them + + def test_translate_all_response_fields(self): + """Test translating response with all possible fields""" + translator = StructuredDataDiagnosisResponseTranslator() + import json + + descriptor_data = {"mapping": {"field1": "column1"}} + + pulsar_response = StructuredDataDiagnosisResponse( + operation="diagnose", + detected_type="csv", + confidence=0.95, + descriptor=json.dumps(descriptor_data), + metadata={"field_count": "5"}, + schema_matches=["schema1", "schema2"], + error=None + ) + + api_data = translator.from_pulsar(pulsar_response) + + assert api_data["operation"] == "diagnose" + assert api_data["detected-type"] == "csv" + assert api_data["confidence"] == 0.95 + assert api_data["descriptor"] == descriptor_data # Should be parsed from JSON + assert api_data["metadata"] == {"field_count": "5"} + assert api_data["schema-matches"] == ["schema1", "schema2"] + + def test_response_completion_flag(self): + """Test that response includes completion flag""" + translator = StructuredDataDiagnosisResponseTranslator() + + pulsar_response = StructuredDataDiagnosisResponse( + operation="schema-selection", + schema_matches=["products"], + error=None + ) + + api_data, is_final = translator.from_response_with_completion(pulsar_response) + + assert is_final is True # Structured-diag responses are always final + assert api_data["operation"] == "schema-selection" + assert api_data["schema-matches"] == ["products"] \ No newline at end of file diff --git a/tests/unit/test_retrieval/test_structured_diag/test_schema_contracts.py b/tests/unit/test_retrieval/test_structured_diag/test_schema_contracts.py new file mode 100644 index 00000000..99f66dc7 --- /dev/null +++ b/tests/unit/test_retrieval/test_structured_diag/test_schema_contracts.py @@ -0,0 +1,258 @@ +""" +Contract tests for structured-diag service schemas +""" + +import pytest +import json +from pulsar.schema import JsonSchema +from trustgraph.schema.services.diagnosis import ( + StructuredDataDiagnosisRequest, + StructuredDataDiagnosisResponse +) + + +class TestStructuredDiagnosisSchemaContract: + """Contract tests for structured diagnosis message schemas""" + + def test_request_schema_basic_fields(self): + """Test basic request schema fields""" + request = StructuredDataDiagnosisRequest( + operation="detect-type", + sample="test data" + ) + + assert request.operation == "detect-type" + assert request.sample == "test data" + assert request.type is None # Optional, defaults to None + assert request.schema_name is None # Optional, defaults to None + assert request.options is None # Optional, defaults to None + + def test_request_schema_all_operations(self): + """Test request schema supports all operations""" + operations = ["detect-type", "generate-descriptor", "diagnose", "schema-selection"] + + for op in operations: + request = StructuredDataDiagnosisRequest( + operation=op, + sample="test data" + ) + assert request.operation == op + + def test_request_schema_with_options(self): + """Test request schema with options""" + options = {"delimiter": ",", "has_header": "true"} + request = StructuredDataDiagnosisRequest( + operation="generate-descriptor", + sample="test data", + type="csv", + schema_name="products", + options=options + ) + + assert request.options == options + assert request.type == "csv" + assert request.schema_name == "products" + + def test_response_schema_basic_fields(self): + """Test basic response schema fields""" + response = StructuredDataDiagnosisResponse( + operation="detect-type", + detected_type="xml", + confidence=0.9, + error=None # Explicitly set to None + ) + + assert response.operation == "detect-type" + assert response.detected_type == "xml" + assert response.confidence == 0.9 + assert response.error is None + assert response.descriptor is None + assert response.metadata is None + assert response.schema_matches is None # New field, defaults to None + + def test_response_schema_with_error(self): + """Test response schema with error""" + from trustgraph.schema.core.primitives import Error + + error = Error( + type="ServiceError", + message="Service unavailable" + ) + response = StructuredDataDiagnosisResponse( + operation="schema-selection", + error=error + ) + + assert response.error == error + assert response.error.type == "ServiceError" + assert response.error.message == "Service unavailable" + + def test_response_schema_with_schema_matches(self): + """Test response schema with schema_matches array""" + matches = ["products", "inventory", "catalog"] + response = StructuredDataDiagnosisResponse( + operation="schema-selection", + schema_matches=matches + ) + + assert response.operation == "schema-selection" + assert response.schema_matches == matches + assert len(response.schema_matches) == 3 + + def test_response_schema_empty_schema_matches(self): + """Test response schema with empty schema_matches array""" + response = StructuredDataDiagnosisResponse( + operation="schema-selection", + schema_matches=[] + ) + + assert response.schema_matches == [] + assert isinstance(response.schema_matches, list) + + def test_response_schema_with_descriptor(self): + """Test response schema with descriptor""" + descriptor = { + "mapping": { + "field1": "column1", + "field2": "column2" + } + } + response = StructuredDataDiagnosisResponse( + operation="generate-descriptor", + descriptor=json.dumps(descriptor) + ) + + assert response.descriptor == json.dumps(descriptor) + parsed = json.loads(response.descriptor) + assert parsed["mapping"]["field1"] == "column1" + + def test_response_schema_with_metadata(self): + """Test response schema with metadata""" + metadata = { + "csv_options": json.dumps({"delimiter": ","}), + "field_count": "5" + } + response = StructuredDataDiagnosisResponse( + operation="diagnose", + metadata=metadata + ) + + assert response.metadata == metadata + assert response.metadata["field_count"] == "5" + + def test_schema_serialization(self): + """Test that schemas can be serialized and deserialized correctly""" + # Test request serialization + request = StructuredDataDiagnosisRequest( + operation="schema-selection", + sample="test data", + options={"key": "value"} + ) + + # Simulate Pulsar JsonSchema serialization + schema = JsonSchema(StructuredDataDiagnosisRequest) + serialized = schema.encode(request) + deserialized = schema.decode(serialized) + + assert deserialized.operation == request.operation + assert deserialized.sample == request.sample + assert deserialized.options == request.options + + def test_response_serialization_with_schema_matches(self): + """Test response serialization with schema_matches array""" + response = StructuredDataDiagnosisResponse( + operation="schema-selection", + schema_matches=["schema1", "schema2"], + confidence=0.85 + ) + + # Simulate Pulsar JsonSchema serialization + schema = JsonSchema(StructuredDataDiagnosisResponse) + serialized = schema.encode(response) + deserialized = schema.decode(serialized) + + assert deserialized.operation == response.operation + assert deserialized.schema_matches == response.schema_matches + assert deserialized.confidence == response.confidence + + def test_backwards_compatibility(self): + """Test that old clients can still use the service without schema_matches""" + # Old response without schema_matches should still work + response = StructuredDataDiagnosisResponse( + operation="detect-type", + detected_type="json", + confidence=0.95 + ) + + # Verify default value for new field + assert response.schema_matches is None # Defaults to None when not set + + # Verify old fields still work + assert response.detected_type == "json" + assert response.confidence == 0.95 + + def test_schema_selection_operation_contract(self): + """Test complete contract for schema-selection operation""" + # Request + request = StructuredDataDiagnosisRequest( + operation="schema-selection", + sample="product_id,name,price\n1,Widget,9.99" + ) + + assert request.operation == "schema-selection" + assert request.sample != "" + + # Response with matches + response = StructuredDataDiagnosisResponse( + operation="schema-selection", + schema_matches=["products", "inventory"] + ) + + assert response.operation == "schema-selection" + assert isinstance(response.schema_matches, list) + assert len(response.schema_matches) == 2 + assert all(isinstance(s, str) for s in response.schema_matches) + + # Response with error + from trustgraph.schema.core.primitives import Error + error_response = StructuredDataDiagnosisResponse( + operation="schema-selection", + error=Error(type="PromptServiceError", message="Service unavailable") + ) + + assert error_response.error is not None + assert error_response.schema_matches is None # Default None when not set + + def test_all_operations_supported(self): + """Verify all operations are properly supported in the contract""" + supported_operations = { + "detect-type": { + "required_request": ["sample"], + "expected_response": ["detected_type", "confidence"] + }, + "generate-descriptor": { + "required_request": ["sample", "type", "schema_name"], + "expected_response": ["descriptor"] + }, + "diagnose": { + "required_request": ["sample"], + "expected_response": ["detected_type", "confidence", "descriptor"] + }, + "schema-selection": { + "required_request": ["sample"], + "expected_response": ["schema_matches"] + } + } + + for operation, contract in supported_operations.items(): + # Test request creation + request_data = {"operation": operation} + for field in contract["required_request"]: + request_data[field] = "test_value" + + request = StructuredDataDiagnosisRequest(**request_data) + assert request.operation == operation + + # Test response creation + response = StructuredDataDiagnosisResponse(operation=operation) + assert response.operation == operation \ No newline at end of file diff --git a/tests/unit/test_retrieval/test_structured_diag/test_schema_selection.py b/tests/unit/test_retrieval/test_structured_diag/test_schema_selection.py new file mode 100644 index 00000000..8ce1b97e --- /dev/null +++ b/tests/unit/test_retrieval/test_structured_diag/test_schema_selection.py @@ -0,0 +1,361 @@ +""" +Unit tests for structured-diag service schema-selection operation +""" + +import pytest +import json +from unittest.mock import AsyncMock, MagicMock, patch +from trustgraph.retrieval.structured_diag.service import Processor +from trustgraph.schema.services.diagnosis import StructuredDataDiagnosisRequest, StructuredDataDiagnosisResponse +from trustgraph.schema import RowSchema, Field as SchemaField, Error + + +@pytest.fixture +def mock_schemas(): + """Create mock schemas for testing""" + schemas = { + "products": RowSchema( + name="products", + description="Product catalog schema", + fields=[ + SchemaField( + name="product_id", + type="string", + description="Product identifier", + required=True, + primary=True, + indexed=True + ), + SchemaField( + name="name", + type="string", + description="Product name", + required=True + ), + SchemaField( + name="price", + type="number", + description="Product price", + required=True + ) + ] + ), + "customers": RowSchema( + name="customers", + description="Customer database schema", + fields=[ + SchemaField( + name="customer_id", + type="string", + description="Customer identifier", + required=True, + primary=True + ), + SchemaField( + name="name", + type="string", + description="Customer name", + required=True + ), + SchemaField( + name="email", + type="string", + description="Customer email", + required=True + ) + ] + ), + "orders": RowSchema( + name="orders", + description="Order management schema", + fields=[ + SchemaField( + name="order_id", + type="string", + description="Order identifier", + required=True, + primary=True + ), + SchemaField( + name="customer_id", + type="string", + description="Customer identifier", + required=True + ), + SchemaField( + name="total", + type="number", + description="Order total", + required=True + ) + ] + ) + } + return schemas + + +@pytest.fixture +def service(mock_schemas): + """Create service instance with mock configuration""" + service = Processor( + taskgroup=MagicMock(), + id="test-processor" + ) + service.schemas = mock_schemas + return service + + +@pytest.fixture +def mock_flow(): + """Create mock flow with prompt service""" + flow = MagicMock() + prompt_request_flow = AsyncMock() + flow.return_value.request = prompt_request_flow + return flow, prompt_request_flow + + +@pytest.mark.asyncio +async def test_schema_selection_success(service, mock_flow): + """Test successful schema selection""" + flow, prompt_request_flow = mock_flow + + # Mock prompt service response with matching schemas + mock_response = MagicMock() + mock_response.error = None + mock_response.text = '["products", "orders"]' + mock_response.object = None # Explicitly set to None + prompt_request_flow.return_value = mock_response + + # Create request + request = StructuredDataDiagnosisRequest( + operation="schema-selection", + sample="product_id,name,price,quantity\nPROD001,Widget,19.99,5" + ) + + # Execute operation + response = await service.schema_selection_operation(request, flow) + + # Verify response + assert response.error is None + assert response.operation == "schema-selection" + assert response.schema_matches == ["products", "orders"] + + # Verify prompt service was called correctly + prompt_request_flow.assert_called_once() + call_args = prompt_request_flow.call_args[0][0] + assert call_args.id == "schema-selection" + + # Check that all schemas were passed to prompt + terms = call_args.terms + schemas_data = json.loads(terms["schemas"]) + assert len(schemas_data) == 3 # All 3 schemas + assert any(s["name"] == "products" for s in schemas_data) + assert any(s["name"] == "customers" for s in schemas_data) + assert any(s["name"] == "orders" for s in schemas_data) + + +@pytest.mark.asyncio +async def test_schema_selection_empty_response(service, mock_flow): + """Test handling of empty prompt service response""" + flow, prompt_request_flow = mock_flow + + # Mock empty response from prompt service + mock_response = MagicMock() + mock_response.error = None + mock_response.text = "" + mock_response.object = "" # Both fields empty + prompt_request_flow.return_value = mock_response + + # Create request + request = StructuredDataDiagnosisRequest( + operation="schema-selection", + sample="test data" + ) + + # Execute operation + response = await service.schema_selection_operation(request, flow) + + # Verify error response + assert response.error is not None + assert response.error.type == "PromptServiceError" + assert "Empty response" in response.error.message + assert response.operation == "schema-selection" + + +@pytest.mark.asyncio +async def test_schema_selection_prompt_error(service, mock_flow): + """Test handling of prompt service error""" + flow, prompt_request_flow = mock_flow + + # Mock error response from prompt service + mock_response = MagicMock() + mock_response.error = Error( + type="ServiceError", + message="Prompt service unavailable" + ) + mock_response.text = None + prompt_request_flow.return_value = mock_response + + # Create request + request = StructuredDataDiagnosisRequest( + operation="schema-selection", + sample="test data" + ) + + # Execute operation + response = await service.schema_selection_operation(request, flow) + + # Verify error response + assert response.error is not None + assert response.error.type == "PromptServiceError" + assert "Failed to select schemas" in response.error.message + assert response.operation == "schema-selection" + + +@pytest.mark.asyncio +async def test_schema_selection_invalid_json(service, mock_flow): + """Test handling of invalid JSON response from prompt service""" + flow, prompt_request_flow = mock_flow + + # Mock invalid JSON response + mock_response = MagicMock() + mock_response.error = None + mock_response.text = "not valid json" + mock_response.object = None + prompt_request_flow.return_value = mock_response + + # Create request + request = StructuredDataDiagnosisRequest( + operation="schema-selection", + sample="test data" + ) + + # Execute operation + response = await service.schema_selection_operation(request, flow) + + # Verify error response + assert response.error is not None + assert response.error.type == "ParseError" + assert "Failed to parse schema selection response" in response.error.message + assert response.operation == "schema-selection" + + +@pytest.mark.asyncio +async def test_schema_selection_non_array_response(service, mock_flow): + """Test handling of non-array JSON response from prompt service""" + flow, prompt_request_flow = mock_flow + + # Mock non-array JSON response + mock_response = MagicMock() + mock_response.error = None + mock_response.text = '{"schema": "products"}' # Object instead of array + mock_response.object = None + prompt_request_flow.return_value = mock_response + + # Create request + request = StructuredDataDiagnosisRequest( + operation="schema-selection", + sample="test data" + ) + + # Execute operation + response = await service.schema_selection_operation(request, flow) + + # Verify error response + assert response.error is not None + assert response.error.type == "ParseError" + assert "Failed to parse schema selection response" in response.error.message + assert response.operation == "schema-selection" + + +@pytest.mark.asyncio +async def test_schema_selection_with_options(service, mock_flow): + """Test schema selection with additional options""" + flow, prompt_request_flow = mock_flow + + # Mock successful response + mock_response = MagicMock() + mock_response.error = None + mock_response.text = '["products"]' + mock_response.object = None + prompt_request_flow.return_value = mock_response + + # Create request with options + request = StructuredDataDiagnosisRequest( + operation="schema-selection", + sample="test data", + options={"filter": "catalog", "confidence": "high"} + ) + + # Execute operation + response = await service.schema_selection_operation(request, flow) + + # Verify response + assert response.error is None + assert response.schema_matches == ["products"] + + # Verify options were passed to prompt + call_args = prompt_request_flow.call_args[0][0] + terms = call_args.terms + options = json.loads(terms["options"]) + assert options["filter"] == "catalog" + assert options["confidence"] == "high" + + +@pytest.mark.asyncio +async def test_schema_selection_exception_handling(service, mock_flow): + """Test handling of unexpected exceptions""" + flow, prompt_request_flow = mock_flow + + # Mock exception during prompt service call + prompt_request_flow.side_effect = Exception("Unexpected error") + + # Create request + request = StructuredDataDiagnosisRequest( + operation="schema-selection", + sample="test data" + ) + + # Execute operation + response = await service.schema_selection_operation(request, flow) + + # Verify error response + assert response.error is not None + assert response.error.type == "PromptServiceError" + assert "Failed to select schemas" in response.error.message + assert response.operation == "schema-selection" + + +@pytest.mark.asyncio +async def test_schema_selection_empty_schemas(service, mock_flow): + """Test schema selection with no schemas configured""" + flow, prompt_request_flow = mock_flow + + # Clear schemas + service.schemas = {} + + # Mock response (shouldn't be reached) + mock_response = MagicMock() + mock_response.error = None + mock_response.text = '[]' + mock_response.object = None + prompt_request_flow.return_value = mock_response + + # Create request + request = StructuredDataDiagnosisRequest( + operation="schema-selection", + sample="test data" + ) + + # Execute operation + response = await service.schema_selection_operation(request, flow) + + # Should still succeed but with empty schemas array passed to prompt + assert response.error is None + assert response.schema_matches == [] + + # Verify empty schemas array was passed + call_args = prompt_request_flow.call_args[0][0] + terms = call_args.terms + schemas_data = json.loads(terms["schemas"]) + assert len(schemas_data) == 0 \ No newline at end of file diff --git a/tests/unit/test_retrieval/test_structured_diag/test_type_detection.py b/tests/unit/test_retrieval/test_structured_diag/test_type_detection.py new file mode 100644 index 00000000..60eae2ef --- /dev/null +++ b/tests/unit/test_retrieval/test_structured_diag/test_type_detection.py @@ -0,0 +1,179 @@ +""" +Unit tests for simplified type detection in structured-diag service +""" + +import pytest +from trustgraph.retrieval.structured_diag.type_detector import detect_data_type + + +class TestSimplifiedTypeDetection: + """Test the simplified type detection logic""" + + def test_xml_detection_with_declaration(self): + """Test XML detection with XML declaration""" + sample = 'data' + data_type, confidence = detect_data_type(sample) + assert data_type == "xml" + assert confidence == 0.9 + + def test_xml_detection_without_declaration(self): + """Test XML detection without declaration but with closing tags""" + sample = 'data' + data_type, confidence = detect_data_type(sample) + assert data_type == "xml" + assert confidence == 0.9 + + def test_xml_detection_truncated(self): + """Test XML detection with truncated XML (common with 500-byte samples)""" + sample = ''' + + + + Steak & Kidney + Yorkshire + 12.5 + 4.2''' # Truncated mid-element + data_type, confidence = detect_data_type(sample) + assert data_type == "xml" + assert confidence == 0.9 + + def test_json_object_detection(self): + """Test JSON object detection""" + sample = '{"name": "John", "age": 30, "city": "New York"}' + data_type, confidence = detect_data_type(sample) + assert data_type == "json" + assert confidence == 0.9 + + def test_json_array_detection(self): + """Test JSON array detection""" + sample = '[{"id": 1}, {"id": 2}, {"id": 3}]' + data_type, confidence = detect_data_type(sample) + assert data_type == "json" + assert confidence == 0.9 + + def test_json_truncated(self): + """Test JSON detection with truncated JSON""" + sample = '{"products": [{"id": 1, "name": "Widget", "price": 19.99}, {"id": 2, "na' + data_type, confidence = detect_data_type(sample) + assert data_type == "json" + assert confidence == 0.9 + + def test_csv_detection(self): + """Test CSV detection as fallback""" + sample = '''name,age,city +John,30,New York +Jane,25,Boston +Bob,35,Chicago''' + data_type, confidence = detect_data_type(sample) + assert data_type == "csv" + assert confidence == 0.8 + + def test_csv_detection_single_line(self): + """Test CSV detection with single line defaults to CSV""" + sample = 'column1,column2,column3' + data_type, confidence = detect_data_type(sample) + assert data_type == "csv" + assert confidence == 0.8 + + def test_empty_input(self): + """Test empty input handling""" + data_type, confidence = detect_data_type("") + assert data_type is None + assert confidence == 0.0 + + def test_whitespace_only(self): + """Test whitespace-only input""" + data_type, confidence = detect_data_type(" \n \t ") + assert data_type is None + assert confidence == 0.0 + + def test_html_not_xml(self): + """Test HTML is detected as XML (has closing tags)""" + sample = '

Title

' + data_type, confidence = detect_data_type(sample) + assert data_type == "xml" # HTML is detected as XML + assert confidence == 0.9 + + def test_malformed_xml_still_detected(self): + """Test malformed XML is still detected as XML""" + sample = 'data' + data_type, confidence = detect_data_type(sample) + assert data_type == "xml" + assert confidence == 0.9 + + def test_json_with_whitespace(self): + """Test JSON detection with leading whitespace""" + sample = ' \n {"key": "value"}' + data_type, confidence = detect_data_type(sample) + assert data_type == "json" + assert confidence == 0.9 + + def test_priority_xml_over_csv(self): + """Test XML takes priority over CSV when both patterns present""" + sample = '\na,b,c' + data_type, confidence = detect_data_type(sample) + assert data_type == "xml" + assert confidence == 0.9 + + def test_priority_json_over_csv(self): + """Test JSON takes priority over CSV when both patterns present""" + sample = '{"data": "a,b,c"}' + data_type, confidence = detect_data_type(sample) + assert data_type == "json" + assert confidence == 0.9 + + def test_text_defaults_to_csv(self): + """Test plain text defaults to CSV""" + sample = 'This is just plain text without any structure' + data_type, confidence = detect_data_type(sample) + assert data_type == "csv" + assert confidence == 0.8 + + +class TestRealWorldSamples: + """Test with real-world data samples""" + + def test_uk_pies_xml_sample(self): + """Test with actual UK pies XML sample (first 500 bytes)""" + sample = ''' + + + + Steak & Kidney + Yorkshire + 12.5 + 4.2 + 285 + Shortcrust + Meat + 3.50 + GBP + Traditional + + + Chicken & Mushroom + Lancashire StructuredDataDiagnosisResponse: @@ -307,6 +311,102 @@ class Processor(FlowProcessor): metadata=metadata ) + async def schema_selection_operation(self, request: StructuredDataDiagnosisRequest, flow) -> StructuredDataDiagnosisResponse: + """Handle schema-selection operation""" + logger.info("Processing schema-selection operation") + + # Prepare all schemas for the prompt - match the original config format + all_schemas = [] + for schema_name, row_schema in self.schemas.items(): + schema_info = { + "name": row_schema.name, + "description": row_schema.description, + "fields": [ + { + "name": f.name, + "type": f.type, + "description": f.description, + "required": f.required, + "primary_key": f.primary, + "indexed": f.indexed, + "enum": f.enum_values if f.enum_values else [], + "size": f.size if hasattr(f, 'size') else 0 + } + for f in row_schema.fields + ] + } + all_schemas.append(schema_info) + + # Create prompt variables - schemas array contains ALL schemas + # Note: The prompt expects 'question' not 'sample' + variables = { + "question": request.sample, # The prompt template expects 'question' + "schemas": all_schemas, + "options": request.options or {} + } + + # Call prompt service with configurable template + terms = {k: json.dumps(v) for k, v in variables.items()} + prompt_request = PromptRequest( + id=self.schema_selection_prompt, + terms=terms + ) + + try: + logger.info(f"Calling prompt service for schema selection with template: {self.schema_selection_prompt}") + response = await flow("prompt-request").request(prompt_request) + + if response.error: + logger.error(f"Prompt service error: {response.error.message}") + error = Error( + type="PromptServiceError", + message="Failed to select schemas using prompt service" + ) + return StructuredDataDiagnosisResponse(error=error, operation=request.operation) + + # Check both text and object fields for response + response_data = None + if response.object and response.object.strip(): + response_data = response.object.strip() + logger.debug(f"Using response from 'object' field: {response_data}") + elif response.text and response.text.strip(): + response_data = response.text.strip() + logger.debug(f"Using response from 'text' field: {response_data}") + else: + logger.error("Empty response from prompt service (checked both text and object fields)") + error = Error( + type="PromptServiceError", + message="Empty response from prompt service" + ) + return StructuredDataDiagnosisResponse(error=error, operation=request.operation) + + # Parse the response as JSON array of schema IDs + try: + schema_matches = json.loads(response_data) + if not isinstance(schema_matches, list): + raise ValueError("Response must be an array") + except (json.JSONDecodeError, ValueError) as e: + logger.error(f"Failed to parse schema matches response: {e}") + error = Error( + type="ParseError", + message="Failed to parse schema selection response as JSON array" + ) + return StructuredDataDiagnosisResponse(error=error, operation=request.operation) + + return StructuredDataDiagnosisResponse( + error=None, + operation=request.operation, + schema_matches=schema_matches + ) + + except Exception as e: + logger.error(f"Error calling prompt service: {e}", exc_info=True) + error = Error( + type="PromptServiceError", + message="Failed to select schemas using prompt service" + ) + return StructuredDataDiagnosisResponse(error=error, operation=request.operation) + async def generate_descriptor_with_prompt( self, sample: str, data_type: str, target_schema: RowSchema, options: Dict[str, str], flow diff --git a/trustgraph-flow/trustgraph/retrieval/structured_diag/type_detector.py b/trustgraph-flow/trustgraph/retrieval/structured_diag/type_detector.py index ccd6bf8b..a291d5cc 100644 --- a/trustgraph-flow/trustgraph/retrieval/structured_diag/type_detector.py +++ b/trustgraph-flow/trustgraph/retrieval/structured_diag/type_detector.py @@ -31,28 +31,13 @@ def detect_data_type(sample: str) -> Tuple[Optional[str], float]: sample = sample.strip() - # Try each format and calculate confidence scores - json_confidence = _check_json_format(sample) - xml_confidence = _check_xml_format(sample) - csv_confidence = _check_csv_format(sample) - - logger.debug(f"Format confidence scores - JSON: {json_confidence}, XML: {xml_confidence}, CSV: {csv_confidence}") - - # Find the format with highest confidence - scores = { - "json": json_confidence, - "xml": xml_confidence, - "csv": csv_confidence - } - - best_format = max(scores, key=scores.get) - best_confidence = scores[best_format] - - # Only return a result if confidence is above threshold - if best_confidence < 0.3: - return None, best_confidence - - return best_format, best_confidence + # Simple pattern matching + if sample.startswith(' float: @@ -83,33 +68,20 @@ def _check_json_format(sample: str) -> float: def _check_xml_format(sample: str) -> float: """Check if sample is valid XML format""" - try: - # Quick heuristic checks first - if not sample.startswith('<'): - return 0.0 - - if not ('>' in sample and ' 10: - return 0.95 - elif child_count > 5: - return 0.9 - elif child_count > 0: - return 0.8 + # XML declaration or starts with tag + if sample.startswith('' in sample: + try: + # Quick parse test + ET.fromstring(sample) + return 0.9 # Valid XML + except ET.ParseError: + return 0.3 # Looks like XML but malformed else: - return 0.6 + return 0.1 # Incomplete XML - except ET.ParseError: - # Check for common XML characteristics even if not well-formed - xml_indicators = [' float: