Structured data, minor features (#500)

- Sorted out confusing --auto mode with tg-load-structured-data
- Fixed tests & added CLI tests
This commit is contained in:
cybermaggedon 2025-09-05 17:25:12 +01:00 committed by GitHub
parent 0b7620bc04
commit 5537fac731
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 3318 additions and 360 deletions

View file

@ -31,6 +31,7 @@ def load_structured_data(
suggest_schema: bool = False,
generate_descriptor: bool = False,
parse_only: bool = False,
auto: bool = False,
output_file: str = None,
sample_size: int = 100,
sample_chars: int = 500,
@ -49,6 +50,7 @@ def load_structured_data(
suggest_schema: Analyze data and suggest matching schemas
generate_descriptor: Generate descriptor from data sample
parse_only: Parse data but don't import to TrustGraph
auto: Run full automatic pipeline (suggest schema + generate descriptor + import)
output_file: Path to write output (descriptor/parsed data)
sample_size: Number of records to sample for analysis
sample_chars: Maximum characters to read for sampling
@ -62,7 +64,90 @@ def load_structured_data(
logging.basicConfig(level=logging.INFO)
# Determine operation mode
if suggest_schema:
if auto:
logger.info(f"🚀 Starting automatic pipeline for {input_file}...")
logger.info("Step 1: Analyzing data to discover best matching schema...")
# Step 1: Auto-discover schema (reuse suggest_schema logic)
discovered_schema = _auto_discover_schema(api_url, input_file, sample_chars, logger)
if not discovered_schema:
logger.error("Failed to discover suitable schema automatically")
print("❌ Could not automatically determine the best schema for your data.")
print("💡 Try running with --suggest-schema first to see available options.")
return None
logger.info(f"✅ Discovered schema: {discovered_schema}")
print(f"🎯 Auto-selected schema: {discovered_schema}")
# Step 2: Auto-generate descriptor
logger.info("Step 2: Generating descriptor configuration...")
auto_descriptor = _auto_generate_descriptor(api_url, input_file, discovered_schema, sample_chars, logger)
if not auto_descriptor:
logger.error("Failed to generate descriptor automatically")
print("❌ Could not automatically generate descriptor configuration.")
return None
logger.info("✅ Generated descriptor configuration")
print("📝 Generated descriptor configuration")
# Step 3: Parse and preview data
logger.info("Step 3: Parsing and validating data...")
preview_records = _auto_parse_preview(input_file, auto_descriptor, min(sample_size, 5), logger)
if preview_records is None:
logger.error("Failed to parse data with generated descriptor")
print("❌ Could not parse data with generated descriptor.")
return None
# Show preview
print("📊 Data Preview (first few records):")
print("=" * 50)
for i, record in enumerate(preview_records[:3], 1):
print(f"Record {i}: {record}")
print("=" * 50)
# Step 4: Import (unless dry_run)
if dry_run:
logger.info("✅ Dry run complete - data is ready for import")
print("✅ Dry run successful! Data is ready for import.")
print(f"💡 Run without --dry-run to import {len(preview_records)} records to TrustGraph.")
return None
else:
logger.info("Step 4: Importing data to TrustGraph...")
print("🚀 Importing data to TrustGraph...")
# Recursively call ourselves with the auto-generated descriptor
# This reuses all the existing import logic
import tempfile
import os
# Save auto-generated descriptor to temp file
temp_descriptor = tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False)
json.dump(auto_descriptor, temp_descriptor, indent=2)
temp_descriptor.close()
try:
# Call the full pipeline mode with our auto-generated descriptor
result = load_structured_data(
api_url=api_url,
input_file=input_file,
descriptor_file=temp_descriptor.name,
flow=flow,
dry_run=False, # We already handled dry_run above
verbose=verbose
)
print("✅ Auto-import completed successfully!")
logger.info("Auto-import pipeline completed successfully")
return result
finally:
# Clean up temp descriptor file
try:
os.unlink(temp_descriptor.name)
except:
pass
elif suggest_schema:
logger.info(f"Analyzing {input_file} to suggest schemas...")
logger.info(f"Sample size: {sample_size} records")
logger.info(f"Sample chars: {sample_chars} characters")
@ -497,123 +582,144 @@ def load_structured_data(
print(f"- Records processed: {len(output_records)}")
print(f"- Target schema: {schema_name}")
print(f"- Field mappings: {len(mappings)}")
# Helper functions for auto mode
def _auto_discover_schema(api_url, input_file, sample_chars, logger):
"""Auto-discover the best matching schema for the input data"""
try:
# Read sample data
with open(input_file, 'r', encoding='utf-8') as f:
sample_data = f.read(sample_chars)
# Import API modules
from trustgraph.api import Api
api = Api(api_url)
config_api = api.config()
# Get available schemas
schema_keys = config_api.list("schema")
if not schema_keys:
logger.error("No schemas available in TrustGraph configuration")
return None
else:
# Full pipeline: parse and import
if not descriptor_file:
# Auto-generate descriptor if not provided
logger.info("No descriptor provided, auto-generating...")
logger.info(f"Schema name: {schema_name}")
# Read sample data for descriptor generation
# Get schema definitions
schemas = {}
for key in schema_keys:
try:
with open(input_file, 'r', encoding='utf-8') as f:
sample_data = f.read(sample_chars)
logger.info(f"Read {len(sample_data)} characters for descriptor generation")
schema_def = config_api.get("schema", key)
schemas[key] = schema_def
except Exception as e:
logger.error(f"Failed to read input file for descriptor generation: {e}")
raise
logger.warning(f"Could not load schema {key}: {e}")
if not schemas:
logger.error("No valid schemas could be loaded")
return None
# Generate descriptor using TrustGraph prompt service
# Use prompt service for schema selection
flow_api = api.flow().id("default")
prompt_client = flow_api.prompt()
prompt = f"""Analyze this data sample and determine the best matching schema:
DATA SAMPLE:
{sample_data[:1000]}
AVAILABLE SCHEMAS:
{json.dumps(schemas, indent=2)}
Return ONLY the schema name (key) that best matches this data. Consider:
1. Field names and types in the data
2. Data structure and format
3. Domain and use case alignment
Schema name:"""
response = prompt_client.schema_selection(
schemas=schemas,
sample=sample_data[:1000]
)
# Extract schema name from response
if isinstance(response, dict) and 'schema' in response:
return response['schema']
elif isinstance(response, str):
# Try to extract schema name from text response
response_lower = response.lower().strip()
for schema_key in schema_keys:
if schema_key.lower() in response_lower:
return schema_key
# If no exact match, try first mentioned schema
words = response.split()
for word in words:
clean_word = word.strip('.,!?":').lower()
if clean_word in [s.lower() for s in schema_keys]:
matching_schema = next(s for s in schema_keys if s.lower() == clean_word)
return matching_schema
logger.warning(f"Could not parse schema selection from response: {response}")
# Fallback: return first available schema
logger.info(f"Using fallback: first available schema '{schema_keys[0]}'")
return schema_keys[0]
except Exception as e:
logger.error(f"Schema discovery failed: {e}")
return None
def _auto_generate_descriptor(api_url, input_file, schema_name, sample_chars, logger):
"""Auto-generate descriptor configuration for the discovered schema"""
try:
# Read sample data
with open(input_file, 'r', encoding='utf-8') as f:
sample_data = f.read(sample_chars)
# Import API modules
from trustgraph.api import Api
api = Api(api_url)
config_api = api.config()
# Get schema definition
schema_def = config_api.get("schema", schema_name)
# Use prompt service for descriptor generation
flow_api = api.flow().id("default")
prompt_client = flow_api.prompt()
response = prompt_client.diagnose_structured_data(
sample=sample_data,
schema_name=schema_name,
schema=schema_def
)
if isinstance(response, str):
try:
from trustgraph.api import Api
from trustgraph.api.types import ConfigKey
api = Api(api_url)
config_api = api.config()
# Get available schemas
logger.info("Fetching available schemas for descriptor generation...")
schema_keys = config_api.list("schema")
logger.info(f"Found {len(schema_keys)} schemas: {schema_keys}")
if not schema_keys:
logger.warning("No schemas found in configuration")
print("No schemas available in TrustGraph configuration")
return
# Fetch each schema definition
schemas = []
config_keys = [ConfigKey(type="schema", key=key) for key in schema_keys]
schema_values = config_api.get(config_keys)
for value in schema_values:
try:
schema_def = json.loads(value.value) if isinstance(value.value, str) else value.value
schemas.append(schema_def)
logger.debug(f"Loaded schema: {value.key}")
except json.JSONDecodeError as e:
logger.warning(f"Failed to parse schema {value.key}: {e}")
continue
logger.info(f"Successfully loaded {len(schemas)} schema definitions")
# Generate descriptor using diagnose-structured-data prompt
flow_api = api.flow().id(flow)
logger.info("Calling TrustGraph diagnose-structured-data prompt for descriptor generation...")
response = flow_api.prompt(
id="diagnose-structured-data",
variables={
"schemas": schemas,
"sample": sample_data
}
)
# Parse the generated descriptor
if isinstance(response, str):
try:
descriptor = json.loads(response)
except json.JSONDecodeError:
logger.error("Generated descriptor is not valid JSON")
raise ValueError("Failed to generate valid descriptor")
else:
descriptor = response
# Override schema_name if provided
if schema_name:
descriptor.setdefault('output', {})['schema_name'] = schema_name
logger.info("Successfully generated descriptor from data sample")
except ImportError as e:
logger.error(f"Failed to import TrustGraph API: {e}")
raise
except Exception as e:
logger.error(f"Failed to generate descriptor: {e}")
raise
return json.loads(response)
except json.JSONDecodeError:
logger.error("Generated descriptor is not valid JSON")
return None
else:
# Load existing descriptor
try:
with open(descriptor_file, 'r', encoding='utf-8') as f:
descriptor = json.load(f)
logger.info(f"Loaded descriptor configuration from {descriptor_file}")
except Exception as e:
logger.error(f"Failed to load descriptor file: {e}")
raise
return response
except Exception as e:
logger.error(f"Descriptor generation failed: {e}")
return None
def _auto_parse_preview(input_file, descriptor, max_records, logger):
"""Parse and preview data using the auto-generated descriptor"""
try:
# Simplified parsing logic for preview (reuse existing logic)
format_info = descriptor.get('format', {})
format_type = format_info.get('type', 'csv').lower()
encoding = format_info.get('encoding', 'utf-8')
logger.info(f"Processing {input_file} for import...")
with open(input_file, 'r', encoding=encoding) as f:
raw_data = f.read()
# Parse data using the same logic as parse-only mode, but with full dataset
try:
format_info = descriptor.get('format', {})
format_type = format_info.get('type', 'csv').lower()
encoding = format_info.get('encoding', 'utf-8')
logger.info(f"Input format: {format_type}, encoding: {encoding}")
with open(input_file, 'r', encoding=encoding) as f:
raw_data = f.read()
logger.info(f"Read {len(raw_data)} characters from input file")
except Exception as e:
logger.error(f"Failed to read input file: {e}")
raise
# Parse data (reuse parse-only logic but process all records)
parsed_records = []
batch_size = descriptor.get('output', {}).get('options', {}).get('batch_size', 1000)
if format_type == 'csv':
import csv
@ -623,261 +729,50 @@ def load_structured_data(
delimiter = options.get('delimiter', ',')
has_header = options.get('has_header', True) or options.get('header', True)
logger.info(f"CSV options - delimiter: '{delimiter}', has_header: {has_header}")
reader = csv.DictReader(StringIO(raw_data), delimiter=delimiter)
if not has_header:
first_row = next(reader)
fieldnames = [f"field_{i+1}" for i in range(len(first_row))]
reader = csv.DictReader(StringIO(raw_data), fieldnames=fieldnames, delimiter=delimiter)
try:
reader = csv.DictReader(StringIO(raw_data), delimiter=delimiter)
if not has_header:
first_row = next(reader)
fieldnames = [f"field_{i+1}" for i in range(len(first_row))]
reader = csv.DictReader(StringIO(raw_data), fieldnames=fieldnames, delimiter=delimiter)
record_count = 0
for row in reader:
parsed_records.append(row)
record_count += 1
# Process in batches to avoid memory issues
if record_count % batch_size == 0:
logger.info(f"Parsed {record_count} records...")
except Exception as e:
logger.error(f"Failed to parse CSV data: {e}")
raise
count = 0
for row in reader:
if count >= max_records:
break
parsed_records.append(dict(row))
count += 1
elif format_type == 'json':
try:
data = json.loads(raw_data)
if isinstance(data, list):
parsed_records = data
elif isinstance(data, dict):
root_path = format_info.get('options', {}).get('root_path')
if root_path:
if root_path.startswith('$.'):
key = root_path[2:]
data = data.get(key, data)
if isinstance(data, list):
parsed_records = data
else:
parsed_records = [data]
except Exception as e:
logger.error(f"Failed to parse JSON data: {e}")
raise
elif format_type == 'xml':
import xml.etree.ElementTree as ET
import json
data = json.loads(raw_data)
options = format_info.get('options', {})
record_path = options.get('record_path', '//record')
field_attribute = options.get('field_attribute')
# Legacy support for old options format
if 'root_element' in options or 'record_element' in options:
root_element = options.get('root_element')
record_element = options.get('record_element', 'record')
if root_element:
record_path = f"//{root_element}/{record_element}"
else:
record_path = f"//{record_element}"
logger.info(f"XML options - record_path: '{record_path}', field_attribute: '{field_attribute}'")
try:
root = ET.fromstring(raw_data)
if isinstance(data, list):
parsed_records = data[:max_records]
else:
parsed_records = [data]
# Find record elements using XPath
xpath_expr = record_path
if xpath_expr.startswith('/ROOT/'):
xpath_expr = xpath_expr[6:]
elif xpath_expr.startswith('/'):
xpath_expr = '.' + xpath_expr
records = root.findall(xpath_expr)
logger.info(f"Found {len(records)} records using XPath: {record_path} (converted to: {xpath_expr})")
# Convert XML elements to dictionaries
for element in records:
record = {}
if field_attribute:
# Handle field elements with name attributes (UN data format)
for child in element:
if child.tag == 'field' and field_attribute in child.attrib:
field_name = child.attrib[field_attribute]
field_value = child.text.strip() if child.text else ""
record[field_name] = field_value
else:
# Handle standard XML structure
record.update(element.attrib)
for child in element:
if child.text:
record[child.tag] = child.text.strip()
else:
record[child.tag] = ""
if not record and element.text:
record['value'] = element.text.strip()
parsed_records.append(record)
except ET.ParseError as e:
logger.error(f"Failed to parse XML data: {e}")
raise
except Exception as e:
logger.error(f"Failed to process XML data: {e}")
raise
else:
raise ValueError(f"Unsupported format type: {format_type}")
logger.info(f"Successfully parsed {len(parsed_records)} records")
# Apply transformations and create TrustGraph objects
# Apply basic field mappings for preview
mappings = descriptor.get('mappings', [])
processed_records = []
schema_name = descriptor.get('output', {}).get('schema_name', 'default')
confidence = descriptor.get('output', {}).get('options', {}).get('confidence', 0.9)
preview_records = []
logger.info(f"Applying {len(mappings)} field mappings...")
for record_num, record in enumerate(parsed_records, start=1):
for record in parsed_records:
processed_record = {}
for mapping in mappings:
source_field = mapping.get('source_field') or mapping.get('source')
target_field = mapping.get('target_field') or mapping.get('target')
source_field = mapping.get('source_field')
target_field = mapping.get('target_field', source_field)
if source_field in record:
value = record[source_field]
# Apply transforms
transforms = mapping.get('transforms', [])
for transform in transforms:
transform_type = transform.get('type')
if transform_type == 'trim' and isinstance(value, str):
value = value.strip()
elif transform_type == 'upper' and isinstance(value, str):
value = value.upper()
elif transform_type == 'lower' and isinstance(value, str):
value = value.lower()
elif transform_type == 'title_case' and isinstance(value, str):
value = value.title()
elif transform_type == 'to_int':
try:
value = int(value) if value != '' else None
except (ValueError, TypeError):
logger.warning(f"Failed to convert '{value}' to int in record {record_num}")
elif transform_type == 'to_float':
try:
value = float(value) if value != '' else None
except (ValueError, TypeError):
logger.warning(f"Failed to convert '{value}' to float in record {record_num}")
# Convert all values to strings as required by ExtractedObject schema
processed_record[target_field] = str(value) if value is not None else ""
else:
logger.warning(f"Source field '{source_field}' not found in record {record_num}")
# Create TrustGraph ExtractedObject
output_record = {
"metadata": {
"id": f"import-{record_num}",
"metadata": [],
"user": "trustgraph",
"collection": "default"
},
"schema_name": schema_name,
"values": processed_record,
"confidence": confidence,
"source_span": ""
}
processed_records.append(output_record)
logger.info(f"Processed {len(processed_records)} records with transformations")
if dry_run:
print(f"Dry run mode - would import {len(processed_records)} records to TrustGraph")
print(f"Target schema: {schema_name}")
print(f"Sample record:")
if processed_records:
# Show what the batched format will look like
sample_batch = processed_records[:min(3, len(processed_records))]
batch_values = [record["values"] for record in sample_batch]
first_record = processed_records[0]
batched_sample = {
"metadata": first_record["metadata"],
"schema_name": first_record["schema_name"],
"values": batch_values,
"confidence": first_record["confidence"],
"source_span": first_record["source_span"]
}
print(json.dumps(batched_sample, indent=2))
return
# Import to TrustGraph using objects import endpoint via WebSocket
logger.info(f"Importing {len(processed_records)} records to TrustGraph...")
try:
import asyncio
from websockets.asyncio.client import connect
# Construct objects import URL similar to load_knowledge pattern
if not api_url.endswith("/"):
api_url += "/"
# Convert HTTP URL to WebSocket URL if needed
ws_url = api_url.replace("http://", "ws://").replace("https://", "wss://")
objects_url = ws_url + f"api/v1/flow/{flow}/import/objects"
logger.info(f"Connecting to objects import endpoint: {objects_url}")
async def import_objects():
async with connect(objects_url) as ws:
imported_count = 0
# Process records in batches
for i in range(0, len(processed_records), batch_size):
batch_records = processed_records[i:i + batch_size]
# Extract values from each record in the batch
batch_values = [record["values"] for record in batch_records]
# Create batched ExtractedObject message using first record as template
first_record = batch_records[0]
batched_record = {
"metadata": first_record["metadata"],
"schema_name": first_record["schema_name"],
"values": batch_values, # Array of value dictionaries
"confidence": first_record["confidence"],
"source_span": first_record["source_span"]
}
# Send batched ExtractedObject
await ws.send(json.dumps(batched_record))
imported_count += len(batch_records)
if imported_count % 100 == 0:
logger.info(f"Imported {imported_count}/{len(processed_records)} records...")
logger.info(f"Successfully imported {imported_count} records to TrustGraph")
return imported_count
# Run the async import
imported_count = asyncio.run(import_objects())
print(f"Import completed: {imported_count} records imported to schema '{schema_name}'")
except ImportError as e:
logger.error(f"Failed to import required modules: {e}")
print(f"Error: Required modules not available - {e}")
raise
except Exception as e:
logger.error(f"Failed to import data to TrustGraph: {e}")
print(f"Import failed: {e}")
raise
if processed_record: # Only add if we got some data
preview_records.append(processed_record)
return preview_records if preview_records else parsed_records
except Exception as e:
logger.error(f"Preview parsing failed: {e}")
return None
def main():
@ -908,26 +803,29 @@ Examples:
%(prog)s --input customers.csv --descriptor descriptor.json
%(prog)s --input products.xml --descriptor xml_descriptor.json
# All-in-one: Auto-generate descriptor and import (for simple cases)
%(prog)s --input customers.csv --schema-name customer
# FULLY AUTOMATIC: Discover schema + generate descriptor + import (zero manual steps!)
%(prog)s --input customers.csv --auto
%(prog)s --input products.xml --auto --dry-run # Preview before importing
# Dry run to validate without importing
%(prog)s --input customers.csv --descriptor descriptor.json --dry-run
Use Cases:
--auto : 🚀 FULLY AUTOMATIC: Discover schema + generate descriptor + import data
(zero manual configuration required!)
--suggest-schema : Diagnose which TrustGraph schemas might match your data
(uses --sample-chars to limit data sent for analysis)
--generate-descriptor: Create/review the structured data language configuration
(uses --sample-chars to limit data sent for analysis)
--parse-only : Validate that parsed data looks correct before import
(uses --sample-size to limit records processed, ignores --sample-chars)
(no mode flags) : Full pipeline - parse and import to TrustGraph
For more information on the descriptor format, see:
docs/tech-specs/structured-data-descriptor.md
""".strip()
""",
)
# Required arguments
parser.add_argument(
'-u', '--api-url',
default=default_url,
@ -968,6 +866,11 @@ For more information on the descriptor format, see:
action='store_true',
help='Parse data using descriptor but don\'t import to TrustGraph'
)
mode_group.add_argument(
'--auto',
action='store_true',
help='Run full automatic pipeline: discover schema + generate descriptor + import data'
)
parser.add_argument(
'-o', '--output',
@ -1026,7 +929,12 @@ For more information on the descriptor format, see:
args = parser.parse_args()
# Validate argument combinations
# Input validation
if not os.path.exists(args.input):
print(f"Error: Input file not found: {args.input}", file=sys.stderr)
sys.exit(1)
# Mode-specific validation
if args.parse_only and not args.descriptor:
print("Error: --descriptor is required when using --parse-only", file=sys.stderr)
sys.exit(1)
@ -1038,11 +946,15 @@ For more information on the descriptor format, see:
if (args.suggest_schema or args.generate_descriptor) and args.sample_size != 100: # 100 is default
print("Warning: --sample-size is ignored in analysis modes, use --sample-chars instead", file=sys.stderr)
if not any([args.suggest_schema, args.generate_descriptor, args.parse_only]) and not args.descriptor:
# Full pipeline mode without descriptor - schema_name should be provided
if not args.schema_name:
print("Error: --descriptor or --schema-name is required for full import", file=sys.stderr)
sys.exit(1)
# Require explicit mode selection - no implicit behavior
if not any([args.suggest_schema, args.generate_descriptor, args.parse_only, args.auto]):
print("Error: Must specify an operation mode", file=sys.stderr)
print("Available modes:", file=sys.stderr)
print(" --auto : Discover schema + generate descriptor + import", file=sys.stderr)
print(" --suggest-schema : Analyze data and suggest schemas", file=sys.stderr)
print(" --generate-descriptor : Generate descriptor from data", file=sys.stderr)
print(" --parse-only : Parse data without importing", file=sys.stderr)
sys.exit(1)
try:
load_structured_data(
@ -1052,6 +964,7 @@ For more information on the descriptor format, see:
suggest_schema=args.suggest_schema,
generate_descriptor=args.generate_descriptor,
parse_only=args.parse_only,
auto=args.auto,
output_file=args.output,
sample_size=args.sample_size,
sample_chars=args.sample_chars,