Object extraction schema validation (#465)

* Object schema validation in kg-extract-objects, prevents invalid data appearing in Pulsar messages

* Added tests for the above
This commit is contained in:
cybermaggedon 2025-08-22 12:30:05 +01:00 committed by GitHub
parent 97cfbb5ea4
commit 5e71d0cadb
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 343 additions and 1 deletions

View file

@ -153,11 +153,81 @@ class Processor(FlowProcessor):
text=text
)
return objects if isinstance(objects, list) else []
if not isinstance(objects, list):
return []
# Validate each object against schema
validated_objects = []
for obj in objects:
if self.validate_object(obj, schema, schema_name):
validated_objects.append(obj)
else:
logger.warning(f"Skipping invalid object for schema {schema_name}")
return validated_objects
except Exception as e:
logger.error(f"Failed to extract objects for schema {schema_name}: {e}", exc_info=True)
return []
def validate_object(self, obj: Any, schema: RowSchema, schema_name: str) -> bool:
"""Validate object against schema definition"""
if not isinstance(obj, dict):
logger.warning(f"Object for schema {schema_name} is not a dictionary: {type(obj)}")
return False
# Check if this looks like a nested format issue
if schema_name in obj and isinstance(obj[schema_name], str):
logger.error(f"Object has nested JSON format under '{schema_name}' key - LLM returned incorrect format")
return False
# Check all required fields are present
for field in schema.fields:
if field.required and field.name not in obj:
logger.warning(f"Required field '{field.name}' missing in {schema_name} object")
return False
# Check primary key fields are not null
if field.primary and (field.name not in obj or obj[field.name] is None):
logger.error(f"Primary key field '{field.name}' is null or missing in {schema_name} object")
return False
# Validate basic type compatibility if value exists
if field.name in obj and obj[field.name] is not None:
value = obj[field.name]
# Type validation
if field.type == "integer":
try:
# Accept numeric strings that can be converted
if isinstance(value, str):
int(value)
elif not isinstance(value, (int, float)):
logger.warning(f"Field '{field.name}' in {schema_name} expected integer, got {type(value).__name__}")
return False
except ValueError:
logger.warning(f"Field '{field.name}' in {schema_name} value '{value}' cannot be converted to integer")
return False
elif field.type == "float":
try:
if isinstance(value, str):
float(value)
elif not isinstance(value, (int, float)):
logger.warning(f"Field '{field.name}' in {schema_name} expected float, got {type(value).__name__}")
return False
except ValueError:
logger.warning(f"Field '{field.name}' in {schema_name} value '{value}' cannot be converted to float")
return False
elif field.type == "boolean":
if not isinstance(value, (bool, str, int)):
logger.warning(f"Field '{field.name}' in {schema_name} expected boolean, got {type(value).__name__}")
return False
logger.debug(f"Object validated successfully for schema {schema_name}")
return True
async def on_chunk(self, msg, consumer, flow):
"""Process incoming chunk and extract objects"""