Remove redundant metadata (#685)

The metadata field (list of triples) in the pipeline Metadata class
was redundant. Document metadata triples already flow directly from
librarian to triple-store via emit_document_provenance() - they don't
need to pass through the extraction pipeline.

Additionally, chunker and PDF decoder were overwriting metadata to []
anyway, so any metadata passed through the pipeline was being
discarded.

Changes:
- Remove metadata field from Metadata dataclass
  (schema/core/metadata.py)
- Update all Metadata instantiations to remove metadata=[]
  parameter
- Remove metadata handling from translators (document_loading,
  knowledge)
- Remove metadata consumption from extractors (ontology, agent)
- Update gateway serializers and import handlers
- Update all unit, integration, and contract tests
This commit is contained in:
cybermaggedon 2026-03-11 10:51:39 +00:00 committed by GitHub
parent 1837d73f34
commit aa4f5c6c00
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
37 changed files with 106 additions and 343 deletions

View file

@ -314,11 +314,19 @@ Converts row index fields into vector embeddings.
| `document_id` | Librarian reference, provenance linking |
| `chunk_id` | Provenance tracking through pipeline |
<<<<<<< HEAD
### Potentially Redundant Fields
| Field | Status |
|-------|--------|
| `metadata.metadata` | Set to `[]` by all extractors; document-level metadata now handled by librarian at submission time |
=======
### Removed Fields
| Field | Status |
|-------|--------|
| `metadata.metadata` | Removed from `Metadata` class. Document-level metadata triples are now emitted directly by librarian to triple store at submission time, not carried through the extraction pipeline. |
>>>>>>> e3bcbf73 (The metadata field (list of triples) in the pipeline Metadata class)
### Bytes Fields Pattern

View file

@ -95,8 +95,7 @@ def sample_message_data():
"Metadata": {
"id": "test-doc-123",
"user": "test_user",
"collection": "test_collection",
"metadata": []
"collection": "test_collection"
},
"Term": {
"type": IRI,

View file

@ -401,25 +401,6 @@ class TestMetadataMessageContracts:
assert metadata.id == "test-doc-123"
assert metadata.user == "test_user"
assert metadata.collection == "test_collection"
assert isinstance(metadata.metadata, list)
def test_metadata_with_triples_contract(self, sample_message_data):
"""Test Metadata with embedded triples contract"""
# Arrange
triple = Triple(**sample_message_data["Triple"])
metadata_data = {
"id": "doc-with-triples",
"user": "test_user",
"collection": "test_collection",
"metadata": [triple]
}
# Act & Assert
assert validate_schema_contract(Metadata, metadata_data)
metadata = Metadata(**metadata_data)
assert len(metadata.metadata) == 1
assert metadata.metadata[0].s.iri == "http://example.com/subject"
def test_error_schema_contract(self):
"""Test Error schema contract"""

View file

@ -24,7 +24,6 @@ class TestRowsCassandraContracts:
id="test-doc-001",
user="test_user",
collection="test_collection",
metadata=[]
)
test_object = ExtractedObject(
@ -50,7 +49,6 @@ class TestRowsCassandraContracts:
assert hasattr(test_object.metadata, 'id')
assert hasattr(test_object.metadata, 'user')
assert hasattr(test_object.metadata, 'collection')
assert hasattr(test_object.metadata, 'metadata')
# Verify types
assert isinstance(test_object.schema_name, str)
@ -154,7 +152,6 @@ class TestRowsCassandraContracts:
id="serial-001",
user="test_user",
collection="test_coll",
metadata=[]
),
schema_name="test_schema",
values=[{"field1": "value1", "field2": "123"}],
@ -234,7 +231,6 @@ class TestRowsCassandraContracts:
id="meta-001",
user="user123", # -> keyspace
collection="coll456", # -> partition key
metadata=[{"key": "value"}]
),
schema_name="table789", # -> table name
values=[{"field": "value"}],
@ -262,7 +258,6 @@ class TestRowsCassandraContractsBatch:
id="batch-doc-001",
user="test_user",
collection="test_collection",
metadata=[]
)
batch_object = ExtractedObject(
@ -308,10 +303,9 @@ class TestRowsCassandraContractsBatch:
test_metadata = Metadata(
id="empty-batch-001",
user="test_user",
collection="test_collection",
metadata=[]
collection="test_collection",
)
empty_batch_object = ExtractedObject(
metadata=test_metadata,
schema_name="empty_schema",
@ -332,9 +326,8 @@ class TestRowsCassandraContractsBatch:
id="single-batch-001",
user="test_user",
collection="test_collection",
metadata=[]
)
single_batch_object = ExtractedObject(
metadata=test_metadata,
schema_name="customer_records",
@ -362,12 +355,11 @@ class TestRowsCassandraContractsBatch:
id="batch-serial-001",
user="test_user",
collection="test_coll",
metadata=[]
),
schema_name="test_schema",
values=[
{"field1": "value1", "field2": "123"},
{"field1": "value2", "field2": "456"},
{"field1": "value2", "field2": "456"},
{"field1": "value3", "field2": "789"}
],
confidence=0.92,
@ -436,9 +428,8 @@ class TestRowsCassandraContractsBatch:
id="partition-test-001",
user="consistent_user", # Same keyspace
collection="consistent_collection", # Same partition
metadata=[]
)
batch_object = ExtractedObject(
metadata=test_metadata,
schema_name="partition_test",

View file

@ -95,9 +95,8 @@ class TestStructuredDataSchemaContracts:
id="structured-data-001",
user="test_user",
collection="test_collection",
metadata=[]
)
# Act
submission = StructuredDataSubmission(
metadata=metadata,
@ -121,9 +120,8 @@ class TestStructuredDataSchemaContracts:
id="extracted-obj-001",
user="test_user",
collection="test_collection",
metadata=[]
)
# Act
obj = ExtractedObject(
metadata=metadata,
@ -147,9 +145,8 @@ class TestStructuredDataSchemaContracts:
id="extracted-batch-001",
user="test_user",
collection="test_collection",
metadata=[]
)
# Act - create object with multiple values
obj = ExtractedObject(
metadata=metadata,
@ -180,11 +177,10 @@ class TestStructuredDataSchemaContracts:
# Arrange
metadata = Metadata(
id="extracted-empty-001",
user="test_user",
user="test_user",
collection="test_collection",
metadata=[]
)
# Act - create object with empty values array
obj = ExtractedObject(
metadata=metadata,
@ -283,7 +279,6 @@ class TestStructuredEmbeddingsContracts:
id="struct-embed-001",
user="test_user",
collection="test_collection",
metadata=[]
)
# Act
@ -313,7 +308,7 @@ class TestStructuredDataSerializationContracts:
def test_structured_data_submission_serialization(self):
"""Test StructuredDataSubmission serialization contract"""
# Arrange
metadata = Metadata(id="test", user="user", collection="col", metadata=[])
metadata = Metadata(id="test", user="user", collection="col")
submission_data = {
"metadata": metadata,
"format": "json",
@ -328,7 +323,7 @@ class TestStructuredDataSerializationContracts:
def test_extracted_object_serialization(self):
"""Test ExtractedObject serialization contract"""
# Arrange
metadata = Metadata(id="test", user="user", collection="col", metadata=[])
metadata = Metadata(id="test", user="user", collection="col")
object_data = {
"metadata": metadata,
"schema_name": "test_schema",
@ -378,7 +373,7 @@ class TestStructuredDataSerializationContracts:
def test_extracted_object_batch_serialization(self):
"""Test ExtractedObject batch serialization contract"""
# Arrange
metadata = Metadata(id="test", user="user", collection="col", metadata=[])
metadata = Metadata(id="test", user="user", collection="col")
batch_object_data = {
"metadata": metadata,
"schema_name": "test_schema",
@ -397,7 +392,7 @@ class TestStructuredDataSerializationContracts:
def test_extracted_object_empty_batch_serialization(self):
"""Test ExtractedObject empty batch serialization contract"""
# Arrange
metadata = Metadata(id="test", user="user", collection="col", metadata=[])
metadata = Metadata(id="test", user="user", collection="col")
empty_batch_data = {
"metadata": metadata,
"schema_name": "test_schema",

View file

@ -76,13 +76,6 @@ class TestAgentKgExtractionIntegration:
chunk=text.encode('utf-8'),
metadata=Metadata(
id="doc123",
metadata=[
Triple(
s=Term(type=IRI, iri="doc123"),
p=Term(type=IRI, iri="http://example.org/type"),
o=Term(type=LITERAL, value="document")
)
]
)
)
@ -136,11 +129,7 @@ class TestAgentKgExtractionIntegration:
# Parse and process
extraction_data = extractor.parse_jsonl(agent_response)
triples, entity_contexts = extractor.process_extraction_data(extraction_data, v.metadata)
# Add metadata triples
for t in v.metadata.metadata:
triples.append(t)
# Emit outputs
if triples:
await extractor.emit_triples(flow("triples"), v.metadata, triples)
@ -242,9 +231,9 @@ class TestAgentKgExtractionIntegration:
# Act - JSONL parsing is lenient, invalid lines are skipped
await configured_agent_extractor.on_message(mock_message, mock_consumer, mock_flow_context)
# Assert - should emit triples (with just metadata) but no entity contexts
# Assert - with no valid extraction data, nothing is emitted
triples_publisher = mock_flow_context("triples")
triples_publisher.send.assert_called_once()
triples_publisher.send.assert_not_called()
entity_contexts_publisher = mock_flow_context("entity-contexts")
entity_contexts_publisher.send.assert_not_called()
@ -268,17 +257,12 @@ class TestAgentKgExtractionIntegration:
# Act
await configured_agent_extractor.on_message(mock_message, mock_consumer, mock_flow_context)
# Assert
# Should still emit outputs (even if empty) to maintain flow consistency
# Assert - with empty extraction results, nothing is emitted
triples_publisher = mock_flow_context("triples")
entity_contexts_publisher = mock_flow_context("entity-contexts")
# Triples should include metadata triples at minimum
triples_publisher.send.assert_called_once()
sent_triples = triples_publisher.send.call_args[0][0]
assert isinstance(sent_triples, Triples)
# Entity contexts should not be sent if empty
# No triples or entity contexts emitted for empty results
triples_publisher.send.assert_not_called()
entity_contexts_publisher.send.assert_not_called()
@pytest.mark.asyncio
@ -308,7 +292,7 @@ class TestAgentKgExtractionIntegration:
test_text = "Test text for prompt rendering"
chunk = Chunk(
chunk=test_text.encode('utf-8'),
metadata=Metadata(id="test-doc", metadata=[])
metadata=Metadata(id="test-doc")
)
agent_client = mock_flow_context("agent-request")
@ -340,7 +324,7 @@ class TestAgentKgExtractionIntegration:
text = f"Test document {i} content"
chunks.append(Chunk(
chunk=text.encode('utf-8'),
metadata=Metadata(id=f"doc{i}", metadata=[])
metadata=Metadata(id=f"doc{i}")
))
agent_client = mock_flow_context("agent-request")
@ -375,7 +359,7 @@ class TestAgentKgExtractionIntegration:
unicode_text = "Machine Learning (学习机器) は人工知能の一分野です。"
chunk = Chunk(
chunk=unicode_text.encode('utf-8'),
metadata=Metadata(id="unicode-doc", metadata=[])
metadata=Metadata(id="unicode-doc")
)
agent_client = mock_flow_context("agent-request")
@ -411,7 +395,7 @@ class TestAgentKgExtractionIntegration:
large_text = "Machine Learning is important. " * 1000 # Repeat to create large text
chunk = Chunk(
chunk=large_text.encode('utf-8'),
metadata=Metadata(id="large-doc", metadata=[])
metadata=Metadata(id="large-doc")
)
agent_client = mock_flow_context("agent-request")

View file

@ -171,7 +171,6 @@ async def test_export_no_message_loss_integration(mock_backend):
triples_obj = Triples(
metadata=Metadata(
id=f"export-msg-{i}",
metadata=to_subgraph(msg_data["metadata"]["metadata"]),
user=msg_data["metadata"]["user"],
collection=msg_data["metadata"]["collection"],
),

View file

@ -92,7 +92,6 @@ class TestKnowledgeGraphPipelineIntegration:
id="doc-123",
user="test_user",
collection="test_collection",
metadata=[]
),
chunk=b"Machine Learning is a subset of Artificial Intelligence. Neural Networks are used in Machine Learning to process complex patterns."
)
@ -243,13 +242,12 @@ class TestKnowledgeGraphPipelineIntegration:
id="test-doc",
user="test_user",
collection="test_collection",
metadata=[]
)
# Act
triples = []
entities = []
for defn in sample_definitions_response:
s = defn["entity"]
o = defn["definition"]
@ -302,12 +300,11 @@ class TestKnowledgeGraphPipelineIntegration:
id="test-doc",
user="test_user",
collection="test_collection",
metadata=[]
)
# Act
triples = []
for rel in sample_relationships_response:
s = rel["subject"]
p = rel["predicate"]
@ -373,7 +370,6 @@ class TestKnowledgeGraphPipelineIntegration:
id="test-doc",
user="test_user",
collection="test_collection",
metadata=[]
),
triples=[
Triple(
@ -406,7 +402,6 @@ class TestKnowledgeGraphPipelineIntegration:
id="test-doc",
user="test_user",
collection="test_collection",
metadata=[]
),
entities=[
EntityEmbeddings(
@ -542,7 +537,7 @@ class TestKnowledgeGraphPipelineIntegration:
]
sample_chunk = Chunk(
metadata=Metadata(id="test", user="user", collection="collection", metadata=[]),
metadata=Metadata(id="test", user="user", collection="collection"),
chunk=b"Test chunk"
)
@ -569,7 +564,7 @@ class TestKnowledgeGraphPipelineIntegration:
# Arrange
large_chunk_batch = [
Chunk(
metadata=Metadata(id=f"doc-{i}", user="user", collection="collection", metadata=[]),
metadata=Metadata(id=f"doc-{i}", user="user", collection="collection"),
chunk=f"Document {i} contains machine learning and AI content.".encode("utf-8")
)
for i in range(100) # Large batch
@ -608,15 +603,8 @@ class TestKnowledgeGraphPipelineIntegration:
id="test-doc-123",
user="test_user",
collection="test_collection",
metadata=[
Triple(
s=Term(type=IRI, iri="doc:test"),
p=Term(type=IRI, iri="dc:title"),
o=Term(type=LITERAL, value="Test Document")
)
]
)
sample_chunk = Chunk(
metadata=original_metadata,
chunk=b"Test content for metadata propagation"

View file

@ -231,7 +231,6 @@ class TestObjectExtractionServiceIntegration:
id="customer-doc-001",
user="integration_test",
collection="test_documents",
metadata=[]
)
chunk_text = """
@ -299,7 +298,6 @@ class TestObjectExtractionServiceIntegration:
id="product-doc-001",
user="integration_test",
collection="test_documents",
metadata=[]
)
chunk_text = """
@ -373,7 +371,6 @@ class TestObjectExtractionServiceIntegration:
id=chunk_id,
user="concurrent_test",
collection="test_collection",
metadata=[]
)
chunk = Chunk(metadata=metadata, chunk=text.encode('utf-8'))
chunks.append(chunk)
@ -470,7 +467,7 @@ class TestObjectExtractionServiceIntegration:
await processor.on_schema_config(integration_config, version=1)
# Create test chunk
metadata = Metadata(id="error-test", user="test", collection="test", metadata=[])
metadata = Metadata(id="error-test", user="test", collection="test")
chunk = Chunk(metadata=metadata, chunk=b"Some text that will fail to process")
mock_msg = MagicMock()
@ -507,7 +504,6 @@ class TestObjectExtractionServiceIntegration:
id="metadata-test-chunk",
user="test_user",
collection="test_collection",
metadata=[] # Could include source document metadata
)
chunk = Chunk(

View file

@ -120,7 +120,6 @@ class TestRowsCassandraIntegration:
id="doc-001",
user="test_user",
collection="import_2024",
metadata=[]
),
schema_name="customer_records",
values=[{
@ -201,7 +200,7 @@ class TestRowsCassandraIntegration:
# Process objects for different schemas
product_obj = ExtractedObject(
metadata=Metadata(id="p1", user="shop", collection="catalog", metadata=[]),
metadata=Metadata(id="p1", user="shop", collection="catalog"),
schema_name="products",
values=[{"product_id": "P001", "name": "Widget", "price": "19.99"}],
confidence=0.9,
@ -209,7 +208,7 @@ class TestRowsCassandraIntegration:
)
order_obj = ExtractedObject(
metadata=Metadata(id="o1", user="shop", collection="sales", metadata=[]),
metadata=Metadata(id="o1", user="shop", collection="sales"),
schema_name="orders",
values=[{"order_id": "O001", "customer_id": "C001", "total": "59.97"}],
confidence=0.85,
@ -254,7 +253,7 @@ class TestRowsCassandraIntegration:
)
test_obj = ExtractedObject(
metadata=Metadata(id="t1", user="test", collection="test", metadata=[]),
metadata=Metadata(id="t1", user="test", collection="test"),
schema_name="indexed_data",
values=[{
"id": "123",
@ -337,7 +336,6 @@ class TestRowsCassandraIntegration:
id="batch-001",
user="test_user",
collection="batch_import",
metadata=[]
),
schema_name="batch_customers",
values=[
@ -391,7 +389,7 @@ class TestRowsCassandraIntegration:
# Process empty batch object
empty_obj = ExtractedObject(
metadata=Metadata(id="empty-1", user="test", collection="empty", metadata=[]),
metadata=Metadata(id="empty-1", user="test", collection="empty"),
schema_name="empty_test",
values=[], # Empty batch
confidence=1.0,
@ -426,7 +424,7 @@ class TestRowsCassandraIntegration:
)
test_obj = ExtractedObject(
metadata=Metadata(id="t1", user="test", collection="test", metadata=[]),
metadata=Metadata(id="t1", user="test", collection="test"),
schema_name="map_test",
values=[{"id": "123", "name": "Test Item", "count": "42"}],
confidence=0.9,
@ -470,7 +468,7 @@ class TestRowsCassandraIntegration:
)
test_obj = ExtractedObject(
metadata=Metadata(id="t1", user="test", collection="my_collection", metadata=[]),
metadata=Metadata(id="t1", user="test", collection="my_collection"),
schema_name="partition_test",
values=[{"id": "123", "category": "test"}],
confidence=0.9,

View file

@ -28,7 +28,6 @@ def sample_text_document():
"""Sample document with moderate length text."""
metadata = Metadata(
id="test-doc-1",
metadata=[],
user="test-user",
collection="test-collection"
)
@ -44,7 +43,6 @@ def long_text_document():
"""Long document for testing multiple chunks."""
metadata = Metadata(
id="test-doc-long",
metadata=[],
user="test-user",
collection="test-collection"
)
@ -61,7 +59,6 @@ def unicode_text_document():
"""Document with various unicode characters."""
metadata = Metadata(
id="test-doc-unicode",
metadata=[],
user="test-user",
collection="test-collection"
)
@ -87,7 +84,6 @@ def empty_text_document():
"""Empty document for edge case testing."""
metadata = Metadata(
id="test-doc-empty",
metadata=[],
user="test-user",
collection="test-collection"
)

View file

@ -184,7 +184,6 @@ class TestRecursiveChunkerSimple(IsolatedAsyncioTestCase):
mock_text_doc = MagicMock()
mock_text_doc.metadata = Metadata(
id="test-doc-123",
metadata=[],
user="test-user",
collection="test-collection"
)

View file

@ -181,7 +181,6 @@ class TestTokenChunkerSimple(IsolatedAsyncioTestCase):
mock_text_doc = MagicMock()
mock_text_doc.metadata = Metadata(
id="test-doc-456",
metadata=[],
user="test-user",
collection="test-collection"
)

View file

@ -73,7 +73,6 @@ def sample_triples():
id="test-doc-id",
user="test-user",
collection="default", # This should be overridden
metadata=[]
),
triples=[
Triple(
@ -93,7 +92,6 @@ def sample_graph_embeddings():
id="test-doc-id",
user="test-user",
collection="default", # This should be overridden
metadata=[]
),
entities=[
EntityEmbeddings(

View file

@ -55,13 +55,6 @@ def sample_objects_message():
return {
"metadata": {
"id": "obj-123",
"metadata": [
{
"s": {"v": "obj-123", "e": False},
"p": {"v": "source", "e": False},
"o": {"v": "test", "e": False}
}
],
"user": "testuser",
"collection": "testcollection"
},
@ -244,7 +237,6 @@ class TestRowsImportMessageProcessing:
assert sent_object.metadata.id == "obj-123"
assert sent_object.metadata.user == "testuser"
assert sent_object.metadata.collection == "testcollection"
assert len(sent_object.metadata.metadata) == 1 # One triple in metadata
@patch('trustgraph.gateway.dispatch.rows_import.Publisher')
@pytest.mark.asyncio
@ -277,7 +269,6 @@ class TestRowsImportMessageProcessing:
assert sent_object.values[0]["field1"] == "value1"
assert sent_object.confidence == 1.0 # Default value
assert sent_object.source_span == "" # Default value
assert len(sent_object.metadata.metadata) == 0 # Default empty list
@patch('trustgraph.gateway.dispatch.rows_import.Publisher')
@pytest.mark.asyncio

View file

@ -29,11 +29,10 @@ class Triple:
self.o = o
class Metadata:
def __init__(self, id, user, collection, metadata):
def __init__(self, id, user, collection):
self.id = id
self.user = user
self.collection = collection
self.metadata = metadata
class Triples:
def __init__(self, metadata, triples):
@ -110,7 +109,6 @@ def sample_triples(sample_triple):
id="test-doc-123",
user="test_user",
collection="test_collection",
metadata=[]
)
return Triples(
@ -126,7 +124,6 @@ def sample_chunk():
id="test-chunk-456",
user="test_user",
collection="test_collection",
metadata=[]
)
return Chunk(

View file

@ -51,13 +51,6 @@ class TestAgentKgExtractor:
"""Sample metadata for testing"""
return Metadata(
id="doc123",
metadata=[
Triple(
s=Term(type=IRI, iri="doc123"),
p=Term(type=IRI, iri="http://example.org/type"),
o=Term(type=LITERAL, value="document")
)
]
)
@pytest.fixture
@ -274,7 +267,7 @@ This is not JSON at all
def test_process_extraction_data_no_metadata_id(self, agent_extractor):
"""Test processing when metadata has no ID"""
metadata = Metadata(id=None, metadata=[])
metadata = Metadata(id=None)
data = [
{"type": "definition", "entity": "Test Entity", "definition": "Test definition"}
]
@ -345,8 +338,6 @@ This is not JSON at all
assert sent_triples.metadata.id == sample_metadata.id
assert sent_triples.metadata.user == sample_metadata.user
assert sent_triples.metadata.collection == sample_metadata.collection
# Note: metadata.metadata is now empty array in the new implementation
assert sent_triples.metadata.metadata == []
assert len(sent_triples.triples) == 1
assert sent_triples.triples[0].s.iri == "test:subject"
@ -371,8 +362,6 @@ This is not JSON at all
assert sent_contexts.metadata.id == sample_metadata.id
assert sent_contexts.metadata.user == sample_metadata.user
assert sent_contexts.metadata.collection == sample_metadata.collection
# Note: metadata.metadata is now empty array in the new implementation
assert sent_contexts.metadata.metadata == []
assert len(sent_contexts.entities) == 1
assert sent_contexts.entities[0].entity.iri == "test:entity"

View file

@ -177,13 +177,13 @@ class TestAgentKgExtractionEdgeCases:
pass
# Test with metadata without ID
metadata = Metadata(id=None, metadata=[])
metadata = Metadata(id=None)
triples, contexts = agent_extractor.process_extraction_data([], metadata)
assert len(triples) == 0
assert len(contexts) == 0
# Test with metadata with empty string ID
metadata = Metadata(id="", metadata=[])
metadata = Metadata(id="")
data = [{"type": "definition", "entity": "Test", "definition": "Test def"}]
triples, contexts = agent_extractor.process_extraction_data(data, metadata)
@ -193,7 +193,7 @@ class TestAgentKgExtractionEdgeCases:
def test_process_extraction_data_special_entity_names(self, agent_extractor):
"""Test processing with special characters in entity names"""
metadata = Metadata(id="doc123", metadata=[])
metadata = Metadata(id="doc123")
special_entities = [
"Entity with spaces",
@ -225,7 +225,7 @@ class TestAgentKgExtractionEdgeCases:
def test_process_extraction_data_very_long_definitions(self, agent_extractor):
"""Test processing with very long entity definitions"""
metadata = Metadata(id="doc123", metadata=[])
metadata = Metadata(id="doc123")
# Create very long definition
long_definition = "This is a very long definition. " * 1000
@ -247,7 +247,7 @@ class TestAgentKgExtractionEdgeCases:
def test_process_extraction_data_duplicate_entities(self, agent_extractor):
"""Test processing with duplicate entity names"""
metadata = Metadata(id="doc123", metadata=[])
metadata = Metadata(id="doc123")
data = [
{"type": "definition", "entity": "Machine Learning", "definition": "First definition"},
@ -269,7 +269,7 @@ class TestAgentKgExtractionEdgeCases:
def test_process_extraction_data_empty_strings(self, agent_extractor):
"""Test processing with empty strings in data"""
metadata = Metadata(id="doc123", metadata=[])
metadata = Metadata(id="doc123")
data = [
{"type": "definition", "entity": "", "definition": "Definition for empty entity"},
@ -291,7 +291,7 @@ class TestAgentKgExtractionEdgeCases:
def test_process_extraction_data_nested_json_in_strings(self, agent_extractor):
"""Test processing when definitions contain JSON-like strings"""
metadata = Metadata(id="doc123", metadata=[])
metadata = Metadata(id="doc123")
data = [
{
@ -315,7 +315,7 @@ class TestAgentKgExtractionEdgeCases:
def test_process_extraction_data_boolean_object_entity_variations(self, agent_extractor):
"""Test processing with various boolean values for object-entity"""
metadata = Metadata(id="doc123", metadata=[])
metadata = Metadata(id="doc123")
data = [
# Explicit True
@ -343,7 +343,7 @@ class TestAgentKgExtractionEdgeCases:
@pytest.mark.asyncio
async def test_emit_empty_collections(self, agent_extractor):
"""Test emitting empty triples and entity contexts"""
metadata = Metadata(id="test", metadata=[])
metadata = Metadata(id="test")
# Test emitting empty triples
mock_publisher = AsyncMock()
@ -389,7 +389,7 @@ class TestAgentKgExtractionEdgeCases:
def test_process_extraction_data_performance_large_dataset(self, agent_extractor):
"""Test performance with large extraction datasets"""
metadata = Metadata(id="large-doc", metadata=[])
metadata = Metadata(id="large-doc")
# Create large dataset in JSONL format
num_definitions = 1000

View file

@ -314,7 +314,6 @@ class TestObjectExtractionBusinessLogic:
id="test-extraction-001",
user="test_user",
collection="test_collection",
metadata=[]
)
values = [{

View file

@ -373,7 +373,6 @@ class TestTripleConstructionLogic:
id="test-doc-123",
user="test_user",
collection="test_collection",
metadata=[]
)
# Act

View file

@ -190,7 +190,6 @@ class TestRowsCassandraStorageLogic:
id="test-001",
user="test_user",
collection="test_collection",
metadata=[]
),
schema_name="test_schema",
values=[{"id": "123", "value": "test_data"}],
@ -252,7 +251,6 @@ class TestRowsCassandraStorageLogic:
id="test-001",
user="test_user",
collection="test_collection",
metadata=[]
),
schema_name="multi_index_schema",
values=[{"id": "123", "category": "electronics", "status": "active"}],
@ -310,7 +308,6 @@ class TestRowsCassandraStorageBatchLogic:
id="batch-001",
user="test_user",
collection="batch_collection",
metadata=[]
),
schema_name="batch_schema",
values=[
@ -365,7 +362,6 @@ class TestRowsCassandraStorageBatchLogic:
id="empty-001",
user="test_user",
collection="empty_collection",
metadata=[]
),
schema_name="empty_schema",
values=[], # Empty batch

View file

@ -2,38 +2,30 @@ import base64
from typing import Dict, Any
from ...schema import Document, TextDocument, Chunk, DocumentEmbeddings, ChunkEmbeddings
from .base import SendTranslator
from .metadata import DocumentMetadataTranslator
from .primitives import SubgraphTranslator
class DocumentTranslator(SendTranslator):
"""Translator for Document schema objects (PDF docs etc.)"""
def __init__(self):
self.subgraph_translator = SubgraphTranslator()
def to_pulsar(self, data: Dict[str, Any]) -> Document:
metadata = data.get("metadata", [])
# Handle base64 content validation
doc = base64.b64decode(data["data"])
from ...schema import Metadata
return Document(
metadata=Metadata(
id=data.get("id"),
metadata=self.subgraph_translator.to_pulsar(metadata) if metadata else [],
user=data.get("user", "trustgraph"),
collection=data.get("collection", "default"),
),
data=base64.b64encode(doc).decode("utf-8")
)
def from_pulsar(self, obj: Document) -> Dict[str, Any]:
result = {
"data": obj.data
}
if obj.metadata:
metadata_dict = {}
if obj.metadata.id:
@ -42,43 +34,36 @@ class DocumentTranslator(SendTranslator):
metadata_dict["user"] = obj.metadata.user
if obj.metadata.collection:
metadata_dict["collection"] = obj.metadata.collection
if obj.metadata.metadata:
metadata_dict["metadata"] = self.subgraph_translator.from_pulsar(obj.metadata.metadata)
result["metadata"] = metadata_dict
return result
class TextDocumentTranslator(SendTranslator):
"""Translator for TextDocument schema objects"""
def __init__(self):
self.subgraph_translator = SubgraphTranslator()
def to_pulsar(self, data: Dict[str, Any]) -> TextDocument:
metadata = data.get("metadata", [])
charset = data.get("charset", "utf-8")
# Text is base64 encoded in input
text = base64.b64decode(data["text"]).decode(charset)
from ...schema import Metadata
return TextDocument(
metadata=Metadata(
id=data.get("id"),
metadata=self.subgraph_translator.to_pulsar(metadata) if metadata else [],
user=data.get("user", "trustgraph"),
collection=data.get("collection", "default"),
),
text=text.encode("utf-8")
)
def from_pulsar(self, obj: TextDocument) -> Dict[str, Any]:
result = {
"text": obj.text.decode("utf-8") if isinstance(obj.text, bytes) else obj.text
}
if obj.metadata:
metadata_dict = {}
if obj.metadata.id:
@ -87,39 +72,31 @@ class TextDocumentTranslator(SendTranslator):
metadata_dict["user"] = obj.metadata.user
if obj.metadata.collection:
metadata_dict["collection"] = obj.metadata.collection
if obj.metadata.metadata:
metadata_dict["metadata"] = self.subgraph_translator.from_pulsar(obj.metadata.metadata)
result["metadata"] = metadata_dict
return result
class ChunkTranslator(SendTranslator):
"""Translator for Chunk schema objects"""
def __init__(self):
self.subgraph_translator = SubgraphTranslator()
def to_pulsar(self, data: Dict[str, Any]) -> Chunk:
metadata = data.get("metadata", [])
from ...schema import Metadata
return Chunk(
metadata=Metadata(
id=data.get("id"),
metadata=self.subgraph_translator.to_pulsar(metadata) if metadata else [],
user=data.get("user", "trustgraph"),
collection=data.get("collection", "default"),
),
chunk=data["chunk"].encode("utf-8") if isinstance(data["chunk"], str) else data["chunk"]
)
def from_pulsar(self, obj: Chunk) -> Dict[str, Any]:
result = {
"chunk": obj.chunk.decode("utf-8") if isinstance(obj.chunk, bytes) else obj.chunk
}
if obj.metadata:
metadata_dict = {}
if obj.metadata.id:
@ -128,20 +105,15 @@ class ChunkTranslator(SendTranslator):
metadata_dict["user"] = obj.metadata.user
if obj.metadata.collection:
metadata_dict["collection"] = obj.metadata.collection
if obj.metadata.metadata:
metadata_dict["metadata"] = self.subgraph_translator.from_pulsar(obj.metadata.metadata)
result["metadata"] = metadata_dict
return result
class DocumentEmbeddingsTranslator(SendTranslator):
"""Translator for DocumentEmbeddings schema objects"""
def __init__(self):
self.subgraph_translator = SubgraphTranslator()
def to_pulsar(self, data: Dict[str, Any]) -> DocumentEmbeddings:
metadata = data.get("metadata", {})
@ -157,13 +129,12 @@ class DocumentEmbeddingsTranslator(SendTranslator):
return DocumentEmbeddings(
metadata=Metadata(
id=metadata.get("id"),
metadata=self.subgraph_translator.to_pulsar(metadata.get("metadata", [])),
user=metadata.get("user", "trustgraph"),
collection=metadata.get("collection", "default"),
),
chunks=chunks
)
def from_pulsar(self, obj: DocumentEmbeddings) -> Dict[str, Any]:
result = {
"chunks": [
@ -174,7 +145,7 @@ class DocumentEmbeddingsTranslator(SendTranslator):
for chunk in obj.chunks
]
}
if obj.metadata:
metadata_dict = {}
if obj.metadata.id:
@ -183,9 +154,7 @@ class DocumentEmbeddingsTranslator(SendTranslator):
metadata_dict["user"] = obj.metadata.user
if obj.metadata.collection:
metadata_dict["collection"] = obj.metadata.collection
if obj.metadata.metadata:
metadata_dict["metadata"] = self.subgraph_translator.from_pulsar(obj.metadata.metadata)
result["metadata"] = metadata_dict
return result

View file

@ -1,43 +1,36 @@
from typing import Dict, Any, Tuple, Optional
from ...schema import (
KnowledgeRequest, KnowledgeResponse, Triples, GraphEmbeddings,
KnowledgeRequest, KnowledgeResponse, Triples, GraphEmbeddings,
Metadata, EntityEmbeddings
)
from .base import MessageTranslator
from .primitives import ValueTranslator, SubgraphTranslator
from .metadata import DocumentMetadataTranslator
class KnowledgeRequestTranslator(MessageTranslator):
"""Translator for KnowledgeRequest schema objects"""
def __init__(self):
self.value_translator = ValueTranslator()
self.subgraph_translator = SubgraphTranslator()
def to_pulsar(self, data: Dict[str, Any]) -> KnowledgeRequest:
triples = None
if "triples" in data:
triples = Triples(
metadata=Metadata(
id=data["triples"]["metadata"]["id"],
metadata=self.subgraph_translator.to_pulsar(
data["triples"]["metadata"]["metadata"]
),
user=data["triples"]["metadata"]["user"],
collection=data["triples"]["metadata"]["collection"]
),
triples=self.subgraph_translator.to_pulsar(data["triples"]["triples"]),
)
graph_embeddings = None
if "graph-embeddings" in data:
graph_embeddings = GraphEmbeddings(
metadata=Metadata(
id=data["graph-embeddings"]["metadata"]["id"],
metadata=self.subgraph_translator.to_pulsar(
data["graph-embeddings"]["metadata"]["metadata"]
),
user=data["graph-embeddings"]["metadata"]["user"],
collection=data["graph-embeddings"]["metadata"]["collection"]
),
@ -49,7 +42,7 @@ class KnowledgeRequestTranslator(MessageTranslator):
for ent in data["graph-embeddings"]["entities"]
]
)
return KnowledgeRequest(
operation=data.get("operation"),
user=data.get("user"),
@ -59,10 +52,10 @@ class KnowledgeRequestTranslator(MessageTranslator):
triples=triples,
graph_embeddings=graph_embeddings,
)
def from_pulsar(self, obj: KnowledgeRequest) -> Dict[str, Any]:
result = {}
if obj.operation:
result["operation"] = obj.operation
if obj.user:
@ -73,27 +66,21 @@ class KnowledgeRequestTranslator(MessageTranslator):
result["flow"] = obj.flow
if obj.collection:
result["collection"] = obj.collection
if obj.triples:
result["triples"] = {
"metadata": {
"id": obj.triples.metadata.id,
"metadata": self.subgraph_translator.from_pulsar(
obj.triples.metadata.metadata
),
"user": obj.triples.metadata.user,
"collection": obj.triples.metadata.collection,
},
"triples": self.subgraph_translator.from_pulsar(obj.triples.triples),
}
if obj.graph_embeddings:
result["graph-embeddings"] = {
"metadata": {
"id": obj.graph_embeddings.metadata.id,
"metadata": self.subgraph_translator.from_pulsar(
obj.graph_embeddings.metadata.metadata
),
"user": obj.graph_embeddings.metadata.user,
"collection": obj.graph_embeddings.metadata.collection,
},
@ -105,50 +92,44 @@ class KnowledgeRequestTranslator(MessageTranslator):
for entity in obj.graph_embeddings.entities
],
}
return result
class KnowledgeResponseTranslator(MessageTranslator):
"""Translator for KnowledgeResponse schema objects"""
def __init__(self):
self.value_translator = ValueTranslator()
self.subgraph_translator = SubgraphTranslator()
def to_pulsar(self, data: Dict[str, Any]) -> KnowledgeResponse:
raise NotImplementedError("Response translation to Pulsar not typically needed")
def from_pulsar(self, obj: KnowledgeResponse) -> Dict[str, Any]:
# Response to list operation
if obj.ids is not None:
return {"ids": obj.ids}
# Streaming triples response
if obj.triples:
return {
"triples": {
"metadata": {
"id": obj.triples.metadata.id,
"metadata": self.subgraph_translator.from_pulsar(
obj.triples.metadata.metadata
),
"user": obj.triples.metadata.user,
"collection": obj.triples.metadata.collection,
},
"triples": self.subgraph_translator.from_pulsar(obj.triples.triples),
}
}
# Streaming graph embeddings response
if obj.graph_embeddings:
return {
"graph-embeddings": {
"metadata": {
"id": obj.graph_embeddings.metadata.id,
"metadata": self.subgraph_translator.from_pulsar(
obj.graph_embeddings.metadata.metadata
),
"user": obj.graph_embeddings.metadata.user,
"collection": obj.graph_embeddings.metadata.collection,
},
@ -161,11 +142,11 @@ class KnowledgeResponseTranslator(MessageTranslator):
],
}
}
# End of stream marker
if obj.eos is True:
return {"eos": True}
# Empty response (successful delete)
return {}

View file

@ -1,14 +1,10 @@
from dataclasses import dataclass, field
from .primitives import Triple
from dataclasses import dataclass
@dataclass
class Metadata:
# Source identifier
id: str = ""
# Subgraph
metadata: list[Triple] = field(default_factory=list)
# Collection management
user: str = ""
collection: str = ""

View file

@ -178,7 +178,6 @@ class Processor(ChunkingService):
await flow("triples").send(Triples(
metadata=Metadata(
id=chunk_uri,
metadata=[],
user=v.metadata.user,
collection=v.metadata.collection,
),
@ -189,7 +188,6 @@ class Processor(ChunkingService):
r = Chunk(
metadata=Metadata(
id=chunk_uri,
metadata=[],
user=v.metadata.user,
collection=v.metadata.collection,
),

View file

@ -302,7 +302,6 @@ class Processor(FlowProcessor):
await flow("triples").send(Triples(
metadata=Metadata(
id=pg_uri,
metadata=[],
user=v.metadata.user,
collection=v.metadata.collection,
),
@ -314,7 +313,6 @@ class Processor(FlowProcessor):
r = TextDocument(
metadata=Metadata(
id=pg_uri,
metadata=[],
user=v.metadata.user,
collection=v.metadata.collection,
),

View file

@ -104,7 +104,6 @@ class Processor(FlowProcessor):
tpls = Triples(
metadata = Metadata(
id = metadata.id,
metadata = [],
user = metadata.user,
collection = metadata.collection,
),
@ -117,7 +116,6 @@ class Processor(FlowProcessor):
ecs = EntityContexts(
metadata = Metadata(
id = metadata.id,
metadata = [],
user = metadata.user,
collection = metadata.collection,
),
@ -216,10 +214,6 @@ class Processor(FlowProcessor):
extraction_data, v.metadata
)
# Put document metadata into triples
for t in v.metadata.metadata:
triples.append(t)
# Emit outputs
if triples:
await self.emit_triples(flow("triples"), v.metadata, triples)

View file

@ -218,7 +218,6 @@ class Processor(FlowProcessor):
flow("triples"),
Metadata(
id=v.metadata.id,
metadata=[],
user=v.metadata.user,
collection=v.metadata.collection,
),
@ -232,7 +231,6 @@ class Processor(FlowProcessor):
flow("entity-contexts"),
Metadata(
id=v.metadata.id,
metadata=[],
user=v.metadata.user,
collection=v.metadata.collection,
),

View file

@ -306,10 +306,6 @@ class Processor(FlowProcessor):
flow, chunk, ontology_subset, prompt_variables
)
# Add metadata triples
for t in v.metadata.metadata:
triples.append(t)
# Generate ontology definition triples
ontology_triples = self.build_ontology_triples(ontology_subset)
@ -558,7 +554,6 @@ class Processor(FlowProcessor):
t = Triples(
metadata=Metadata(
id=metadata.id,
metadata=[],
user=metadata.user,
collection=metadata.collection,
),
@ -571,7 +566,6 @@ class Processor(FlowProcessor):
ec = EntityContexts(
metadata=Metadata(
id=metadata.id,
metadata=[],
user=metadata.user,
collection=metadata.collection,
),

View file

@ -219,7 +219,6 @@ class Processor(FlowProcessor):
flow("triples"),
Metadata(
id=v.metadata.id,
metadata=[],
user=v.metadata.user,
collection=v.metadata.collection,
),

View file

@ -272,7 +272,6 @@ class Processor(FlowProcessor):
extracted = ExtractedObject(
metadata=Metadata(
id=f"{v.metadata.id}:{schema_name}",
metadata=[],
user=v.metadata.user,
collection=v.metadata.collection,
),

View file

@ -53,7 +53,6 @@ class RowsImport:
elt = ExtractedObject(
metadata=Metadata(
id=data["metadata"]["id"],
metadata=to_subgraph(data["metadata"].get("metadata", [])),
user=data["metadata"]["user"],
collection=data["metadata"]["collection"],
),

View file

@ -37,18 +37,17 @@ def serialize_triples(message):
return {
"metadata": {
"id": message.metadata.id,
"metadata": serialize_subgraph(message.metadata.metadata),
"user": message.metadata.user,
"collection": message.metadata.collection,
},
"triples": serialize_subgraph(message.triples),
}
def serialize_graph_embeddings(message):
return {
"metadata": {
"id": message.metadata.id,
"metadata": serialize_subgraph(message.metadata.metadata),
"user": message.metadata.user,
"collection": message.metadata.collection,
},
@ -61,11 +60,11 @@ def serialize_graph_embeddings(message):
],
}
def serialize_entity_contexts(message):
return {
"metadata": {
"id": message.metadata.id,
"metadata": serialize_subgraph(message.metadata.metadata),
"user": message.metadata.user,
"collection": message.metadata.collection,
},
@ -78,11 +77,11 @@ def serialize_entity_contexts(message):
],
}
def serialize_document_embeddings(message):
return {
"metadata": {
"id": message.metadata.id,
"metadata": serialize_subgraph(message.metadata.metadata),
"user": message.metadata.user,
"collection": message.metadata.collection,
},

View file

@ -48,7 +48,6 @@ class TriplesImport:
elt = Triples(
metadata=Metadata(
id=data["metadata"]["id"],
metadata=to_subgraph(data["metadata"]["metadata"]),
user=data["metadata"]["user"],
collection=data["metadata"]["collection"],
),

View file

@ -334,7 +334,6 @@ class Processor(AsyncProcessor):
triples_msg = Triples(
metadata=Metadata(
id=doc_uri,
metadata=[],
user=processing.user,
collection=processing.collection,
),
@ -381,7 +380,6 @@ class Processor(AsyncProcessor):
doc = TextDocument(
metadata = Metadata(
id = document.id,
metadata = document.metadata,
user = processing.user,
collection = processing.collection
),
@ -392,7 +390,6 @@ class Processor(AsyncProcessor):
doc = TextDocument(
metadata = Metadata(
id = document.id,
metadata = document.metadata,
user = processing.user,
collection = processing.collection
),
@ -408,7 +405,6 @@ class Processor(AsyncProcessor):
doc = Document(
metadata = Metadata(
id = document.id,
metadata = document.metadata,
user = processing.user,
collection = processing.collection
),
@ -419,7 +415,6 @@ class Processor(AsyncProcessor):
doc = Document(
metadata = Metadata(
id = document.id,
metadata = document.metadata,
user = processing.user,
collection = processing.collection
),

View file

@ -243,7 +243,6 @@ class Processor(FlowProcessor):
await flow("explainability").send(Triples(
metadata=Metadata(
id=explain_id,
metadata=[],
user=v.user,
collection=v.collection, # Store in user's collection, not separate explainability collection
),

View file

@ -218,16 +218,6 @@ class KnowledgeTableStore:
when = int(time.time() * 1000)
if m.metadata.metadata:
metadata = [
(
*term_to_tuple(v.s), *term_to_tuple(v.p), *term_to_tuple(v.o)
)
for v in m.metadata.metadata
]
else:
metadata = []
triples = [
(
*term_to_tuple(v.s), *term_to_tuple(v.p), *term_to_tuple(v.o)
@ -244,7 +234,7 @@ class KnowledgeTableStore:
(
uuid.uuid4(), m.metadata.user,
m.metadata.id, when,
metadata, triples,
[], triples,
)
)
@ -259,16 +249,6 @@ class KnowledgeTableStore:
when = int(time.time() * 1000)
if m.metadata.metadata:
metadata = [
(
*term_to_tuple(v.s), *term_to_tuple(v.p), *term_to_tuple(v.o)
)
for v in m.metadata.metadata
]
else:
metadata = []
entities = [
(
term_to_tuple(v.entity),
@ -286,7 +266,7 @@ class KnowledgeTableStore:
(
uuid.uuid4(), m.metadata.user,
m.metadata.id, when,
metadata, entities,
[], entities,
)
)
@ -301,16 +281,6 @@ class KnowledgeTableStore:
when = int(time.time() * 1000)
if m.metadata.metadata:
metadata = [
(
*term_to_tuple(v.s), *term_to_tuple(v.p), *term_to_tuple(v.o)
)
for v in m.metadata.metadata
]
else:
metadata = []
chunks = [
(
v.chunk_id,
@ -328,7 +298,7 @@ class KnowledgeTableStore:
(
uuid.uuid4(), m.metadata.user,
m.metadata.id, when,
metadata, chunks,
[], chunks,
)
)
@ -423,18 +393,6 @@ class KnowledgeTableStore:
for row in resp:
if row[2]:
metadata = [
Triple(
s = tuple_to_term(elt[0], elt[1]),
p = tuple_to_term(elt[2], elt[3]),
o = tuple_to_term(elt[4], elt[5]),
)
for elt in row[2]
]
else:
metadata = []
if row[3]:
triples = [
Triple(
@ -453,7 +411,6 @@ class KnowledgeTableStore:
id = document_id,
user = user,
collection = "default", # FIXME: What to put here?
metadata = metadata,
),
triples = triples
)
@ -482,18 +439,6 @@ class KnowledgeTableStore:
for row in resp:
if row[2]:
metadata = [
Triple(
s = tuple_to_term(elt[0], elt[1]),
p = tuple_to_term(elt[2], elt[3]),
o = tuple_to_term(elt[4], elt[5]),
)
for elt in row[2]
]
else:
metadata = []
if row[3]:
entities = [
EntityEmbeddings(
@ -511,7 +456,6 @@ class KnowledgeTableStore:
id = document_id,
user = user,
collection = "default", # FIXME: What to put here?
metadata = metadata,
),
entities = entities
)