mirror of
https://github.com/trustgraph-ai/trustgraph.git
synced 2026-04-25 08:26:21 +02:00
Remove type suffixes from explainability dataclass fields + fix show_explain_trace Rename dataclass fields to match KG property naming conventions: - Analysis: thought_uri/observation_uri → thought/observation - Synthesis/Conclusion/Reflection: document_uri → document Fix show_explain_trace for current API: - Resolve document content via librarian fetch instead of removed inline content fields (synthesis.content, conclusion.answer) - Add Grounding display for DocRAG traces - Update fetch_docrag_trace chain: Question → Grounding → Exploration → Synthesis - Pass api/explain_client to all print functions for content resolution Update all CLI tools and tests for renamed fields.
582 lines
18 KiB
Python
582 lines
18 KiB
Python
"""
|
|
Show full explainability trace for a GraphRAG or Agent session.
|
|
|
|
Given a question/session URI, displays the complete trace:
|
|
- GraphRAG: Question -> Exploration -> Focus (edge selection) -> Synthesis (answer)
|
|
- Agent: Session -> Iteration(s) (thought/action/observation) -> Final Answer
|
|
|
|
The tool auto-detects the trace type based on rdf:type.
|
|
|
|
Examples:
|
|
tg-show-explain-trace -U trustgraph -C default "urn:trustgraph:question:abc123"
|
|
tg-show-explain-trace -U trustgraph -C default "urn:trustgraph:agent:abc123"
|
|
tg-show-explain-trace --max-answer 1000 "urn:trustgraph:question:abc123"
|
|
tg-show-explain-trace --show-provenance "urn:trustgraph:question:abc123"
|
|
"""
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import sys
|
|
from trustgraph.api import (
|
|
Api,
|
|
ExplainabilityClient,
|
|
Question,
|
|
Exploration,
|
|
Focus,
|
|
Synthesis,
|
|
Analysis,
|
|
Conclusion,
|
|
)
|
|
|
|
default_url = os.getenv("TRUSTGRAPH_URL", 'http://localhost:8088/')
|
|
default_token = os.getenv("TRUSTGRAPH_TOKEN", None)
|
|
default_user = 'trustgraph'
|
|
default_collection = 'default'
|
|
|
|
# Graphs
|
|
RETRIEVAL_GRAPH = "urn:graph:retrieval"
|
|
SOURCE_GRAPH = "urn:graph:source"
|
|
|
|
# Provenance predicates for edge tracing
|
|
TG = "https://trustgraph.ai/ns/"
|
|
TG_CONTAINS = TG + "contains"
|
|
PROV = "http://www.w3.org/ns/prov#"
|
|
PROV_WAS_DERIVED_FROM = PROV + "wasDerivedFrom"
|
|
|
|
|
|
def trace_edge_provenance(flow, user, collection, edge, label_cache, explain_client):
|
|
"""
|
|
Trace an edge back to its source document via reification.
|
|
|
|
Args:
|
|
flow: SocketFlowInstance
|
|
user: User identifier
|
|
collection: Collection identifier
|
|
edge: Dict with s, p, o keys
|
|
label_cache: Dict for caching labels
|
|
explain_client: ExplainabilityClient for label resolution
|
|
|
|
Returns:
|
|
List of provenance chains, each chain is list of {uri, label}
|
|
"""
|
|
edge_s = edge.get("s", "")
|
|
edge_p = edge.get("p", "")
|
|
edge_o = edge.get("o", "")
|
|
|
|
# Build quoted triple for lookup
|
|
def build_term(val):
|
|
if isinstance(val, str) and (val.startswith("http") or val.startswith("urn:")):
|
|
return {"t": "i", "i": val}
|
|
return {"t": "l", "v": str(val)}
|
|
|
|
quoted_triple = {
|
|
"t": "t",
|
|
"tr": {
|
|
"s": build_term(edge_s),
|
|
"p": build_term(edge_p),
|
|
"o": build_term(edge_o),
|
|
}
|
|
}
|
|
|
|
# Query: ?subgraph tg:contains <<edge>>
|
|
try:
|
|
results = flow.triples_query(
|
|
p=TG_CONTAINS,
|
|
o=quoted_triple,
|
|
g=SOURCE_GRAPH,
|
|
user=user,
|
|
collection=collection,
|
|
limit=10
|
|
)
|
|
except Exception:
|
|
return []
|
|
|
|
# Extract statement URIs
|
|
stmt_uris = []
|
|
for t in results:
|
|
s_term = t.get("s", {})
|
|
s_val = s_term.get("i") or s_term.get("v", "")
|
|
if s_val:
|
|
stmt_uris.append(s_val)
|
|
|
|
# For each statement, trace wasDerivedFrom chain
|
|
provenance_chains = []
|
|
for stmt_uri in stmt_uris:
|
|
chain = trace_provenance_chain(flow, user, collection, stmt_uri, label_cache, explain_client)
|
|
if chain:
|
|
provenance_chains.append(chain)
|
|
|
|
return provenance_chains
|
|
|
|
|
|
def trace_provenance_chain(flow, user, collection, start_uri, label_cache, explain_client, max_depth=10):
|
|
"""Trace prov:wasDerivedFrom chain from start_uri to root."""
|
|
chain = []
|
|
current = start_uri
|
|
|
|
for _ in range(max_depth):
|
|
if not current:
|
|
break
|
|
|
|
# Get label
|
|
if current in label_cache:
|
|
label = label_cache[current]
|
|
else:
|
|
label = explain_client.resolve_label(current, user, collection)
|
|
label_cache[current] = label
|
|
|
|
chain.append({"uri": current, "label": label})
|
|
|
|
# Get parent via wasDerivedFrom
|
|
try:
|
|
results = flow.triples_query(
|
|
s=current,
|
|
p=PROV_WAS_DERIVED_FROM,
|
|
g=SOURCE_GRAPH,
|
|
user=user,
|
|
collection=collection,
|
|
limit=1
|
|
)
|
|
except Exception:
|
|
break
|
|
|
|
parent = None
|
|
for t in results:
|
|
o_term = t.get("o", {})
|
|
parent = o_term.get("i") or o_term.get("v", "")
|
|
break
|
|
|
|
if not parent or parent == current:
|
|
break
|
|
current = parent
|
|
|
|
return chain
|
|
|
|
|
|
def format_provenance_chain(chain):
|
|
"""Format a provenance chain for display."""
|
|
if not chain:
|
|
return ""
|
|
labels = [item.get("label", item.get("uri", "?")) for item in chain]
|
|
return " -> ".join(labels)
|
|
|
|
|
|
def print_graphrag_text(trace, explain_client, flow, user, collection, api=None, show_provenance=False):
|
|
"""Print GraphRAG trace in text format."""
|
|
question = trace.get("question")
|
|
|
|
print(f"=== GraphRAG Session: {question.uri if question else 'Unknown'} ===")
|
|
print()
|
|
|
|
if question:
|
|
print(f"Question: {question.query}")
|
|
if question.timestamp:
|
|
print(f"Time: {question.timestamp}")
|
|
print()
|
|
|
|
# Exploration
|
|
print("--- Exploration ---")
|
|
exploration = trace.get("exploration")
|
|
if exploration:
|
|
print(f"Retrieved {exploration.edge_count} edges from knowledge graph")
|
|
else:
|
|
print("No exploration data found")
|
|
print()
|
|
|
|
# Focus
|
|
print("--- Focus (Edge Selection) ---")
|
|
focus = trace.get("focus")
|
|
if focus:
|
|
edges = focus.edge_selections
|
|
print(f"Selected {len(edges)} edges:")
|
|
print()
|
|
|
|
label_cache = {}
|
|
|
|
for i, edge_sel in enumerate(edges, 1):
|
|
if edge_sel.edge:
|
|
s_label, p_label, o_label = explain_client.resolve_edge_labels(
|
|
edge_sel.edge, user, collection
|
|
)
|
|
print(f" {i}. ({s_label}, {p_label}, {o_label})")
|
|
|
|
if edge_sel.reasoning:
|
|
r_short = edge_sel.reasoning[:100] + "..." if len(edge_sel.reasoning) > 100 else edge_sel.reasoning
|
|
print(f" Reasoning: {r_short}")
|
|
|
|
if show_provenance and edge_sel.edge:
|
|
provenance = trace_edge_provenance(
|
|
flow, user, collection, edge_sel.edge,
|
|
label_cache, explain_client
|
|
)
|
|
for chain in provenance:
|
|
chain_str = format_provenance_chain(chain)
|
|
if chain_str:
|
|
print(f" Source: {chain_str}")
|
|
|
|
print()
|
|
else:
|
|
print("No focus data found")
|
|
print()
|
|
|
|
# Synthesis
|
|
print("--- Synthesis ---")
|
|
synthesis = trace.get("synthesis")
|
|
if synthesis:
|
|
content = ""
|
|
if synthesis.document and api:
|
|
content = explain_client.fetch_document_content(
|
|
synthesis.document, api, user
|
|
)
|
|
if content:
|
|
print("Answer:")
|
|
for line in content.split("\n"):
|
|
print(f" {line}")
|
|
elif synthesis.document:
|
|
print(f"Document: {synthesis.document}")
|
|
else:
|
|
print("No answer content found")
|
|
else:
|
|
print("No synthesis data found")
|
|
|
|
|
|
def print_docrag_text(trace, explain_client, api, user):
|
|
"""Print DocRAG trace in text format."""
|
|
question = trace.get("question")
|
|
|
|
print(f"=== DocRAG Session: {question.uri if question else 'Unknown'} ===")
|
|
print()
|
|
|
|
if question:
|
|
print(f"Question: {question.query}")
|
|
if question.timestamp:
|
|
print(f"Time: {question.timestamp}")
|
|
print()
|
|
|
|
# Grounding
|
|
grounding = trace.get("grounding")
|
|
if grounding:
|
|
print("--- Grounding ---")
|
|
print(f"Concepts: {', '.join(grounding.concepts)}")
|
|
print()
|
|
|
|
# Exploration
|
|
print("--- Exploration ---")
|
|
exploration = trace.get("exploration")
|
|
if exploration:
|
|
print(f"Retrieved {exploration.chunk_count} chunks from document store")
|
|
else:
|
|
print("No exploration data found")
|
|
print()
|
|
|
|
# Synthesis (no Focus step for DocRAG)
|
|
print("--- Synthesis ---")
|
|
synthesis = trace.get("synthesis")
|
|
if synthesis:
|
|
content = ""
|
|
if synthesis.document and api:
|
|
content = explain_client.fetch_document_content(
|
|
synthesis.document, api, user
|
|
)
|
|
if content:
|
|
print("Answer:")
|
|
for line in content.split("\n"):
|
|
print(f" {line}")
|
|
elif synthesis.document:
|
|
print(f"Document: {synthesis.document}")
|
|
else:
|
|
print("No answer content found")
|
|
else:
|
|
print("No synthesis data found")
|
|
|
|
|
|
def print_agent_text(trace, explain_client, api, user):
|
|
"""Print Agent trace in text format."""
|
|
question = trace.get("question")
|
|
|
|
print(f"=== Agent Session: {question.uri if question else 'Unknown'} ===")
|
|
print()
|
|
|
|
if question:
|
|
print(f"Question: {question.query}")
|
|
if question.timestamp:
|
|
print(f"Time: {question.timestamp}")
|
|
print()
|
|
|
|
# Analysis steps
|
|
print("--- Analysis ---")
|
|
iterations = trace.get("iterations", [])
|
|
if iterations:
|
|
for i, analysis in enumerate(iterations, 1):
|
|
print(f"Analysis {i}:")
|
|
print(f" Thought: {analysis.thought or 'N/A'}")
|
|
print(f" Action: {analysis.action or 'N/A'}")
|
|
|
|
|
|
if analysis.arguments:
|
|
# Try to pretty-print JSON arguments
|
|
try:
|
|
args_obj = json.loads(analysis.arguments)
|
|
args_str = json.dumps(args_obj, indent=4)
|
|
print(f" Arguments:")
|
|
for line in args_str.split('\n'):
|
|
print(f" {line}")
|
|
except Exception:
|
|
print(f" Arguments: {analysis.arguments}")
|
|
else:
|
|
print(f" Arguments: N/A")
|
|
|
|
obs = analysis.observation or 'N/A'
|
|
if obs and len(obs) > 200:
|
|
obs = obs[:200] + "... [truncated]"
|
|
print(f" Observation: {obs}")
|
|
print()
|
|
else:
|
|
print("No analysis steps recorded")
|
|
print()
|
|
|
|
# Conclusion
|
|
print("--- Conclusion ---")
|
|
conclusion = trace.get("conclusion")
|
|
if conclusion:
|
|
content = ""
|
|
if conclusion.document and api:
|
|
content = explain_client.fetch_document_content(
|
|
conclusion.document, api, user
|
|
)
|
|
if content:
|
|
print("Answer:")
|
|
for line in content.split("\n"):
|
|
print(f" {line}")
|
|
elif conclusion.document:
|
|
print(f"Document: {conclusion.document}")
|
|
else:
|
|
print("No conclusion recorded")
|
|
else:
|
|
print("No conclusion recorded")
|
|
|
|
|
|
def trace_to_dict(trace, trace_type):
|
|
"""Convert trace entities to JSON-serializable dict."""
|
|
if trace_type == "agent":
|
|
question = trace.get("question")
|
|
return {
|
|
"type": "agent",
|
|
"session_id": question.uri if question else None,
|
|
"question": question.query if question else None,
|
|
"time": question.timestamp if question else None,
|
|
"iterations": [
|
|
{
|
|
"id": a.uri,
|
|
"thought": a.thought,
|
|
"action": a.action,
|
|
"arguments": a.arguments,
|
|
"observation": a.observation,
|
|
}
|
|
for a in trace.get("iterations", [])
|
|
],
|
|
"conclusion": {
|
|
"id": trace["conclusion"].uri,
|
|
"document": trace["conclusion"].document,
|
|
} if trace.get("conclusion") else None,
|
|
}
|
|
elif trace_type == "docrag":
|
|
question = trace.get("question")
|
|
grounding = trace.get("grounding")
|
|
exploration = trace.get("exploration")
|
|
synthesis = trace.get("synthesis")
|
|
|
|
return {
|
|
"type": "docrag",
|
|
"question_id": question.uri if question else None,
|
|
"question": question.query if question else None,
|
|
"time": question.timestamp if question else None,
|
|
"grounding": {
|
|
"id": grounding.uri,
|
|
"concepts": grounding.concepts,
|
|
} if grounding else None,
|
|
"exploration": {
|
|
"id": exploration.uri,
|
|
"chunk_count": exploration.chunk_count,
|
|
} if exploration else None,
|
|
"synthesis": {
|
|
"id": synthesis.uri,
|
|
"document": synthesis.document,
|
|
} if synthesis else None,
|
|
}
|
|
else:
|
|
# graphrag
|
|
question = trace.get("question")
|
|
exploration = trace.get("exploration")
|
|
focus = trace.get("focus")
|
|
synthesis = trace.get("synthesis")
|
|
|
|
return {
|
|
"type": "graphrag",
|
|
"question_id": question.uri if question else None,
|
|
"question": question.query if question else None,
|
|
"time": question.timestamp if question else None,
|
|
"exploration": {
|
|
"id": exploration.uri,
|
|
"edge_count": exploration.edge_count,
|
|
} if exploration else None,
|
|
"focus": {
|
|
"id": focus.uri,
|
|
"selected_edges": [
|
|
{
|
|
"edge": edge_sel.edge,
|
|
"reasoning": edge_sel.reasoning,
|
|
}
|
|
for edge_sel in focus.edge_selections
|
|
],
|
|
} if focus else None,
|
|
"synthesis": {
|
|
"id": synthesis.uri,
|
|
"document": synthesis.document,
|
|
} if synthesis else None,
|
|
}
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
prog='tg-show-explain-trace',
|
|
description=__doc__,
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
)
|
|
|
|
parser.add_argument(
|
|
'question_id',
|
|
help='Question/session URI to show trace for',
|
|
)
|
|
|
|
parser.add_argument(
|
|
'-u', '--api-url',
|
|
default=default_url,
|
|
help=f'API URL (default: {default_url})',
|
|
)
|
|
|
|
parser.add_argument(
|
|
'-t', '--token',
|
|
default=default_token,
|
|
help='Auth token (default: $TRUSTGRAPH_TOKEN)',
|
|
)
|
|
|
|
parser.add_argument(
|
|
'-U', '--user',
|
|
default=default_user,
|
|
help=f'User ID (default: {default_user})',
|
|
)
|
|
|
|
parser.add_argument(
|
|
'-C', '--collection',
|
|
default=default_collection,
|
|
help=f'Collection (default: {default_collection})',
|
|
)
|
|
|
|
parser.add_argument(
|
|
'-f', '--flow-id',
|
|
default='default',
|
|
help='Flow ID (default: default)',
|
|
)
|
|
|
|
parser.add_argument(
|
|
'--max-answer',
|
|
type=int,
|
|
default=500,
|
|
help='Max chars for answer display (default: 500)',
|
|
)
|
|
|
|
parser.add_argument(
|
|
'--show-provenance',
|
|
action='store_true',
|
|
help='Also trace edges back to source documents',
|
|
)
|
|
|
|
parser.add_argument(
|
|
'--format',
|
|
choices=['text', 'json'],
|
|
default='text',
|
|
help='Output format: text (default), json',
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
|
|
try:
|
|
api = Api(args.api_url, token=args.token)
|
|
socket = api.socket()
|
|
flow = socket.flow(args.flow_id)
|
|
explain_client = ExplainabilityClient(flow)
|
|
|
|
try:
|
|
# Detect trace type
|
|
trace_type = explain_client.detect_session_type(
|
|
args.question_id,
|
|
graph=RETRIEVAL_GRAPH,
|
|
user=args.user,
|
|
collection=args.collection,
|
|
)
|
|
|
|
if trace_type == "agent":
|
|
# Fetch and display agent trace
|
|
trace = explain_client.fetch_agent_trace(
|
|
args.question_id,
|
|
graph=RETRIEVAL_GRAPH,
|
|
user=args.user,
|
|
collection=args.collection,
|
|
api=api,
|
|
max_content=args.max_answer,
|
|
)
|
|
|
|
if args.format == 'json':
|
|
print(json.dumps(trace_to_dict(trace, "agent"), indent=2))
|
|
else:
|
|
print_agent_text(trace, explain_client, api, args.user)
|
|
|
|
elif trace_type == "docrag":
|
|
# Fetch and display DocRAG trace
|
|
trace = explain_client.fetch_docrag_trace(
|
|
args.question_id,
|
|
graph=RETRIEVAL_GRAPH,
|
|
user=args.user,
|
|
collection=args.collection,
|
|
api=api,
|
|
max_content=args.max_answer,
|
|
)
|
|
|
|
if args.format == 'json':
|
|
print(json.dumps(trace_to_dict(trace, "docrag"), indent=2))
|
|
else:
|
|
print_docrag_text(trace, explain_client, api, args.user)
|
|
|
|
else:
|
|
# Fetch and display GraphRAG trace
|
|
trace = explain_client.fetch_graphrag_trace(
|
|
args.question_id,
|
|
graph=RETRIEVAL_GRAPH,
|
|
user=args.user,
|
|
collection=args.collection,
|
|
api=api,
|
|
max_content=args.max_answer,
|
|
)
|
|
|
|
if args.format == 'json':
|
|
print(json.dumps(trace_to_dict(trace, "graphrag"), indent=2))
|
|
else:
|
|
print_graphrag_text(
|
|
trace, explain_client, flow,
|
|
args.user, args.collection,
|
|
api=api,
|
|
show_provenance=args.show_provenance
|
|
)
|
|
|
|
finally:
|
|
socket.close()
|
|
|
|
except Exception as e:
|
|
print(f"Error: {e}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|