mirror of
https://github.com/trustgraph-ai/trustgraph.git
synced 2026-04-30 19:06:21 +02:00
Structure data mvp (#452)
* Structured data tech spec * Architecture principles * New schemas * Updated schemas and specs * Object extractor * Add .coveragerc * New tests * Cassandra object storage * Trying to object extraction working, issues exist
This commit is contained in:
parent
5de56c5dbc
commit
83f0c1e7f3
46 changed files with 5313 additions and 1629 deletions
|
|
@ -0,0 +1,3 @@
|
|||
|
||||
from . processor import *
|
||||
|
||||
|
|
@ -1,6 +1,6 @@
|
|||
#!/usr/bin/env python3
|
||||
|
||||
from . extract import run
|
||||
from . processor import run
|
||||
|
||||
if __name__ == '__main__':
|
||||
run()
|
||||
241
trustgraph-flow/trustgraph/extract/kg/objects/processor.py
Normal file
241
trustgraph-flow/trustgraph/extract/kg/objects/processor.py
Normal file
|
|
@ -0,0 +1,241 @@
|
|||
"""
|
||||
Object extraction service - extracts structured objects from text chunks
|
||||
based on configured schemas.
|
||||
"""
|
||||
|
||||
import json
|
||||
import logging
|
||||
from typing import Dict, List, Any
|
||||
|
||||
# Module logger
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
from .... schema import Chunk, ExtractedObject, Metadata
|
||||
from .... schema import PromptRequest, PromptResponse
|
||||
from .... schema import RowSchema, Field
|
||||
|
||||
from .... base import FlowProcessor, ConsumerSpec, ProducerSpec
|
||||
from .... base import PromptClientSpec
|
||||
from .... messaging.translators import row_schema_translator
|
||||
|
||||
default_ident = "kg-extract-objects"
|
||||
|
||||
|
||||
def convert_values_to_strings(obj: Dict[str, Any]) -> Dict[str, str]:
|
||||
"""Convert all values in a dictionary to strings for Pulsar Map(String()) compatibility"""
|
||||
result = {}
|
||||
for key, value in obj.items():
|
||||
if value is None:
|
||||
result[key] = ""
|
||||
elif isinstance(value, str):
|
||||
result[key] = value
|
||||
elif isinstance(value, (int, float, bool)):
|
||||
result[key] = str(value)
|
||||
elif isinstance(value, (list, dict)):
|
||||
# For complex types, serialize as JSON
|
||||
result[key] = json.dumps(value)
|
||||
else:
|
||||
# For any other type, convert to string
|
||||
result[key] = str(value)
|
||||
return result
|
||||
default_concurrency = 1
|
||||
|
||||
class Processor(FlowProcessor):
|
||||
|
||||
def __init__(self, **params):
|
||||
|
||||
id = params.get("id")
|
||||
concurrency = params.get("concurrency", 1)
|
||||
|
||||
# Config key for schemas
|
||||
self.config_key = params.get("config_type", "schema")
|
||||
|
||||
super(Processor, self).__init__(
|
||||
**params | {
|
||||
"id": id,
|
||||
"config-type": self.config_key,
|
||||
"concurrency": concurrency,
|
||||
}
|
||||
)
|
||||
|
||||
self.register_specification(
|
||||
ConsumerSpec(
|
||||
name = "input",
|
||||
schema = Chunk,
|
||||
handler = self.on_chunk,
|
||||
concurrency = concurrency,
|
||||
)
|
||||
)
|
||||
|
||||
self.register_specification(
|
||||
PromptClientSpec(
|
||||
request_name = "prompt-request",
|
||||
response_name = "prompt-response",
|
||||
)
|
||||
)
|
||||
|
||||
self.register_specification(
|
||||
ProducerSpec(
|
||||
name = "output",
|
||||
schema = ExtractedObject
|
||||
)
|
||||
)
|
||||
|
||||
# Register config handler for schema updates
|
||||
self.register_config_handler(self.on_schema_config)
|
||||
|
||||
# Schema storage: name -> RowSchema
|
||||
self.schemas: Dict[str, RowSchema] = {}
|
||||
|
||||
async def on_schema_config(self, config, version):
|
||||
"""Handle schema configuration updates"""
|
||||
|
||||
logger.info(f"Loading schema configuration version {version}")
|
||||
|
||||
# Clear existing schemas
|
||||
self.schemas = {}
|
||||
|
||||
# Check if our config type exists
|
||||
if self.config_key not in config:
|
||||
logger.warning(f"No '{self.config_key}' type in configuration")
|
||||
return
|
||||
|
||||
# Get the schemas dictionary for our type
|
||||
schemas_config = config[self.config_key]
|
||||
|
||||
# Process each schema in the schemas config
|
||||
for schema_name, schema_json in schemas_config.items():
|
||||
|
||||
try:
|
||||
# Parse the JSON schema definition
|
||||
schema_def = json.loads(schema_json)
|
||||
|
||||
# Create Field objects
|
||||
fields = []
|
||||
for field_def in schema_def.get("fields", []):
|
||||
field = Field(
|
||||
name=field_def["name"],
|
||||
type=field_def["type"],
|
||||
size=field_def.get("size", 0),
|
||||
primary=field_def.get("primary_key", False),
|
||||
description=field_def.get("description", ""),
|
||||
required=field_def.get("required", False),
|
||||
enum_values=field_def.get("enum", []),
|
||||
indexed=field_def.get("indexed", False)
|
||||
)
|
||||
fields.append(field)
|
||||
|
||||
# Create RowSchema
|
||||
row_schema = RowSchema(
|
||||
name=schema_def.get("name", schema_name),
|
||||
description=schema_def.get("description", ""),
|
||||
fields=fields
|
||||
)
|
||||
|
||||
self.schemas[schema_name] = row_schema
|
||||
logger.info(f"Loaded schema: {schema_name} with {len(fields)} fields")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to parse schema {schema_name}: {e}", exc_info=True)
|
||||
|
||||
logger.info(f"Schema configuration loaded: {len(self.schemas)} schemas")
|
||||
|
||||
async def extract_objects_for_schema(self, text: str, schema_name: str, schema: RowSchema, flow) -> List[Dict[str, Any]]:
|
||||
"""Extract objects from text for a specific schema"""
|
||||
|
||||
try:
|
||||
# Convert Pulsar RowSchema to JSON-serializable dict
|
||||
schema_dict = row_schema_translator.from_pulsar(schema)
|
||||
|
||||
# Use prompt client to extract rows based on schema
|
||||
objects = await flow("prompt-request").extract_objects(
|
||||
schema=schema_dict,
|
||||
text=text
|
||||
)
|
||||
|
||||
return objects if isinstance(objects, list) else []
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to extract objects for schema {schema_name}: {e}", exc_info=True)
|
||||
return []
|
||||
|
||||
async def on_chunk(self, msg, consumer, flow):
|
||||
"""Process incoming chunk and extract objects"""
|
||||
|
||||
v = msg.value()
|
||||
logger.info(f"Extracting objects from chunk {v.metadata.id}...")
|
||||
|
||||
chunk_text = v.chunk.decode("utf-8")
|
||||
|
||||
# If no schemas configured, log warning and return
|
||||
if not self.schemas:
|
||||
logger.warning("No schemas configured - skipping extraction")
|
||||
return
|
||||
|
||||
try:
|
||||
# Extract objects for each configured schema
|
||||
for schema_name, schema in self.schemas.items():
|
||||
|
||||
logger.debug(f"Extracting {schema_name} objects from chunk")
|
||||
|
||||
# Extract objects using prompt
|
||||
objects = await self.extract_objects_for_schema(
|
||||
chunk_text,
|
||||
schema_name,
|
||||
schema,
|
||||
flow
|
||||
)
|
||||
|
||||
# Emit each extracted object
|
||||
for obj in objects:
|
||||
|
||||
# Calculate confidence (could be enhanced with actual confidence from prompt)
|
||||
confidence = 0.8 # Default confidence
|
||||
|
||||
# Convert all values to strings for Pulsar compatibility
|
||||
string_values = convert_values_to_strings(obj)
|
||||
|
||||
# Create ExtractedObject
|
||||
extracted = ExtractedObject(
|
||||
metadata=Metadata(
|
||||
id=f"{v.metadata.id}:{schema_name}:{hash(str(obj))}",
|
||||
metadata=[],
|
||||
user=v.metadata.user,
|
||||
collection=v.metadata.collection,
|
||||
),
|
||||
schema_name=schema_name,
|
||||
values=string_values,
|
||||
confidence=confidence,
|
||||
source_span=chunk_text[:100] # First 100 chars as source reference
|
||||
)
|
||||
|
||||
await flow("output").send(extracted)
|
||||
logger.debug(f"Emitted extracted object for schema {schema_name}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Object extraction exception: {e}", exc_info=True)
|
||||
|
||||
logger.debug("Object extraction complete")
|
||||
|
||||
@staticmethod
|
||||
def add_args(parser):
|
||||
"""Add command-line arguments"""
|
||||
|
||||
parser.add_argument(
|
||||
'-c', '--concurrency',
|
||||
type=int,
|
||||
default=default_concurrency,
|
||||
help=f'Concurrent processing threads (default: {default_concurrency})'
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
'--config-type',
|
||||
default='schema',
|
||||
help='Configuration type prefix for schemas (default: schema)'
|
||||
)
|
||||
|
||||
FlowProcessor.add_args(parser)
|
||||
|
||||
def run():
|
||||
"""Entry point for kg-extract-objects command"""
|
||||
Processor.launch(default_ident, __doc__)
|
||||
|
|
@ -1,3 +0,0 @@
|
|||
|
||||
from . extract import *
|
||||
|
||||
|
|
@ -1,225 +0,0 @@
|
|||
|
||||
"""
|
||||
Simple decoder, accepts vector+text chunks input, applies analysis to pull
|
||||
out a row of fields. Output as a vector plus object.
|
||||
"""
|
||||
|
||||
import urllib.parse
|
||||
import os
|
||||
import logging
|
||||
from pulsar.schema import JsonSchema
|
||||
|
||||
# Module logger
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
from .... schema import ChunkEmbeddings, Rows, ObjectEmbeddings, Metadata
|
||||
from .... schema import RowSchema, Field
|
||||
from .... schema import chunk_embeddings_ingest_queue, rows_store_queue
|
||||
from .... schema import object_embeddings_store_queue
|
||||
from .... schema import prompt_request_queue
|
||||
from .... schema import prompt_response_queue
|
||||
from .... log_level import LogLevel
|
||||
from .... clients.prompt_client import PromptClient
|
||||
from .... base import ConsumerProducer
|
||||
|
||||
from .... objects.field import Field as FieldParser
|
||||
from .... objects.object import Schema
|
||||
|
||||
module = ".".join(__name__.split(".")[1:-1])
|
||||
|
||||
default_input_queue = chunk_embeddings_ingest_queue
|
||||
default_output_queue = rows_store_queue
|
||||
default_vector_queue = object_embeddings_store_queue
|
||||
default_subscriber = module
|
||||
|
||||
class Processor(ConsumerProducer):
|
||||
|
||||
def __init__(self, **params):
|
||||
|
||||
input_queue = params.get("input_queue", default_input_queue)
|
||||
output_queue = params.get("output_queue", default_output_queue)
|
||||
vector_queue = params.get("vector_queue", default_vector_queue)
|
||||
subscriber = params.get("subscriber", default_subscriber)
|
||||
pr_request_queue = params.get(
|
||||
"prompt_request_queue", prompt_request_queue
|
||||
)
|
||||
pr_response_queue = params.get(
|
||||
"prompt_response_queue", prompt_response_queue
|
||||
)
|
||||
|
||||
super(Processor, self).__init__(
|
||||
**params | {
|
||||
"input_queue": input_queue,
|
||||
"output_queue": output_queue,
|
||||
"subscriber": subscriber,
|
||||
"input_schema": ChunkEmbeddings,
|
||||
"output_schema": Rows,
|
||||
"prompt_request_queue": pr_request_queue,
|
||||
"prompt_response_queue": pr_response_queue,
|
||||
}
|
||||
)
|
||||
|
||||
self.vec_prod = self.client.create_producer(
|
||||
topic=vector_queue,
|
||||
schema=JsonSchema(ObjectEmbeddings),
|
||||
)
|
||||
|
||||
__class__.pubsub_metric.info({
|
||||
"input_queue": input_queue,
|
||||
"output_queue": output_queue,
|
||||
"vector_queue": vector_queue,
|
||||
"prompt_request_queue": pr_request_queue,
|
||||
"prompt_response_queue": pr_response_queue,
|
||||
"subscriber": subscriber,
|
||||
"input_schema": ChunkEmbeddings.__name__,
|
||||
"output_schema": Rows.__name__,
|
||||
"vector_schema": ObjectEmbeddings.__name__,
|
||||
})
|
||||
|
||||
flds = __class__.parse_fields(params["field"])
|
||||
|
||||
for fld in flds:
|
||||
logger.debug(f"Field configuration: {fld}")
|
||||
|
||||
self.primary = None
|
||||
|
||||
for f in flds:
|
||||
if f.primary:
|
||||
if self.primary:
|
||||
raise RuntimeError(
|
||||
"Only one primary key field is supported"
|
||||
)
|
||||
self.primary = f
|
||||
|
||||
if self.primary == None:
|
||||
raise RuntimeError(
|
||||
"Must have exactly one primary key field"
|
||||
)
|
||||
|
||||
self.schema = Schema(
|
||||
name = params["name"],
|
||||
description = params["description"],
|
||||
fields = flds
|
||||
)
|
||||
|
||||
self.row_schema=RowSchema(
|
||||
name=self.schema.name,
|
||||
description=self.schema.description,
|
||||
fields=[
|
||||
Field(
|
||||
name=f.name, type=str(f.type), size=f.size,
|
||||
primary=f.primary, description=f.description,
|
||||
)
|
||||
for f in self.schema.fields
|
||||
]
|
||||
)
|
||||
|
||||
self.prompt = PromptClient(
|
||||
pulsar_host=self.pulsar_host,
|
||||
pulsar_api_key=self.pulsar_api_key,
|
||||
input_queue=pr_request_queue,
|
||||
output_queue=pr_response_queue,
|
||||
subscriber = module + "-prompt",
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def parse_fields(fields):
|
||||
return [ FieldParser.parse(f) for f in fields ]
|
||||
|
||||
def get_rows(self, chunk):
|
||||
return self.prompt.request_rows(self.schema, chunk)
|
||||
|
||||
def emit_rows(self, metadata, rows):
|
||||
|
||||
t = Rows(
|
||||
metadata=metadata, row_schema=self.row_schema, rows=rows
|
||||
)
|
||||
await self.send(t)
|
||||
|
||||
def emit_vec(self, metadata, name, vec, key_name, key):
|
||||
|
||||
r = ObjectEmbeddings(
|
||||
metadata=metadata, vectors=vec, name=name, key_name=key_name, id=key
|
||||
)
|
||||
self.vec_prod.send(r)
|
||||
|
||||
async def handle(self, msg):
|
||||
|
||||
v = msg.value()
|
||||
logger.info(f"Extracting rows from {v.metadata.id}...")
|
||||
|
||||
chunk = v.chunk.decode("utf-8")
|
||||
|
||||
try:
|
||||
|
||||
rows = self.get_rows(chunk)
|
||||
|
||||
self.emit_rows(
|
||||
metadata=v.metadata,
|
||||
rows=rows
|
||||
)
|
||||
|
||||
for row in rows:
|
||||
self.emit_vec(
|
||||
metadata=v.metadata, vec=v.vectors,
|
||||
name=self.schema.name, key_name=self.primary.name,
|
||||
key=row[self.primary.name]
|
||||
)
|
||||
|
||||
for row in rows:
|
||||
logger.debug(f"Extracted row: {row}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Row extraction exception: {e}", exc_info=True)
|
||||
|
||||
logger.debug("Row extraction complete")
|
||||
|
||||
@staticmethod
|
||||
def add_args(parser):
|
||||
|
||||
ConsumerProducer.add_args(
|
||||
parser, default_input_queue, default_subscriber,
|
||||
default_output_queue,
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
'-c', '--vector-queue',
|
||||
default=default_vector_queue,
|
||||
help=f'Vector output queue (default: {default_vector_queue})'
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
'--prompt-request-queue',
|
||||
default=prompt_request_queue,
|
||||
help=f'Prompt request queue (default: {prompt_request_queue})',
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
'--prompt-response-queue',
|
||||
default=prompt_response_queue,
|
||||
help=f'Prompt response queue (default: {prompt_response_queue})',
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
'-f', '--field',
|
||||
required=True,
|
||||
action='append',
|
||||
help=f'Field definition, format name:type:size:pri:descriptionn',
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
'-n', '--name',
|
||||
required=True,
|
||||
help=f'Name of row object',
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
'-d', '--description',
|
||||
required=True,
|
||||
help=f'Description of object',
|
||||
)
|
||||
|
||||
def run():
|
||||
|
||||
Processor.launch(module, __doc__)
|
||||
|
||||
1
trustgraph-flow/trustgraph/storage/objects/__init__.py
Normal file
1
trustgraph-flow/trustgraph/storage/objects/__init__.py
Normal file
|
|
@ -0,0 +1 @@
|
|||
# Objects storage module
|
||||
|
|
@ -0,0 +1 @@
|
|||
from . write import *
|
||||
|
|
@ -0,0 +1,3 @@
|
|||
from . write import run
|
||||
|
||||
run()
|
||||
411
trustgraph-flow/trustgraph/storage/objects/cassandra/write.py
Normal file
411
trustgraph-flow/trustgraph/storage/objects/cassandra/write.py
Normal file
|
|
@ -0,0 +1,411 @@
|
|||
"""
|
||||
Object writer for Cassandra. Input is ExtractedObject.
|
||||
Writes structured objects to Cassandra tables based on schema definitions.
|
||||
"""
|
||||
|
||||
import json
|
||||
import logging
|
||||
from typing import Dict, Set, Optional, Any
|
||||
from cassandra.cluster import Cluster
|
||||
from cassandra.auth import PlainTextAuthProvider
|
||||
from cassandra.cqlengine import connection
|
||||
from cassandra import ConsistencyLevel
|
||||
|
||||
from .... schema import ExtractedObject
|
||||
from .... schema import RowSchema, Field
|
||||
from .... base import FlowProcessor, ConsumerSpec
|
||||
|
||||
# Module logger
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
default_ident = "objects-write"
|
||||
default_graph_host = 'localhost'
|
||||
|
||||
class Processor(FlowProcessor):
|
||||
|
||||
def __init__(self, **params):
|
||||
|
||||
id = params.get("id", default_ident)
|
||||
|
||||
# Cassandra connection parameters
|
||||
self.graph_host = params.get("graph_host", default_graph_host)
|
||||
self.graph_username = params.get("graph_username", None)
|
||||
self.graph_password = params.get("graph_password", None)
|
||||
|
||||
# Config key for schemas
|
||||
self.config_key = params.get("config_type", "schema")
|
||||
|
||||
super(Processor, self).__init__(
|
||||
**params | {
|
||||
"id": id,
|
||||
"config-type": self.config_key,
|
||||
}
|
||||
)
|
||||
|
||||
self.register_specification(
|
||||
ConsumerSpec(
|
||||
name = "input",
|
||||
schema = ExtractedObject,
|
||||
handler = self.on_object
|
||||
)
|
||||
)
|
||||
|
||||
# Register config handler for schema updates
|
||||
self.register_config_handler(self.on_schema_config)
|
||||
|
||||
# Cache of known keyspaces/tables
|
||||
self.known_keyspaces: Set[str] = set()
|
||||
self.known_tables: Dict[str, Set[str]] = {} # keyspace -> set of tables
|
||||
|
||||
# Schema storage: name -> RowSchema
|
||||
self.schemas: Dict[str, RowSchema] = {}
|
||||
|
||||
# Cassandra session
|
||||
self.cluster = None
|
||||
self.session = None
|
||||
|
||||
def connect_cassandra(self):
|
||||
"""Connect to Cassandra cluster"""
|
||||
if self.session:
|
||||
return
|
||||
|
||||
try:
|
||||
if self.graph_username and self.graph_password:
|
||||
auth_provider = PlainTextAuthProvider(
|
||||
username=self.graph_username,
|
||||
password=self.graph_password
|
||||
)
|
||||
self.cluster = Cluster(
|
||||
contact_points=[self.graph_host],
|
||||
auth_provider=auth_provider
|
||||
)
|
||||
else:
|
||||
self.cluster = Cluster(contact_points=[self.graph_host])
|
||||
|
||||
self.session = self.cluster.connect()
|
||||
logger.info(f"Connected to Cassandra cluster at {self.graph_host}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to connect to Cassandra: {e}", exc_info=True)
|
||||
raise
|
||||
|
||||
async def on_schema_config(self, config, version):
|
||||
"""Handle schema configuration updates"""
|
||||
logger.info(f"Loading schema configuration version {version}")
|
||||
|
||||
# Clear existing schemas
|
||||
self.schemas = {}
|
||||
|
||||
# Check if our config type exists
|
||||
if self.config_key not in config:
|
||||
logger.warning(f"No '{self.config_key}' type in configuration")
|
||||
return
|
||||
|
||||
# Get the schemas dictionary for our type
|
||||
schemas_config = config[self.config_key]
|
||||
|
||||
# Process each schema in the schemas config
|
||||
for schema_name, schema_json in schemas_config.items():
|
||||
try:
|
||||
# Parse the JSON schema definition
|
||||
schema_def = json.loads(schema_json)
|
||||
|
||||
# Create Field objects
|
||||
fields = []
|
||||
for field_def in schema_def.get("fields", []):
|
||||
field = Field(
|
||||
name=field_def["name"],
|
||||
type=field_def["type"],
|
||||
size=field_def.get("size", 0),
|
||||
primary=field_def.get("primary_key", False),
|
||||
description=field_def.get("description", ""),
|
||||
required=field_def.get("required", False),
|
||||
enum_values=field_def.get("enum", []),
|
||||
indexed=field_def.get("indexed", False)
|
||||
)
|
||||
fields.append(field)
|
||||
|
||||
# Create RowSchema
|
||||
row_schema = RowSchema(
|
||||
name=schema_def.get("name", schema_name),
|
||||
description=schema_def.get("description", ""),
|
||||
fields=fields
|
||||
)
|
||||
|
||||
self.schemas[schema_name] = row_schema
|
||||
logger.info(f"Loaded schema: {schema_name} with {len(fields)} fields")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to parse schema {schema_name}: {e}", exc_info=True)
|
||||
|
||||
logger.info(f"Schema configuration loaded: {len(self.schemas)} schemas")
|
||||
|
||||
def ensure_keyspace(self, keyspace: str):
|
||||
"""Ensure keyspace exists in Cassandra"""
|
||||
if keyspace in self.known_keyspaces:
|
||||
return
|
||||
|
||||
# Connect if needed
|
||||
self.connect_cassandra()
|
||||
|
||||
# Sanitize keyspace name
|
||||
safe_keyspace = self.sanitize_name(keyspace)
|
||||
|
||||
# Create keyspace if not exists
|
||||
create_keyspace_cql = f"""
|
||||
CREATE KEYSPACE IF NOT EXISTS {safe_keyspace}
|
||||
WITH REPLICATION = {{
|
||||
'class': 'SimpleStrategy',
|
||||
'replication_factor': 1
|
||||
}}
|
||||
"""
|
||||
|
||||
try:
|
||||
self.session.execute(create_keyspace_cql)
|
||||
self.known_keyspaces.add(keyspace)
|
||||
self.known_tables[keyspace] = set()
|
||||
logger.info(f"Ensured keyspace exists: {safe_keyspace}")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to create keyspace {safe_keyspace}: {e}", exc_info=True)
|
||||
raise
|
||||
|
||||
def get_cassandra_type(self, field_type: str, size: int = 0) -> str:
|
||||
"""Convert schema field type to Cassandra type"""
|
||||
# Handle None size
|
||||
if size is None:
|
||||
size = 0
|
||||
|
||||
type_mapping = {
|
||||
"string": "text",
|
||||
"integer": "bigint" if size > 4 else "int",
|
||||
"float": "double" if size > 4 else "float",
|
||||
"boolean": "boolean",
|
||||
"timestamp": "timestamp",
|
||||
"date": "date",
|
||||
"time": "time",
|
||||
"uuid": "uuid"
|
||||
}
|
||||
|
||||
return type_mapping.get(field_type, "text")
|
||||
|
||||
def sanitize_name(self, name: str) -> str:
|
||||
"""Sanitize names for Cassandra compatibility"""
|
||||
# Replace non-alphanumeric characters with underscore
|
||||
import re
|
||||
safe_name = re.sub(r'[^a-zA-Z0-9_]', '_', name)
|
||||
# Ensure it starts with a letter
|
||||
if safe_name and not safe_name[0].isalpha():
|
||||
safe_name = 'o_' + safe_name
|
||||
return safe_name.lower()
|
||||
|
||||
def sanitize_table(self, name: str) -> str:
|
||||
"""Sanitize names for Cassandra compatibility"""
|
||||
# Replace non-alphanumeric characters with underscore
|
||||
import re
|
||||
safe_name = re.sub(r'[^a-zA-Z0-9_]', '_', name)
|
||||
# Ensure it starts with a letter
|
||||
safe_name = 'o_' + safe_name
|
||||
return safe_name.lower()
|
||||
|
||||
def ensure_table(self, keyspace: str, table_name: str, schema: RowSchema):
|
||||
"""Ensure table exists with proper structure"""
|
||||
table_key = f"{keyspace}.{table_name}"
|
||||
if table_key in self.known_tables.get(keyspace, set()):
|
||||
return
|
||||
|
||||
# Ensure keyspace exists first
|
||||
self.ensure_keyspace(keyspace)
|
||||
|
||||
safe_keyspace = self.sanitize_name(keyspace)
|
||||
safe_table = self.sanitize_table(table_name)
|
||||
|
||||
# Build column definitions
|
||||
columns = ["collection text"] # Collection is always part of table
|
||||
primary_key_fields = []
|
||||
clustering_fields = []
|
||||
|
||||
for field in schema.fields:
|
||||
safe_field_name = self.sanitize_name(field.name)
|
||||
cassandra_type = self.get_cassandra_type(field.type, field.size)
|
||||
columns.append(f"{safe_field_name} {cassandra_type}")
|
||||
|
||||
if field.primary:
|
||||
primary_key_fields.append(safe_field_name)
|
||||
|
||||
# Build primary key - collection is always first in partition key
|
||||
if primary_key_fields:
|
||||
primary_key = f"PRIMARY KEY ((collection, {', '.join(primary_key_fields)}))"
|
||||
else:
|
||||
# If no primary key defined, use collection and a synthetic id
|
||||
columns.append("synthetic_id uuid")
|
||||
primary_key = "PRIMARY KEY ((collection, synthetic_id))"
|
||||
|
||||
# Create table
|
||||
create_table_cql = f"""
|
||||
CREATE TABLE IF NOT EXISTS {safe_keyspace}.{safe_table} (
|
||||
{', '.join(columns)},
|
||||
{primary_key}
|
||||
)
|
||||
"""
|
||||
|
||||
try:
|
||||
self.session.execute(create_table_cql)
|
||||
self.known_tables[keyspace].add(table_key)
|
||||
logger.info(f"Ensured table exists: {safe_keyspace}.{safe_table}")
|
||||
|
||||
# Create secondary indexes for indexed fields
|
||||
for field in schema.fields:
|
||||
if field.indexed and not field.primary:
|
||||
safe_field_name = self.sanitize_name(field.name)
|
||||
index_name = f"{safe_table}_{safe_field_name}_idx"
|
||||
create_index_cql = f"""
|
||||
CREATE INDEX IF NOT EXISTS {index_name}
|
||||
ON {safe_keyspace}.{safe_table} ({safe_field_name})
|
||||
"""
|
||||
try:
|
||||
self.session.execute(create_index_cql)
|
||||
logger.info(f"Created index: {index_name}")
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to create index {index_name}: {e}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to create table {safe_keyspace}.{safe_table}: {e}", exc_info=True)
|
||||
raise
|
||||
|
||||
def convert_value(self, value: Any, field_type: str) -> Any:
|
||||
"""Convert value to appropriate type for Cassandra"""
|
||||
if value is None:
|
||||
return None
|
||||
|
||||
try:
|
||||
if field_type == "integer":
|
||||
return int(value)
|
||||
elif field_type == "float":
|
||||
return float(value)
|
||||
elif field_type == "boolean":
|
||||
if isinstance(value, str):
|
||||
return value.lower() in ('true', '1', 'yes')
|
||||
return bool(value)
|
||||
elif field_type == "timestamp":
|
||||
# Handle timestamp conversion if needed
|
||||
return value
|
||||
else:
|
||||
return str(value)
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to convert value {value} to type {field_type}: {e}")
|
||||
return str(value)
|
||||
|
||||
async def on_object(self, msg, consumer, flow):
|
||||
"""Process incoming ExtractedObject and store in Cassandra"""
|
||||
|
||||
obj = msg.value()
|
||||
logger.info(f"Storing object for schema {obj.schema_name} from {obj.metadata.id}")
|
||||
|
||||
# Get schema definition
|
||||
schema = self.schemas.get(obj.schema_name)
|
||||
if not schema:
|
||||
logger.warning(f"No schema found for {obj.schema_name} - skipping")
|
||||
return
|
||||
|
||||
# Ensure table exists
|
||||
keyspace = obj.metadata.user
|
||||
table_name = obj.schema_name
|
||||
self.ensure_table(keyspace, table_name, schema)
|
||||
|
||||
# Prepare data for insertion
|
||||
safe_keyspace = self.sanitize_name(keyspace)
|
||||
safe_table = self.sanitize_table(table_name)
|
||||
|
||||
# Build column names and values
|
||||
columns = ["collection"]
|
||||
values = [obj.metadata.collection]
|
||||
placeholders = ["%s"]
|
||||
|
||||
# Check if we need a synthetic ID
|
||||
has_primary_key = any(field.primary for field in schema.fields)
|
||||
if not has_primary_key:
|
||||
import uuid
|
||||
columns.append("synthetic_id")
|
||||
values.append(uuid.uuid4())
|
||||
placeholders.append("%s")
|
||||
|
||||
# Process fields
|
||||
for field in schema.fields:
|
||||
safe_field_name = self.sanitize_name(field.name)
|
||||
raw_value = obj.values.get(field.name)
|
||||
|
||||
# Handle required fields
|
||||
if field.required and raw_value is None:
|
||||
logger.warning(f"Required field {field.name} is missing in object")
|
||||
# Continue anyway - Cassandra doesn't enforce NOT NULL
|
||||
|
||||
# Check if primary key field is NULL
|
||||
if field.primary and raw_value is None:
|
||||
logger.error(f"Primary key field {field.name} cannot be NULL - skipping object")
|
||||
return
|
||||
|
||||
# Convert value to appropriate type
|
||||
converted_value = self.convert_value(raw_value, field.type)
|
||||
|
||||
columns.append(safe_field_name)
|
||||
values.append(converted_value)
|
||||
placeholders.append("%s")
|
||||
|
||||
# Build and execute insert query
|
||||
insert_cql = f"""
|
||||
INSERT INTO {safe_keyspace}.{safe_table} ({', '.join(columns)})
|
||||
VALUES ({', '.join(placeholders)})
|
||||
"""
|
||||
|
||||
# Debug: Show data being inserted
|
||||
logger.debug(f"Storing {obj.schema_name}: {dict(zip(columns, values))}")
|
||||
|
||||
if len(columns) != len(values) or len(columns) != len(placeholders):
|
||||
raise ValueError(f"Mismatch in counts - columns: {len(columns)}, values: {len(values)}, placeholders: {len(placeholders)}")
|
||||
|
||||
try:
|
||||
# Convert to tuple - Cassandra driver requires tuple for parameters
|
||||
self.session.execute(insert_cql, tuple(values))
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to insert object: {e}", exc_info=True)
|
||||
raise
|
||||
|
||||
def close(self):
|
||||
"""Clean up Cassandra connections"""
|
||||
if self.cluster:
|
||||
self.cluster.shutdown()
|
||||
logger.info("Closed Cassandra connection")
|
||||
|
||||
@staticmethod
|
||||
def add_args(parser):
|
||||
"""Add command-line arguments"""
|
||||
|
||||
FlowProcessor.add_args(parser)
|
||||
|
||||
parser.add_argument(
|
||||
'-g', '--graph-host',
|
||||
default=default_graph_host,
|
||||
help=f'Cassandra host (default: {default_graph_host})'
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
'--graph-username',
|
||||
default=None,
|
||||
help='Cassandra username'
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
'--graph-password',
|
||||
default=None,
|
||||
help='Cassandra password'
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
'--config-type',
|
||||
default='schema',
|
||||
help='Configuration type prefix for schemas (default: schema)'
|
||||
)
|
||||
|
||||
def run():
|
||||
"""Entry point for objects-write-cassandra command"""
|
||||
Processor.launch(default_ident, __doc__)
|
||||
Loading…
Add table
Add a link
Reference in a new issue