diff --git a/docs/tech-specs/flow-class-definition.md b/docs/tech-specs/flow-class-definition.md new file mode 100644 index 00000000..5469144e --- /dev/null +++ b/docs/tech-specs/flow-class-definition.md @@ -0,0 +1,156 @@ +# Flow Class Definition Specification + +## Overview + +A flow class defines a complete dataflow pattern template in the TrustGraph system. When instantiated, it creates an interconnected network of processors that handle data ingestion, processing, storage, and querying as a unified system. + +## Structure + +A flow class definition consists of four main sections: + +### 1. Class Section +Defines shared service processors that are instantiated once per flow class. These processors handle requests from all flow instances of this class. + +```json +"class": { + "service-name:{class}": { + "request": "queue-pattern:{class}", + "response": "queue-pattern:{class}" + } +} +``` + +**Characteristics:** +- Shared across all flow instances of the same class +- Typically expensive or stateless services (LLMs, embedding models) +- Use `{class}` template variable for queue naming +- Examples: `embeddings:{class}`, `text-completion:{class}`, `graph-rag:{class}` + +### 2. Flow Section +Defines flow-specific processors that are instantiated for each individual flow instance. Each flow gets its own isolated set of these processors. + +```json +"flow": { + "processor-name:{id}": { + "input": "queue-pattern:{id}", + "output": "queue-pattern:{id}" + } +} +``` + +**Characteristics:** +- Unique instance per flow +- Handle flow-specific data and state +- Use `{id}` template variable for queue naming +- Examples: `chunker:{id}`, `pdf-decoder:{id}`, `kg-extract-relationships:{id}` + +### 3. Interfaces Section +Defines the entry points and interaction contracts for the flow. These form the API surface for external systems and internal component communication. + +Interfaces can take two forms: + +**Fire-and-Forget Pattern** (single queue): +```json +"interfaces": { + "document-load": "persistent://tg/flow/document-load:{id}", + "triples-store": "persistent://tg/flow/triples-store:{id}" +} +``` + +**Request/Response Pattern** (object with request/response fields): +```json +"interfaces": { + "embeddings": { + "request": "non-persistent://tg/request/embeddings:{class}", + "response": "non-persistent://tg/response/embeddings:{class}" + } +} +``` + +**Types of Interfaces:** +- **Entry Points**: Where external systems inject data (`document-load`, `agent`) +- **Service Interfaces**: Request/response patterns for services (`embeddings`, `text-completion`) +- **Data Interfaces**: Fire-and-forget data flow connection points (`triples-store`, `entity-contexts-load`) + +### 4. Metadata +Additional information about the flow class: + +```json +"description": "Human-readable description", +"tags": ["capability-1", "capability-2"] +``` + +## Template Variables + +### {id} +- Replaced with the unique flow instance identifier +- Creates isolated resources for each flow +- Example: `flow-123`, `customer-A-flow` + +### {class} +- Replaced with the flow class name +- Creates shared resources across flows of the same class +- Example: `standard-rag`, `enterprise-rag` + +## Queue Patterns (Pulsar) + +Flow classes use Apache Pulsar for messaging. Queue names follow the Pulsar format: +``` +://// +``` + +### Components: +- **persistence**: `persistent` or `non-persistent` (Pulsar persistence mode) +- **tenant**: `tg` for TrustGraph-supplied flow class definitions +- **namespace**: Indicates the messaging pattern + - `flow`: Fire-and-forget services + - `request`: Request portion of request/response services + - `response`: Response portion of request/response services +- **topic**: The specific queue/topic name with template variables + +### Persistent Queues +- Pattern: `persistent://tg/flow/:{id}` +- Used for fire-and-forget services and durable data flow +- Data persists in Pulsar storage across restarts +- Example: `persistent://tg/flow/chunk-load:{id}` + +### Non-Persistent Queues +- Pattern: `non-persistent://tg/request/:{class}` or `non-persistent://tg/response/:{class}` +- Used for request/response messaging patterns +- Ephemeral, not persisted to disk by Pulsar +- Lower latency, suitable for RPC-style communication +- Example: `non-persistent://tg/request/embeddings:{class}` + +## Dataflow Architecture + +The flow class creates a unified dataflow where: + +1. **Document Processing Pipeline**: Flows from ingestion through transformation to storage +2. **Query Services**: Integrated processors that query the same data stores and services +3. **Shared Services**: Centralized processors that all flows can utilize +4. **Storage Writers**: Persist processed data to appropriate stores + +All processors (both `{id}` and `{class}`) work together as a cohesive dataflow graph, not as separate systems. + +## Example Flow Instantiation + +Given: +- Flow Instance ID: `customer-A-flow` +- Flow Class: `standard-rag` + +Template expansions: +- `persistent://tg/flow/chunk-load:{id}` → `persistent://tg/flow/chunk-load:customer-A-flow` +- `non-persistent://tg/request/embeddings:{class}` → `non-persistent://tg/request/embeddings:standard-rag` + +This creates: +- Isolated document processing pipeline for `customer-A-flow` +- Shared embedding service for all `standard-rag` flows +- Complete dataflow from document ingestion through querying + +## Benefits + +1. **Resource Efficiency**: Expensive services are shared across flows +2. **Flow Isolation**: Each flow has its own data processing pipeline +3. **Scalability**: Can instantiate multiple flows from the same template +4. **Modularity**: Clear separation between shared and flow-specific components +5. **Unified Architecture**: Query and processing are part of the same dataflow \ No newline at end of file diff --git a/docs/tech-specs/structured-diag-service.md b/docs/tech-specs/structured-diag-service.md new file mode 100644 index 00000000..1eab9df2 --- /dev/null +++ b/docs/tech-specs/structured-diag-service.md @@ -0,0 +1,273 @@ +# Structured Data Diagnostic Service Technical Specification + +## Overview + +This specification describes a new invokable service for diagnosing and analyzing structured data within TrustGraph. The service extracts functionality from the existing `tg-load-structured-data` command-line tool and exposes it as a request/response service, enabling programmatic access to data type detection and descriptor generation capabilities. + +The service supports three primary operations: + +1. **Data Type Detection**: Analyze a data sample to determine its format (CSV, JSON, or XML) +2. **Descriptor Generation**: Generate a TrustGraph structured data descriptor for a given data sample and type +3. **Combined Diagnosis**: Perform both type detection and descriptor generation in sequence + +## Goals + +- **Modularize Data Analysis**: Extract data diagnosis logic from CLI into reusable service components +- **Enable Programmatic Access**: Provide API-based access to data analysis capabilities +- **Support Multiple Data Formats**: Handle CSV, JSON, and XML data formats consistently +- **Generate Accurate Descriptors**: Produce structured data descriptors that accurately map source data to TrustGraph schemas +- **Maintain Backward Compatibility**: Ensure existing CLI functionality continues to work +- **Enable Service Composition**: Allow other services to leverage data diagnosis capabilities +- **Improve Testability**: Separate business logic from CLI interface for better testing +- **Support Streaming Analysis**: Enable analysis of data samples without loading entire files + +## Background + +Currently, the `tg-load-structured-data` command provides comprehensive functionality for analyzing structured data and generating descriptors. However, this functionality is tightly coupled to the CLI interface, limiting its reusability. + +Current limitations include: +- Data diagnosis logic embedded in CLI code +- No programmatic access to type detection and descriptor generation +- Difficult to integrate diagnosis capabilities into other services +- Limited ability to compose data analysis workflows + +This specification addresses these gaps by creating a dedicated service for structured data diagnosis. By exposing these capabilities as a service, TrustGraph can: +- Enable other services to analyze data programmatically +- Support more complex data processing pipelines +- Facilitate integration with external systems +- Improve maintainability through separation of concerns + +## Technical Design + +### Architecture + +The structured data diagnostic service requires the following technical components: + +1. **Diagnostic Service Processor** + - Handles incoming diagnosis requests + - Orchestrates type detection and descriptor generation + - Returns structured responses with diagnosis results + + Module: `trustgraph-flow/trustgraph/diagnosis/structured_data/service.py` + +2. **Data Type Detector** + - Uses algorithmic detection to identify data format (CSV, JSON, XML) + - Analyzes data structure, delimiters, and syntax patterns + - Returns detected format and confidence scores + + Module: `trustgraph-flow/trustgraph/diagnosis/structured_data/type_detector.py` + +3. **Descriptor Generator** + - Uses prompt service to generate descriptors + - Invokes format-specific prompts (diagnose-csv, diagnose-json, diagnose-xml) + - Maps data fields to TrustGraph schema fields through prompt responses + + Module: `trustgraph-flow/trustgraph/diagnosis/structured_data/descriptor_generator.py` + +### Data Models + +#### StructuredDataDiagnosisRequest + +Request message for structured data diagnosis operations: + +```python +class StructuredDataDiagnosisRequest: + operation: str # "detect-type", "generate-descriptor", or "diagnose" + sample: str # Data sample to analyze (text content) + type: Optional[str] # Data type (csv, json, xml) - required for generate-descriptor + schema_name: Optional[str] # Target schema name for descriptor generation + options: Dict[str, Any] # Additional options (e.g., delimiter for CSV) +``` + +#### StructuredDataDiagnosisResponse + +Response message containing diagnosis results: + +```python +class StructuredDataDiagnosisResponse: + operation: str # The operation that was performed + detected_type: Optional[str] # Detected data type (for detect-type/diagnose) + confidence: Optional[float] # Confidence score for type detection + descriptor: Optional[Dict] # Generated descriptor (for generate-descriptor/diagnose) + error: Optional[str] # Error message if operation failed + metadata: Dict[str, Any] # Additional metadata (e.g., field count, sample records) +``` + +#### Descriptor Structure + +The generated descriptor follows the existing structured data descriptor format: + +```json +{ + "format": { + "type": "csv", + "encoding": "utf-8", + "options": { + "delimiter": ",", + "has_header": true + } + }, + "mappings": [ + { + "source_field": "customer_id", + "target_field": "id", + "transforms": [ + {"type": "trim"} + ] + } + ], + "output": { + "schema_name": "customer", + "options": { + "batch_size": 1000, + "confidence": 0.9 + } + } +} +``` + +### Service Interface + +The service will expose the following operations through the request/response pattern: + +1. **Type Detection Operation** + - Input: Data sample + - Processing: Analyze data structure using algorithmic detection + - Output: Detected type with confidence score + +2. **Descriptor Generation Operation** + - Input: Data sample, type, target schema name + - Processing: + - Call prompt service with format-specific prompt ID (diagnose-csv, diagnose-json, or diagnose-xml) + - Pass data sample and available schemas to prompt + - Receive generated descriptor from prompt response + - Output: Structured data descriptor + +3. **Combined Diagnosis Operation** + - Input: Data sample, optional schema name + - Processing: + - Use algorithmic detection to identify format first + - Select appropriate format-specific prompt based on detected type + - Call prompt service to generate descriptor + - Output: Both detected type and descriptor + +### Implementation Details + +The service will follow TrustGraph service conventions: + +1. **Service Registration** + - Register as `structured-diag` service type + - Use standard request/response topics + - Implement FlowProcessor base class + - Register PromptClientSpec for prompt service interaction + +2. **Configuration Management** + - Access schema configurations via config service + - Cache schemas for performance + - Handle configuration updates dynamically + +3. **Prompt Integration** + - Use existing prompt service infrastructure + - Call prompt service with format-specific prompt IDs: + - `diagnose-csv`: For CSV data analysis + - `diagnose-json`: For JSON data analysis + - `diagnose-xml`: For XML data analysis + - Prompts are configured in prompt config, not hard-coded in service + - Pass schemas and data samples as prompt variables + - Parse prompt responses to extract descriptors + +4. **Error Handling** + - Validate input data samples + - Provide descriptive error messages + - Handle malformed data gracefully + - Handle prompt service failures + +5. **Data Sampling** + - Process configurable sample sizes + - Handle incomplete records appropriately + - Maintain sampling consistency + +### API Integration + +The service will integrate with existing TrustGraph APIs: + +Modified Components: +- `tg-load-structured-data` CLI - Refactored to use the new service for diagnosis operations +- Flow API - Extended to support structured data diagnosis requests + +New Service Endpoints: +- `/api/v1/flow/{flow}/diagnose/structured-data` - WebSocket endpoint for diagnosis requests +- `/api/v1/diagnose/structured-data` - REST endpoint for synchronous diagnosis + +### Message Flow + +``` +Client → Gateway → Structured Diag Service → Config Service (for schemas) + ↓ + Type Detector (algorithmic) + ↓ + Prompt Service (diagnose-csv/json/xml) + ↓ + Descriptor Generator (parses prompt response) + ↓ +Client ← Gateway ← Structured Diag Service (response) +``` + +## Security Considerations + +- Input validation to prevent injection attacks +- Size limits on data samples to prevent DoS +- Sanitization of generated descriptors +- Access control through existing TrustGraph authentication + +## Performance Considerations + +- Cache schema definitions to reduce config service calls +- Limit sample sizes to maintain responsive performance +- Use streaming processing for large data samples +- Implement timeout mechanisms for long-running analyses + +## Testing Strategy + +1. **Unit Tests** + - Type detection for various data formats + - Descriptor generation accuracy + - Error handling scenarios + +2. **Integration Tests** + - Service request/response flow + - Schema retrieval and caching + - CLI integration + +3. **Performance Tests** + - Large sample processing + - Concurrent request handling + - Memory usage under load + +## Migration Plan + +1. **Phase 1**: Implement service with core functionality +2. **Phase 2**: Refactor CLI to use service (maintain backward compatibility) +3. **Phase 3**: Add REST API endpoints +4. **Phase 4**: Deprecate embedded CLI logic (with notice period) + +## Timeline + +- Week 1-2: Implement core service and type detection +- Week 3-4: Add descriptor generation and integration +- Week 5: Testing and documentation +- Week 6: CLI refactoring and migration + +## Open Questions + +- Should the service support additional data formats (e.g., Parquet, Avro)? +- What should be the maximum sample size for analysis? +- Should diagnosis results be cached for repeated requests? +- How should the service handle multi-schema scenarios? +- Should the prompt IDs be configurable parameters for the service? + +## References + +- [Structured Data Descriptor Specification](structured-data-descriptor.md) +- [Structured Data Loading Documentation](structured-data.md) +- `tg-load-structured-data` implementation: `trustgraph-cli/trustgraph/cli/load_structured_data.py` \ No newline at end of file diff --git a/trustgraph-base/trustgraph/messaging/__init__.py b/trustgraph-base/trustgraph/messaging/__init__.py index 6b1aedd2..0c805967 100644 --- a/trustgraph-base/trustgraph/messaging/__init__.py +++ b/trustgraph-base/trustgraph/messaging/__init__.py @@ -24,6 +24,7 @@ from .translators.embeddings_query import ( from .translators.objects_query import ObjectsQueryRequestTranslator, ObjectsQueryResponseTranslator from .translators.nlp_query import QuestionToStructuredQueryRequestTranslator, QuestionToStructuredQueryResponseTranslator from .translators.structured_query import StructuredQueryRequestTranslator, StructuredQueryResponseTranslator +from .translators.diagnosis import StructuredDataDiagnosisRequestTranslator, StructuredDataDiagnosisResponseTranslator # Register all service translators TranslatorRegistry.register_service( @@ -123,11 +124,17 @@ TranslatorRegistry.register_service( ) TranslatorRegistry.register_service( - "structured-query", - StructuredQueryRequestTranslator(), + "structured-query", + StructuredQueryRequestTranslator(), StructuredQueryResponseTranslator() ) +TranslatorRegistry.register_service( + "structured-diag", + StructuredDataDiagnosisRequestTranslator(), + StructuredDataDiagnosisResponseTranslator() +) + # Register single-direction translators for document loading TranslatorRegistry.register_request("document", DocumentTranslator()) TranslatorRegistry.register_request("text-document", TextDocumentTranslator()) diff --git a/trustgraph-base/trustgraph/messaging/translators/__init__.py b/trustgraph-base/trustgraph/messaging/translators/__init__.py index 1bed3020..9ce2730e 100644 --- a/trustgraph-base/trustgraph/messaging/translators/__init__.py +++ b/trustgraph-base/trustgraph/messaging/translators/__init__.py @@ -18,3 +18,4 @@ from .embeddings_query import ( GraphEmbeddingsRequestTranslator, GraphEmbeddingsResponseTranslator ) from .objects_query import ObjectsQueryRequestTranslator, ObjectsQueryResponseTranslator +from .diagnosis import StructuredDataDiagnosisRequestTranslator, StructuredDataDiagnosisResponseTranslator diff --git a/trustgraph-base/trustgraph/messaging/translators/diagnosis.py b/trustgraph-base/trustgraph/messaging/translators/diagnosis.py new file mode 100644 index 00000000..f7b0150d --- /dev/null +++ b/trustgraph-base/trustgraph/messaging/translators/diagnosis.py @@ -0,0 +1,65 @@ +from typing import Dict, Any, Tuple +import json +from ...schema import StructuredDataDiagnosisRequest, StructuredDataDiagnosisResponse +from .base import MessageTranslator + + +class StructuredDataDiagnosisRequestTranslator(MessageTranslator): + """Translator for StructuredDataDiagnosisRequest schema objects""" + + def to_pulsar(self, data: Dict[str, Any]) -> StructuredDataDiagnosisRequest: + return StructuredDataDiagnosisRequest( + operation=data["operation"], + sample=data["sample"], + type=data.get("type", ""), + schema_name=data.get("schema-name", ""), + options=data.get("options", {}) + ) + + def from_pulsar(self, obj: StructuredDataDiagnosisRequest) -> Dict[str, Any]: + result = { + "operation": obj.operation, + "sample": obj.sample, + } + + # Add optional fields if they exist + if obj.type: + result["type"] = obj.type + if obj.schema_name: + result["schema-name"] = obj.schema_name + if obj.options: + result["options"] = obj.options + + return result + + +class StructuredDataDiagnosisResponseTranslator(MessageTranslator): + """Translator for StructuredDataDiagnosisResponse schema objects""" + + def to_pulsar(self, data: Dict[str, Any]) -> StructuredDataDiagnosisResponse: + raise NotImplementedError("Response translation to Pulsar not typically needed") + + def from_pulsar(self, obj: StructuredDataDiagnosisResponse) -> Dict[str, Any]: + result = { + "operation": obj.operation + } + + # Add optional response fields if they exist + if obj.detected_type: + result["detected-type"] = obj.detected_type + if obj.confidence is not None: + result["confidence"] = obj.confidence + if obj.descriptor: + # Parse JSON-encoded descriptor + try: + result["descriptor"] = json.loads(obj.descriptor) + except (json.JSONDecodeError, TypeError): + result["descriptor"] = obj.descriptor + if obj.metadata: + result["metadata"] = obj.metadata + + return result + + def from_response_with_completion(self, obj: StructuredDataDiagnosisResponse) -> Tuple[Dict[str, Any], bool]: + """Returns (response_dict, is_final)""" + return self.from_pulsar(obj), True \ No newline at end of file diff --git a/trustgraph-base/trustgraph/schema/services/__init__.py b/trustgraph-base/trustgraph/schema/services/__init__.py index 48711833..d1c5448a 100644 --- a/trustgraph-base/trustgraph/schema/services/__init__.py +++ b/trustgraph-base/trustgraph/schema/services/__init__.py @@ -9,4 +9,5 @@ from .library import * from .lookup import * from .nlp_query import * from .structured_query import * -from .objects_query import * \ No newline at end of file +from .objects_query import * +from .diagnosis import * \ No newline at end of file diff --git a/trustgraph-base/trustgraph/schema/services/diagnosis.py b/trustgraph-base/trustgraph/schema/services/diagnosis.py new file mode 100644 index 00000000..2bd6caf0 --- /dev/null +++ b/trustgraph-base/trustgraph/schema/services/diagnosis.py @@ -0,0 +1,30 @@ +from pulsar.schema import Record, String, Map, Double +from ..core.primitives import Error + +############################################################################ + +# Structured data diagnosis services + +class StructuredDataDiagnosisRequest(Record): + operation = String() # "detect-type", "generate-descriptor", or "diagnose" + sample = String() # Data sample to analyze (text content) + type = String() # Data type (csv, json, xml) - optional, required for generate-descriptor + schema_name = String() # Target schema name for descriptor generation - optional + + # JSON encoded options (e.g., delimiter for CSV) + options = Map(String()) + +class StructuredDataDiagnosisResponse(Record): + error = Error() + + operation = String() # The operation that was performed + detected_type = String() # Detected data type (for detect-type/diagnose) - optional + confidence = Double() # Confidence score for type detection - optional + + # JSON encoded descriptor (for generate-descriptor/diagnose) - optional + descriptor = String() + + # JSON encoded additional metadata (e.g., field count, sample records) + metadata = Map(String()) + +############################################################################ \ No newline at end of file diff --git a/trustgraph-flow/pyproject.toml b/trustgraph-flow/pyproject.toml index 8b10748b..c1ecd346 100644 --- a/trustgraph-flow/pyproject.toml +++ b/trustgraph-flow/pyproject.toml @@ -96,6 +96,7 @@ prompt-template = "trustgraph.prompt.template:run" rev-gateway = "trustgraph.rev_gateway:run" run-processing = "trustgraph.processing:run" structured-query = "trustgraph.retrieval.structured_query:run" +structured-diag = "trustgraph.retrieval.structured_diag:run" text-completion-azure = "trustgraph.model.text_completion.azure:run" text-completion-azure-openai = "trustgraph.model.text_completion.azure_openai:run" text-completion-claude = "trustgraph.model.text_completion.claude:run" diff --git a/trustgraph-flow/trustgraph/gateway/dispatch/manager.py b/trustgraph-flow/trustgraph/gateway/dispatch/manager.py index fc582d07..6f8649f0 100644 --- a/trustgraph-flow/trustgraph/gateway/dispatch/manager.py +++ b/trustgraph-flow/trustgraph/gateway/dispatch/manager.py @@ -22,6 +22,7 @@ from . triples_query import TriplesQueryRequestor from . objects_query import ObjectsQueryRequestor from . nlp_query import NLPQueryRequestor from . structured_query import StructuredQueryRequestor +from . structured_diag import StructuredDiagRequestor from . embeddings import EmbeddingsRequestor from . graph_embeddings_query import GraphEmbeddingsQueryRequestor from . mcp_tool import McpToolRequestor @@ -57,6 +58,7 @@ request_response_dispatchers = { "objects": ObjectsQueryRequestor, "nlp-query": NLPQueryRequestor, "structured-query": StructuredQueryRequestor, + "structured-diag": StructuredDiagRequestor, } global_dispatchers = { diff --git a/trustgraph-flow/trustgraph/gateway/dispatch/structured_diag.py b/trustgraph-flow/trustgraph/gateway/dispatch/structured_diag.py new file mode 100644 index 00000000..8dae646d --- /dev/null +++ b/trustgraph-flow/trustgraph/gateway/dispatch/structured_diag.py @@ -0,0 +1,30 @@ +from ... schema import StructuredDataDiagnosisRequest, StructuredDataDiagnosisResponse +from ... messaging import TranslatorRegistry + +from . requestor import ServiceRequestor + +class StructuredDiagRequestor(ServiceRequestor): + def __init__( + self, pulsar_client, request_queue, response_queue, timeout, + consumer, subscriber, + ): + + super(StructuredDiagRequestor, self).__init__( + pulsar_client=pulsar_client, + request_queue=request_queue, + response_queue=response_queue, + request_schema=StructuredDataDiagnosisRequest, + response_schema=StructuredDataDiagnosisResponse, + subscription = subscriber, + consumer_name = consumer, + timeout=timeout, + ) + + self.request_translator = TranslatorRegistry.get_request_translator("structured-diag") + self.response_translator = TranslatorRegistry.get_response_translator("structured-diag") + + def to_request(self, body): + return self.request_translator.to_pulsar(body) + + def from_response(self, message): + return self.response_translator.from_response_with_completion(message) \ No newline at end of file diff --git a/trustgraph-flow/trustgraph/retrieval/structured_diag/__init__.py b/trustgraph-flow/trustgraph/retrieval/structured_diag/__init__.py new file mode 100644 index 00000000..c4e9c7e7 --- /dev/null +++ b/trustgraph-flow/trustgraph/retrieval/structured_diag/__init__.py @@ -0,0 +1,2 @@ +# Structured data diagnosis service +from .service import * \ No newline at end of file diff --git a/trustgraph-flow/trustgraph/retrieval/structured_diag/service.py b/trustgraph-flow/trustgraph/retrieval/structured_diag/service.py new file mode 100644 index 00000000..75af6dc3 --- /dev/null +++ b/trustgraph-flow/trustgraph/retrieval/structured_diag/service.py @@ -0,0 +1,394 @@ +""" +Structured Data Diagnosis Service - analyzes structured data and generates descriptors. +Supports three operations: detect-type, generate-descriptor, and diagnose (combined). +""" + +import json +import logging +from typing import Dict, Any, Optional + +from ...schema import StructuredDataDiagnosisRequest, StructuredDataDiagnosisResponse +from ...schema import PromptRequest, Error, RowSchema, Field as SchemaField + +from ...base import FlowProcessor, ConsumerSpec, ProducerSpec, PromptClientSpec + +from .type_detector import detect_data_type, detect_csv_options + +# Module logger +logger = logging.getLogger(__name__) + +default_ident = "structured-diag" +default_csv_prompt = "diagnose-csv" +default_json_prompt = "diagnose-json" +default_xml_prompt = "diagnose-xml" + + +class Processor(FlowProcessor): + + def __init__(self, **params): + + id = params.get("id", default_ident) + + # Config key for schemas + self.config_key = params.get("config_type", "schema") + + # Configurable prompt template names + self.csv_prompt = params.get("csv_prompt", default_csv_prompt) + self.json_prompt = params.get("json_prompt", default_json_prompt) + self.xml_prompt = params.get("xml_prompt", default_xml_prompt) + + super(Processor, self).__init__( + **params | { + "id": id, + "config_type": self.config_key, + } + ) + + self.register_specification( + ConsumerSpec( + name = "request", + schema = StructuredDataDiagnosisRequest, + handler = self.on_message + ) + ) + + self.register_specification( + ProducerSpec( + name = "response", + schema = StructuredDataDiagnosisResponse, + ) + ) + + # Client spec for calling prompt service + self.register_specification( + PromptClientSpec( + request_name = "prompt-request", + response_name = "prompt-response", + ) + ) + + # Register config handler for schema updates + self.register_config_handler(self.on_schema_config) + + # Schema storage: name -> RowSchema + self.schemas: Dict[str, RowSchema] = {} + + logger.info("Structured Data Diagnosis service initialized") + + async def on_schema_config(self, config, version): + """Handle schema configuration updates""" + logger.info(f"Loading schema configuration version {version}") + + # Clear existing schemas + self.schemas = {} + + # Check if our config type exists + if self.config_key not in config: + logger.warning(f"No '{self.config_key}' type in configuration") + return + + # Get the schemas dictionary for our type + schemas_config = config[self.config_key] + + # Process each schema in the schemas config + for schema_name, schema_json in schemas_config.items(): + try: + # Parse the JSON schema definition + schema_def = json.loads(schema_json) + + # Create Field objects + fields = [] + for field_def in schema_def.get("fields", []): + field = SchemaField( + name=field_def["name"], + type=field_def["type"], + size=field_def.get("size", 0), + primary=field_def.get("primary_key", False), + description=field_def.get("description", ""), + required=field_def.get("required", False), + enum_values=field_def.get("enum", []), + indexed=field_def.get("indexed", False) + ) + fields.append(field) + + # Create RowSchema + row_schema = RowSchema( + name=schema_def.get("name", schema_name), + description=schema_def.get("description", ""), + fields=fields + ) + + self.schemas[schema_name] = row_schema + logger.info(f"Loaded schema: {schema_name} with {len(fields)} fields") + + except Exception as e: + logger.error(f"Failed to parse schema {schema_name}: {e}", exc_info=True) + + logger.info(f"Schema configuration loaded: {len(self.schemas)} schemas") + + async def on_message(self, msg, consumer, flow): + """Handle incoming structured data diagnosis request""" + + try: + request = msg.value() + + # Sender-produced ID + id = msg.properties()["id"] + + logger.info(f"Handling structured data diagnosis request {id}: operation={request.operation}") + + if request.operation == "detect-type": + response = await self.detect_type_operation(request, flow) + elif request.operation == "generate-descriptor": + response = await self.generate_descriptor_operation(request, flow) + elif request.operation == "diagnose": + response = await self.diagnose_operation(request, flow) + else: + error = Error( + type="InvalidOperation", + message=f"Unknown operation: {request.operation}. Supported: detect-type, generate-descriptor, diagnose" + ) + response = StructuredDataDiagnosisResponse( + error=error, + operation=request.operation + ) + + # Send response + await flow("response").send( + id, response, properties={"id": id} + ) + + except Exception as e: + logger.error(f"Error processing diagnosis request: {e}", exc_info=True) + + error = Error( + type="ProcessingError", + message=f"Failed to process diagnosis request: {str(e)}" + ) + + response = StructuredDataDiagnosisResponse( + error=error, + operation=request.operation if request else "unknown" + ) + + await flow("response").send( + id, response, properties={"id": id} + ) + + async def detect_type_operation(self, request: StructuredDataDiagnosisRequest, flow) -> StructuredDataDiagnosisResponse: + """Handle detect-type operation""" + logger.info("Processing detect-type operation") + + detected_type, confidence = detect_data_type(request.sample) + + metadata = {} + if detected_type == "csv": + csv_options = detect_csv_options(request.sample) + metadata["csv_options"] = json.dumps(csv_options) + + return StructuredDataDiagnosisResponse( + error=None, + operation=request.operation, + detected_type=detected_type or "", + confidence=confidence, + metadata=metadata + ) + + async def generate_descriptor_operation(self, request: StructuredDataDiagnosisRequest, flow) -> StructuredDataDiagnosisResponse: + """Handle generate-descriptor operation""" + logger.info(f"Processing generate-descriptor operation for type: {request.type}") + + if not request.type: + error = Error( + type="MissingParameter", + message="Type parameter is required for generate-descriptor operation" + ) + return StructuredDataDiagnosisResponse(error=error, operation=request.operation) + + if not request.schema_name: + error = Error( + type="MissingParameter", + message="Schema name parameter is required for generate-descriptor operation" + ) + return StructuredDataDiagnosisResponse(error=error, operation=request.operation) + + # Get target schema + if request.schema_name not in self.schemas: + error = Error( + type="SchemaNotFound", + message=f"Schema '{request.schema_name}' not found in configuration" + ) + return StructuredDataDiagnosisResponse(error=error, operation=request.operation) + + target_schema = self.schemas[request.schema_name] + + # Generate descriptor using prompt service + descriptor = await self.generate_descriptor_with_prompt( + request.sample, request.type, target_schema, request.options, flow + ) + + if descriptor is None: + error = Error( + type="DescriptorGenerationFailed", + message="Failed to generate descriptor using prompt service" + ) + return StructuredDataDiagnosisResponse(error=error, operation=request.operation) + + return StructuredDataDiagnosisResponse( + error=None, + operation=request.operation, + descriptor=json.dumps(descriptor), + metadata={"schema_name": request.schema_name, "type": request.type} + ) + + async def diagnose_operation(self, request: StructuredDataDiagnosisRequest, flow) -> StructuredDataDiagnosisResponse: + """Handle combined diagnose operation""" + logger.info("Processing combined diagnose operation") + + # Step 1: Detect type + detected_type, confidence = detect_data_type(request.sample) + + if not detected_type: + error = Error( + type="TypeDetectionFailed", + message="Unable to detect data type from sample" + ) + return StructuredDataDiagnosisResponse(error=error, operation=request.operation) + + # Step 2: Use provided schema name or auto-select first available + schema_name = request.schema_name + if not schema_name and self.schemas: + schema_name = list(self.schemas.keys())[0] + logger.info(f"Auto-selected schema: {schema_name}") + + if not schema_name: + error = Error( + type="NoSchemaAvailable", + message="No schema specified and no schemas available in configuration" + ) + return StructuredDataDiagnosisResponse(error=error, operation=request.operation) + + if schema_name not in self.schemas: + error = Error( + type="SchemaNotFound", + message=f"Schema '{schema_name}' not found in configuration" + ) + return StructuredDataDiagnosisResponse(error=error, operation=request.operation) + + target_schema = self.schemas[schema_name] + + # Step 3: Generate descriptor + descriptor = await self.generate_descriptor_with_prompt( + request.sample, detected_type, target_schema, request.options, flow + ) + + if descriptor is None: + error = Error( + type="DescriptorGenerationFailed", + message="Failed to generate descriptor using prompt service" + ) + return StructuredDataDiagnosisResponse(error=error, operation=request.operation) + + metadata = { + "schema_name": schema_name, + "auto_selected_schema": request.schema_name != schema_name + } + + if detected_type == "csv": + csv_options = detect_csv_options(request.sample) + metadata["csv_options"] = json.dumps(csv_options) + + return StructuredDataDiagnosisResponse( + error=None, + operation=request.operation, + detected_type=detected_type, + confidence=confidence, + descriptor=json.dumps(descriptor), + metadata=metadata + ) + + async def generate_descriptor_with_prompt( + self, sample: str, data_type: str, target_schema: RowSchema, + options: Dict[str, str], flow + ) -> Optional[Dict[str, Any]]: + """Generate descriptor using appropriate prompt service""" + + # Select prompt template based on data type + prompt_templates = { + "csv": self.csv_prompt, + "json": self.json_prompt, + "xml": self.xml_prompt + } + + prompt_id = prompt_templates.get(data_type) + if not prompt_id: + logger.error(f"No prompt template defined for data type: {data_type}") + return None + + # Prepare schema information for prompt + schema_info = { + "name": target_schema.name, + "description": target_schema.description, + "fields": [ + { + "name": f.name, + "type": f.type, + "description": f.description, + "required": f.required, + "primary_key": f.primary, + "indexed": f.indexed, + "enum_values": f.enum_values if f.enum_values else [] + } + for f in target_schema.fields + ] + } + + # Create prompt variables + variables = { + "sample": sample, + "schemas": [schema_info], # Array with single target schema + "options": options or {} + } + + # Call prompt service + terms = {k: json.dumps(v) for k, v in variables.items()} + prompt_request = PromptRequest( + id=prompt_id, + terms=terms + ) + + try: + logger.info(f"Calling prompt service with template: {prompt_id}") + response = await flow("prompt-request").request(prompt_request) + + if response.error: + logger.error(f"Prompt service error: {response.error.message}") + return None + + # Parse response + if response.object: + try: + return json.loads(response.object) + except json.JSONDecodeError as e: + logger.error(f"Failed to parse prompt response as JSON: {e}") + logger.debug(f"Response object: {response.object}") + return None + elif response.text: + try: + return json.loads(response.text) + except json.JSONDecodeError as e: + logger.error(f"Failed to parse prompt text response as JSON: {e}") + logger.debug(f"Response text: {response.text}") + return None + else: + logger.error("Empty response from prompt service") + return None + + except Exception as e: + logger.error(f"Error calling prompt service: {e}", exc_info=True) + return None + + +def run(): + """Entry point for structured-diag command""" + Processor.launch(default_ident, __doc__) \ No newline at end of file diff --git a/trustgraph-flow/trustgraph/retrieval/structured_diag/type_detector.py b/trustgraph-flow/trustgraph/retrieval/structured_diag/type_detector.py new file mode 100644 index 00000000..ccd6bf8b --- /dev/null +++ b/trustgraph-flow/trustgraph/retrieval/structured_diag/type_detector.py @@ -0,0 +1,236 @@ +""" +Algorithmic data type detection for structured data. +Determines if data is CSV, JSON, or XML based on content analysis. +""" + +import json +import xml.etree.ElementTree as ET +import csv +from io import StringIO +import logging +from typing import Dict, Optional, Tuple + +# Module logger +logger = logging.getLogger(__name__) + + +def detect_data_type(sample: str) -> Tuple[Optional[str], float]: + """ + Detect the data type (csv, json, xml) of a data sample. + + Args: + sample: String containing data sample to analyze + + Returns: + Tuple of (detected_type, confidence_score) + detected_type: "csv", "json", "xml", or None if unable to determine + confidence_score: Float between 0.0 and 1.0 indicating confidence + """ + if not sample or not sample.strip(): + return None, 0.0 + + sample = sample.strip() + + # Try each format and calculate confidence scores + json_confidence = _check_json_format(sample) + xml_confidence = _check_xml_format(sample) + csv_confidence = _check_csv_format(sample) + + logger.debug(f"Format confidence scores - JSON: {json_confidence}, XML: {xml_confidence}, CSV: {csv_confidence}") + + # Find the format with highest confidence + scores = { + "json": json_confidence, + "xml": xml_confidence, + "csv": csv_confidence + } + + best_format = max(scores, key=scores.get) + best_confidence = scores[best_format] + + # Only return a result if confidence is above threshold + if best_confidence < 0.3: + return None, best_confidence + + return best_format, best_confidence + + +def _check_json_format(sample: str) -> float: + """Check if sample is valid JSON format""" + try: + # Must start with { or [ + if not (sample.startswith('{') or sample.startswith('[')): + return 0.0 + + # Try to parse as JSON + data = json.loads(sample) + + # Higher confidence for structured data + if isinstance(data, dict): + return 0.95 + elif isinstance(data, list) and len(data) > 0: + # Check if it's an array of objects (common for structured data) + if isinstance(data[0], dict): + return 0.9 + else: + return 0.7 + else: + return 0.6 + + except (json.JSONDecodeError, ValueError): + return 0.0 + + +def _check_xml_format(sample: str) -> float: + """Check if sample is valid XML format""" + try: + # Quick heuristic checks first + if not sample.startswith('<'): + return 0.0 + + if not ('>' in sample and ' 10: + return 0.95 + elif child_count > 5: + return 0.9 + elif child_count > 0: + return 0.8 + else: + return 0.6 + + except ET.ParseError: + # Check for common XML characteristics even if not well-formed + xml_indicators = [' float: + """Check if sample is valid CSV format""" + try: + lines = sample.strip().split('\n') + if len(lines) < 2: + return 0.0 + + # Try to parse as CSV with different delimiters + delimiters = [',', ';', '\t', '|'] + best_score = 0.0 + + for delimiter in delimiters: + score = _check_csv_with_delimiter(sample, delimiter) + best_score = max(best_score, score) + + return best_score + + except Exception: + return 0.0 + + +def _check_csv_with_delimiter(sample: str, delimiter: str) -> float: + """Check CSV format with specific delimiter""" + try: + reader = csv.reader(StringIO(sample), delimiter=delimiter) + rows = list(reader) + + if len(rows) < 2: + return 0.0 + + # Check consistency of column counts + first_row_cols = len(rows[0]) + if first_row_cols < 2: + return 0.0 + + consistent_rows = 0 + for row in rows[1:]: + if len(row) == first_row_cols: + consistent_rows += 1 + + consistency_ratio = consistent_rows / (len(rows) - 1) if len(rows) > 1 else 0 + + # Base score on consistency and structure + if consistency_ratio > 0.8: + # Higher score for more columns and rows + column_bonus = min(first_row_cols * 0.05, 0.2) + row_bonus = min(len(rows) * 0.01, 0.1) + return min(0.7 + column_bonus + row_bonus, 0.95) + elif consistency_ratio > 0.6: + return 0.5 + else: + return 0.2 + + except Exception: + return 0.0 + + +def detect_csv_options(sample: str) -> Dict[str, any]: + """ + Detect CSV-specific options like delimiter and header presence. + + Args: + sample: CSV data sample + + Returns: + Dict with detected options: delimiter, has_header, etc. + """ + options = { + "delimiter": ",", + "has_header": True, + "encoding": "utf-8" + } + + try: + lines = sample.strip().split('\n') + if len(lines) < 2: + return options + + # Detect delimiter + delimiters = [',', ';', '\t', '|'] + best_delimiter = "," + best_score = 0 + + for delimiter in delimiters: + score = _check_csv_with_delimiter(sample, delimiter) + if score > best_score: + best_score = score + best_delimiter = delimiter + + options["delimiter"] = best_delimiter + + # Detect header (heuristic: first row has text, second row has more numbers/structured data) + reader = csv.reader(StringIO(sample), delimiter=best_delimiter) + rows = list(reader) + + if len(rows) >= 2: + first_row = rows[0] + second_row = rows[1] + + # Count numeric fields in each row + first_numeric = sum(1 for cell in first_row if _is_numeric(cell)) + second_numeric = sum(1 for cell in second_row if _is_numeric(cell)) + + # If second row has more numeric values, first row is likely header + if second_numeric > first_numeric and first_numeric < len(first_row) * 0.7: + options["has_header"] = True + else: + options["has_header"] = False + + except Exception as e: + logger.debug(f"Error detecting CSV options: {e}") + + return options + + +def _is_numeric(value: str) -> bool: + """Check if a string value represents a number""" + try: + float(value.strip()) + return True + except (ValueError, AttributeError): + return False \ No newline at end of file