From 286f762369f26a5675c01762c5da8a641e140f84 Mon Sep 17 00:00:00 2001 From: cybermaggedon Date: Wed, 11 Mar 2026 12:16:39 +0000 Subject: [PATCH] The id field in pipeline Metadata was being overwritten at each processing (#686) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The id field in pipeline Metadata was being overwritten at each processing stage (document → page → chunk), causing knowledge storage to create separate cores per chunk instead of grouping by document. Add a root field that: - Is set by librarian to the original document ID - Is copied unchanged through PDF decoder, chunkers, and extractors - Is used by knowledge storage for document_id grouping (with fallback to id) Changes: - Add root field to Metadata schema with empty string default - Set root=document.id in librarian when initiating document processing - Copy root through PDF decoder, recursive chunker, and all extractors - Update knowledge storage to use root (or id as fallback) for grouping - Add root handling to translators and gateway serialization - Update test mock Metadata class to include root parameter --- tests/unit/test_knowledge_graph/conftest.py | 3 ++- .../messaging/translators/document_loading.py | 12 ++++++++++++ .../trustgraph/messaging/translators/knowledge.py | 6 ++++++ trustgraph-base/trustgraph/schema/core/metadata.py | 3 +++ .../trustgraph/chunking/recursive/chunker.py | 2 ++ .../trustgraph/decoding/pdf/pdf_decoder.py | 2 ++ .../trustgraph/extract/kg/agent/extract.py | 2 ++ .../trustgraph/extract/kg/definitions/extract.py | 2 ++ .../trustgraph/extract/kg/ontology/extract.py | 2 ++ .../trustgraph/extract/kg/relationships/extract.py | 1 + .../trustgraph/extract/kg/rows/processor.py | 1 + .../trustgraph/gateway/dispatch/serialize.py | 4 ++++ .../trustgraph/gateway/dispatch/triples_import.py | 1 + trustgraph-flow/trustgraph/librarian/service.py | 5 +++++ trustgraph-flow/trustgraph/tables/knowledge.py | 6 +++--- 15 files changed, 48 insertions(+), 4 deletions(-) diff --git a/tests/unit/test_knowledge_graph/conftest.py b/tests/unit/test_knowledge_graph/conftest.py index 0e18b2e1..8e8d9e43 100644 --- a/tests/unit/test_knowledge_graph/conftest.py +++ b/tests/unit/test_knowledge_graph/conftest.py @@ -29,8 +29,9 @@ class Triple: self.o = o class Metadata: - def __init__(self, id, user, collection): + def __init__(self, id, user, collection, root=""): self.id = id + self.root = root self.user = user self.collection = collection diff --git a/trustgraph-base/trustgraph/messaging/translators/document_loading.py b/trustgraph-base/trustgraph/messaging/translators/document_loading.py index ae9c4c91..7c2a013f 100644 --- a/trustgraph-base/trustgraph/messaging/translators/document_loading.py +++ b/trustgraph-base/trustgraph/messaging/translators/document_loading.py @@ -15,6 +15,7 @@ class DocumentTranslator(SendTranslator): return Document( metadata=Metadata( id=data.get("id"), + root=data.get("root", ""), user=data.get("user", "trustgraph"), collection=data.get("collection", "default"), ), @@ -30,6 +31,8 @@ class DocumentTranslator(SendTranslator): metadata_dict = {} if obj.metadata.id: metadata_dict["id"] = obj.metadata.id + if obj.metadata.root: + metadata_dict["root"] = obj.metadata.root if obj.metadata.user: metadata_dict["user"] = obj.metadata.user if obj.metadata.collection: @@ -53,6 +56,7 @@ class TextDocumentTranslator(SendTranslator): return TextDocument( metadata=Metadata( id=data.get("id"), + root=data.get("root", ""), user=data.get("user", "trustgraph"), collection=data.get("collection", "default"), ), @@ -68,6 +72,8 @@ class TextDocumentTranslator(SendTranslator): metadata_dict = {} if obj.metadata.id: metadata_dict["id"] = obj.metadata.id + if obj.metadata.root: + metadata_dict["root"] = obj.metadata.root if obj.metadata.user: metadata_dict["user"] = obj.metadata.user if obj.metadata.collection: @@ -86,6 +92,7 @@ class ChunkTranslator(SendTranslator): return Chunk( metadata=Metadata( id=data.get("id"), + root=data.get("root", ""), user=data.get("user", "trustgraph"), collection=data.get("collection", "default"), ), @@ -101,6 +108,8 @@ class ChunkTranslator(SendTranslator): metadata_dict = {} if obj.metadata.id: metadata_dict["id"] = obj.metadata.id + if obj.metadata.root: + metadata_dict["root"] = obj.metadata.root if obj.metadata.user: metadata_dict["user"] = obj.metadata.user if obj.metadata.collection: @@ -129,6 +138,7 @@ class DocumentEmbeddingsTranslator(SendTranslator): return DocumentEmbeddings( metadata=Metadata( id=metadata.get("id"), + root=metadata.get("root", ""), user=metadata.get("user", "trustgraph"), collection=metadata.get("collection", "default"), ), @@ -150,6 +160,8 @@ class DocumentEmbeddingsTranslator(SendTranslator): metadata_dict = {} if obj.metadata.id: metadata_dict["id"] = obj.metadata.id + if obj.metadata.root: + metadata_dict["root"] = obj.metadata.root if obj.metadata.user: metadata_dict["user"] = obj.metadata.user if obj.metadata.collection: diff --git a/trustgraph-base/trustgraph/messaging/translators/knowledge.py b/trustgraph-base/trustgraph/messaging/translators/knowledge.py index 99243c6b..0043d1e4 100644 --- a/trustgraph-base/trustgraph/messaging/translators/knowledge.py +++ b/trustgraph-base/trustgraph/messaging/translators/knowledge.py @@ -20,6 +20,7 @@ class KnowledgeRequestTranslator(MessageTranslator): triples = Triples( metadata=Metadata( id=data["triples"]["metadata"]["id"], + root=data["triples"]["metadata"].get("root", ""), user=data["triples"]["metadata"]["user"], collection=data["triples"]["metadata"]["collection"] ), @@ -31,6 +32,7 @@ class KnowledgeRequestTranslator(MessageTranslator): graph_embeddings = GraphEmbeddings( metadata=Metadata( id=data["graph-embeddings"]["metadata"]["id"], + root=data["graph-embeddings"]["metadata"].get("root", ""), user=data["graph-embeddings"]["metadata"]["user"], collection=data["graph-embeddings"]["metadata"]["collection"] ), @@ -71,6 +73,7 @@ class KnowledgeRequestTranslator(MessageTranslator): result["triples"] = { "metadata": { "id": obj.triples.metadata.id, + "root": obj.triples.metadata.root, "user": obj.triples.metadata.user, "collection": obj.triples.metadata.collection, }, @@ -81,6 +84,7 @@ class KnowledgeRequestTranslator(MessageTranslator): result["graph-embeddings"] = { "metadata": { "id": obj.graph_embeddings.metadata.id, + "root": obj.graph_embeddings.metadata.root, "user": obj.graph_embeddings.metadata.user, "collection": obj.graph_embeddings.metadata.collection, }, @@ -117,6 +121,7 @@ class KnowledgeResponseTranslator(MessageTranslator): "triples": { "metadata": { "id": obj.triples.metadata.id, + "root": obj.triples.metadata.root, "user": obj.triples.metadata.user, "collection": obj.triples.metadata.collection, }, @@ -130,6 +135,7 @@ class KnowledgeResponseTranslator(MessageTranslator): "graph-embeddings": { "metadata": { "id": obj.graph_embeddings.metadata.id, + "root": obj.graph_embeddings.metadata.root, "user": obj.graph_embeddings.metadata.user, "collection": obj.graph_embeddings.metadata.collection, }, diff --git a/trustgraph-base/trustgraph/schema/core/metadata.py b/trustgraph-base/trustgraph/schema/core/metadata.py index edfc30a3..a37a8d62 100644 --- a/trustgraph-base/trustgraph/schema/core/metadata.py +++ b/trustgraph-base/trustgraph/schema/core/metadata.py @@ -5,6 +5,9 @@ class Metadata: # Source identifier id: str = "" + # Root document identifier (set by librarian, preserved through pipeline) + root: str = "" + # Collection management user: str = "" collection: str = "" diff --git a/trustgraph-flow/trustgraph/chunking/recursive/chunker.py b/trustgraph-flow/trustgraph/chunking/recursive/chunker.py index 76d698bc..3438093c 100755 --- a/trustgraph-flow/trustgraph/chunking/recursive/chunker.py +++ b/trustgraph-flow/trustgraph/chunking/recursive/chunker.py @@ -178,6 +178,7 @@ class Processor(ChunkingService): await flow("triples").send(Triples( metadata=Metadata( id=chunk_uri, + root=v.metadata.root, user=v.metadata.user, collection=v.metadata.collection, ), @@ -188,6 +189,7 @@ class Processor(ChunkingService): r = Chunk( metadata=Metadata( id=chunk_uri, + root=v.metadata.root, user=v.metadata.user, collection=v.metadata.collection, ), diff --git a/trustgraph-flow/trustgraph/decoding/pdf/pdf_decoder.py b/trustgraph-flow/trustgraph/decoding/pdf/pdf_decoder.py index cbf2acb4..550948fe 100755 --- a/trustgraph-flow/trustgraph/decoding/pdf/pdf_decoder.py +++ b/trustgraph-flow/trustgraph/decoding/pdf/pdf_decoder.py @@ -302,6 +302,7 @@ class Processor(FlowProcessor): await flow("triples").send(Triples( metadata=Metadata( id=pg_uri, + root=v.metadata.root, user=v.metadata.user, collection=v.metadata.collection, ), @@ -313,6 +314,7 @@ class Processor(FlowProcessor): r = TextDocument( metadata=Metadata( id=pg_uri, + root=v.metadata.root, user=v.metadata.user, collection=v.metadata.collection, ), diff --git a/trustgraph-flow/trustgraph/extract/kg/agent/extract.py b/trustgraph-flow/trustgraph/extract/kg/agent/extract.py index 0e844d17..79d5123d 100644 --- a/trustgraph-flow/trustgraph/extract/kg/agent/extract.py +++ b/trustgraph-flow/trustgraph/extract/kg/agent/extract.py @@ -104,6 +104,7 @@ class Processor(FlowProcessor): tpls = Triples( metadata = Metadata( id = metadata.id, + root = metadata.root, user = metadata.user, collection = metadata.collection, ), @@ -116,6 +117,7 @@ class Processor(FlowProcessor): ecs = EntityContexts( metadata = Metadata( id = metadata.id, + root = metadata.root, user = metadata.user, collection = metadata.collection, ), diff --git a/trustgraph-flow/trustgraph/extract/kg/definitions/extract.py b/trustgraph-flow/trustgraph/extract/kg/definitions/extract.py index c1838071..277157b9 100755 --- a/trustgraph-flow/trustgraph/extract/kg/definitions/extract.py +++ b/trustgraph-flow/trustgraph/extract/kg/definitions/extract.py @@ -218,6 +218,7 @@ class Processor(FlowProcessor): flow("triples"), Metadata( id=v.metadata.id, + root=v.metadata.root, user=v.metadata.user, collection=v.metadata.collection, ), @@ -231,6 +232,7 @@ class Processor(FlowProcessor): flow("entity-contexts"), Metadata( id=v.metadata.id, + root=v.metadata.root, user=v.metadata.user, collection=v.metadata.collection, ), diff --git a/trustgraph-flow/trustgraph/extract/kg/ontology/extract.py b/trustgraph-flow/trustgraph/extract/kg/ontology/extract.py index 6971f92f..eec6face 100644 --- a/trustgraph-flow/trustgraph/extract/kg/ontology/extract.py +++ b/trustgraph-flow/trustgraph/extract/kg/ontology/extract.py @@ -554,6 +554,7 @@ class Processor(FlowProcessor): t = Triples( metadata=Metadata( id=metadata.id, + root=metadata.root, user=metadata.user, collection=metadata.collection, ), @@ -566,6 +567,7 @@ class Processor(FlowProcessor): ec = EntityContexts( metadata=Metadata( id=metadata.id, + root=metadata.root, user=metadata.user, collection=metadata.collection, ), diff --git a/trustgraph-flow/trustgraph/extract/kg/relationships/extract.py b/trustgraph-flow/trustgraph/extract/kg/relationships/extract.py index 7bc69528..a136acc6 100755 --- a/trustgraph-flow/trustgraph/extract/kg/relationships/extract.py +++ b/trustgraph-flow/trustgraph/extract/kg/relationships/extract.py @@ -219,6 +219,7 @@ class Processor(FlowProcessor): flow("triples"), Metadata( id=v.metadata.id, + root=v.metadata.root, user=v.metadata.user, collection=v.metadata.collection, ), diff --git a/trustgraph-flow/trustgraph/extract/kg/rows/processor.py b/trustgraph-flow/trustgraph/extract/kg/rows/processor.py index 8d32963a..88e29116 100644 --- a/trustgraph-flow/trustgraph/extract/kg/rows/processor.py +++ b/trustgraph-flow/trustgraph/extract/kg/rows/processor.py @@ -272,6 +272,7 @@ class Processor(FlowProcessor): extracted = ExtractedObject( metadata=Metadata( id=f"{v.metadata.id}:{schema_name}", + root=v.metadata.root, user=v.metadata.user, collection=v.metadata.collection, ), diff --git a/trustgraph-flow/trustgraph/gateway/dispatch/serialize.py b/trustgraph-flow/trustgraph/gateway/dispatch/serialize.py index a7c19723..f42eee02 100644 --- a/trustgraph-flow/trustgraph/gateway/dispatch/serialize.py +++ b/trustgraph-flow/trustgraph/gateway/dispatch/serialize.py @@ -37,6 +37,7 @@ def serialize_triples(message): return { "metadata": { "id": message.metadata.id, + "root": message.metadata.root, "user": message.metadata.user, "collection": message.metadata.collection, }, @@ -48,6 +49,7 @@ def serialize_graph_embeddings(message): return { "metadata": { "id": message.metadata.id, + "root": message.metadata.root, "user": message.metadata.user, "collection": message.metadata.collection, }, @@ -65,6 +67,7 @@ def serialize_entity_contexts(message): return { "metadata": { "id": message.metadata.id, + "root": message.metadata.root, "user": message.metadata.user, "collection": message.metadata.collection, }, @@ -82,6 +85,7 @@ def serialize_document_embeddings(message): return { "metadata": { "id": message.metadata.id, + "root": message.metadata.root, "user": message.metadata.user, "collection": message.metadata.collection, }, diff --git a/trustgraph-flow/trustgraph/gateway/dispatch/triples_import.py b/trustgraph-flow/trustgraph/gateway/dispatch/triples_import.py index e85d8821..37f123fa 100644 --- a/trustgraph-flow/trustgraph/gateway/dispatch/triples_import.py +++ b/trustgraph-flow/trustgraph/gateway/dispatch/triples_import.py @@ -48,6 +48,7 @@ class TriplesImport: elt = Triples( metadata=Metadata( id=data["metadata"]["id"], + root=data["metadata"].get("root", ""), user=data["metadata"]["user"], collection=data["metadata"]["collection"], ), diff --git a/trustgraph-flow/trustgraph/librarian/service.py b/trustgraph-flow/trustgraph/librarian/service.py index 7361d74e..e017a99d 100755 --- a/trustgraph-flow/trustgraph/librarian/service.py +++ b/trustgraph-flow/trustgraph/librarian/service.py @@ -334,6 +334,7 @@ class Processor(AsyncProcessor): triples_msg = Triples( metadata=Metadata( id=doc_uri, + root=document.id, user=processing.user, collection=processing.collection, ), @@ -380,6 +381,7 @@ class Processor(AsyncProcessor): doc = TextDocument( metadata = Metadata( id = document.id, + root = document.id, user = processing.user, collection = processing.collection ), @@ -390,6 +392,7 @@ class Processor(AsyncProcessor): doc = TextDocument( metadata = Metadata( id = document.id, + root = document.id, user = processing.user, collection = processing.collection ), @@ -405,6 +408,7 @@ class Processor(AsyncProcessor): doc = Document( metadata = Metadata( id = document.id, + root = document.id, user = processing.user, collection = processing.collection ), @@ -415,6 +419,7 @@ class Processor(AsyncProcessor): doc = Document( metadata = Metadata( id = document.id, + root = document.id, user = processing.user, collection = processing.collection ), diff --git a/trustgraph-flow/trustgraph/tables/knowledge.py b/trustgraph-flow/trustgraph/tables/knowledge.py index 7a7b5e40..430dc3c9 100644 --- a/trustgraph-flow/trustgraph/tables/knowledge.py +++ b/trustgraph-flow/trustgraph/tables/knowledge.py @@ -233,7 +233,7 @@ class KnowledgeTableStore: self.insert_triples_stmt, ( uuid.uuid4(), m.metadata.user, - m.metadata.id, when, + m.metadata.root or m.metadata.id, when, [], triples, ) ) @@ -265,7 +265,7 @@ class KnowledgeTableStore: self.insert_graph_embeddings_stmt, ( uuid.uuid4(), m.metadata.user, - m.metadata.id, when, + m.metadata.root or m.metadata.id, when, [], entities, ) ) @@ -297,7 +297,7 @@ class KnowledgeTableStore: self.insert_document_embeddings_stmt, ( uuid.uuid4(), m.metadata.user, - m.metadata.id, when, + m.metadata.root or m.metadata.id, when, [], chunks, ) )