trustgraph/tests/integration
cybermaggedon 7a6197d8c3
GraphRAG Query-Time Explainability (#677)
Implements full explainability pipeline for GraphRAG queries, enabling
traceability from answers back to source documents.

Renamed throughout for clarity:
- provenance_callback → explain_callback
- provenance_id → explain_id
- provenance_collection → explain_collection
- message_type "provenance" → "explain"
- Queue name "provenance" → "explainability"

GraphRAG queries now emit explainability events as they execute:
1. Session - query text and timestamp
2. Retrieval - edges retrieved from subgraph
3. Selection - selected edges with LLM reasoning (JSONL with id +
   reasoning)
4. Answer - reference to synthesized response

Events stream via explain_callback during query(), enabling
real-time UX.

- Answers stored in librarian service (not inline in graph - too large)
- Document ID as URN: urn:trustgraph:answer:{session_id}
- Graph stores tg:document reference (IRI) to librarian document
- Added librarian producer/consumer to graph-rag service

- get_labelgraph() now returns (labeled_edges, uri_map)
- uri_map maps edge_id(label_s, label_p, label_o) →
  (uri_s, uri_p, uri_o)
- Explainability data stores original URIs, not labels
- Enables tracing edges back to reifying statements via tg:reifies

- Added serialize_triple() to query service (matches storage format)
- get_term_value() now handles TRIPLE type terms
- Enables querying by quoted triple in object position:
  ?stmt tg:reifies <<s p o>>

- Displays real-time explainability events during query
- Resolves rdfs:label for edge components (s, p, o)
- Traces source chain via prov:wasDerivedFrom to root document
- Output: "Source: Chunk 1 → Page 2 → Document Title"
- Label caching to avoid repeated queries

GraphRagResponse:
- explain_id: str | None
- explain_collection: str | None
- message_type: str ("chunk" or "explain")
- end_of_session: bool

trustgraph-base/trustgraph/provenance/:
- namespaces.py - Added TG_DOCUMENT predicate
- triples.py - answer_triples() supports document_id reference
- uris.py - Added edge_selection_uri()

trustgraph-base/trustgraph/schema/services/retrieval.py:
- GraphRagResponse with explain_id, explain_collection, end_of_session

trustgraph-flow/trustgraph/retrieval/graph_rag/:
- graph_rag.py - URI preservation, streaming answer accumulation
- rag.py - Librarian integration, real-time explain emission

trustgraph-flow/trustgraph/query/triples/cassandra/service.py:
- Quoted triple serialization for query matching

trustgraph-cli/trustgraph/cli/invoke_graph_rag.py:
- Full explainability display with label resolution and source tracing
2026-03-10 10:00:01 +00:00
..
__init__.py Test suite executed from CI pipeline (#433) 2025-07-14 14:57:44 +01:00
cassandra_test_helper.py Increase storage test coverage (#435) 2025-07-15 09:33:35 +01:00
conftest.py Update to add streaming tests (#600) 2026-01-06 21:48:05 +00:00
README.md Test suite executed from CI pipeline (#433) 2025-07-14 14:57:44 +01:00
test_agent_kg_extraction_integration.py Changed schema for Value -> Term, majorly breaking change (#622) 2026-01-27 13:48:08 +00:00
test_agent_manager_integration.py Fix tests (#571) 2025-11-28 16:37:01 +00:00
test_agent_streaming_integration.py Fix agent streaming tool failure (#602) 2026-01-06 23:00:50 +00:00
test_agent_structured_query_integration.py Extend use of user + collection fields (#503) 2025-09-08 18:28:38 +01:00
test_cassandra_config_end_to_end.py Structured data 2 (#645) 2026-02-23 15:56:29 +00:00
test_cassandra_integration.py Changed schema for Value -> Term, majorly breaking change (#622) 2026-01-27 13:48:08 +00:00
test_config_cli_integration.py More config cli (#466) 2025-08-22 13:36:10 +01:00
test_document_rag_integration.py Embeddings API scores (#671) 2026-03-09 10:53:44 +00:00
test_document_rag_streaming_integration.py Embeddings API scores (#671) 2026-03-09 10:53:44 +00:00
test_dynamic_llm_parameters.py More LLM param test coverage (#535) 2025-09-26 01:00:30 +01:00
test_graph_rag_integration.py GraphRAG Query-Time Explainability (#677) 2026-03-10 10:00:01 +00:00
test_graph_rag_streaming_integration.py GraphRAG Query-Time Explainability (#677) 2026-03-10 10:00:01 +00:00
test_import_export_graceful_shutdown.py Changed schema for Value -> Term, majorly breaking change (#622) 2026-01-27 13:48:08 +00:00
test_kg_extract_store_integration.py Embeddings API scores (#671) 2026-03-09 10:53:44 +00:00
test_load_structured_data_integration.py Structured data, minor features (#500) 2025-09-05 17:25:12 +01:00
test_load_structured_data_websocket.py Structured data, minor features (#500) 2025-09-05 17:25:12 +01:00
test_nlp_query_integration.py Fix/sys integration issues (#494) 2025-09-05 08:38:15 +01:00
test_object_extraction_integration.py Structured data 2 (#645) 2026-02-23 15:56:29 +00:00
test_prompt_streaming_integration.py Streaming rag responses (#568) 2025-11-26 19:47:39 +00:00
test_rag_streaming_protocol.py GraphRAG Query-Time Explainability (#677) 2026-03-10 10:00:01 +00:00
test_rows_cassandra_integration.py Structured data 2 (#645) 2026-02-23 15:56:29 +00:00
test_rows_graphql_query_integration.py Structured data 2 (#645) 2026-02-23 15:56:29 +00:00
test_structured_query_integration.py Structured data 2 (#645) 2026-02-23 15:56:29 +00:00
test_template_service_integration.py Update to enable knowledge extraction using the agent framework (#439) 2025-07-21 14:31:57 +01:00
test_text_completion_integration.py Feature/streaming llm phase 1 (#566) 2025-11-26 09:59:10 +00:00
test_text_completion_streaming_integration.py Streaming rag responses (#568) 2025-11-26 19:47:39 +00:00
test_tool_group_integration.py Feature/tool group (#484) 2025-09-03 23:39:49 +01:00

Integration Test Pattern for TrustGraph

This directory contains integration tests that verify the coordination between multiple TrustGraph services and components, following the patterns outlined in TEST_STRATEGY.md.

Integration Test Approach

Integration tests focus on service-to-service communication patterns and end-to-end message flows while still using mocks for external infrastructure.

Key Principles

  1. Test Service Coordination: Verify that services work together correctly
  2. Mock External Dependencies: Use mocks for databases, APIs, and infrastructure
  3. Real Business Logic: Exercise actual service logic and data transformations
  4. Error Propagation: Test how errors flow through the system
  5. Configuration Testing: Verify services respond correctly to different configurations

Test Structure

Fixtures (conftest.py)

Common fixtures for integration tests:

  • mock_pulsar_client: Mock Pulsar messaging client
  • mock_flow_context: Mock flow context for service coordination
  • integration_config: Standard configuration for integration tests
  • sample_documents: Test document collections
  • sample_embeddings: Test embedding vectors
  • sample_queries: Test query sets

Test Patterns

1. End-to-End Flow Testing

@pytest.mark.integration
@pytest.mark.asyncio
async def test_service_end_to_end_flow(self, service_instance, mock_clients):
    """Test complete service pipeline from input to output"""
    # Arrange - Set up realistic test data
    # Act - Execute the full service workflow
    # Assert - Verify coordination between all components

2. Error Propagation Testing

@pytest.mark.integration
@pytest.mark.asyncio
async def test_service_error_handling(self, service_instance, mock_clients):
    """Test how errors propagate through service coordination"""
    # Arrange - Set up failure scenarios
    # Act - Execute service with failing dependency
    # Assert - Verify proper error handling and cleanup

3. Configuration Testing

@pytest.mark.integration
@pytest.mark.asyncio
async def test_service_configuration_scenarios(self, service_instance):
    """Test service behavior with different configurations"""
    # Test multiple configuration scenarios
    # Verify service adapts correctly to each configuration

Running Integration Tests

Run All Integration Tests

pytest tests/integration/ -m integration

Run Specific Test

pytest tests/integration/test_document_rag_integration.py::TestDocumentRagIntegration::test_document_rag_end_to_end_flow -v

Run with Coverage (Skip Coverage Requirement)

pytest tests/integration/ -m integration --cov=trustgraph --cov-fail-under=0

Run Slow Tests

pytest tests/integration/ -m "integration and slow"

Skip Slow Tests

pytest tests/integration/ -m "integration and not slow"

Examples: Integration Test Implementations

1. Document RAG Integration Test

The test_document_rag_integration.py demonstrates the integration test pattern:

What It Tests

  • Service Coordination: Embeddings → Document Retrieval → Prompt Generation
  • Error Handling: Failure scenarios for each service dependency
  • Configuration: Different document limits, users, and collections
  • Performance: Large document set handling

Key Features

  • Realistic Data Flow: Uses actual service logic with mocked dependencies
  • Multiple Scenarios: Success, failure, and edge cases
  • Verbose Logging: Tests logging functionality
  • Multi-User Support: Tests user and collection isolation

Test Coverage

  • End-to-end happy path
  • No documents found scenario
  • Service failure scenarios (embeddings, documents, prompt)
  • Configuration variations
  • Multi-user isolation
  • Performance testing
  • Verbose logging

2. Text Completion Integration Test

The test_text_completion_integration.py demonstrates external API integration testing:

What It Tests

  • External API Integration: OpenAI API connectivity and authentication
  • Rate Limiting: Proper handling of API rate limits and retries
  • Error Handling: API failures, connection timeouts, and error propagation
  • Token Tracking: Accurate input/output token counting and metrics
  • Configuration: Different model parameters and settings
  • Concurrency: Multiple simultaneous API requests

Key Features

  • Realistic Mock Responses: Uses actual OpenAI API response structures
  • Authentication Testing: API key validation and base URL configuration
  • Error Scenarios: Rate limits, connection failures, invalid requests
  • Performance Metrics: Timing and token usage validation
  • Model Flexibility: Tests different GPT models and parameters

Test Coverage

  • Successful text completion generation
  • Multiple model configurations (GPT-3.5, GPT-4, GPT-4-turbo)
  • Rate limit handling (RateLimitError → TooManyRequests)
  • API error handling and propagation
  • Token counting accuracy
  • Prompt construction and parameter validation
  • Authentication patterns and API key validation
  • Concurrent request processing
  • Response content extraction and validation
  • Performance timing measurements

3. Agent Manager Integration Test

The test_agent_manager_integration.py demonstrates complex service coordination testing:

What It Tests

  • ReAct Pattern: Think-Act-Observe cycles with multi-step reasoning
  • Tool Coordination: Selection and execution of different tools (knowledge query, text completion, MCP tools)
  • Conversation State: Management of conversation history and context
  • Multi-Service Integration: Coordination between prompt, graph RAG, and tool services
  • Error Handling: Tool failures, unknown tools, and error propagation
  • Configuration Management: Dynamic tool loading and configuration

Key Features

  • Complex Coordination: Tests agent reasoning with multiple tool options
  • Stateful Processing: Maintains conversation history across interactions
  • Dynamic Tool Selection: Tests tool selection based on context and reasoning
  • Callback Pattern: Tests think/observe callback mechanisms
  • JSON Serialization: Handles complex data structures in prompts
  • Performance Testing: Large conversation history handling

Test Coverage

  • Basic reasoning cycle with tool selection
  • Final answer generation (ending ReAct cycle)
  • Full ReAct cycle with tool execution
  • Conversation history management
  • Multiple tool coordination and selection
  • Tool argument validation and processing
  • Error handling (unknown tools, execution failures)
  • Context integration and additional prompting
  • Empty tool configuration handling
  • Tool response processing and cleanup
  • Performance with large conversation history
  • JSON serialization in complex prompts

4. Knowledge Graph Extract → Store Pipeline Integration Test

The test_kg_extract_store_integration.py demonstrates multi-stage pipeline testing:

What It Tests

  • Text-to-Graph Transformation: Complete pipeline from text chunks to graph triples
  • Entity Extraction: Definition extraction with proper URI generation
  • Relationship Extraction: Subject-predicate-object relationship extraction
  • Graph Database Integration: Storage coordination with Cassandra knowledge store
  • Data Validation: Entity filtering, validation, and consistency checks
  • Pipeline Coordination: Multi-stage processing with proper data flow

Key Features

  • Multi-Stage Pipeline: Tests definitions → relationships → storage coordination
  • Graph Data Structures: RDF triples, entity contexts, and graph embeddings
  • URI Generation: Consistent entity URI creation across pipeline stages
  • Data Transformation: Complex text analysis to structured graph data
  • Batch Processing: Large document set processing performance
  • Error Resilience: Graceful handling of extraction failures

Test Coverage

  • Definitions extraction pipeline (text → entities + definitions)
  • Relationships extraction pipeline (text → subject-predicate-object)
  • URI generation consistency between processors
  • Triple generation from definitions and relationships
  • Knowledge store integration (triples and embeddings storage)
  • End-to-end pipeline coordination
  • Error handling in extraction services
  • Empty and invalid extraction results handling
  • Entity filtering and validation
  • Large batch processing performance
  • Metadata propagation through pipeline stages

Best Practices

Test Organization

  • Group related tests in classes
  • Use descriptive test names that explain the scenario
  • Follow the Arrange-Act-Assert pattern
  • Use appropriate pytest markers (@pytest.mark.integration, @pytest.mark.slow)

Mock Strategy

  • Mock external services (databases, APIs, message brokers)
  • Use real service logic and data transformations
  • Create realistic mock responses that match actual service behavior
  • Reset mocks between tests to ensure isolation

Test Data

  • Use realistic test data that reflects actual usage patterns
  • Create reusable fixtures for common test scenarios
  • Test with various data sizes and edge cases
  • Include both success and failure scenarios

Error Testing

  • Test each dependency failure scenario
  • Verify proper error propagation and cleanup
  • Test timeout and retry mechanisms
  • Validate error response formats

Performance Testing

  • Mark performance tests with @pytest.mark.slow
  • Test with realistic data volumes
  • Set reasonable performance expectations
  • Monitor resource usage during tests

Adding New Integration Tests

  1. Identify Service Dependencies: Map out which services your target service coordinates with
  2. Create Mock Fixtures: Set up mocks for each dependency in conftest.py
  3. Design Test Scenarios: Plan happy path, error cases, and edge conditions
  4. Implement Tests: Follow the established patterns in this directory
  5. Add Documentation: Update this README with your new test patterns

Test Markers

  • @pytest.mark.integration: Marks tests as integration tests
  • @pytest.mark.slow: Marks tests that take longer to run
  • @pytest.mark.asyncio: Required for async test functions

Future Enhancements

  • Add tests with real test containers for database integration
  • Implement contract testing for service interfaces
  • Add performance benchmarking for critical paths
  • Create integration test templates for common service patterns