Subgraph provenance (#694)

Replace per-triple provenance reification with subgraph model

Extraction provenance previously created a full reification (statement
URI, activity, agent) for every single extracted triple, producing ~13
provenance triples per knowledge triple.  Since each chunk is processed
by a single LLM call, this was both redundant and semantically
inaccurate.

Now one subgraph object is created per chunk extraction, with
tg:contains linking to each extracted triple.  For 20 extractions from
a chunk this reduces provenance from ~260 triples to ~33.

- Rename tg:reifies -> tg:contains, stmt_uri -> subgraph_uri
- Replace triple_provenance_triples() with subgraph_provenance_triples()
- Refactor kg-extract-definitions and kg-extract-relationships to
  generate provenance once per chunk instead of per triple
- Add subgraph provenance to kg-extract-ontology and kg-extract-agent
  (previously had none)
- Update CLI tools and tech specs to match

Also rename tg-show-document-hierarchy to tg-show-extraction-provenance.

Added extra typing for extraction provenance, fixed extraction prov CLI
This commit is contained in:
cybermaggedon 2026-03-13 11:37:59 +00:00 committed by GitHub
parent 35128ff019
commit 64e3f6bd0d
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
20 changed files with 463 additions and 193 deletions

View file

@ -36,7 +36,7 @@ TG_SELECTED_EDGE = TG + "selectedEdge"
TG_EDGE = TG + "edge"
TG_REASONING = TG + "reasoning"
TG_CONTENT = TG + "content"
TG_REIFIES = TG + "reifies"
TG_CONTAINS = TG + "contains"
PROV = "http://www.w3.org/ns/prov#"
PROV_STARTED_AT_TIME = PROV + "startedAtTime"
PROV_WAS_DERIVED_FROM = PROV + "wasDerivedFrom"
@ -185,18 +185,18 @@ async def _query_edge_provenance(ws_url, flow_id, edge_s, edge_p, edge_o, user,
"""
Query for provenance of an edge (s, p, o) in the knowledge graph.
Finds statements that reify the edge via tg:reifies, then follows
Finds subgraphs that contain the edge via tg:contains, then follows
prov:wasDerivedFrom to find source documents.
Returns list of source URIs (chunks, pages, documents).
"""
# Query for statements that reify this edge: ?stmt tg:reifies <<s p o>>
# Query for subgraphs that contain this edge: ?subgraph tg:contains <<s p o>>
request = {
"id": "edge-prov-request",
"service": "triples",
"flow": flow_id,
"request": {
"p": {"t": "i", "i": TG_REIFIES},
"p": {"t": "i", "i": TG_CONTAINS},
"o": {
"t": "t", # Quoted triple type
"tr": {

View file

@ -40,7 +40,7 @@ SOURCE_GRAPH = "urn:graph:source"
# Provenance predicates for edge tracing
TG = "https://trustgraph.ai/ns/"
TG_REIFIES = TG + "reifies"
TG_CONTAINS = TG + "contains"
PROV = "http://www.w3.org/ns/prov#"
PROV_WAS_DERIVED_FROM = PROV + "wasDerivedFrom"
@ -79,10 +79,10 @@ def trace_edge_provenance(flow, user, collection, edge, label_cache, explain_cli
}
}
# Query: ?stmt tg:reifies <<edge>>
# Query: ?subgraph tg:contains <<edge>>
try:
results = flow.triples_query(
p=TG_REIFIES,
p=TG_CONTAINS,
o=quoted_triple,
g=SOURCE_GRAPH,
user=user,

View file

@ -1,12 +1,12 @@
"""
Show document hierarchy: Document -> Pages -> Chunks -> Edges.
Show extraction provenance: Document -> Pages -> Chunks -> Edges.
Given a document ID, traverses and displays all derived entities
(pages, chunks, extracted edges) using prov:wasDerivedFrom relationships.
Examples:
tg-show-document-hierarchy -U trustgraph -C default "urn:trustgraph:doc:abc123"
tg-show-document-hierarchy --show-content --max-content 500 "urn:trustgraph:doc:abc123"
tg-show-extraction-provenance -U trustgraph -C default "urn:trustgraph:doc:abc123"
tg-show-extraction-provenance --show-content --max-content 500 "urn:trustgraph:doc:abc123"
"""
import argparse
@ -25,10 +25,22 @@ PROV_WAS_DERIVED_FROM = "http://www.w3.org/ns/prov#wasDerivedFrom"
RDFS_LABEL = "http://www.w3.org/2000/01/rdf-schema#label"
RDF_TYPE = "http://www.w3.org/1999/02/22-rdf-syntax-ns#type"
TG = "https://trustgraph.ai/ns/"
TG_REIFIES = TG + "reifies"
TG_CONTAINS = TG + "contains"
TG_DOCUMENT_TYPE = TG + "Document"
TG_PAGE_TYPE = TG + "Page"
TG_CHUNK_TYPE = TG + "Chunk"
TG_SUBGRAPH_TYPE = TG + "Subgraph"
DC_TITLE = "http://purl.org/dc/terms/title"
DC_FORMAT = "http://purl.org/dc/terms/format"
# Map TrustGraph type URIs to display names
TYPE_MAP = {
TG_DOCUMENT_TYPE: "document",
TG_PAGE_TYPE: "page",
TG_CHUNK_TYPE: "chunk",
TG_SUBGRAPH_TYPE: "subgraph",
}
# Source graph
SOURCE_GRAPH = "urn:graph:source"
@ -109,15 +121,15 @@ def extract_value(term):
def get_node_metadata(socket, flow_id, user, collection, node_uri):
"""Get metadata for a node (label, type, title, format)."""
"""Get metadata for a node (label, types, title, format)."""
triples = query_triples(socket, flow_id, user, collection, s=node_uri, g=SOURCE_GRAPH)
metadata = {"uri": node_uri}
metadata = {"uri": node_uri, "types": []}
for s, p, o in triples:
if p == RDFS_LABEL:
metadata["label"] = o
elif p == RDF_TYPE:
metadata["type"] = o
metadata["types"].append(o)
elif p == DC_TITLE:
metadata["title"] = o
elif p == DC_FORMAT:
@ -126,6 +138,14 @@ def get_node_metadata(socket, flow_id, user, collection, node_uri):
return metadata
def classify_node(metadata):
"""Classify a node based on its rdf:type values."""
for type_uri in metadata.get("types", []):
if type_uri in TYPE_MAP:
return TYPE_MAP[type_uri]
return "unknown"
def get_children(socket, flow_id, user, collection, parent_uri):
"""Get children of a node via prov:wasDerivedFrom."""
triples = query_triples(
@ -135,29 +155,6 @@ def get_children(socket, flow_id, user, collection, parent_uri):
return [s for s, p, o in triples]
def get_edges_from_chunk(socket, flow_id, user, collection, chunk_uri):
"""Get edges that were derived from a chunk (via tg:reifies)."""
# Query for triples where: ?stmt prov:wasDerivedFrom chunk_uri
# Then get the tg:reifies value
derived_triples = query_triples(
socket, flow_id, user, collection,
p=PROV_WAS_DERIVED_FROM, o=chunk_uri, g=SOURCE_GRAPH
)
edges = []
for stmt_uri, _, _ in derived_triples:
# Get what this statement reifies
reifies_triples = query_triples(
socket, flow_id, user, collection,
s=stmt_uri, p=TG_REIFIES, g=SOURCE_GRAPH
)
for _, _, edge in reifies_triples:
if isinstance(edge, dict):
edges.append(edge)
return edges
def get_document_content(api, user, doc_id, max_content):
"""Fetch document content from librarian API."""
try:
@ -176,32 +173,6 @@ def get_document_content(api, user, doc_id, max_content):
return f"[Error fetching content: {e}]"
def classify_uri(uri):
"""Classify a URI as document, page, or chunk based on patterns."""
if not isinstance(uri, str):
return "unknown"
# Common patterns in trustgraph URIs
if "/c" in uri and uri.split("/c")[-1].isdigit():
return "chunk"
if "/p" in uri and any(uri.split("/p")[-1].replace("/", "").isdigit() for _ in [1]):
# Check for page pattern like /p1 or /p1/
parts = uri.split("/p")
if len(parts) > 1:
remainder = parts[-1].split("/")[0]
if remainder.isdigit():
return "page"
if "chunk" in uri.lower():
return "chunk"
if "page" in uri.lower():
return "page"
if "doc" in uri.lower():
return "document"
return "unknown"
def build_hierarchy(socket, flow_id, user, collection, root_uri, api=None, show_content=False, max_content=200, visited=None):
"""Build document hierarchy tree recursively."""
if visited is None:
@ -212,7 +183,7 @@ def build_hierarchy(socket, flow_id, user, collection, root_uri, api=None, show_
visited.add(root_uri)
metadata = get_node_metadata(socket, flow_id, user, collection, root_uri)
node_type = classify_uri(root_uri)
node_type = classify_node(metadata)
node = {
"uri": root_uri,
@ -232,10 +203,20 @@ def build_hierarchy(socket, flow_id, user, collection, root_uri, api=None, show_
children_uris = get_children(socket, flow_id, user, collection, root_uri)
for child_uri in children_uris:
child_type = classify_uri(child_uri)
child_metadata = get_node_metadata(socket, flow_id, user, collection, child_uri)
child_type = classify_node(child_metadata)
# Recursively build hierarchy for pages and chunks
if child_type in ("page", "chunk", "unknown"):
if child_type == "subgraph":
# Subgraphs contain extracted edges — inline them
contains_triples = query_triples(
socket, flow_id, user, collection,
s=child_uri, p=TG_CONTAINS, g=SOURCE_GRAPH
)
for _, _, edge in contains_triples:
if isinstance(edge, dict):
node["edges"].append(edge)
else:
# Recurse into pages, chunks, etc.
child_node = build_hierarchy(
socket, flow_id, user, collection, child_uri,
api=api, show_content=show_content, max_content=max_content,
@ -244,11 +225,6 @@ def build_hierarchy(socket, flow_id, user, collection, root_uri, api=None, show_
if child_node:
node["children"].append(child_node)
# Get edges for chunks
if node_type == "chunk":
edges = get_edges_from_chunk(socket, flow_id, user, collection, root_uri)
node["edges"] = edges
# Sort children by URI for consistent output
node["children"].sort(key=lambda x: x.get("uri", ""))
@ -332,7 +308,7 @@ def print_json(node):
def main():
parser = argparse.ArgumentParser(
prog='tg-show-document-hierarchy',
prog='tg-show-extraction-provenance',
description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter,
)