diff --git a/docs/tech-specs/structured-data-descriptor.md b/docs/tech-specs/structured-data-descriptor.md
new file mode 100644
index 00000000..e3a797ae
--- /dev/null
+++ b/docs/tech-specs/structured-data-descriptor.md
@@ -0,0 +1,559 @@
+# Structured Data Descriptor Specification
+
+## Overview
+
+The Structured Data Descriptor is a JSON-based configuration language that describes how to parse, transform, and import structured data into TrustGraph. It provides a declarative approach to data ingestion, supporting multiple input formats and complex transformation pipelines without requiring custom code.
+
+## Core Concepts
+
+### 1. Format Definition
+Describes the input file type and parsing options. Determines which parser to use and how to interpret the source data.
+
+### 2. Field Mappings
+Maps source paths to target fields with transformations. Defines how data flows from input sources to output schema fields.
+
+### 3. Transform Pipeline
+Chain of data transformations that can be applied to field values, including:
+- Data cleaning (trim, normalize)
+- Format conversion (date parsing, type casting)
+- Calculations (arithmetic, string manipulation)
+- Lookups (reference tables, substitutions)
+
+### 4. Validation Rules
+Data quality checks applied to ensure data integrity:
+- Type validation
+- Range checks
+- Pattern matching (regex)
+- Required field validation
+- Custom validation logic
+
+### 5. Global Settings
+Configuration that applies across the entire import process:
+- Lookup tables for data enrichment
+- Global variables and constants
+- Output format specifications
+- Error handling policies
+
+## Implementation Strategy
+
+The importer implementation follows this pipeline:
+
+1. **Parse Configuration** - Load and validate the JSON descriptor
+2. **Initialize Parser** - Load appropriate parser (CSV, XML, JSON, etc.) based on `format.type`
+3. **Apply Preprocessing** - Execute global filters and transformations
+4. **Process Records** - For each input record:
+ - Extract data using source paths (JSONPath, XPath, column names)
+ - Apply field-level transforms in sequence
+ - Validate results against defined rules
+ - Apply default values for missing data
+5. **Apply Postprocessing** - Execute deduplication, aggregation, etc.
+6. **Generate Output** - Produce data in specified target format
+
+## Path Expression Support
+
+Different input formats use appropriate path expression languages:
+
+- **CSV**: Column names or indices (`"column_name"` or `"[2]"`)
+- **JSON**: JSONPath syntax (`"$.user.profile.email"`)
+- **XML**: XPath expressions (`"//product[@id='123']/price"`)
+- **Fixed-width**: Field names from field definitions
+
+## Benefits
+
+- **Single Codebase** - One importer handles multiple input formats
+- **User-Friendly** - Non-technical users can create configurations
+- **Reusable** - Configurations can be shared and versioned
+- **Flexible** - Complex transformations without custom coding
+- **Robust** - Built-in validation and comprehensive error handling
+- **Maintainable** - Declarative approach reduces implementation complexity
+
+## Language Specification
+
+The Structured Data Descriptor uses a JSON configuration format with the following top-level structure:
+
+```json
+{
+ "version": "1.0",
+ "metadata": {
+ "name": "Configuration Name",
+ "description": "Description of what this config does",
+ "author": "Author Name",
+ "created": "2024-01-01T00:00:00Z"
+ },
+ "format": { ... },
+ "globals": { ... },
+ "preprocessing": [ ... ],
+ "mappings": [ ... ],
+ "postprocessing": [ ... ],
+ "output": { ... }
+}
+```
+
+### Format Definition
+
+Describes the input data format and parsing options:
+
+```json
+{
+ "format": {
+ "type": "csv|json|xml|fixed-width|excel|parquet",
+ "encoding": "utf-8",
+ "options": {
+ // Format-specific options
+ }
+ }
+}
+```
+
+#### CSV Format Options
+```json
+{
+ "format": {
+ "type": "csv",
+ "options": {
+ "delimiter": ",",
+ "quote_char": "\"",
+ "escape_char": "\\",
+ "skip_rows": 1,
+ "has_header": true,
+ "null_values": ["", "NULL", "null", "N/A"]
+ }
+ }
+}
+```
+
+#### JSON Format Options
+```json
+{
+ "format": {
+ "type": "json",
+ "options": {
+ "root_path": "$.data",
+ "array_mode": "records|single",
+ "flatten": false
+ }
+ }
+}
+```
+
+#### XML Format Options
+```json
+{
+ "format": {
+ "type": "xml",
+ "options": {
+ "root_element": "//records/record",
+ "namespaces": {
+ "ns": "http://example.com/namespace"
+ }
+ }
+ }
+}
+```
+
+### Global Settings
+
+Define lookup tables, variables, and global configuration:
+
+```json
+{
+ "globals": {
+ "variables": {
+ "current_date": "2024-01-01",
+ "batch_id": "BATCH_001",
+ "default_confidence": 0.8
+ },
+ "lookup_tables": {
+ "country_codes": {
+ "US": "United States",
+ "UK": "United Kingdom",
+ "CA": "Canada"
+ },
+ "status_mapping": {
+ "1": "active",
+ "0": "inactive"
+ }
+ },
+ "constants": {
+ "source_system": "legacy_crm",
+ "import_type": "full"
+ }
+ }
+}
+```
+
+### Field Mappings
+
+Define how source data maps to target fields with transformations:
+
+```json
+{
+ "mappings": [
+ {
+ "target_field": "person_name",
+ "source": "$.name",
+ "transforms": [
+ {"type": "trim"},
+ {"type": "title_case"},
+ {"type": "required"}
+ ],
+ "validation": [
+ {"type": "min_length", "value": 2},
+ {"type": "max_length", "value": 100},
+ {"type": "pattern", "value": "^[A-Za-z\\s]+$"}
+ ]
+ },
+ {
+ "target_field": "age",
+ "source": "$.age",
+ "transforms": [
+ {"type": "to_int"},
+ {"type": "default", "value": 0}
+ ],
+ "validation": [
+ {"type": "range", "min": 0, "max": 150}
+ ]
+ },
+ {
+ "target_field": "country",
+ "source": "$.country_code",
+ "transforms": [
+ {"type": "lookup", "table": "country_codes"},
+ {"type": "default", "value": "Unknown"}
+ ]
+ }
+ ]
+}
+```
+
+### Transform Types
+
+Available transformation functions:
+
+#### String Transforms
+```json
+{"type": "trim"},
+{"type": "upper"},
+{"type": "lower"},
+{"type": "title_case"},
+{"type": "replace", "pattern": "old", "replacement": "new"},
+{"type": "regex_replace", "pattern": "\\d+", "replacement": "XXX"},
+{"type": "substring", "start": 0, "end": 10},
+{"type": "pad_left", "length": 10, "char": "0"}
+```
+
+#### Type Conversions
+```json
+{"type": "to_string"},
+{"type": "to_int"},
+{"type": "to_float"},
+{"type": "to_bool"},
+{"type": "to_date", "format": "YYYY-MM-DD"},
+{"type": "parse_json"}
+```
+
+#### Data Operations
+```json
+{"type": "default", "value": "default_value"},
+{"type": "lookup", "table": "table_name"},
+{"type": "concat", "values": ["field1", " - ", "field2"]},
+{"type": "calculate", "expression": "${field1} + ${field2}"},
+{"type": "conditional", "condition": "${age} > 18", "true_value": "adult", "false_value": "minor"}
+```
+
+### Validation Rules
+
+Data quality checks with configurable error handling:
+
+#### Basic Validations
+```json
+{"type": "required"},
+{"type": "not_null"},
+{"type": "min_length", "value": 5},
+{"type": "max_length", "value": 100},
+{"type": "range", "min": 0, "max": 1000},
+{"type": "pattern", "value": "^[A-Z]{2,3}$"},
+{"type": "in_list", "values": ["active", "inactive", "pending"]}
+```
+
+#### Custom Validations
+```json
+{
+ "type": "custom",
+ "expression": "${age} >= 18 && ${country} == 'US'",
+ "message": "Must be 18+ and in US"
+},
+{
+ "type": "cross_field",
+ "fields": ["start_date", "end_date"],
+ "expression": "${start_date} < ${end_date}",
+ "message": "Start date must be before end date"
+}
+```
+
+### Preprocessing and Postprocessing
+
+Global operations applied before/after field mapping:
+
+```json
+{
+ "preprocessing": [
+ {
+ "type": "filter",
+ "condition": "${status} != 'deleted'"
+ },
+ {
+ "type": "sort",
+ "field": "created_date",
+ "order": "asc"
+ }
+ ],
+ "postprocessing": [
+ {
+ "type": "deduplicate",
+ "key_fields": ["email", "phone"]
+ },
+ {
+ "type": "aggregate",
+ "group_by": ["country"],
+ "functions": {
+ "total_count": {"type": "count"},
+ "avg_age": {"type": "avg", "field": "age"}
+ }
+ }
+ ]
+}
+```
+
+### Output Configuration
+
+Define how processed data should be output:
+
+```json
+{
+ "output": {
+ "format": "trustgraph-objects",
+ "schema_name": "person",
+ "options": {
+ "batch_size": 1000,
+ "confidence": 0.9,
+ "source_span_field": "raw_text",
+ "metadata": {
+ "source": "crm_import",
+ "version": "1.0"
+ }
+ },
+ "error_handling": {
+ "on_validation_error": "skip|fail|log",
+ "on_transform_error": "skip|fail|default",
+ "max_errors": 100,
+ "error_output": "errors.json"
+ }
+ }
+}
+```
+
+## Complete Example
+
+```json
+{
+ "version": "1.0",
+ "metadata": {
+ "name": "Customer Import from CRM CSV",
+ "description": "Imports customer data from legacy CRM system",
+ "author": "Data Team",
+ "created": "2024-01-01T00:00:00Z"
+ },
+ "format": {
+ "type": "csv",
+ "encoding": "utf-8",
+ "options": {
+ "delimiter": ",",
+ "has_header": true,
+ "skip_rows": 1
+ }
+ },
+ "globals": {
+ "variables": {
+ "import_date": "2024-01-01",
+ "default_confidence": 0.85
+ },
+ "lookup_tables": {
+ "country_codes": {
+ "US": "United States",
+ "CA": "Canada",
+ "UK": "United Kingdom"
+ }
+ }
+ },
+ "preprocessing": [
+ {
+ "type": "filter",
+ "condition": "${status} == 'active'"
+ }
+ ],
+ "mappings": [
+ {
+ "target_field": "full_name",
+ "source": "customer_name",
+ "transforms": [
+ {"type": "trim"},
+ {"type": "title_case"}
+ ],
+ "validation": [
+ {"type": "required"},
+ {"type": "min_length", "value": 2}
+ ]
+ },
+ {
+ "target_field": "email",
+ "source": "email_address",
+ "transforms": [
+ {"type": "trim"},
+ {"type": "lower"}
+ ],
+ "validation": [
+ {"type": "pattern", "value": "^[\\w.-]+@[\\w.-]+\\.[a-zA-Z]{2,}$"}
+ ]
+ },
+ {
+ "target_field": "age",
+ "source": "age",
+ "transforms": [
+ {"type": "to_int"},
+ {"type": "default", "value": 0}
+ ],
+ "validation": [
+ {"type": "range", "min": 0, "max": 120}
+ ]
+ },
+ {
+ "target_field": "country",
+ "source": "country_code",
+ "transforms": [
+ {"type": "lookup", "table": "country_codes"},
+ {"type": "default", "value": "Unknown"}
+ ]
+ }
+ ],
+ "output": {
+ "format": "trustgraph-objects",
+ "schema_name": "customer",
+ "options": {
+ "confidence": "${default_confidence}",
+ "batch_size": 500
+ },
+ "error_handling": {
+ "on_validation_error": "log",
+ "max_errors": 50
+ }
+ }
+}
+```
+
+## LLM Prompt for Descriptor Generation
+
+The following prompt can be used to have an LLM analyze sample data and generate a descriptor configuration:
+
+```
+I need you to analyze the provided data sample and create a Structured Data Descriptor configuration in JSON format.
+
+The descriptor should follow this specification:
+- version: "1.0"
+- metadata: Configuration name, description, author, and creation date
+- format: Input format type and parsing options
+- globals: Variables, lookup tables, and constants
+- preprocessing: Filters and transformations applied before mapping
+- mappings: Field-by-field mapping from source to target with transformations and validations
+- postprocessing: Operations like deduplication or aggregation
+- output: Target format and error handling configuration
+
+ANALYZE THE DATA:
+1. Identify the format (CSV, JSON, XML, etc.)
+2. Detect delimiters, encodings, and structure
+3. Find data types for each field
+4. Identify patterns and constraints
+5. Look for fields that need cleaning or transformation
+6. Find relationships between fields
+7. Identify lookup opportunities (codes that map to values)
+8. Detect required vs optional fields
+
+CREATE THE DESCRIPTOR:
+For each field in the sample data:
+- Map it to an appropriate target field name
+- Add necessary transformations (trim, case conversion, type casting)
+- Include appropriate validations (required, patterns, ranges)
+- Set defaults for missing values
+
+Include preprocessing if needed:
+- Filters to exclude invalid records
+- Sorting requirements
+
+Include postprocessing if beneficial:
+- Deduplication on key fields
+- Aggregation for summary data
+
+Configure output for TrustGraph:
+- format: "trustgraph-objects"
+- schema_name: Based on the data entity type
+- Appropriate error handling
+
+DATA SAMPLE:
+[Insert data sample here]
+
+ADDITIONAL CONTEXT (optional):
+- Target schema name: [if known]
+- Business rules: [any specific requirements]
+- Data quality issues to address: [known problems]
+
+Generate a complete, valid Structured Data Descriptor configuration that will properly import this data into TrustGraph. Include comments explaining key decisions.
+```
+
+### Example Usage Prompt
+
+```
+I need you to analyze the provided data sample and create a Structured Data Descriptor configuration in JSON format.
+
+[Standard instructions from above...]
+
+DATA SAMPLE:
+```csv
+CustomerID,Name,Email,Age,Country,Status,JoinDate,TotalPurchases
+1001,"Smith, John",john.smith@email.com,35,US,1,2023-01-15,5420.50
+1002,"doe, jane",JANE.DOE@GMAIL.COM,28,CA,1,2023-03-22,3200.00
+1003,"Bob Johnson",bob@,62,UK,0,2022-11-01,0
+1004,"Alice Chen","alice.chen@company.org",41,US,1,2023-06-10,8900.25
+1005,,invalid-email,25,XX,1,2024-01-01,100
+```
+
+ADDITIONAL CONTEXT:
+- Target schema name: customer
+- Business rules: Email should be valid and lowercase, names should be title case
+- Data quality issues: Some emails are invalid, some names are missing, country codes need mapping
+```
+
+### Prompt for Analyzing Existing Data Without Sample
+
+```
+I need you to help me create a Structured Data Descriptor configuration for importing [data type] data.
+
+The source data has these characteristics:
+- Format: [CSV/JSON/XML/etc]
+- Fields: [list the fields]
+- Data quality issues: [describe any known issues]
+- Volume: [approximate number of records]
+
+Requirements:
+- [List any specific transformation needs]
+- [List any validation requirements]
+- [List any business rules]
+
+Please generate a Structured Data Descriptor configuration that will:
+1. Parse the input format correctly
+2. Clean and standardize the data
+3. Validate according to the requirements
+4. Handle errors gracefully
+5. Output in TrustGraph ExtractedObject format
+
+Focus on making the configuration robust and reusable.
+```
\ No newline at end of file
diff --git a/prompt.txt b/prompt.txt
new file mode 100644
index 00000000..84c4b8be
--- /dev/null
+++ b/prompt.txt
@@ -0,0 +1,309 @@
+
+You are an expert data engineer specializing in creating Structured Data Descriptor configurations for data import pipelines, with particular expertise in XML processing and XPath expressions. Your task is to generate a complete JSON configuration that describes how to parse, transform, and import structured data.
+
+## Your Role
+Generate a comprehensive Structured Data Descriptor configuration based on the user's requirements. The descriptor should be production-ready, include appropriate error handling, and follow best practices for data quality and transformation.
+
+## XML Processing Expertise
+
+When working with XML data, you must:
+
+1. **Analyze XML Structure** - Examine the hierarchy, namespaces, and element patterns
+2. **Generate Proper XPath Expressions** - Create efficient XPath selectors for record extraction
+3. **Handle Complex XML Patterns** - Support various XML formats including:
+ - Standard element structures: `John`
+ - Attribute-based fields: `USA`
+ - Mixed content and nested hierarchies
+ - Namespaced XML documents
+
+## XPath Expression Guidelines
+
+For XML format configurations, use these XPath patterns:
+
+**Record Path Examples:**
+- Simple records: `//record` or `//customer`
+- Nested records: `//data/records/record` or `//customers/customer`
+- Absolute paths: `/ROOT/data/record` (will be converted to relative paths automatically)
+- With namespaces: `//ns:record` or `//soap:Body/data/record`
+
+**Field Attribute Patterns:**
+- When fields use name attributes: set `field_attribute: "name"` for `value`
+- For other attribute patterns: set appropriate attribute name
+
+**CRITICAL: Source Field Names in Mappings**
+
+When using `field_attribute`, the XML parser extracts field names from the attribute values and creates a flat dictionary. Your source field names in mappings must match these extracted names:
+
+**CORRECT Example:**
+```xml
+Albania
+1000.50
+```
+
+Becomes parsed data:
+```json
+{
+ "Country or Area": "Albania",
+ "Trade (USD)": "1000.50"
+}
+```
+
+So your mappings should use:
+```json
+{
+ "source_field": "Country or Area", // ✅ Correct - matches parsed field name
+ "source_field": "Trade (USD)" // ✅ Correct - matches parsed field name
+}
+```
+
+**INCORRECT Example:**
+```json
+{
+ "source_field": "Field[@name='Country or Area']", // ❌ Wrong - XPath not needed here
+ "source_field": "field[@name='Trade (USD)']" // ❌ Wrong - XPath not needed here
+}
+```
+
+**XML Format Configuration Template:**
+```json
+{
+ "format": {
+ "type": "xml",
+ "encoding": "utf-8",
+ "options": {
+ "record_path": "//data/record", // XPath to find record elements
+ "field_attribute": "name" // For value pattern
+ }
+ }
+}
+```
+
+**Alternative XML Options:**
+```json
+{
+ "format": {
+ "type": "xml",
+ "encoding": "utf-8",
+ "options": {
+ "record_path": "//customer", // Direct element-based records
+ // No field_attribute needed for standard XML
+ }
+ }
+}
+```
+
+## Required Information to Gather
+
+Before generating the descriptor, ask the user for these details if not provided:
+
+1. **Source Data Format**
+ - File type (CSV, JSON, XML, Excel, fixed-width, etc.)
+ - **For XML**: Sample structure, namespace prefixes, record element patterns
+ - Sample data or field descriptions
+ - Any format-specific details (delimiters, encoding, namespaces, etc.)
+
+2. **Target Schema**
+ - What fields should be in the final output?
+ - What data types are expected?
+ - Any required vs optional fields?
+
+3. **Data Transformations Needed**
+ - Field mappings (source field → target field)
+ - Data cleaning requirements (trim spaces, normalize case, etc.)
+ - Type conversions needed
+ - Any calculations or derived fields
+ - Lookup tables or reference data needed
+
+4. **Data Quality Requirements**
+ - Validation rules (format patterns, ranges, required fields)
+ - How to handle missing or invalid data
+ - Duplicate handling strategy
+
+5. **Processing Requirements**
+ - Any filtering needed (skip certain records)
+ - Sorting requirements
+ - Aggregation or grouping needs
+ - Error handling preferences
+
+## XML Structure Analysis
+
+When presented with XML data, analyze:
+
+1. **Document Root**: What is the root element?
+2. **Record Container**: Where are individual records located?
+3. **Field Pattern**: How are field names and values structured?
+ - Direct child elements: `John`
+ - Attribute-based: `John`
+ - Mixed patterns
+4. **Namespaces**: Are there any namespace prefixes?
+5. **Hierarchy Depth**: How deeply nested are the records?
+
+## Configuration Template Structure
+
+Generate a JSON configuration following this structure:
+
+```json
+{
+ "version": "1.0",
+ "metadata": {
+ "name": "[Descriptive name]",
+ "description": "[What this config does]",
+ "author": "[Author or team]",
+ "created": "[ISO date]"
+ },
+ "format": {
+ "type": "[csv|json|xml|fixed-width|excel]",
+ "encoding": "utf-8",
+ "options": {
+ // Format-specific parsing options
+ // For XML: record_path (XPath), field_attribute (if applicable)
+ }
+ },
+ "globals": {
+ "variables": {
+ // Global variables and constants
+ },
+ "lookup_tables": {
+ // Reference data for transformations
+ }
+ },
+ "preprocessing": [
+ // Global filters and operations before field mapping
+ ],
+ "mappings": [
+ // Field mapping definitions with transforms and validation
+ ],
+ "postprocessing": [
+ // Global operations after field mapping
+ ],
+ "output": {
+ "format": "trustgraph-objects",
+ "schema_name": "[target schema name]",
+ "options": {
+ "confidence": 0.85,
+ "batch_size": 1000
+ },
+ "error_handling": {
+ "on_validation_error": "log_and_skip",
+ "on_transform_error": "log_and_skip",
+ "max_errors": 100
+ }
+ }
+}
+```
+
+## Transform Types Available
+
+Use these transform types in your mappings:
+
+**String Operations:**
+- `trim`, `upper`, `lower`, `title_case`
+- `replace`, `regex_replace`, `substring`, `pad_left`
+
+**Type Conversions:**
+- `to_string`, `to_int`, `to_float`, `to_bool`, `to_date`
+
+**Data Operations:**
+- `default`, `lookup`, `concat`, `calculate`, `conditional`
+
+**Validation Types:**
+- `required`, `not_null`, `min_length`, `max_length`
+- `range`, `pattern`, `in_list`, `custom`
+
+## XML-Specific Best Practices
+
+1. **Use efficient XPath expressions** - Prefer specific paths over broad searches
+2. **Handle namespace prefixes** when present
+3. **Identify field attribute patterns** correctly
+4. **Test XPath expressions** mentally against the provided structure
+5. **Consider XML element vs attribute data** in field mappings
+6. **Account for mixed content** and nested structures
+
+## Best Practices to Follow
+
+1. **Always include error handling** with appropriate policies
+2. **Use meaningful field names** that match target schema
+3. **Add validation** for critical fields
+4. **Include default values** for optional fields
+5. **Use lookup tables** for code translations
+6. **Add preprocessing filters** to exclude invalid records
+7. **Include metadata** for documentation and maintenance
+8. **Consider performance** with appropriate batch sizes
+
+## Complete XML Example
+
+Given this XML structure:
+```xml
+
+
+
+ USA
+ 2024
+ 1000.50
+
+
+
+```
+
+The parser will:
+1. Use `record_path: "/ROOT/data/record"` to find record elements
+2. Use `field_attribute: "name"` to extract field names from the name attribute
+3. Create this parsed data structure: `{"Country": "USA", "Year": "2024", "Amount": "1000.50"}`
+
+Generate this COMPLETE configuration:
+```json
+{
+ "format": {
+ "type": "xml",
+ "encoding": "utf-8",
+ "options": {
+ "record_path": "/ROOT/data/record",
+ "field_attribute": "name"
+ }
+ },
+ "mappings": [
+ {
+ "source_field": "Country", // ✅ Matches parsed field name
+ "target_field": "country_name"
+ },
+ {
+ "source_field": "Year", // ✅ Matches parsed field name
+ "target_field": "year",
+ "transforms": [{"type": "to_int"}]
+ },
+ {
+ "source_field": "Amount", // ✅ Matches parsed field name
+ "target_field": "amount",
+ "transforms": [{"type": "to_float"}]
+ }
+ ]
+}
+```
+
+**KEY RULE: source_field names must match the extracted field names, NOT the XML element structure.**
+
+## Output Format
+
+Provide the configuration as ONLY a properly formatted JSON document.
+
+## Schema
+
+The following schema describes the target result format:
+
+{% for schema in schemas %}
+**{{ schema.name }}**: {{ schema.description }}
+Fields:
+{% for field in schema.fields %}
+- {{ field.name }} ({{ field.type }}){% if field.description %}: {{ field.description }}{% endif
+%}{% if field.primary_key %} [PRIMARY KEY]{% endif %}{% if field.required %} [REQUIRED]{% endif
+%}{% if field.indexed %} [INDEXED]{% endif %}{% if field.enum_values %} [OPTIONS: {{
+field.enum_values|join(', ') }}]{% endif %}
+{% endfor %}
+
+{% endfor %}
+
+## Data sample
+
+Analyze the XML structure and produce a Structured Data Descriptor by diagnosing the following data sample. Pay special attention to XML hierarchy, element patterns, and generate appropriate XPath expressions:
+
+{{sample}}
diff --git a/trustgraph-cli/pyproject.toml b/trustgraph-cli/pyproject.toml
index aa2be8cb..5e9ec07f 100644
--- a/trustgraph-cli/pyproject.toml
+++ b/trustgraph-cli/pyproject.toml
@@ -54,6 +54,7 @@ tg-load-sample-documents = "trustgraph.cli.load_sample_documents:main"
tg-load-text = "trustgraph.cli.load_text:main"
tg-load-turtle = "trustgraph.cli.load_turtle:main"
tg-load-knowledge = "trustgraph.cli.load_knowledge:main"
+tg-load-structured-data = "trustgraph.cli.load_structured_data:main"
tg-put-flow-class = "trustgraph.cli.put_flow_class:main"
tg-put-kg-core = "trustgraph.cli.put_kg_core:main"
tg-remove-library-document = "trustgraph.cli.remove_library_document:main"
diff --git a/trustgraph-cli/trustgraph/cli/load_structured_data.py b/trustgraph-cli/trustgraph/cli/load_structured_data.py
new file mode 100644
index 00000000..3c88e346
--- /dev/null
+++ b/trustgraph-cli/trustgraph/cli/load_structured_data.py
@@ -0,0 +1,1051 @@
+"""
+Load structured data into TrustGraph using a descriptor configuration.
+
+This utility can:
+1. Analyze data samples to suggest appropriate schemas
+2. Generate descriptor configurations from data samples
+3. Parse and transform data using descriptor configurations
+4. Import processed data into TrustGraph
+
+The tool supports running all steps automatically or individual steps for
+validation and debugging. The descriptor language allows for complex
+transformations, validations, and mappings without requiring custom code.
+"""
+
+import argparse
+import os
+import sys
+import json
+import logging
+
+# Module logger
+logger = logging.getLogger(__name__)
+
+default_url = os.getenv("TRUSTGRAPH_URL", 'http://localhost:8088/')
+
+
+def load_structured_data(
+ api_url: str,
+ input_file: str,
+ descriptor_file: str = None,
+ suggest_schema: bool = False,
+ generate_descriptor: bool = False,
+ parse_only: bool = False,
+ output_file: str = None,
+ sample_size: int = 100,
+ sample_chars: int = 500,
+ schema_name: str = None,
+ flow: str = 'default',
+ dry_run: bool = False,
+ verbose: bool = False
+):
+ """
+ Load structured data using a descriptor configuration.
+
+ Args:
+ api_url: TrustGraph API URL
+ input_file: Path to input data file
+ descriptor_file: Path to JSON descriptor configuration
+ suggest_schema: Analyze data and suggest matching schemas
+ generate_descriptor: Generate descriptor from data sample
+ parse_only: Parse data but don't import to TrustGraph
+ output_file: Path to write output (descriptor/parsed data)
+ sample_size: Number of records to sample for analysis
+ sample_chars: Maximum characters to read for sampling
+ schema_name: Target schema name for generation
+ dry_run: If True, validate but don't import data
+ verbose: Enable verbose logging
+ """
+ if verbose:
+ logging.basicConfig(level=logging.DEBUG)
+ else:
+ logging.basicConfig(level=logging.INFO)
+
+ # Determine operation mode
+ if suggest_schema:
+ logger.info(f"Analyzing {input_file} to suggest schemas...")
+ logger.info(f"Sample size: {sample_size} records")
+ logger.info(f"Sample chars: {sample_chars} characters")
+
+ # Read sample data from input file
+ try:
+ with open(input_file, 'r', encoding='utf-8') as f:
+ # Read up to sample_chars characters
+ sample_data = f.read(sample_chars)
+ if len(sample_data) < sample_chars:
+ logger.info(f"Read entire file ({len(sample_data)} characters)")
+ else:
+ logger.info(f"Read sample ({sample_chars} characters)")
+ except Exception as e:
+ logger.error(f"Failed to read input file: {e}")
+ raise
+
+ # Fetch available schemas from Config API
+ try:
+ from trustgraph.api import Api
+ from trustgraph.api.types import ConfigKey
+
+ api = Api(api_url)
+ config_api = api.config()
+
+ # Get list of available schema keys
+ logger.info("Fetching available schemas from Config API...")
+ schema_keys = config_api.list("schema")
+ logger.info(f"Found {len(schema_keys)} schemas: {schema_keys}")
+
+ if not schema_keys:
+ logger.warning("No schemas found in configuration")
+ print("No schemas available in TrustGraph configuration")
+ return
+
+ # Fetch each schema definition
+ schemas = []
+ config_keys = [ConfigKey(type="schema", key=key) for key in schema_keys]
+ schema_values = config_api.get(config_keys)
+
+ for value in schema_values:
+ try:
+ # Schema values are JSON strings, parse them
+ schema_def = json.loads(value.value) if isinstance(value.value, str) else value.value
+ schemas.append(schema_def)
+ logger.debug(f"Loaded schema: {value.key}")
+ except json.JSONDecodeError as e:
+ logger.warning(f"Failed to parse schema {value.key}: {e}")
+ continue
+
+ logger.info(f"Successfully loaded {len(schemas)} schema definitions")
+
+ # Use TrustGraph prompt service for schema suggestion
+ flow_api = api.flow().id("default")
+
+ # Call schema-selection prompt with actual schemas and data sample
+ logger.info("Calling TrustGraph schema-selection prompt...")
+ response = flow_api.prompt(
+ id="schema-selection",
+ variables={
+ "schemas": schemas, # Array of actual schema definitions (note: plural 'schemas')
+ "data": sample_data
+ }
+ )
+
+ print("Schema Suggestion Results:")
+ print("=" * 50)
+ print(response)
+
+ except ImportError as e:
+ logger.error(f"Failed to import TrustGraph API: {e}")
+ raise
+ except Exception as e:
+ logger.error(f"Failed to call TrustGraph prompt service: {e}")
+ raise
+
+ elif generate_descriptor:
+ logger.info(f"Generating descriptor from {input_file}...")
+ logger.info(f"Sample size: {sample_size} records")
+ logger.info(f"Sample chars: {sample_chars} characters")
+ if schema_name:
+ logger.info(f"Target schema: {schema_name}")
+
+ # Read sample data from input file
+ try:
+ with open(input_file, 'r', encoding='utf-8') as f:
+ # Read up to sample_chars characters
+ sample_data = f.read(sample_chars)
+ if len(sample_data) < sample_chars:
+ logger.info(f"Read entire file ({len(sample_data)} characters)")
+ else:
+ logger.info(f"Read sample ({sample_chars} characters)")
+ except Exception as e:
+ logger.error(f"Failed to read input file: {e}")
+ raise
+
+ # Fetch available schemas from Config API (same as suggest-schema mode)
+ try:
+ from trustgraph.api import Api
+ from trustgraph.api.types import ConfigKey
+
+ api = Api(api_url)
+ config_api = api.config()
+
+ # Get list of available schema keys
+ logger.info("Fetching available schemas from Config API...")
+ schema_keys = config_api.list("schema")
+ logger.info(f"Found {len(schema_keys)} schemas: {schema_keys}")
+
+ if not schema_keys:
+ logger.warning("No schemas found in configuration")
+ print("No schemas available in TrustGraph configuration")
+ return
+
+ # Fetch each schema definition
+ schemas = []
+ config_keys = [ConfigKey(type="schema", key=key) for key in schema_keys]
+ schema_values = config_api.get(config_keys)
+
+ for value in schema_values:
+ try:
+ # Schema values are JSON strings, parse them
+ schema_def = json.loads(value.value) if isinstance(value.value, str) else value.value
+ schemas.append(schema_def)
+ logger.debug(f"Loaded schema: {value.key}")
+ except json.JSONDecodeError as e:
+ logger.warning(f"Failed to parse schema {value.key}: {e}")
+ continue
+
+ logger.info(f"Successfully loaded {len(schemas)} schema definitions")
+
+ # Use TrustGraph prompt service for descriptor generation
+ flow_api = api.flow().id("default")
+
+ # Call diagnose-structured-data prompt with schemas and data sample
+ logger.info("Calling TrustGraph diagnose-structured-data prompt...")
+ response = flow_api.prompt(
+ id="diagnose-structured-data",
+ variables={
+ "schemas": schemas, # Array of actual schema definitions
+ "sample": sample_data # Note: using 'sample' instead of 'data'
+ }
+ )
+
+ # Output the generated descriptor
+ if output_file:
+ try:
+ with open(output_file, 'w', encoding='utf-8') as f:
+ if isinstance(response, str):
+ f.write(response)
+ else:
+ f.write(json.dumps(response, indent=2))
+ print(f"Generated descriptor saved to: {output_file}")
+ logger.info(f"Descriptor saved to {output_file}")
+ except Exception as e:
+ logger.error(f"Failed to save descriptor to {output_file}: {e}")
+ print(f"Error saving descriptor: {e}")
+ else:
+ print("Generated Descriptor:")
+ print("=" * 50)
+ if isinstance(response, str):
+ print(response)
+ else:
+ print(json.dumps(response, indent=2))
+
+ except ImportError as e:
+ logger.error(f"Failed to import TrustGraph API: {e}")
+ raise
+ except Exception as e:
+ logger.error(f"Failed to call TrustGraph prompt service: {e}")
+ raise
+
+ elif parse_only:
+ if not descriptor_file:
+ raise ValueError("--descriptor is required when using --parse-only")
+ logger.info(f"Parsing {input_file} with descriptor {descriptor_file}...")
+
+ # Load descriptor configuration
+ try:
+ with open(descriptor_file, 'r', encoding='utf-8') as f:
+ descriptor = json.load(f)
+ logger.info(f"Loaded descriptor configuration from {descriptor_file}")
+ except Exception as e:
+ logger.error(f"Failed to load descriptor file: {e}")
+ raise
+
+ # Read input data based on format in descriptor
+ try:
+ format_info = descriptor.get('format', {})
+ format_type = format_info.get('type', 'csv').lower()
+ encoding = format_info.get('encoding', 'utf-8')
+
+ logger.info(f"Input format: {format_type}, encoding: {encoding}")
+
+ with open(input_file, 'r', encoding=encoding) as f:
+ raw_data = f.read()
+
+ logger.info(f"Read {len(raw_data)} characters from input file")
+
+ except Exception as e:
+ logger.error(f"Failed to read input file: {e}")
+ raise
+
+ # Parse data based on format type
+ parsed_records = []
+
+ if format_type == 'csv':
+ import csv
+ from io import StringIO
+
+ options = format_info.get('options', {})
+ delimiter = options.get('delimiter', ',')
+ has_header = options.get('has_header', True) or options.get('header', True)
+
+ logger.info(f"CSV options - delimiter: '{delimiter}', has_header: {has_header}")
+
+ try:
+ reader = csv.DictReader(StringIO(raw_data), delimiter=delimiter)
+ if not has_header:
+ # If no header, create field names from first row or use generic names
+ first_row = next(reader)
+ fieldnames = [f"field_{i+1}" for i in range(len(first_row))]
+ reader = csv.DictReader(StringIO(raw_data), fieldnames=fieldnames, delimiter=delimiter)
+
+ for row_num, row in enumerate(reader, start=1):
+ # Respect sample_size limit
+ if row_num > sample_size:
+ logger.info(f"Reached sample size limit of {sample_size} records")
+ break
+ parsed_records.append(row)
+
+ except Exception as e:
+ logger.error(f"Failed to parse CSV data: {e}")
+ raise
+
+ elif format_type == 'json':
+ try:
+ data = json.loads(raw_data)
+ if isinstance(data, list):
+ parsed_records = data[:sample_size] # Respect sample_size
+ elif isinstance(data, dict):
+ # Handle single object or extract array from root path
+ root_path = format_info.get('options', {}).get('root_path')
+ if root_path:
+ # Simple JSONPath-like extraction (basic implementation)
+ if root_path.startswith('$.'):
+ key = root_path[2:]
+ data = data.get(key, data)
+
+ if isinstance(data, list):
+ parsed_records = data[:sample_size]
+ else:
+ parsed_records = [data]
+
+ except Exception as e:
+ logger.error(f"Failed to parse JSON data: {e}")
+ raise
+
+ elif format_type == 'xml':
+ import xml.etree.ElementTree as ET
+
+ options = format_info.get('options', {})
+ record_path = options.get('record_path', '//record') # XPath to find record elements
+ field_attribute = options.get('field_attribute') # Attribute name for field names (e.g., "name")
+
+ # Legacy support for old options format
+ if 'root_element' in options or 'record_element' in options:
+ root_element = options.get('root_element')
+ record_element = options.get('record_element', 'record')
+ if root_element:
+ record_path = f"//{root_element}/{record_element}"
+ else:
+ record_path = f"//{record_element}"
+
+ logger.info(f"XML options - record_path: '{record_path}', field_attribute: '{field_attribute}'")
+
+ try:
+ root = ET.fromstring(raw_data)
+
+ # Find record elements using XPath
+ # ElementTree XPath support is limited, convert absolute paths to relative
+ xpath_expr = record_path
+ if xpath_expr.startswith('/ROOT/'):
+ # Remove /ROOT/ prefix since we're already at the root
+ xpath_expr = xpath_expr[6:]
+ elif xpath_expr.startswith('/'):
+ # Convert absolute path to relative by removing leading /
+ xpath_expr = '.' + xpath_expr
+
+ records = root.findall(xpath_expr)
+ logger.info(f"Found {len(records)} records using XPath: {record_path} (converted to: {xpath_expr})")
+
+ # Convert XML elements to dictionaries
+ record_count = 0
+ for element in records:
+ if record_count >= sample_size:
+ logger.info(f"Reached sample size limit of {sample_size} records")
+ break
+
+ record = {}
+
+ if field_attribute:
+ # Handle field elements with name attributes (UN data format)
+ # Albania
+ for child in element:
+ if child.tag == 'field' and field_attribute in child.attrib:
+ field_name = child.attrib[field_attribute]
+ field_value = child.text.strip() if child.text else ""
+ record[field_name] = field_value
+ else:
+ # Handle standard XML structure
+ # Convert element attributes to fields
+ record.update(element.attrib)
+
+ # Convert child elements to fields
+ for child in element:
+ if child.text:
+ record[child.tag] = child.text.strip()
+ else:
+ record[child.tag] = ""
+
+ # If no children or attributes, use element text as single field
+ if not record and element.text:
+ record['value'] = element.text.strip()
+
+ parsed_records.append(record)
+ record_count += 1
+
+ except ET.ParseError as e:
+ logger.error(f"Failed to parse XML data: {e}")
+ raise
+ except Exception as e:
+ logger.error(f"Failed to process XML data: {e}")
+ raise
+
+ else:
+ raise ValueError(f"Unsupported format type: {format_type}")
+
+ logger.info(f"Successfully parsed {len(parsed_records)} records")
+
+ # Apply basic transformations and validation (simplified version)
+ mappings = descriptor.get('mappings', [])
+ processed_records = []
+
+ for record_num, record in enumerate(parsed_records, start=1):
+ processed_record = {}
+
+ for mapping in mappings:
+ source_field = mapping.get('source_field') or mapping.get('source')
+ target_field = mapping.get('target_field') or mapping.get('target')
+
+ if source_field in record:
+ value = record[source_field]
+
+ # Apply basic transforms (simplified)
+ transforms = mapping.get('transforms', [])
+ for transform in transforms:
+ transform_type = transform.get('type')
+
+ if transform_type == 'trim' and isinstance(value, str):
+ value = value.strip()
+ elif transform_type == 'upper' and isinstance(value, str):
+ value = value.upper()
+ elif transform_type == 'lower' and isinstance(value, str):
+ value = value.lower()
+ elif transform_type == 'title_case' and isinstance(value, str):
+ value = value.title()
+ elif transform_type == 'to_int':
+ try:
+ value = int(value) if value != '' else None
+ except (ValueError, TypeError):
+ logger.warning(f"Failed to convert '{value}' to int in record {record_num}")
+ elif transform_type == 'to_float':
+ try:
+ value = float(value) if value != '' else None
+ except (ValueError, TypeError):
+ logger.warning(f"Failed to convert '{value}' to float in record {record_num}")
+
+ # Convert all values to strings as required by ExtractedObject schema
+ processed_record[target_field] = str(value) if value is not None else ""
+ else:
+ logger.warning(f"Source field '{source_field}' not found in record {record_num}")
+
+ processed_records.append(processed_record)
+
+ # Format output for TrustGraph ExtractedObject structure
+ output_records = []
+ schema_name = descriptor.get('output', {}).get('schema_name', 'default')
+ confidence = descriptor.get('output', {}).get('options', {}).get('confidence', 0.9)
+
+ for record in processed_records:
+ output_record = {
+ "metadata": {
+ "id": f"parsed-{len(output_records)+1}",
+ "metadata": [], # Empty metadata triples
+ "user": "trustgraph",
+ "collection": "default"
+ },
+ "schema_name": schema_name,
+ "values": record,
+ "confidence": confidence,
+ "source_span": ""
+ }
+ output_records.append(output_record)
+
+ # Output results
+ if output_file:
+ try:
+ with open(output_file, 'w', encoding='utf-8') as f:
+ json.dump(output_records, f, indent=2)
+ print(f"Parsed data saved to: {output_file}")
+ logger.info(f"Parsed {len(output_records)} records saved to {output_file}")
+ except Exception as e:
+ logger.error(f"Failed to save parsed data to {output_file}: {e}")
+ print(f"Error saving parsed data: {e}")
+ else:
+ print("Parsed Data Preview:")
+ print("=" * 50)
+ # Show first few records for preview
+ preview_count = min(3, len(output_records))
+ for i in range(preview_count):
+ print(f"Record {i+1}:")
+ print(json.dumps(output_records[i], indent=2))
+ print()
+
+ if len(output_records) > preview_count:
+ print(f"... and {len(output_records) - preview_count} more records")
+ print(f"Total records processed: {len(output_records)}")
+
+ print(f"\nParsing Summary:")
+ print(f"- Input format: {format_type}")
+ print(f"- Records processed: {len(output_records)}")
+ print(f"- Target schema: {schema_name}")
+ print(f"- Field mappings: {len(mappings)}")
+
+ else:
+ # Full pipeline: parse and import
+ if not descriptor_file:
+ # Auto-generate descriptor if not provided
+ logger.info("No descriptor provided, auto-generating...")
+ logger.info(f"Schema name: {schema_name}")
+
+ # Read sample data for descriptor generation
+ try:
+ with open(input_file, 'r', encoding='utf-8') as f:
+ sample_data = f.read(sample_chars)
+ logger.info(f"Read {len(sample_data)} characters for descriptor generation")
+ except Exception as e:
+ logger.error(f"Failed to read input file for descriptor generation: {e}")
+ raise
+
+ # Generate descriptor using TrustGraph prompt service
+ try:
+ from trustgraph.api import Api
+ from trustgraph.api.types import ConfigKey
+
+ api = Api(api_url)
+ config_api = api.config()
+
+ # Get available schemas
+ logger.info("Fetching available schemas for descriptor generation...")
+ schema_keys = config_api.list("schema")
+ logger.info(f"Found {len(schema_keys)} schemas: {schema_keys}")
+
+ if not schema_keys:
+ logger.warning("No schemas found in configuration")
+ print("No schemas available in TrustGraph configuration")
+ return
+
+ # Fetch each schema definition
+ schemas = []
+ config_keys = [ConfigKey(type="schema", key=key) for key in schema_keys]
+ schema_values = config_api.get(config_keys)
+
+ for value in schema_values:
+ try:
+ schema_def = json.loads(value.value) if isinstance(value.value, str) else value.value
+ schemas.append(schema_def)
+ logger.debug(f"Loaded schema: {value.key}")
+ except json.JSONDecodeError as e:
+ logger.warning(f"Failed to parse schema {value.key}: {e}")
+ continue
+
+ logger.info(f"Successfully loaded {len(schemas)} schema definitions")
+
+ # Generate descriptor using diagnose-structured-data prompt
+ flow_api = api.flow().id(flow)
+
+ logger.info("Calling TrustGraph diagnose-structured-data prompt for descriptor generation...")
+ response = flow_api.prompt(
+ id="diagnose-structured-data",
+ variables={
+ "schemas": schemas,
+ "sample": sample_data
+ }
+ )
+
+ # Parse the generated descriptor
+ if isinstance(response, str):
+ try:
+ descriptor = json.loads(response)
+ except json.JSONDecodeError:
+ logger.error("Generated descriptor is not valid JSON")
+ raise ValueError("Failed to generate valid descriptor")
+ else:
+ descriptor = response
+
+ # Override schema_name if provided
+ if schema_name:
+ descriptor.setdefault('output', {})['schema_name'] = schema_name
+
+ logger.info("Successfully generated descriptor from data sample")
+
+ except ImportError as e:
+ logger.error(f"Failed to import TrustGraph API: {e}")
+ raise
+ except Exception as e:
+ logger.error(f"Failed to generate descriptor: {e}")
+ raise
+ else:
+ # Load existing descriptor
+ try:
+ with open(descriptor_file, 'r', encoding='utf-8') as f:
+ descriptor = json.load(f)
+ logger.info(f"Loaded descriptor configuration from {descriptor_file}")
+ except Exception as e:
+ logger.error(f"Failed to load descriptor file: {e}")
+ raise
+
+ logger.info(f"Processing {input_file} for import...")
+
+ # Parse data using the same logic as parse-only mode, but with full dataset
+ try:
+ format_info = descriptor.get('format', {})
+ format_type = format_info.get('type', 'csv').lower()
+ encoding = format_info.get('encoding', 'utf-8')
+
+ logger.info(f"Input format: {format_type}, encoding: {encoding}")
+
+ with open(input_file, 'r', encoding=encoding) as f:
+ raw_data = f.read()
+
+ logger.info(f"Read {len(raw_data)} characters from input file")
+
+ except Exception as e:
+ logger.error(f"Failed to read input file: {e}")
+ raise
+
+ # Parse data (reuse parse-only logic but process all records)
+ parsed_records = []
+ batch_size = descriptor.get('output', {}).get('options', {}).get('batch_size', 1000)
+
+ if format_type == 'csv':
+ import csv
+ from io import StringIO
+
+ options = format_info.get('options', {})
+ delimiter = options.get('delimiter', ',')
+ has_header = options.get('has_header', True) or options.get('header', True)
+
+ logger.info(f"CSV options - delimiter: '{delimiter}', has_header: {has_header}")
+
+ try:
+ reader = csv.DictReader(StringIO(raw_data), delimiter=delimiter)
+ if not has_header:
+ first_row = next(reader)
+ fieldnames = [f"field_{i+1}" for i in range(len(first_row))]
+ reader = csv.DictReader(StringIO(raw_data), fieldnames=fieldnames, delimiter=delimiter)
+
+ record_count = 0
+ for row in reader:
+ parsed_records.append(row)
+ record_count += 1
+
+ # Process in batches to avoid memory issues
+ if record_count % batch_size == 0:
+ logger.info(f"Parsed {record_count} records...")
+
+ except Exception as e:
+ logger.error(f"Failed to parse CSV data: {e}")
+ raise
+
+ elif format_type == 'json':
+ try:
+ data = json.loads(raw_data)
+ if isinstance(data, list):
+ parsed_records = data
+ elif isinstance(data, dict):
+ root_path = format_info.get('options', {}).get('root_path')
+ if root_path:
+ if root_path.startswith('$.'):
+ key = root_path[2:]
+ data = data.get(key, data)
+
+ if isinstance(data, list):
+ parsed_records = data
+ else:
+ parsed_records = [data]
+
+ except Exception as e:
+ logger.error(f"Failed to parse JSON data: {e}")
+ raise
+
+ elif format_type == 'xml':
+ import xml.etree.ElementTree as ET
+
+ options = format_info.get('options', {})
+ record_path = options.get('record_path', '//record')
+ field_attribute = options.get('field_attribute')
+
+ # Legacy support for old options format
+ if 'root_element' in options or 'record_element' in options:
+ root_element = options.get('root_element')
+ record_element = options.get('record_element', 'record')
+ if root_element:
+ record_path = f"//{root_element}/{record_element}"
+ else:
+ record_path = f"//{record_element}"
+
+ logger.info(f"XML options - record_path: '{record_path}', field_attribute: '{field_attribute}'")
+
+ try:
+ root = ET.fromstring(raw_data)
+
+ # Find record elements using XPath
+ xpath_expr = record_path
+ if xpath_expr.startswith('/ROOT/'):
+ xpath_expr = xpath_expr[6:]
+ elif xpath_expr.startswith('/'):
+ xpath_expr = '.' + xpath_expr
+
+ records = root.findall(xpath_expr)
+ logger.info(f"Found {len(records)} records using XPath: {record_path} (converted to: {xpath_expr})")
+
+ # Convert XML elements to dictionaries
+ for element in records:
+ record = {}
+
+ if field_attribute:
+ # Handle field elements with name attributes (UN data format)
+ for child in element:
+ if child.tag == 'field' and field_attribute in child.attrib:
+ field_name = child.attrib[field_attribute]
+ field_value = child.text.strip() if child.text else ""
+ record[field_name] = field_value
+ else:
+ # Handle standard XML structure
+ record.update(element.attrib)
+
+ for child in element:
+ if child.text:
+ record[child.tag] = child.text.strip()
+ else:
+ record[child.tag] = ""
+
+ if not record and element.text:
+ record['value'] = element.text.strip()
+
+ parsed_records.append(record)
+
+ except ET.ParseError as e:
+ logger.error(f"Failed to parse XML data: {e}")
+ raise
+ except Exception as e:
+ logger.error(f"Failed to process XML data: {e}")
+ raise
+
+ else:
+ raise ValueError(f"Unsupported format type: {format_type}")
+
+ logger.info(f"Successfully parsed {len(parsed_records)} records")
+
+ # Apply transformations and create TrustGraph objects
+ mappings = descriptor.get('mappings', [])
+ processed_records = []
+ schema_name = descriptor.get('output', {}).get('schema_name', 'default')
+ confidence = descriptor.get('output', {}).get('options', {}).get('confidence', 0.9)
+
+ logger.info(f"Applying {len(mappings)} field mappings...")
+
+ for record_num, record in enumerate(parsed_records, start=1):
+ processed_record = {}
+
+ for mapping in mappings:
+ source_field = mapping.get('source_field') or mapping.get('source')
+ target_field = mapping.get('target_field') or mapping.get('target')
+
+ if source_field in record:
+ value = record[source_field]
+
+ # Apply transforms
+ transforms = mapping.get('transforms', [])
+ for transform in transforms:
+ transform_type = transform.get('type')
+
+ if transform_type == 'trim' and isinstance(value, str):
+ value = value.strip()
+ elif transform_type == 'upper' and isinstance(value, str):
+ value = value.upper()
+ elif transform_type == 'lower' and isinstance(value, str):
+ value = value.lower()
+ elif transform_type == 'title_case' and isinstance(value, str):
+ value = value.title()
+ elif transform_type == 'to_int':
+ try:
+ value = int(value) if value != '' else None
+ except (ValueError, TypeError):
+ logger.warning(f"Failed to convert '{value}' to int in record {record_num}")
+ elif transform_type == 'to_float':
+ try:
+ value = float(value) if value != '' else None
+ except (ValueError, TypeError):
+ logger.warning(f"Failed to convert '{value}' to float in record {record_num}")
+
+ # Convert all values to strings as required by ExtractedObject schema
+ processed_record[target_field] = str(value) if value is not None else ""
+ else:
+ logger.warning(f"Source field '{source_field}' not found in record {record_num}")
+
+ # Create TrustGraph ExtractedObject
+ output_record = {
+ "metadata": {
+ "id": f"import-{record_num}",
+ "metadata": [],
+ "user": "trustgraph",
+ "collection": "default"
+ },
+ "schema_name": schema_name,
+ "values": processed_record,
+ "confidence": confidence,
+ "source_span": ""
+ }
+ processed_records.append(output_record)
+
+ logger.info(f"Processed {len(processed_records)} records with transformations")
+
+ if dry_run:
+ print(f"Dry run mode - would import {len(processed_records)} records to TrustGraph")
+ print(f"Target schema: {schema_name}")
+ print(f"Sample record:")
+ if processed_records:
+ print(json.dumps(processed_records[0], indent=2))
+ return
+
+ # Import to TrustGraph using objects import endpoint via WebSocket
+ logger.info(f"Importing {len(processed_records)} records to TrustGraph...")
+
+ try:
+ import asyncio
+ from websockets.asyncio.client import connect
+
+ # Construct objects import URL similar to load_knowledge pattern
+ if not api_url.endswith("/"):
+ api_url += "/"
+
+ # Convert HTTP URL to WebSocket URL if needed
+ ws_url = api_url.replace("http://", "ws://").replace("https://", "wss://")
+ objects_url = ws_url + f"api/v1/flow/{flow}/import/objects"
+
+ logger.info(f"Connecting to objects import endpoint: {objects_url}")
+
+ async def import_objects():
+ async with connect(objects_url) as ws:
+ imported_count = 0
+
+ for record in processed_records:
+ # Send individual ExtractedObject records
+ await ws.send(json.dumps(record))
+ imported_count += 1
+
+ if imported_count % 100 == 0:
+ logger.info(f"Imported {imported_count}/{len(processed_records)} records...")
+
+ logger.info(f"Successfully imported {imported_count} records to TrustGraph")
+ return imported_count
+
+ # Run the async import
+ imported_count = asyncio.run(import_objects())
+ print(f"Import completed: {imported_count} records imported to schema '{schema_name}'")
+
+ except ImportError as e:
+ logger.error(f"Failed to import required modules: {e}")
+ print(f"Error: Required modules not available - {e}")
+ raise
+ except Exception as e:
+ logger.error(f"Failed to import data to TrustGraph: {e}")
+ print(f"Import failed: {e}")
+ raise
+
+
+def main():
+ """Main entry point for the CLI."""
+
+ parser = argparse.ArgumentParser(
+ prog='tg-load-structured-data',
+ description=__doc__,
+ formatter_class=argparse.RawDescriptionHelpFormatter,
+ epilog="""
+Examples:
+ # Step 1: Analyze data and suggest matching schemas
+ %(prog)s --input customers.csv --suggest-schema
+ %(prog)s --input products.xml --suggest-schema --sample-chars 1000
+
+ # Step 2: Generate descriptor configuration from data sample
+ %(prog)s --input customers.csv --generate-descriptor --schema-name customer --output descriptor.json
+ %(prog)s --input products.xml --generate-descriptor --schema-name product --output xml_descriptor.json
+
+ # Generate descriptor with custom sampling (more data for better analysis)
+ %(prog)s --input large_dataset.csv --generate-descriptor --schema-name product --sample-chars 100000 --sample-size 500
+
+ # Step 3: Parse data and review output without importing (supports CSV, JSON, XML)
+ %(prog)s --input customers.csv --descriptor descriptor.json --parse-only --output parsed.json
+ %(prog)s --input products.xml --descriptor xml_descriptor.json --parse-only
+
+ # Step 4: Import data to TrustGraph using descriptor
+ %(prog)s --input customers.csv --descriptor descriptor.json
+ %(prog)s --input products.xml --descriptor xml_descriptor.json
+
+ # All-in-one: Auto-generate descriptor and import (for simple cases)
+ %(prog)s --input customers.csv --schema-name customer
+
+ # Dry run to validate without importing
+ %(prog)s --input customers.csv --descriptor descriptor.json --dry-run
+
+Use Cases:
+ --suggest-schema : Diagnose which TrustGraph schemas might match your data
+ (uses --sample-chars to limit data sent for analysis)
+ --generate-descriptor: Create/review the structured data language configuration
+ (uses --sample-chars to limit data sent for analysis)
+ --parse-only : Validate that parsed data looks correct before import
+ (uses --sample-size to limit records processed, ignores --sample-chars)
+ (no mode flags) : Full pipeline - parse and import to TrustGraph
+
+For more information on the descriptor format, see:
+ docs/tech-specs/structured-data-descriptor.md
+ """.strip()
+ )
+
+ parser.add_argument(
+ '-u', '--api-url',
+ default=default_url,
+ help=f'TrustGraph API URL (default: {default_url})'
+ )
+
+ parser.add_argument(
+ '-f', '--flow',
+ default='default',
+ help='TrustGraph flow name to use for import (default: default)'
+ )
+
+ parser.add_argument(
+ '-i', '--input',
+ required=True,
+ help='Path to input data file to process'
+ )
+
+ parser.add_argument(
+ '-d', '--descriptor',
+ help='Path to JSON descriptor configuration file (required for full import and parse-only)'
+ )
+
+ # Operation modes (mutually exclusive)
+ mode_group = parser.add_mutually_exclusive_group()
+ mode_group.add_argument(
+ '--suggest-schema',
+ action='store_true',
+ help='Analyze data sample and suggest matching TrustGraph schemas'
+ )
+ mode_group.add_argument(
+ '--generate-descriptor',
+ action='store_true',
+ help='Generate descriptor configuration from data sample'
+ )
+ mode_group.add_argument(
+ '--parse-only',
+ action='store_true',
+ help='Parse data using descriptor but don\'t import to TrustGraph'
+ )
+
+ parser.add_argument(
+ '-o', '--output',
+ help='Output file path (for generated descriptors or parsed data)'
+ )
+
+ parser.add_argument(
+ '--sample-size',
+ type=int,
+ default=100,
+ help='Number of records to process (parse-only mode) or sample for analysis (default: 100)'
+ )
+
+ parser.add_argument(
+ '--sample-chars',
+ type=int,
+ default=500,
+ help='Maximum characters to read for sampling (suggest-schema/generate-descriptor modes only, default: 500)'
+ )
+
+ parser.add_argument(
+ '--schema-name',
+ help='Target schema name for descriptor generation'
+ )
+
+ parser.add_argument(
+ '--dry-run',
+ action='store_true',
+ help='Validate configuration and data without importing (full pipeline only)'
+ )
+
+ parser.add_argument(
+ '-v', '--verbose',
+ action='store_true',
+ help='Enable verbose output for debugging'
+ )
+
+ parser.add_argument(
+ '--batch-size',
+ type=int,
+ default=1000,
+ help='Number of records to process in each batch (default: 1000)'
+ )
+
+ parser.add_argument(
+ '--max-errors',
+ type=int,
+ default=100,
+ help='Maximum number of errors before stopping (default: 100)'
+ )
+
+ parser.add_argument(
+ '--error-file',
+ help='Path to write error records (optional)'
+ )
+
+ args = parser.parse_args()
+
+ # Validate argument combinations
+ if args.parse_only and not args.descriptor:
+ print("Error: --descriptor is required when using --parse-only", file=sys.stderr)
+ sys.exit(1)
+
+ # Warn about irrelevant parameters
+ if args.parse_only and args.sample_chars != 500: # 500 is the default
+ print("Warning: --sample-chars is ignored in --parse-only mode (entire file is processed)", file=sys.stderr)
+
+ if (args.suggest_schema or args.generate_descriptor) and args.sample_size != 100: # 100 is default
+ print("Warning: --sample-size is ignored in analysis modes, use --sample-chars instead", file=sys.stderr)
+
+ if not any([args.suggest_schema, args.generate_descriptor, args.parse_only]) and not args.descriptor:
+ # Full pipeline mode without descriptor - schema_name should be provided
+ if not args.schema_name:
+ print("Error: --descriptor or --schema-name is required for full import", file=sys.stderr)
+ sys.exit(1)
+
+ try:
+ load_structured_data(
+ api_url=args.api_url,
+ input_file=args.input,
+ descriptor_file=args.descriptor,
+ suggest_schema=args.suggest_schema,
+ generate_descriptor=args.generate_descriptor,
+ parse_only=args.parse_only,
+ output_file=args.output,
+ sample_size=args.sample_size,
+ sample_chars=args.sample_chars,
+ schema_name=args.schema_name,
+ flow=args.flow,
+ dry_run=args.dry_run,
+ verbose=args.verbose
+ )
+ except FileNotFoundError as e:
+ print(f"Error: File not found - {e}", file=sys.stderr)
+ sys.exit(1)
+ except json.JSONDecodeError as e:
+ print(f"Error: Invalid JSON in descriptor - {e}", file=sys.stderr)
+ sys.exit(1)
+ except Exception as e:
+ print(f"Error: {e}", file=sys.stderr)
+ if args.verbose:
+ import traceback
+ traceback.print_exc()
+ sys.exit(1)
+
+
+if __name__ == "__main__":
+ main()
\ No newline at end of file