Structured query support (#492)

* Tweak the structured query schema

* Structure query service

* Gateway support for nlp-query and structured-query

* API support

* Added CLI

* Update tests

* More tests
This commit is contained in:
cybermaggedon 2025-09-04 16:06:18 +01:00 committed by GitHub
parent 8d4aa0069c
commit a6d9f5e849
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
22 changed files with 2813 additions and 31 deletions

View file

@ -20,7 +20,7 @@ from trustgraph.schema import (
GraphEmbeddings, EntityEmbeddings,
Metadata, Field, RowSchema,
StructuredDataSubmission, ExtractedObject,
NLPToStructuredQueryRequest, NLPToStructuredQueryResponse,
QuestionToStructuredQueryRequest, QuestionToStructuredQueryResponse,
StructuredQueryRequest, StructuredQueryResponse,
StructuredObjectEmbedding
)

View file

@ -12,7 +12,7 @@ from typing import Dict, Any
from trustgraph.schema import (
StructuredDataSubmission, ExtractedObject,
NLPToStructuredQueryRequest, NLPToStructuredQueryResponse,
QuestionToStructuredQueryRequest, QuestionToStructuredQueryResponse,
StructuredQueryRequest, StructuredQueryResponse,
StructuredObjectEmbedding, Field, RowSchema,
Metadata, Error, Value
@ -146,23 +146,21 @@ class TestStructuredQueryServiceContracts:
"""Contract tests for structured query services"""
def test_nlp_to_structured_query_request_contract(self):
"""Test NLPToStructuredQueryRequest schema contract"""
"""Test QuestionToStructuredQueryRequest schema contract"""
# Act
request = NLPToStructuredQueryRequest(
natural_language_query="Show me all customers who registered last month",
max_results=100,
context_hints={"time_range": "last_month", "entity_type": "customer"}
request = QuestionToStructuredQueryRequest(
question="Show me all customers who registered last month",
max_results=100
)
# Assert
assert "customers" in request.natural_language_query
assert "customers" in request.question
assert request.max_results == 100
assert request.context_hints["time_range"] == "last_month"
def test_nlp_to_structured_query_response_contract(self):
"""Test NLPToStructuredQueryResponse schema contract"""
"""Test QuestionToStructuredQueryResponse schema contract"""
# Act
response = NLPToStructuredQueryResponse(
response = QuestionToStructuredQueryResponse(
error=None,
graphql_query="query { customers(filter: {registered: {gte: \"2024-01-01\"}}) { id name email } }",
variables={"start_date": "2024-01-01"},
@ -180,15 +178,11 @@ class TestStructuredQueryServiceContracts:
"""Test StructuredQueryRequest schema contract"""
# Act
request = StructuredQueryRequest(
query="query GetCustomers($limit: Int) { customers(limit: $limit) { id name email } }",
variables={"limit": "10"},
operation_name="GetCustomers"
question="Show me customers with limit 10"
)
# Assert
assert "customers" in request.query
assert request.variables["limit"] == "10"
assert request.operation_name == "GetCustomers"
assert "customers" in request.question
def test_structured_query_response_contract(self):
"""Test StructuredQueryResponse schema contract"""
@ -291,11 +285,10 @@ class TestStructuredDataSerializationContracts:
"""Test NLP query request/response serialization contract"""
# Test request
request_data = {
"natural_language_query": "test query",
"max_results": 10,
"context_hints": {}
"question": "test query",
"max_results": 10
}
assert serialize_deserialize_test(NLPToStructuredQueryRequest, request_data)
assert serialize_deserialize_test(QuestionToStructuredQueryRequest, request_data)
# Test response
response_data = {
@ -305,4 +298,20 @@ class TestStructuredDataSerializationContracts:
"detected_schemas": ["test"],
"confidence": 0.9
}
assert serialize_deserialize_test(NLPToStructuredQueryResponse, response_data)
assert serialize_deserialize_test(QuestionToStructuredQueryResponse, response_data)
def test_structured_query_serialization(self):
"""Test structured query request/response serialization contract"""
# Test request
request_data = {
"question": "Show me all customers"
}
assert serialize_deserialize_test(StructuredQueryRequest, request_data)
# Test response
response_data = {
"error": None,
"data": '{"customers": [{"id": "1", "name": "John"}]}',
"errors": []
}
assert serialize_deserialize_test(StructuredQueryResponse, response_data)

View file

@ -0,0 +1,539 @@
"""
Integration tests for NLP Query Service
These tests verify the end-to-end functionality of the NLP query service,
testing service coordination, prompt service integration, and schema processing.
Following the TEST_STRATEGY.md approach for integration testing.
"""
import pytest
import json
from unittest.mock import AsyncMock, MagicMock, patch
from trustgraph.schema import (
QuestionToStructuredQueryRequest, QuestionToStructuredQueryResponse,
PromptRequest, PromptResponse, Error, RowSchema, Field as SchemaField
)
from trustgraph.retrieval.nlp_query.service import Processor
@pytest.mark.integration
class TestNLPQueryServiceIntegration:
"""Integration tests for NLP query service coordination"""
@pytest.fixture
def sample_schemas(self):
"""Sample schemas for testing"""
return {
"customers": RowSchema(
name="customers",
description="Customer data with contact information",
fields=[
SchemaField(name="id", type="string", primary=True),
SchemaField(name="name", type="string"),
SchemaField(name="email", type="string"),
SchemaField(name="state", type="string"),
SchemaField(name="phone", type="string")
]
),
"orders": RowSchema(
name="orders",
description="Customer order transactions",
fields=[
SchemaField(name="order_id", type="string", primary=True),
SchemaField(name="customer_id", type="string"),
SchemaField(name="total", type="float"),
SchemaField(name="status", type="string"),
SchemaField(name="order_date", type="datetime")
]
),
"products": RowSchema(
name="products",
description="Product catalog information",
fields=[
SchemaField(name="product_id", type="string", primary=True),
SchemaField(name="name", type="string"),
SchemaField(name="category", type="string"),
SchemaField(name="price", type="float"),
SchemaField(name="in_stock", type="boolean")
]
)
}
@pytest.fixture
def integration_processor(self, sample_schemas):
"""Create processor with realistic configuration"""
proc = Processor(
taskgroup=MagicMock(),
pulsar_client=AsyncMock(),
config_type="schema",
schema_selection_template="schema-selection-v1",
graphql_generation_template="graphql-generation-v1"
)
# Set up schemas
proc.schemas = sample_schemas
# Mock the client method
proc.client = MagicMock()
return proc
@pytest.mark.asyncio
async def test_end_to_end_nlp_query_processing(self, integration_processor):
"""Test complete NLP query processing pipeline"""
# Arrange - Create realistic query request
request = QuestionToStructuredQueryRequest(
question="Show me customers from California who have placed orders over $500",
max_results=50
)
msg = MagicMock()
msg.value.return_value = request
msg.properties.return_value = {"id": "integration-test-001"}
consumer = MagicMock()
flow = MagicMock()
flow_response = AsyncMock()
flow.return_value = flow_response
# Mock Phase 1 - Schema Selection Response
phase1_response = PromptResponse(
text=json.dumps(["customers", "orders"]),
error=None
)
# Mock Phase 2 - GraphQL Generation Response
expected_graphql = """
query GetCaliforniaCustomersWithLargeOrders($min_total: Float!) {
customers(where: {state: {eq: "California"}}) {
id
name
email
state
orders(where: {total: {gt: $min_total}}) {
order_id
total
status
order_date
}
}
}
"""
phase2_response = PromptResponse(
text=json.dumps({
"query": expected_graphql.strip(),
"variables": {"min_total": "500.0"},
"confidence": 0.92
}),
error=None
)
# Set up mock to return different responses for each call
integration_processor.client.return_value.request = AsyncMock(
side_effect=[phase1_response, phase2_response]
)
# Act - Process the message
await integration_processor.on_message(msg, consumer, flow)
# Assert - Verify the complete pipeline
assert integration_processor.client.return_value.request.call_count == 2
flow_response.send.assert_called_once()
# Verify response structure and content
response_call = flow_response.send.call_args
response = response_call[0][0]
assert isinstance(response, QuestionToStructuredQueryResponse)
assert response.error is None
assert "customers" in response.graphql_query
assert "orders" in response.graphql_query
assert "California" in response.graphql_query
assert response.detected_schemas == ["customers", "orders"]
assert response.confidence == 0.92
assert response.variables["min_total"] == "500.0"
@pytest.mark.asyncio
async def test_complex_multi_table_query_integration(self, integration_processor):
"""Test integration with complex multi-table queries"""
# Arrange
request = QuestionToStructuredQueryRequest(
question="Find all electronic products under $100 that are in stock, along with any recent orders",
max_results=25
)
msg = MagicMock()
msg.value.return_value = request
msg.properties.return_value = {"id": "multi-table-test"}
consumer = MagicMock()
flow = MagicMock()
flow_response = AsyncMock()
flow.return_value = flow_response
# Mock responses
phase1_response = PromptResponse(
text=json.dumps(["products", "orders"]),
error=None
)
phase2_response = PromptResponse(
text=json.dumps({
"query": "query { products(where: {category: {eq: \"Electronics\"}, price: {lt: 100}, in_stock: {eq: true}}) { product_id name price orders { order_id total } } }",
"variables": {},
"confidence": 0.88
}),
error=None
)
integration_processor.client.return_value.request = AsyncMock(
side_effect=[phase1_response, phase2_response]
)
# Act
await integration_processor.on_message(msg, consumer, flow)
# Assert
response_call = flow_response.send.call_args
response = response_call[0][0]
assert response.detected_schemas == ["products", "orders"]
assert "Electronics" in response.graphql_query
assert "price: {lt: 100}" in response.graphql_query
assert "in_stock: {eq: true}" in response.graphql_query
@pytest.mark.asyncio
async def test_schema_configuration_integration(self, integration_processor):
"""Test integration with dynamic schema configuration"""
# Arrange - New schema configuration
new_schema_config = {
"schema": {
"inventory": json.dumps({
"name": "inventory",
"description": "Product inventory tracking",
"fields": [
{"name": "sku", "type": "string", "primary_key": True},
{"name": "quantity", "type": "integer"},
{"name": "warehouse_location", "type": "string"}
]
})
}
}
# Act - Update configuration
await integration_processor.on_schema_config(new_schema_config, "v2")
# Arrange - Test query using new schema
request = QuestionToStructuredQueryRequest(
question="Show inventory levels for all products in warehouse A",
max_results=100
)
msg = MagicMock()
msg.value.return_value = request
msg.properties.return_value = {"id": "schema-config-test"}
consumer = MagicMock()
flow = MagicMock()
flow_response = AsyncMock()
flow.return_value = flow_response
# Mock responses that use the new schema
phase1_response = PromptResponse(
text=json.dumps(["inventory"]),
error=None
)
phase2_response = PromptResponse(
text=json.dumps({
"query": "query { inventory(where: {warehouse_location: {eq: \"A\"}}) { sku quantity warehouse_location } }",
"variables": {},
"confidence": 0.85
}),
error=None
)
integration_processor.client.return_value.request = AsyncMock(
side_effect=[phase1_response, phase2_response]
)
# Act
await integration_processor.on_message(msg, consumer, flow)
# Assert
assert "inventory" in integration_processor.schemas
response_call = flow_response.send.call_args
response = response_call[0][0]
assert response.detected_schemas == ["inventory"]
assert "inventory" in response.graphql_query
@pytest.mark.asyncio
async def test_prompt_service_error_recovery_integration(self, integration_processor):
"""Test integration with prompt service error scenarios"""
# Arrange
request = QuestionToStructuredQueryRequest(
question="Show me customer data",
max_results=10
)
msg = MagicMock()
msg.value.return_value = request
msg.properties.return_value = {"id": "error-recovery-test"}
consumer = MagicMock()
flow = MagicMock()
flow_response = AsyncMock()
flow.return_value = flow_response
# Mock Phase 1 error
phase1_error_response = PromptResponse(
text="",
error=Error(type="template-not-found", message="Schema selection template not available")
)
integration_processor.client.return_value.request = AsyncMock(
return_value=phase1_error_response
)
# Act
await integration_processor.on_message(msg, consumer, flow)
# Assert - Error is properly handled and propagated
flow_response.send.assert_called_once()
response_call = flow_response.send.call_args
response = response_call[0][0]
assert isinstance(response, QuestionToStructuredQueryResponse)
assert response.error is not None
assert response.error.type == "nlp-query-error"
assert "Prompt service error" in response.error.message
@pytest.mark.asyncio
async def test_template_parameter_integration(self, sample_schemas):
"""Test integration with different template configurations"""
# Test with custom templates
custom_processor = Processor(
taskgroup=MagicMock(),
pulsar_client=AsyncMock(),
config_type="schema",
schema_selection_template="custom-schema-selector",
graphql_generation_template="custom-graphql-generator"
)
custom_processor.schemas = sample_schemas
custom_processor.client = MagicMock()
request = QuestionToStructuredQueryRequest(
question="Test query",
max_results=5
)
msg = MagicMock()
msg.value.return_value = request
msg.properties.return_value = {"id": "template-test"}
consumer = MagicMock()
flow = MagicMock()
flow_response = AsyncMock()
flow.return_value = flow_response
# Mock responses
phase1_response = PromptResponse(text=json.dumps(["customers"]), error=None)
phase2_response = PromptResponse(
text=json.dumps({
"query": "query { customers { id name } }",
"variables": {},
"confidence": 0.9
}),
error=None
)
custom_processor.client.return_value.request = AsyncMock(
side_effect=[phase1_response, phase2_response]
)
# Act
await custom_processor.on_message(msg, consumer, flow)
# Assert - Verify custom templates are used
assert custom_processor.schema_selection_template == "custom-schema-selector"
assert custom_processor.graphql_generation_template == "custom-graphql-generator"
# Verify the calls were made
assert custom_processor.client.return_value.request.call_count == 2
@pytest.mark.asyncio
async def test_large_schema_set_integration(self, integration_processor):
"""Test integration with large numbers of schemas"""
# Arrange - Add many schemas
large_schema_set = {}
for i in range(20):
schema_name = f"table_{i:02d}"
large_schema_set[schema_name] = RowSchema(
name=schema_name,
description=f"Test table {i} with sample data",
fields=[
SchemaField(name="id", type="string", primary=True)
] + [SchemaField(name=f"field_{j}", type="string") for j in range(5)]
)
integration_processor.schemas.update(large_schema_set)
request = QuestionToStructuredQueryRequest(
question="Show me data from table_05 and table_12",
max_results=20
)
msg = MagicMock()
msg.value.return_value = request
msg.properties.return_value = {"id": "large-schema-test"}
consumer = MagicMock()
flow = MagicMock()
flow_response = AsyncMock()
flow.return_value = flow_response
# Mock responses
phase1_response = PromptResponse(
text=json.dumps(["table_05", "table_12"]),
error=None
)
phase2_response = PromptResponse(
text=json.dumps({
"query": "query { table_05 { id field_0 } table_12 { id field_1 } }",
"variables": {},
"confidence": 0.87
}),
error=None
)
integration_processor.client.return_value.request = AsyncMock(
side_effect=[phase1_response, phase2_response]
)
# Act
await integration_processor.on_message(msg, consumer, flow)
# Assert - Should handle large schema sets efficiently
response_call = flow_response.send.call_args
response = response_call[0][0]
assert response.detected_schemas == ["table_05", "table_12"]
assert "table_05" in response.graphql_query
assert "table_12" in response.graphql_query
@pytest.mark.asyncio
async def test_concurrent_request_handling_integration(self, integration_processor):
"""Test integration with concurrent request processing"""
# Arrange - Multiple concurrent requests
requests = []
messages = []
flows = []
for i in range(5):
request = QuestionToStructuredQueryRequest(
question=f"Query {i}: Show me data",
max_results=10
)
msg = MagicMock()
msg.value.return_value = request
msg.properties.return_value = {"id": f"concurrent-test-{i}"}
flow = MagicMock()
flow_response = AsyncMock()
flow.return_value = flow_response
requests.append(request)
messages.append(msg)
flows.append(flow)
# Mock responses for all requests
mock_responses = []
for i in range(10): # 2 calls per request (phase1 + phase2)
if i % 2 == 0: # Phase 1 responses
mock_responses.append(PromptResponse(
text=json.dumps(["customers"]),
error=None
))
else: # Phase 2 responses
mock_responses.append(PromptResponse(
text=json.dumps({
"query": f"query {{ customers {{ id name }} }}",
"variables": {},
"confidence": 0.9
}),
error=None
))
integration_processor.client.return_value.request = AsyncMock(
side_effect=mock_responses
)
# Act - Process all messages concurrently
import asyncio
consumer = MagicMock()
tasks = []
for msg, flow in zip(messages, flows):
task = integration_processor.on_message(msg, consumer, flow)
tasks.append(task)
await asyncio.gather(*tasks)
# Assert - All requests should be processed
assert integration_processor.client.return_value.request.call_count == 10
for flow in flows:
flow.return_value.send.assert_called_once()
@pytest.mark.asyncio
async def test_performance_timing_integration(self, integration_processor):
"""Test performance characteristics of the integration"""
# Arrange
request = QuestionToStructuredQueryRequest(
question="Performance test query",
max_results=100
)
msg = MagicMock()
msg.value.return_value = request
msg.properties.return_value = {"id": "performance-test"}
consumer = MagicMock()
flow = MagicMock()
flow_response = AsyncMock()
flow.return_value = flow_response
# Mock fast responses
phase1_response = PromptResponse(text=json.dumps(["customers"]), error=None)
phase2_response = PromptResponse(
text=json.dumps({
"query": "query { customers { id } }",
"variables": {},
"confidence": 0.9
}),
error=None
)
integration_processor.client.return_value.request = AsyncMock(
side_effect=[phase1_response, phase2_response]
)
# Act
import time
start_time = time.time()
await integration_processor.on_message(msg, consumer, flow)
end_time = time.time()
execution_time = end_time - start_time
# Assert
assert execution_time < 1.0 # Should complete quickly with mocked services
flow_response.send.assert_called_once()
response_call = flow_response.send.call_args
response = response_call[0][0]
assert response.error is None

View file

@ -0,0 +1,665 @@
"""
Integration tests for Structured Query Service
These tests verify the end-to-end functionality of the structured query service,
testing orchestration between nlp-query and objects-query services.
Following the TEST_STRATEGY.md approach for integration testing.
"""
import pytest
import json
from unittest.mock import AsyncMock, MagicMock
from trustgraph.schema import (
StructuredQueryRequest, StructuredQueryResponse,
QuestionToStructuredQueryRequest, QuestionToStructuredQueryResponse,
ObjectsQueryRequest, ObjectsQueryResponse,
Error, GraphQLError
)
from trustgraph.retrieval.structured_query.service import Processor
@pytest.mark.integration
class TestStructuredQueryServiceIntegration:
"""Integration tests for structured query service orchestration"""
@pytest.fixture
def integration_processor(self):
"""Create processor with realistic configuration"""
proc = Processor(
taskgroup=MagicMock(),
pulsar_client=AsyncMock()
)
# Mock the client method
proc.client = MagicMock()
return proc
@pytest.mark.asyncio
async def test_end_to_end_structured_query_processing(self, integration_processor):
"""Test complete structured query processing pipeline"""
# Arrange - Create realistic query request
request = StructuredQueryRequest(
question="Show me all customers from California who have made purchases over $500"
)
msg = MagicMock()
msg.value.return_value = request
msg.properties.return_value = {"id": "integration-test-001"}
consumer = MagicMock()
flow = MagicMock()
flow_response = AsyncMock()
flow.return_value = flow_response
# Mock NLP Query Service Response
nlp_response = QuestionToStructuredQueryResponse(
error=None,
graphql_query='''
query GetCaliforniaCustomersWithLargePurchases($minAmount: String!, $state: String!) {
customers(where: {state: {eq: $state}}) {
id
name
email
orders(where: {total: {gt: $minAmount}}) {
id
total
date
}
}
}
''',
variables={
"minAmount": "500.0",
"state": "California"
},
detected_schemas=["customers", "orders"],
confidence=0.91
)
# Mock Objects Query Service Response
objects_response = ObjectsQueryResponse(
error=None,
data='{"customers": [{"id": "123", "name": "Alice Johnson", "email": "alice@example.com", "orders": [{"id": "456", "total": 750.0, "date": "2024-01-15"}]}]}',
errors=None,
extensions={"execution_time": "150ms", "query_complexity": "8"}
)
# Set up mock clients to return different responses
mock_nlp_client = AsyncMock()
mock_nlp_client.request.return_value = nlp_response
mock_objects_client = AsyncMock()
mock_objects_client.request.return_value = objects_response
integration_processor.client.side_effect = lambda name: (
mock_nlp_client if name == "nlp-query-request" else mock_objects_client
)
# Act - Process the message
await integration_processor.on_message(msg, consumer, flow)
# Assert - Verify the complete orchestration
# Verify NLP service call
mock_nlp_client.request.assert_called_once()
nlp_call_args = mock_nlp_client.request.call_args[0][0]
assert isinstance(nlp_call_args, QuestionToStructuredQueryRequest)
assert nlp_call_args.question == "Show me all customers from California who have made purchases over $500"
assert nlp_call_args.max_results == 100 # Default max_results
# Verify Objects service call
mock_objects_client.request.assert_called_once()
objects_call_args = mock_objects_client.request.call_args[0][0]
assert isinstance(objects_call_args, ObjectsQueryRequest)
assert "customers" in objects_call_args.query
assert "orders" in objects_call_args.query
assert objects_call_args.variables["minAmount"] == "500.0" # Converted to string
assert objects_call_args.variables["state"] == "California"
assert objects_call_args.user == "default"
assert objects_call_args.collection == "default"
# Verify response
flow_response.send.assert_called_once()
response_call = flow_response.send.call_args
response = response_call[0][0]
assert isinstance(response, StructuredQueryResponse)
assert response.error is None
assert "Alice Johnson" in response.data
assert "750.0" in response.data
assert len(response.errors) == 0
@pytest.mark.asyncio
async def test_nlp_service_integration_failure(self, integration_processor):
"""Test integration when NLP service fails"""
# Arrange
request = StructuredQueryRequest(
question="This is an unparseable query ][{}"
)
msg = MagicMock()
msg.value.return_value = request
msg.properties.return_value = {"id": "nlp-failure-test"}
consumer = MagicMock()
flow = MagicMock()
flow_response = AsyncMock()
flow.return_value = flow_response
# Mock NLP service failure
nlp_error_response = QuestionToStructuredQueryResponse(
error=Error(type="nlp-parsing-error", message="Unable to parse natural language query"),
graphql_query="",
variables={},
detected_schemas=[],
confidence=0.0
)
mock_nlp_client = AsyncMock()
mock_nlp_client.request.return_value = nlp_error_response
integration_processor.client.return_value = mock_nlp_client
# Act
await integration_processor.on_message(msg, consumer, flow)
# Assert - Error should be propagated properly
flow_response.send.assert_called_once()
response_call = flow_response.send.call_args
response = response_call[0][0]
assert isinstance(response, StructuredQueryResponse)
assert response.error is not None
assert response.error.type == "structured-query-error"
assert "NLP query service error" in response.error.message
assert "Unable to parse natural language query" in response.error.message
@pytest.mark.asyncio
async def test_objects_service_integration_failure(self, integration_processor):
"""Test integration when Objects service fails"""
# Arrange
request = StructuredQueryRequest(
question="Show me data from a table that doesn't exist"
)
msg = MagicMock()
msg.value.return_value = request
msg.properties.return_value = {"id": "objects-failure-test"}
consumer = MagicMock()
flow = MagicMock()
flow_response = AsyncMock()
flow.return_value = flow_response
# Mock successful NLP response
nlp_response = QuestionToStructuredQueryResponse(
error=None,
graphql_query='query { nonexistent_table { id name } }',
variables={},
detected_schemas=["nonexistent_table"],
confidence=0.7
)
# Mock Objects service failure
objects_error_response = ObjectsQueryResponse(
error=Error(type="graphql-schema-error", message="Table 'nonexistent_table' does not exist in schema"),
data=None,
errors=None,
extensions={}
)
mock_nlp_client = AsyncMock()
mock_nlp_client.request.return_value = nlp_response
mock_objects_client = AsyncMock()
mock_objects_client.request.return_value = objects_error_response
integration_processor.client.side_effect = lambda name: (
mock_nlp_client if name == "nlp-query-request" else mock_objects_client
)
# Act
await integration_processor.on_message(msg, consumer, flow)
# Assert - Error should be propagated
flow_response.send.assert_called_once()
response_call = flow_response.send.call_args
response = response_call[0][0]
assert response.error is not None
assert response.error.type == "structured-query-error"
assert "Objects query service error" in response.error.message
assert "nonexistent_table" in response.error.message
@pytest.mark.asyncio
async def test_graphql_validation_errors_integration(self, integration_processor):
"""Test integration with GraphQL validation errors"""
# Arrange
request = StructuredQueryRequest(
question="Show me customer invalid_field values"
)
msg = MagicMock()
msg.value.return_value = request
msg.properties.return_value = {"id": "validation-error-test"}
consumer = MagicMock()
flow = MagicMock()
flow_response = AsyncMock()
flow.return_value = flow_response
# Mock NLP response with invalid field
nlp_response = QuestionToStructuredQueryResponse(
error=None,
graphql_query='query { customers { id invalid_field } }',
variables={},
detected_schemas=["customers"],
confidence=0.8
)
# Mock Objects response with GraphQL validation errors
validation_errors = [
GraphQLError(
message="Cannot query field 'invalid_field' on type 'Customer'",
path=["customers", "0", "invalid_field"],
extensions={"code": "VALIDATION_ERROR"}
),
GraphQLError(
message="Field 'invalid_field' is not defined in the schema",
path=["customers", "invalid_field"],
extensions={"code": "FIELD_NOT_FOUND"}
)
]
objects_response = ObjectsQueryResponse(
error=None,
data=None, # No data when validation fails
errors=validation_errors,
extensions={"validation_errors": "2"}
)
mock_nlp_client = AsyncMock()
mock_nlp_client.request.return_value = nlp_response
mock_objects_client = AsyncMock()
mock_objects_client.request.return_value = objects_response
integration_processor.client.side_effect = lambda name: (
mock_nlp_client if name == "nlp-query-request" else mock_objects_client
)
# Act
await integration_processor.on_message(msg, consumer, flow)
# Assert - GraphQL errors should be included in response
flow_response.send.assert_called_once()
response_call = flow_response.send.call_args
response = response_call[0][0]
assert response.error is None # No system error
assert len(response.errors) == 2 # Two GraphQL errors
assert "Cannot query field 'invalid_field'" in response.errors[0]
assert "Field 'invalid_field' is not defined" in response.errors[1]
assert "customers" in response.errors[0]
@pytest.mark.asyncio
async def test_complex_multi_service_integration(self, integration_processor):
"""Test complex integration scenario with multiple entities and relationships"""
# Arrange
request = StructuredQueryRequest(
question="Find all products under $100 that are in stock, along with their recent orders from customers in New York"
)
msg = MagicMock()
msg.value.return_value = request
msg.properties.return_value = {"id": "complex-integration-test"}
consumer = MagicMock()
flow = MagicMock()
flow_response = AsyncMock()
flow.return_value = flow_response
# Mock complex NLP response
nlp_response = QuestionToStructuredQueryResponse(
error=None,
graphql_query='''
query GetProductsWithCustomerOrders($maxPrice: String!, $inStock: String!, $state: String!) {
products(where: {price: {lt: $maxPrice}, in_stock: {eq: $inStock}}) {
id
name
price
orders {
id
total
customer {
id
name
state
}
}
}
}
''',
variables={
"maxPrice": "100.0",
"inStock": "true",
"state": "New York"
},
detected_schemas=["products", "orders", "customers"],
confidence=0.85
)
# Mock complex Objects response
complex_data = {
"products": [
{
"id": "prod_123",
"name": "Widget A",
"price": 89.99,
"orders": [
{
"id": "order_456",
"total": 179.98,
"customer": {
"id": "cust_789",
"name": "Bob Smith",
"state": "New York"
}
}
]
},
{
"id": "prod_124",
"name": "Widget B",
"price": 65.50,
"orders": [
{
"id": "order_457",
"total": 131.00,
"customer": {
"id": "cust_790",
"name": "Carol Jones",
"state": "New York"
}
}
]
}
]
}
objects_response = ObjectsQueryResponse(
error=None,
data=json.dumps(complex_data),
errors=None,
extensions={
"execution_time": "250ms",
"query_complexity": "15",
"data_sources": "products,orders,customers" # Convert array to comma-separated string
}
)
mock_nlp_client = AsyncMock()
mock_nlp_client.request.return_value = nlp_response
mock_objects_client = AsyncMock()
mock_objects_client.request.return_value = objects_response
integration_processor.client.side_effect = lambda name: (
mock_nlp_client if name == "nlp-query-request" else mock_objects_client
)
# Act
await integration_processor.on_message(msg, consumer, flow)
# Assert - Verify complex data integration
# Check NLP service call
nlp_call_args = mock_nlp_client.request.call_args[0][0]
assert len(nlp_call_args.question) > 50 # Complex question
# Check Objects service call with variable conversion
objects_call_args = mock_objects_client.request.call_args[0][0]
assert objects_call_args.variables["maxPrice"] == "100.0"
assert objects_call_args.variables["inStock"] == "true"
assert objects_call_args.variables["state"] == "New York"
# Check response contains complex data
response_call = flow_response.send.call_args
response = response_call[0][0]
assert response.error is None
assert "Widget A" in response.data
assert "Widget B" in response.data
assert "Bob Smith" in response.data
assert "Carol Jones" in response.data
assert "New York" in response.data
@pytest.mark.asyncio
async def test_empty_result_integration(self, integration_processor):
"""Test integration when query returns empty results"""
# Arrange
request = StructuredQueryRequest(
question="Show me customers from Mars"
)
msg = MagicMock()
msg.value.return_value = request
msg.properties.return_value = {"id": "empty-result-test"}
consumer = MagicMock()
flow = MagicMock()
flow_response = AsyncMock()
flow.return_value = flow_response
# Mock NLP response
nlp_response = QuestionToStructuredQueryResponse(
error=None,
graphql_query='query { customers(where: {planet: {eq: "Mars"}}) { id name planet } }',
variables={},
detected_schemas=["customers"],
confidence=0.9
)
# Mock empty Objects response
objects_response = ObjectsQueryResponse(
error=None,
data='{"customers": []}', # Empty result set
errors=None,
extensions={"result_count": "0"}
)
mock_nlp_client = AsyncMock()
mock_nlp_client.request.return_value = nlp_response
mock_objects_client = AsyncMock()
mock_objects_client.request.return_value = objects_response
integration_processor.client.side_effect = lambda name: (
mock_nlp_client if name == "nlp-query-request" else mock_objects_client
)
# Act
await integration_processor.on_message(msg, consumer, flow)
# Assert - Empty results should be handled gracefully
response_call = flow_response.send.call_args
response = response_call[0][0]
assert response.error is None
assert response.data == '{"customers": []}'
assert len(response.errors) == 0
@pytest.mark.asyncio
async def test_concurrent_requests_integration(self, integration_processor):
"""Test integration with concurrent request processing"""
# Arrange - Multiple concurrent requests
requests = []
messages = []
flows = []
for i in range(3):
request = StructuredQueryRequest(
question=f"Query {i}: Show me data"
)
msg = MagicMock()
msg.value.return_value = request
msg.properties.return_value = {"id": f"concurrent-test-{i}"}
flow = MagicMock()
flow_response = AsyncMock()
flow.return_value = flow_response
requests.append(request)
messages.append(msg)
flows.append(flow)
# Mock responses for all requests (6 total: 3 NLP + 3 Objects)
mock_responses = []
for i in range(6):
if i % 2 == 0: # NLP responses
mock_responses.append(QuestionToStructuredQueryResponse(
error=None,
graphql_query=f'query {{ test_{i//2} {{ id }} }}',
variables={},
detected_schemas=[f"test_{i//2}"],
confidence=0.9
))
else: # Objects responses
mock_responses.append(ObjectsQueryResponse(
error=None,
data=f'{{"test_{i//2}": [{{"id": "{i//2}"}}]}}',
errors=None,
extensions={}
))
call_count = 0
def mock_client_side_effect(name):
nonlocal call_count
client = AsyncMock()
client.request.return_value = mock_responses[call_count]
call_count += 1
return client
integration_processor.client.side_effect = mock_client_side_effect
# Act - Process all messages concurrently
import asyncio
consumer = MagicMock()
tasks = []
for msg, flow in zip(messages, flows):
task = integration_processor.on_message(msg, consumer, flow)
tasks.append(task)
await asyncio.gather(*tasks)
# Assert - All requests should be processed
assert call_count == 6 # 2 calls per request (NLP + Objects)
for flow in flows:
flow.return_value.send.assert_called_once()
@pytest.mark.asyncio
async def test_service_timeout_integration(self, integration_processor):
"""Test integration with service timeout scenarios"""
# Arrange
request = StructuredQueryRequest(
question="This query will timeout"
)
msg = MagicMock()
msg.value.return_value = request
msg.properties.return_value = {"id": "timeout-test"}
consumer = MagicMock()
flow = MagicMock()
flow_response = AsyncMock()
flow.return_value = flow_response
# Mock NLP service timeout
mock_nlp_client = AsyncMock()
mock_nlp_client.request.side_effect = Exception("Service timeout: Request took longer than 30s")
integration_processor.client.return_value = mock_nlp_client
# Act
await integration_processor.on_message(msg, consumer, flow)
# Assert - Timeout should be handled gracefully
flow_response.send.assert_called_once()
response_call = flow_response.send.call_args
response = response_call[0][0]
assert response.error is not None
assert response.error.type == "structured-query-error"
assert "timeout" in response.error.message.lower()
@pytest.mark.asyncio
async def test_variable_type_conversion_integration(self, integration_processor):
"""Test integration with complex variable type conversions"""
# Arrange
request = StructuredQueryRequest(
question="Show me orders with totals between 50.5 and 200.75 from the last 30 days"
)
msg = MagicMock()
msg.value.return_value = request
msg.properties.return_value = {"id": "variable-conversion-test"}
consumer = MagicMock()
flow = MagicMock()
flow_response = AsyncMock()
flow.return_value = flow_response
# Mock NLP response with various data types that need string conversion
nlp_response = QuestionToStructuredQueryResponse(
error=None,
graphql_query='query($minTotal: Float!, $maxTotal: Float!, $daysPast: Int!) { orders(filter: {total: {between: [$minTotal, $maxTotal]}, date: {gte: $daysPast}}) { id total date } }',
variables={
"minTotal": "50.5", # Already string
"maxTotal": "200.75", # Already string
"daysPast": "30" # Already string
},
detected_schemas=["orders"],
confidence=0.88
)
# Mock Objects response
objects_response = ObjectsQueryResponse(
error=None,
data='{"orders": [{"id": "123", "total": 125.50, "date": "2024-01-15"}]}',
errors=None,
extensions={}
)
mock_nlp_client = AsyncMock()
mock_nlp_client.request.return_value = nlp_response
mock_objects_client = AsyncMock()
mock_objects_client.request.return_value = objects_response
integration_processor.client.side_effect = lambda name: (
mock_nlp_client if name == "nlp-query-request" else mock_objects_client
)
# Act
await integration_processor.on_message(msg, consumer, flow)
# Assert - Variables should be properly converted to strings
objects_call_args = mock_objects_client.request.call_args[0][0]
# All variables should be strings for Pulsar schema compatibility
assert isinstance(objects_call_args.variables["minTotal"], str)
assert isinstance(objects_call_args.variables["maxTotal"], str)
assert isinstance(objects_call_args.variables["daysPast"], str)
# Values should be preserved
assert objects_call_args.variables["minTotal"] == "50.5"
assert objects_call_args.variables["maxTotal"] == "200.75"
assert objects_call_args.variables["daysPast"] == "30"
# Response should contain expected data
response_call = flow_response.send.call_args
response = response_call[0][0]
assert response.error is None
assert "125.50" in response.data

View file

@ -0,0 +1,356 @@
"""
Unit tests for NLP Query service
Following TEST_STRATEGY.md approach for service testing
"""
import pytest
import json
from unittest.mock import AsyncMock, MagicMock, patch
from typing import Dict, Any
from trustgraph.schema import (
QuestionToStructuredQueryRequest, QuestionToStructuredQueryResponse,
PromptRequest, PromptResponse, Error, RowSchema, Field as SchemaField
)
from trustgraph.retrieval.nlp_query.service import Processor
@pytest.fixture
def mock_prompt_client():
"""Mock prompt service client"""
return AsyncMock()
@pytest.fixture
def mock_pulsar_client():
"""Mock Pulsar client"""
return AsyncMock()
@pytest.fixture
def sample_schemas():
"""Sample schemas for testing"""
return {
"customers": RowSchema(
name="customers",
description="Customer data",
fields=[
SchemaField(name="id", type="string", primary=True),
SchemaField(name="name", type="string"),
SchemaField(name="email", type="string"),
SchemaField(name="state", type="string")
]
),
"orders": RowSchema(
name="orders",
description="Order data",
fields=[
SchemaField(name="order_id", type="string", primary=True),
SchemaField(name="customer_id", type="string"),
SchemaField(name="total", type="float"),
SchemaField(name="status", type="string")
]
)
}
@pytest.fixture
def processor(mock_pulsar_client, sample_schemas):
"""Create processor with mocked dependencies"""
proc = Processor(
taskgroup=MagicMock(),
pulsar_client=mock_pulsar_client,
config_type="schema"
)
# Set up schemas
proc.schemas = sample_schemas
# Mock the client method
proc.client = MagicMock()
return proc
@pytest.mark.asyncio
class TestNLPQueryProcessor:
"""Test NLP Query service processor"""
async def test_phase1_select_schemas_success(self, processor, mock_prompt_client):
"""Test successful schema selection (Phase 1)"""
# Arrange
question = "Show me customers from California"
expected_schemas = ["customers"]
mock_response = PromptResponse(
text=json.dumps(expected_schemas),
error=None
)
processor.client.return_value.request = AsyncMock(return_value=mock_response)
# Act
result = await processor.phase1_select_schemas(question)
# Assert
assert result == expected_schemas
processor.client.assert_called_once_with("prompt-request")
async def test_phase1_select_schemas_prompt_error(self, processor):
"""Test schema selection with prompt service error"""
# Arrange
question = "Show me customers"
error = Error(type="prompt-error", message="Template not found")
mock_response = PromptResponse(text="", error=error)
processor.client.return_value.request = AsyncMock(return_value=mock_response)
# Act & Assert
with pytest.raises(Exception, match="Prompt service error"):
await processor.phase1_select_schemas(question)
async def test_phase2_generate_graphql_success(self, processor):
"""Test successful GraphQL generation (Phase 2)"""
# Arrange
question = "Show me customers from California"
selected_schemas = ["customers"]
expected_result = {
"query": "query { customers(where: {state: {eq: \"California\"}}) { id name email state } }",
"variables": {},
"confidence": 0.95
}
mock_response = PromptResponse(
text=json.dumps(expected_result),
error=None
)
processor.client.return_value.request = AsyncMock(return_value=mock_response)
# Act
result = await processor.phase2_generate_graphql(question, selected_schemas)
# Assert
assert result == expected_result
processor.client.assert_called_once_with("prompt-request")
async def test_phase2_generate_graphql_prompt_error(self, processor):
"""Test GraphQL generation with prompt service error"""
# Arrange
question = "Show me customers"
selected_schemas = ["customers"]
error = Error(type="prompt-error", message="Generation failed")
mock_response = PromptResponse(text="", error=error)
processor.client.return_value.request = AsyncMock(return_value=mock_response)
# Act & Assert
with pytest.raises(Exception, match="Prompt service error"):
await processor.phase2_generate_graphql(question, selected_schemas)
async def test_on_message_full_flow_success(self, processor):
"""Test complete message processing flow"""
# Arrange
request = QuestionToStructuredQueryRequest(
question="Show me customers from California",
max_results=100
)
msg = MagicMock()
msg.value.return_value = request
msg.properties.return_value = {"id": "test-123"}
consumer = MagicMock()
flow = MagicMock()
flow_response = AsyncMock()
flow.return_value = flow_response
# Mock Phase 1 response
phase1_response = PromptResponse(
text=json.dumps(["customers"]),
error=None
)
# Mock Phase 2 response
phase2_response = PromptResponse(
text=json.dumps({
"query": "query { customers(where: {state: {eq: \"California\"}}) { id name email } }",
"variables": {},
"confidence": 0.9
}),
error=None
)
# Set up mock to return different responses for each call
processor.client.return_value.request = AsyncMock(
side_effect=[phase1_response, phase2_response]
)
# Act
await processor.on_message(msg, consumer, flow)
# Assert
assert processor.client.return_value.request.call_count == 2
flow_response.send.assert_called_once()
# Verify response structure
response_call = flow_response.send.call_args
response = response_call[0][0] # First argument is the response object
assert isinstance(response, QuestionToStructuredQueryResponse)
assert response.error is None
assert "customers" in response.graphql_query
assert response.detected_schemas == ["customers"]
assert response.confidence == 0.9
async def test_on_message_phase1_error(self, processor):
"""Test message processing with Phase 1 failure"""
# Arrange
request = QuestionToStructuredQueryRequest(
question="Show me customers",
max_results=100
)
msg = MagicMock()
msg.value.return_value = request
msg.properties.return_value = {"id": "test-123"}
consumer = MagicMock()
flow = MagicMock()
flow_response = AsyncMock()
flow.return_value = flow_response
# Mock Phase 1 error
phase1_response = PromptResponse(
text="",
error=Error(type="template-error", message="Template not found")
)
processor.client.return_value.request = AsyncMock(return_value=phase1_response)
# Act
await processor.on_message(msg, consumer, flow)
# Assert
flow_response.send.assert_called_once()
# Verify error response
response_call = flow_response.send.call_args
response = response_call[0][0]
assert isinstance(response, QuestionToStructuredQueryResponse)
assert response.error is not None
assert response.error.type == "nlp-query-error"
assert "Prompt service error" in response.error.message
async def test_schema_config_loading(self, processor):
"""Test schema configuration loading"""
# Arrange
config = {
"schema": {
"test_schema": json.dumps({
"name": "test_schema",
"description": "Test schema",
"fields": [
{
"name": "id",
"type": "string",
"primary_key": True,
"required": True
},
{
"name": "name",
"type": "string",
"description": "User name"
}
]
})
}
}
# Act
await processor.on_schema_config(config, "v1")
# Assert
assert "test_schema" in processor.schemas
schema = processor.schemas["test_schema"]
assert schema.name == "test_schema"
assert schema.description == "Test schema"
assert len(schema.fields) == 2
assert schema.fields[0].name == "id"
assert schema.fields[0].primary == True
assert schema.fields[1].name == "name"
async def test_schema_config_loading_invalid_json(self, processor):
"""Test schema configuration loading with invalid JSON"""
# Arrange
config = {
"schema": {
"bad_schema": "invalid json{"
}
}
# Act
await processor.on_schema_config(config, "v1")
# Assert - bad schema should be ignored
assert "bad_schema" not in processor.schemas
def test_processor_initialization(self, mock_pulsar_client):
"""Test processor initialization with correct specifications"""
# Act
processor = Processor(
taskgroup=MagicMock(),
pulsar_client=mock_pulsar_client,
schema_selection_template="custom-schema-select",
graphql_generation_template="custom-graphql-gen"
)
# Assert
assert processor.schema_selection_template == "custom-schema-select"
assert processor.graphql_generation_template == "custom-graphql-gen"
assert processor.config_key == "schema"
assert processor.schemas == {}
def test_add_args(self):
"""Test command-line argument parsing"""
import argparse
parser = argparse.ArgumentParser()
Processor.add_args(parser)
# Test default values
args = parser.parse_args([])
assert args.config_type == "schema"
assert args.schema_selection_template == "schema-selection"
assert args.graphql_generation_template == "graphql-generation"
# Test custom values
args = parser.parse_args([
"--config-type", "custom",
"--schema-selection-template", "my-selector",
"--graphql-generation-template", "my-generator"
])
assert args.config_type == "custom"
assert args.schema_selection_template == "my-selector"
assert args.graphql_generation_template == "my-generator"
@pytest.mark.unit
class TestNLPQueryHelperFunctions:
"""Test helper functions and data transformations"""
def test_schema_info_formatting(self, sample_schemas):
"""Test schema info formatting for prompts"""
# This would test any helper functions for formatting schema data
# Currently the formatting is inline, but good to test if extracted
customers_schema = sample_schemas["customers"]
expected_fields = ["id", "name", "email", "state"]
actual_fields = [f.name for f in customers_schema.fields]
assert actual_fields == expected_fields
# Test primary key detection
primary_fields = [f.name for f in customers_schema.fields if f.primary]
assert primary_fields == ["id"]

View file

@ -0,0 +1,522 @@
"""
Unit tests for Structured Query Service
Following TEST_STRATEGY.md approach for service testing
"""
import pytest
import json
from unittest.mock import AsyncMock, MagicMock, patch
from trustgraph.schema import (
StructuredQueryRequest, StructuredQueryResponse,
QuestionToStructuredQueryRequest, QuestionToStructuredQueryResponse,
ObjectsQueryRequest, ObjectsQueryResponse,
Error, GraphQLError
)
from trustgraph.retrieval.structured_query.service import Processor
@pytest.fixture
def mock_pulsar_client():
"""Mock Pulsar client"""
return AsyncMock()
@pytest.fixture
def processor(mock_pulsar_client):
"""Create processor with mocked dependencies"""
proc = Processor(
taskgroup=MagicMock(),
pulsar_client=mock_pulsar_client
)
# Mock the client method
proc.client = MagicMock()
return proc
@pytest.mark.asyncio
class TestStructuredQueryProcessor:
"""Test Structured Query service processor"""
async def test_successful_end_to_end_query(self, processor):
"""Test successful end-to-end query processing"""
# Arrange
request = StructuredQueryRequest(
question="Show me all customers from New York"
)
msg = MagicMock()
msg.value.return_value = request
msg.properties.return_value = {"id": "test-123"}
consumer = MagicMock()
flow = MagicMock()
flow_response = AsyncMock()
flow.return_value = flow_response
# Mock NLP query service response
nlp_response = QuestionToStructuredQueryResponse(
error=None,
graphql_query='query { customers(where: {state: {eq: "NY"}}) { id name email } }',
variables={"state": "NY"},
detected_schemas=["customers"],
confidence=0.95
)
# Mock objects query service response
objects_response = ObjectsQueryResponse(
error=None,
data='{"customers": [{"id": "1", "name": "John", "email": "john@example.com"}]}',
errors=None,
extensions={}
)
# Set up mock clients
mock_nlp_client = AsyncMock()
mock_nlp_client.request.return_value = nlp_response
mock_objects_client = AsyncMock()
mock_objects_client.request.return_value = objects_response
processor.client.side_effect = lambda name: (
mock_nlp_client if name == "nlp-query-request" else mock_objects_client
)
# Act
await processor.on_message(msg, consumer, flow)
# Assert
# Verify NLP query service was called correctly
mock_nlp_client.request.assert_called_once()
nlp_call_args = mock_nlp_client.request.call_args[0][0]
assert isinstance(nlp_call_args, QuestionToStructuredQueryRequest)
assert nlp_call_args.question == "Show me all customers from New York"
assert nlp_call_args.max_results == 100
# Verify objects query service was called correctly
mock_objects_client.request.assert_called_once()
objects_call_args = mock_objects_client.request.call_args[0][0]
assert isinstance(objects_call_args, ObjectsQueryRequest)
assert objects_call_args.query == 'query { customers(where: {state: {eq: "NY"}}) { id name email } }'
assert objects_call_args.variables == {"state": "NY"}
assert objects_call_args.user == "default"
assert objects_call_args.collection == "default"
# Verify response
flow_response.send.assert_called_once()
response_call = flow_response.send.call_args
response = response_call[0][0]
assert isinstance(response, StructuredQueryResponse)
assert response.error is None
assert response.data == '{"customers": [{"id": "1", "name": "John", "email": "john@example.com"}]}'
assert len(response.errors) == 0
async def test_nlp_query_service_error(self, processor):
"""Test handling of NLP query service errors"""
# Arrange
request = StructuredQueryRequest(
question="Invalid query"
)
msg = MagicMock()
msg.value.return_value = request
msg.properties.return_value = {"id": "test-error"}
consumer = MagicMock()
flow = MagicMock()
flow_response = AsyncMock()
flow.return_value = flow_response
# Mock NLP query service error response
nlp_response = QuestionToStructuredQueryResponse(
error=Error(type="nlp-query-error", message="Failed to parse question"),
graphql_query="",
variables={},
detected_schemas=[],
confidence=0.0
)
mock_nlp_client = AsyncMock()
mock_nlp_client.request.return_value = nlp_response
processor.client.return_value = mock_nlp_client
# Act
await processor.on_message(msg, consumer, flow)
# Assert
flow_response.send.assert_called_once()
response_call = flow_response.send.call_args
response = response_call[0][0]
assert isinstance(response, StructuredQueryResponse)
assert response.error is not None
assert response.error.type == "structured-query-error"
assert "NLP query service error" in response.error.message
async def test_empty_graphql_query_error(self, processor):
"""Test handling of empty GraphQL query from NLP service"""
# Arrange
request = StructuredQueryRequest(
question="Ambiguous question"
)
msg = MagicMock()
msg.value.return_value = request
msg.properties.return_value = {"id": "test-empty"}
consumer = MagicMock()
flow = MagicMock()
flow_response = AsyncMock()
flow.return_value = flow_response
# Mock NLP query service response with empty query
nlp_response = QuestionToStructuredQueryResponse(
error=None,
graphql_query="", # Empty query
variables={},
detected_schemas=[],
confidence=0.1
)
mock_nlp_client = AsyncMock()
mock_nlp_client.request.return_value = nlp_response
processor.client.return_value = mock_nlp_client
# Act
await processor.on_message(msg, consumer, flow)
# Assert
flow_response.send.assert_called_once()
response_call = flow_response.send.call_args
response = response_call[0][0]
assert response.error is not None
assert "empty GraphQL query" in response.error.message
async def test_objects_query_service_error(self, processor):
"""Test handling of objects query service errors"""
# Arrange
request = StructuredQueryRequest(
question="Show me customers"
)
msg = MagicMock()
msg.value.return_value = request
msg.properties.return_value = {"id": "test-objects-error"}
consumer = MagicMock()
flow = MagicMock()
flow_response = AsyncMock()
flow.return_value = flow_response
# Mock successful NLP response
nlp_response = QuestionToStructuredQueryResponse(
error=None,
graphql_query='query { customers { id name } }',
variables={},
detected_schemas=["customers"],
confidence=0.9
)
# Mock objects query service error
objects_response = ObjectsQueryResponse(
error=Error(type="graphql-execution-error", message="Table 'customers' not found"),
data=None,
errors=None,
extensions={}
)
mock_nlp_client = AsyncMock()
mock_nlp_client.request.return_value = nlp_response
mock_objects_client = AsyncMock()
mock_objects_client.request.return_value = objects_response
processor.client.side_effect = lambda name: (
mock_nlp_client if name == "nlp-query-request" else mock_objects_client
)
# Act
await processor.on_message(msg, consumer, flow)
# Assert
flow_response.send.assert_called_once()
response_call = flow_response.send.call_args
response = response_call[0][0]
assert response.error is not None
assert "Objects query service error" in response.error.message
assert "Table 'customers' not found" in response.error.message
async def test_graphql_errors_handling(self, processor):
"""Test handling of GraphQL validation/execution errors"""
# Arrange
request = StructuredQueryRequest(
question="Show invalid field"
)
msg = MagicMock()
msg.value.return_value = request
msg.properties.return_value = {"id": "test-graphql-errors"}
consumer = MagicMock()
flow = MagicMock()
flow_response = AsyncMock()
flow.return_value = flow_response
# Mock successful NLP response
nlp_response = QuestionToStructuredQueryResponse(
error=None,
graphql_query='query { customers { invalid_field } }',
variables={},
detected_schemas=["customers"],
confidence=0.8
)
# Mock objects response with GraphQL errors
graphql_errors = [
GraphQLError(
message="Cannot query field 'invalid_field' on type 'Customer'",
path=["customers", "0", "invalid_field"], # All path elements must be strings
extensions={}
)
]
objects_response = ObjectsQueryResponse(
error=None,
data=None,
errors=graphql_errors,
extensions={}
)
mock_nlp_client = AsyncMock()
mock_nlp_client.request.return_value = nlp_response
mock_objects_client = AsyncMock()
mock_objects_client.request.return_value = objects_response
processor.client.side_effect = lambda name: (
mock_nlp_client if name == "nlp-query-request" else mock_objects_client
)
# Act
await processor.on_message(msg, consumer, flow)
# Assert
flow_response.send.assert_called_once()
response_call = flow_response.send.call_args
response = response_call[0][0]
assert response.error is None
assert len(response.errors) == 1
assert "Cannot query field 'invalid_field'" in response.errors[0]
assert "customers" in response.errors[0]
async def test_complex_query_with_variables(self, processor):
"""Test processing complex queries with variables"""
# Arrange
request = StructuredQueryRequest(
question="Show customers with orders over $100 from last month"
)
msg = MagicMock()
msg.value.return_value = request
msg.properties.return_value = {"id": "test-complex"}
consumer = MagicMock()
flow = MagicMock()
flow_response = AsyncMock()
flow.return_value = flow_response
# Mock NLP response with complex query and variables
nlp_response = QuestionToStructuredQueryResponse(
error=None,
graphql_query='''
query GetCustomersWithLargeOrders($minTotal: Float!, $startDate: String!) {
customers {
id
name
orders(where: {total: {gt: $minTotal}, date: {gte: $startDate}}) {
id
total
date
}
}
}
''',
variables={
"minTotal": "100.0", # Convert to string for Pulsar schema
"startDate": "2024-01-01"
},
detected_schemas=["customers", "orders"],
confidence=0.88
)
# Mock objects response
objects_response = ObjectsQueryResponse(
error=None,
data='{"customers": [{"id": "1", "name": "Alice", "orders": [{"id": "100", "total": 150.0}]}]}',
errors=None
)
mock_nlp_client = AsyncMock()
mock_nlp_client.request.return_value = nlp_response
mock_objects_client = AsyncMock()
mock_objects_client.request.return_value = objects_response
processor.client.side_effect = lambda name: (
mock_nlp_client if name == "nlp-query-request" else mock_objects_client
)
# Act
await processor.on_message(msg, consumer, flow)
# Assert
# Verify variables were passed correctly (converted to strings)
objects_call_args = mock_objects_client.request.call_args[0][0]
assert objects_call_args.variables["minTotal"] == "100.0" # Should be converted to string
assert objects_call_args.variables["startDate"] == "2024-01-01"
# Verify response
response_call = flow_response.send.call_args
response = response_call[0][0]
assert response.error is None
assert "Alice" in response.data
async def test_null_data_handling(self, processor):
"""Test handling of null/empty data responses"""
# Arrange
request = StructuredQueryRequest(
question="Show nonexistent data"
)
msg = MagicMock()
msg.value.return_value = request
msg.properties.return_value = {"id": "test-null"}
consumer = MagicMock()
flow = MagicMock()
flow_response = AsyncMock()
flow.return_value = flow_response
# Mock responses
nlp_response = QuestionToStructuredQueryResponse(
error=None,
graphql_query='query { customers { id } }',
variables={},
detected_schemas=["customers"],
confidence=0.9
)
objects_response = ObjectsQueryResponse(
error=None,
data=None, # Null data
errors=None,
extensions={}
)
mock_nlp_client = AsyncMock()
mock_nlp_client.request.return_value = nlp_response
mock_objects_client = AsyncMock()
mock_objects_client.request.return_value = objects_response
processor.client.side_effect = lambda name: (
mock_nlp_client if name == "nlp-query-request" else mock_objects_client
)
# Act
await processor.on_message(msg, consumer, flow)
# Assert
response_call = flow_response.send.call_args
response = response_call[0][0]
assert response.error is None
assert response.data == "null" # Should convert None to "null" string
async def test_exception_handling(self, processor):
"""Test general exception handling"""
# Arrange
request = StructuredQueryRequest(
question="Test exception"
)
msg = MagicMock()
msg.value.return_value = request
msg.properties.return_value = {"id": "test-exception"}
consumer = MagicMock()
flow = MagicMock()
flow_response = AsyncMock()
flow.return_value = flow_response
# Mock client to raise exception
mock_client = AsyncMock()
mock_client.request.side_effect = Exception("Network timeout")
processor.client.return_value = mock_client
# Act
await processor.on_message(msg, consumer, flow)
# Assert
flow_response.send.assert_called_once()
response_call = flow_response.send.call_args
response = response_call[0][0]
assert response.error is not None
assert response.error.type == "structured-query-error"
assert "Network timeout" in response.error.message
assert response.data == "null"
assert len(response.errors) == 0
def test_processor_initialization(self, mock_pulsar_client):
"""Test processor initialization with correct specifications"""
# Act
processor = Processor(
taskgroup=MagicMock(),
pulsar_client=mock_pulsar_client
)
# Assert - Test default ID
assert processor.id == "structured-query"
# Verify specifications were registered (we can't directly access them,
# but we know they were registered if initialization succeeded)
assert processor is not None
def test_add_args(self):
"""Test command-line argument parsing"""
import argparse
parser = argparse.ArgumentParser()
Processor.add_args(parser)
# Test that it doesn't crash (no additional args)
args = parser.parse_args([])
# No specific assertions since no custom args are added
assert args is not None
@pytest.mark.unit
class TestStructuredQueryHelperFunctions:
"""Test helper functions and data transformations"""
def test_service_logging_integration(self):
"""Test that logging is properly configured"""
# Import the logger
from trustgraph.retrieval.structured_query.service import logger
assert logger.name == "trustgraph.retrieval.structured_query.service"
def test_default_values(self):
"""Test default configuration values"""
from trustgraph.retrieval.structured_query.service import default_ident
assert default_ident == "structured-query"

View file

@ -426,3 +426,62 @@ class FlowInstance:
return result
def nlp_query(self, question, max_results=100):
"""
Convert a natural language question to a GraphQL query.
Args:
question: Natural language question
max_results: Maximum number of results to return (default: 100)
Returns:
dict with graphql_query, variables, detected_schemas, confidence
"""
input = {
"question": question,
"max_results": max_results
}
response = self.request(
"service/nlp-query",
input
)
# Check for system-level error
if "error" in response and response["error"]:
error_type = response["error"].get("type", "unknown")
error_message = response["error"].get("message", "Unknown error")
raise ProtocolException(f"{error_type}: {error_message}")
return response
def structured_query(self, question):
"""
Execute a natural language question against structured data.
Combines NLP query conversion and GraphQL execution.
Args:
question: Natural language question
Returns:
dict with data and optional errors
"""
input = {
"question": question
}
response = self.request(
"service/structured-query",
input
)
# Check for system-level error
if "error" in response and response["error"]:
error_type = response["error"].get("type", "unknown")
error_message = response["error"].get("message", "Unknown error")
raise ProtocolException(f"{error_type}: {error_message}")
return response

View file

@ -22,6 +22,8 @@ from .translators.embeddings_query import (
GraphEmbeddingsRequestTranslator, GraphEmbeddingsResponseTranslator
)
from .translators.objects_query import ObjectsQueryRequestTranslator, ObjectsQueryResponseTranslator
from .translators.nlp_query import QuestionToStructuredQueryRequestTranslator, QuestionToStructuredQueryResponseTranslator
from .translators.structured_query import StructuredQueryRequestTranslator, StructuredQueryResponseTranslator
# Register all service translators
TranslatorRegistry.register_service(
@ -114,6 +116,18 @@ TranslatorRegistry.register_service(
ObjectsQueryResponseTranslator()
)
TranslatorRegistry.register_service(
"nlp-query",
QuestionToStructuredQueryRequestTranslator(),
QuestionToStructuredQueryResponseTranslator()
)
TranslatorRegistry.register_service(
"structured-query",
StructuredQueryRequestTranslator(),
StructuredQueryResponseTranslator()
)
# Register single-direction translators for document loading
TranslatorRegistry.register_request("document", DocumentTranslator())
TranslatorRegistry.register_request("text-document", TextDocumentTranslator())

View file

@ -0,0 +1,47 @@
from typing import Dict, Any, Tuple
from ...schema import QuestionToStructuredQueryRequest, QuestionToStructuredQueryResponse
from .base import MessageTranslator
class QuestionToStructuredQueryRequestTranslator(MessageTranslator):
"""Translator for QuestionToStructuredQueryRequest schema objects"""
def to_pulsar(self, data: Dict[str, Any]) -> QuestionToStructuredQueryRequest:
return QuestionToStructuredQueryRequest(
question=data.get("question", ""),
max_results=data.get("max_results", 100)
)
def from_pulsar(self, obj: QuestionToStructuredQueryRequest) -> Dict[str, Any]:
return {
"question": obj.question,
"max_results": obj.max_results
}
class QuestionToStructuredQueryResponseTranslator(MessageTranslator):
"""Translator for QuestionToStructuredQueryResponse schema objects"""
def to_pulsar(self, data: Dict[str, Any]) -> QuestionToStructuredQueryResponse:
raise NotImplementedError("Response translation to Pulsar not typically needed")
def from_pulsar(self, obj: QuestionToStructuredQueryResponse) -> Dict[str, Any]:
result = {
"graphql_query": obj.graphql_query,
"variables": dict(obj.variables) if obj.variables else {},
"detected_schemas": list(obj.detected_schemas) if obj.detected_schemas else [],
"confidence": obj.confidence
}
# Handle system-level error
if obj.error:
result["error"] = {
"type": obj.error.type,
"message": obj.error.message
}
return result
def from_response_with_completion(self, obj: QuestionToStructuredQueryResponse) -> Tuple[Dict[str, Any], bool]:
"""Returns (response_dict, is_final)"""
return self.from_pulsar(obj), True

View file

@ -0,0 +1,56 @@
from typing import Dict, Any, Tuple
from ...schema import StructuredQueryRequest, StructuredQueryResponse
from .base import MessageTranslator
import json
class StructuredQueryRequestTranslator(MessageTranslator):
"""Translator for StructuredQueryRequest schema objects"""
def to_pulsar(self, data: Dict[str, Any]) -> StructuredQueryRequest:
return StructuredQueryRequest(
question=data.get("question", "")
)
def from_pulsar(self, obj: StructuredQueryRequest) -> Dict[str, Any]:
return {
"question": obj.question
}
class StructuredQueryResponseTranslator(MessageTranslator):
"""Translator for StructuredQueryResponse schema objects"""
def to_pulsar(self, data: Dict[str, Any]) -> StructuredQueryResponse:
raise NotImplementedError("Response translation to Pulsar not typically needed")
def from_pulsar(self, obj: StructuredQueryResponse) -> Dict[str, Any]:
result = {}
# Handle structured query response data
if obj.data:
try:
result["data"] = json.loads(obj.data)
except json.JSONDecodeError:
result["data"] = obj.data
else:
result["data"] = None
# Handle errors (array of strings)
if obj.errors:
result["errors"] = list(obj.errors)
else:
result["errors"] = []
# Handle system-level error
if obj.error:
result["error"] = {
"type": obj.error.type,
"message": obj.error.message
}
return result
def from_response_with_completion(self, obj: StructuredQueryResponse) -> Tuple[Dict[str, Any], bool]:
"""Returns (response_dict, is_final)"""
return self.from_pulsar(obj), True

View file

@ -8,13 +8,11 @@ from ..core.topic import topic
# Structured Query Service - executes GraphQL queries
class StructuredQueryRequest(Record):
query = String() # GraphQL query
variables = Map(String()) # GraphQL variables
operation_name = String() # Optional operation name for multi-operation documents
question = String()
class StructuredQueryResponse(Record):
error = Error()
data = String() # JSON-encoded GraphQL response data
errors = Array(String()) # GraphQL errors if any
############################################################################
############################################################################

View file

@ -43,8 +43,10 @@ tg-invoke-document-rag = "trustgraph.cli.invoke_document_rag:main"
tg-invoke-graph-rag = "trustgraph.cli.invoke_graph_rag:main"
tg-invoke-llm = "trustgraph.cli.invoke_llm:main"
tg-invoke-mcp-tool = "trustgraph.cli.invoke_mcp_tool:main"
tg-invoke-nlp-query = "trustgraph.cli.invoke_nlp_query:main"
tg-invoke-objects-query = "trustgraph.cli.invoke_objects_query:main"
tg-invoke-prompt = "trustgraph.cli.invoke_prompt:main"
tg-invoke-structured-query = "trustgraph.cli.invoke_structured_query:main"
tg-load-doc-embeds = "trustgraph.cli.load_doc_embeds:main"
tg-load-kg-core = "trustgraph.cli.load_kg_core:main"
tg-load-pdf = "trustgraph.cli.load_pdf:main"

View file

@ -0,0 +1,111 @@
"""
Uses the NLP Query service to convert natural language questions to GraphQL queries
"""
import argparse
import os
import json
import sys
from trustgraph.api import Api
default_url = os.getenv("TRUSTGRAPH_URL", 'http://localhost:8088/')
def nlp_query(url, flow_id, question, max_results, output_format='json'):
api = Api(url).flow().id(flow_id)
resp = api.nlp_query(
question=question,
max_results=max_results
)
# Check for errors
if "error" in resp and resp["error"]:
print("Error:", resp["error"].get("message", "Unknown error"), file=sys.stderr)
sys.exit(1)
# Format output based on requested format
if output_format == 'json':
print(json.dumps(resp, indent=2))
elif output_format == 'graphql':
# Just print the GraphQL query
if "graphql_query" in resp:
print(resp["graphql_query"])
else:
print("No GraphQL query generated", file=sys.stderr)
sys.exit(1)
elif output_format == 'summary':
# Print a human-readable summary
if "graphql_query" in resp:
print(f"Generated GraphQL Query:")
print("-" * 40)
print(resp["graphql_query"])
print("-" * 40)
if "detected_schemas" in resp and resp["detected_schemas"]:
print(f"Detected Schemas: {', '.join(resp['detected_schemas'])}")
if "confidence" in resp:
print(f"Confidence: {resp['confidence']:.2%}")
if "variables" in resp and resp["variables"]:
print(f"Variables: {json.dumps(resp['variables'], indent=2)}")
else:
print("No GraphQL query generated", file=sys.stderr)
sys.exit(1)
def main():
parser = argparse.ArgumentParser(
prog='tg-invoke-nlp-query',
description=__doc__,
)
parser.add_argument(
'-u', '--url',
default=default_url,
help=f'API URL (default: {default_url})',
)
parser.add_argument(
'-f', '--flow-id',
default="default",
help=f'Flow ID (default: default)'
)
parser.add_argument(
'-q', '--question',
required=True,
help='Natural language question to convert to GraphQL',
)
parser.add_argument(
'-m', '--max-results',
type=int,
default=100,
help='Maximum number of results (default: 100)'
)
parser.add_argument(
'--format',
choices=['json', 'graphql', 'summary'],
default='summary',
help='Output format (default: summary)'
)
args = parser.parse_args()
try:
nlp_query(
url=args.url,
flow_id=args.flow_id,
question=args.question,
max_results=args.max_results,
output_format=args.format,
)
except Exception as e:
print("Exception:", e, flush=True, file=sys.stderr)
sys.exit(1)
if __name__ == "__main__":
main()

View file

@ -0,0 +1,159 @@
"""
Uses the Structured Query service to execute natural language questions against structured data
"""
import argparse
import os
import json
import sys
import csv
import io
from trustgraph.api import Api
from tabulate import tabulate
default_url = os.getenv("TRUSTGRAPH_URL", 'http://localhost:8088/')
def format_output(data, output_format):
"""Format structured query response data in the specified format"""
if not data:
return "No data returned"
# Handle case where data contains multiple query results
if isinstance(data, dict) and len(data) == 1:
# Single query result - extract the list
query_name, result_list = next(iter(data.items()))
if isinstance(result_list, list):
return format_table_data(result_list, query_name, output_format)
# Multiple queries or non-list data - use JSON format
if output_format == 'json':
return json.dumps(data, indent=2)
else:
return json.dumps(data, indent=2) # Fallback to JSON
def format_table_data(rows, table_name, output_format):
"""Format a list of rows in the specified format"""
if not rows:
return f"No {table_name} found"
if output_format == 'json':
return json.dumps({table_name: rows}, indent=2)
elif output_format == 'csv':
# Get field names in order from first row, then add any missing ones
fieldnames = list(rows[0].keys()) if rows else []
# Add any additional fields from other rows that might be missing
all_fields = set(fieldnames)
for row in rows:
for field in row.keys():
if field not in all_fields:
fieldnames.append(field)
all_fields.add(field)
# Create CSV string
output = io.StringIO()
writer = csv.DictWriter(output, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(rows)
return output.getvalue().rstrip()
elif output_format == 'table':
# Get field names in order from first row, then add any missing ones
fieldnames = list(rows[0].keys()) if rows else []
# Add any additional fields from other rows that might be missing
all_fields = set(fieldnames)
for row in rows:
for field in row.keys():
if field not in all_fields:
fieldnames.append(field)
all_fields.add(field)
# Create table data
table_data = []
for row in rows:
table_row = [row.get(field, '') for field in fieldnames]
table_data.append(table_row)
return tabulate(table_data, headers=fieldnames, tablefmt='pretty')
else:
return json.dumps({table_name: rows}, indent=2)
def structured_query(url, flow_id, question, output_format='table'):
api = Api(url).flow().id(flow_id)
resp = api.structured_query(question=question)
# Check for errors
if "error" in resp and resp["error"]:
print("Error:", resp["error"].get("message", "Unknown error"), file=sys.stderr)
sys.exit(1)
# Check for query errors
if "errors" in resp and resp["errors"]:
print("Query Errors:", file=sys.stderr)
for error in resp["errors"]:
print(f" - {error}", file=sys.stderr)
# Still print data if available
if "data" in resp and resp["data"]:
print(format_output(resp["data"], output_format))
sys.exit(1)
# Print the data
if "data" in resp:
print(format_output(resp["data"], output_format))
else:
print("No data returned", file=sys.stderr)
sys.exit(1)
def main():
parser = argparse.ArgumentParser(
prog='tg-invoke-structured-query',
description=__doc__,
)
parser.add_argument(
'-u', '--url',
default=default_url,
help=f'API URL (default: {default_url})',
)
parser.add_argument(
'-f', '--flow-id',
default="default",
help=f'Flow ID (default: default)'
)
parser.add_argument(
'-q', '--question',
required=True,
help='Natural language question to execute',
)
parser.add_argument(
'--format',
choices=['table', 'json', 'csv'],
default='table',
help='Output format (default: table)'
)
args = parser.parse_args()
try:
structured_query(
url=args.url,
flow_id=args.flow_id,
question=args.question,
output_format=args.format,
)
except Exception as e:
print("Exception:", e, flush=True, file=sys.stderr)
sys.exit(1)
if __name__ == "__main__":
main()

View file

@ -96,6 +96,7 @@ pdf-ocr-mistral = "trustgraph.decoding.mistral_ocr:run"
prompt-template = "trustgraph.prompt.template:run"
rev-gateway = "trustgraph.rev_gateway:run"
run-processing = "trustgraph.processing:run"
structured-query = "trustgraph.retrieval.structured_query:run"
text-completion-azure = "trustgraph.model.text_completion.azure:run"
text-completion-azure-openai = "trustgraph.model.text_completion.azure_openai:run"
text-completion-claude = "trustgraph.model.text_completion.claude:run"

View file

@ -20,6 +20,8 @@ from . graph_rag import GraphRagRequestor
from . document_rag import DocumentRagRequestor
from . triples_query import TriplesQueryRequestor
from . objects_query import ObjectsQueryRequestor
from . nlp_query import NLPQueryRequestor
from . structured_query import StructuredQueryRequestor
from . embeddings import EmbeddingsRequestor
from . graph_embeddings_query import GraphEmbeddingsQueryRequestor
from . mcp_tool import McpToolRequestor
@ -52,6 +54,8 @@ request_response_dispatchers = {
"graph-embeddings": GraphEmbeddingsQueryRequestor,
"triples": TriplesQueryRequestor,
"objects": ObjectsQueryRequestor,
"nlp-query": NLPQueryRequestor,
"structured-query": StructuredQueryRequestor,
}
global_dispatchers = {

View file

@ -0,0 +1,30 @@
from ... schema import QuestionToStructuredQueryRequest, QuestionToStructuredQueryResponse
from ... messaging import TranslatorRegistry
from . requestor import ServiceRequestor
class NLPQueryRequestor(ServiceRequestor):
def __init__(
self, pulsar_client, request_queue, response_queue, timeout,
consumer, subscriber,
):
super(NLPQueryRequestor, self).__init__(
pulsar_client=pulsar_client,
request_queue=request_queue,
response_queue=response_queue,
request_schema=QuestionToStructuredQueryRequest,
response_schema=QuestionToStructuredQueryResponse,
subscription = subscriber,
consumer_name = consumer,
timeout=timeout,
)
self.request_translator = TranslatorRegistry.get_request_translator("nlp-query")
self.response_translator = TranslatorRegistry.get_response_translator("nlp-query")
def to_request(self, body):
return self.request_translator.to_pulsar(body)
def from_response(self, message):
return self.response_translator.from_response_with_completion(message)

View file

@ -0,0 +1,30 @@
from ... schema import StructuredQueryRequest, StructuredQueryResponse
from ... messaging import TranslatorRegistry
from . requestor import ServiceRequestor
class StructuredQueryRequestor(ServiceRequestor):
def __init__(
self, pulsar_client, request_queue, response_queue, timeout,
consumer, subscriber,
):
super(StructuredQueryRequestor, self).__init__(
pulsar_client=pulsar_client,
request_queue=request_queue,
response_queue=response_queue,
request_schema=StructuredQueryRequest,
response_schema=StructuredQueryResponse,
subscription = subscriber,
consumer_name = consumer,
timeout=timeout,
)
self.request_translator = TranslatorRegistry.get_request_translator("structured-query")
self.response_translator = TranslatorRegistry.get_response_translator("structured-query")
def to_request(self, body):
return self.request_translator.to_pulsar(body)
def from_response(self, message):
return self.response_translator.from_response_with_completion(message)

View file

@ -8,10 +8,10 @@ import logging
from typing import Dict, Any, Optional, List
from ...schema import QuestionToStructuredQueryRequest, QuestionToStructuredQueryResponse
from ...schema import PromptRequest, PromptResponse
from ...schema import PromptRequest
from ...schema import Error, RowSchema, Field as SchemaField
from ...base import FlowProcessor, ConsumerSpec, ProducerSpec, ClientSpec
from ...base import FlowProcessor, ConsumerSpec, ProducerSpec, PromptClientSpec
# Module logger
logger = logging.getLogger(__name__)
@ -57,11 +57,9 @@ class Processor(FlowProcessor):
# Client spec for calling prompt service
self.register_specification(
ClientSpec(
PromptClientSpec(
request_name = "prompt-request",
response_name = "prompt-response",
request_schema = PromptRequest,
response_schema = PromptResponse
)
)

View file

@ -0,0 +1 @@
from . service import *

View file

@ -0,0 +1,5 @@
#!/usr/bin/env python3
from . service import run
run()

View file

@ -0,0 +1,176 @@
"""
Structured Query Service - orchestrates natural language question processing.
Takes a question, converts it to GraphQL via nlp-query, executes via objects-query,
and returns the results.
"""
import json
import logging
from typing import Dict, Any, Optional
from ...schema import StructuredQueryRequest, StructuredQueryResponse
from ...schema import QuestionToStructuredQueryRequest, QuestionToStructuredQueryResponse
from ...schema import ObjectsQueryRequest, ObjectsQueryResponse
from ...schema import Error
from ...base import FlowProcessor, ConsumerSpec, ProducerSpec, RequestResponseSpec
# Module logger
logger = logging.getLogger(__name__)
default_ident = "structured-query"
class Processor(FlowProcessor):
def __init__(self, **params):
id = params.get("id", default_ident)
super(Processor, self).__init__(
**params | {
"id": id,
}
)
self.register_specification(
ConsumerSpec(
name = "request",
schema = StructuredQueryRequest,
handler = self.on_message
)
)
self.register_specification(
ProducerSpec(
name = "response",
schema = StructuredQueryResponse,
)
)
# Client spec for calling NLP query service
self.register_specification(
RequestResponseSpec(
request_name = "nlp-query-request",
response_name = "nlp-query-response",
request_schema = QuestionToStructuredQueryRequest,
response_schema = QuestionToStructuredQueryResponse
)
)
# Client spec for calling objects query service
self.register_specification(
RequestResponseSpec(
request_name = "objects-query-request",
response_name = "objects-query-response",
request_schema = ObjectsQueryRequest,
response_schema = ObjectsQueryResponse
)
)
logger.info("Structured Query service initialized")
async def on_message(self, msg, consumer, flow):
"""Handle incoming structured query request"""
try:
request = msg.value()
# Sender-produced ID
id = msg.properties()["id"]
logger.info(f"Handling structured query request {id}: {request.question[:100]}...")
# Step 1: Convert question to GraphQL using NLP query service
logger.info("Step 1: Converting question to GraphQL")
nlp_request = QuestionToStructuredQueryRequest(
question=request.question,
max_results=100 # Default limit
)
nlp_response = await self.client("nlp-query-request").request(nlp_request)
if nlp_response.error is not None:
raise Exception(f"NLP query service error: {nlp_response.error.message}")
if not nlp_response.graphql_query:
raise Exception("NLP query service returned empty GraphQL query")
logger.info(f"Generated GraphQL query: {nlp_response.graphql_query[:200]}...")
logger.info(f"Detected schemas: {nlp_response.detected_schemas}")
logger.info(f"Confidence: {nlp_response.confidence}")
# Step 2: Execute GraphQL query using objects query service
logger.info("Step 2: Executing GraphQL query")
# Convert variables to strings (GraphQL variables can be various types, but Pulsar schema expects strings)
variables_as_strings = {}
if nlp_response.variables:
for key, value in nlp_response.variables.items():
if isinstance(value, str):
variables_as_strings[key] = value
else:
variables_as_strings[key] = str(value)
# For now, we'll use default user/collection values
# In a real implementation, these would come from authentication/context
objects_request = ObjectsQueryRequest(
user="default", # TODO: Get from authentication context
collection="default", # TODO: Get from request context
query=nlp_response.graphql_query,
variables=variables_as_strings,
operation_name=None
)
objects_response = await self.client("objects-query-request").request(objects_request)
if objects_response.error is not None:
raise Exception(f"Objects query service error: {objects_response.error.message}")
# Handle GraphQL errors from the objects query service
graphql_errors = []
if objects_response.errors:
for gql_error in objects_response.errors:
graphql_errors.append(f"{gql_error.message} (path: {gql_error.path})")
logger.info("Step 3: Returning results")
# Create response
response = StructuredQueryResponse(
error=None,
data=objects_response.data or "null", # JSON string
errors=graphql_errors
)
logger.info("Sending structured query response...")
await flow("response").send(response, properties={"id": id})
logger.info("Structured query request completed")
except Exception as e:
logger.error(f"Exception in structured query service: {e}", exc_info=True)
logger.info("Sending error response...")
response = StructuredQueryResponse(
error = Error(
type = "structured-query-error",
message = str(e),
),
data = "null",
errors = []
)
await flow("response").send(response, properties={"id": id})
@staticmethod
def add_args(parser):
"""Add command-line arguments"""
FlowProcessor.add_args(parser)
# No additional arguments needed for this orchestrator service
def run():
"""Entry point for structured-query command"""
Processor.launch(default_ident, __doc__)