diff --git a/tests/integration/test_graph_rag_integration.py b/tests/integration/test_graph_rag_integration.py index f0de5bb5..6ff14d69 100644 --- a/tests/integration/test_graph_rag_integration.py +++ b/tests/integration/test_graph_rag_integration.py @@ -168,7 +168,7 @@ class TestGraphRagIntegration: assert isinstance(response, str) assert "machine learning" in response.lower() - # Verify provenance was emitted in real-time (4 events: session, retrieval, selection, answer) + # Verify provenance was emitted in real-time (4 events: question, exploration, focus, synthesis) assert len(provenance_events) == 4 for triples, prov_id in provenance_events: assert isinstance(triples, list) diff --git a/tests/unit/test_retrieval/test_graph_rag.py b/tests/unit/test_retrieval/test_graph_rag.py index af0cbfc2..195c8172 100644 --- a/tests/unit/test_retrieval/test_graph_rag.py +++ b/tests/unit/test_retrieval/test_graph_rag.py @@ -644,7 +644,7 @@ class TestQuery: # Verify response text assert response == expected_response - # Verify provenance was emitted incrementally (4 events: session, retrieval, selection, answer) + # Verify provenance was emitted incrementally (4 events: question, exploration, focus, synthesis) assert len(provenance_events) == 4 # Verify each event has triples and a URN @@ -653,11 +653,11 @@ class TestQuery: assert len(triples) > 0 assert prov_id.startswith("urn:trustgraph:") - # Verify order: session, retrieval, selection, answer - assert "session" in provenance_events[0][1] - assert "retrieval" in provenance_events[1][1] - assert "selection" in provenance_events[2][1] - assert "answer" in provenance_events[3][1] + # Verify order: question, exploration, focus, synthesis + assert "question" in provenance_events[0][1] + assert "exploration" in provenance_events[1][1] + assert "focus" in provenance_events[2][1] + assert "synthesis" in provenance_events[3][1] finally: # Restore original methods diff --git a/trustgraph-base/trustgraph/messaging/translators/retrieval.py b/trustgraph-base/trustgraph/messaging/translators/retrieval.py index 85900089..f84ce103 100644 --- a/trustgraph-base/trustgraph/messaging/translators/retrieval.py +++ b/trustgraph-base/trustgraph/messaging/translators/retrieval.py @@ -104,10 +104,10 @@ class GraphRagResponseTranslator(MessageTranslator): if explain_id: result["explain_id"] = explain_id - # Include explain_collection for explain messages - explain_collection = getattr(obj, "explain_collection", None) - if explain_collection: - result["explain_collection"] = explain_collection + # Include explain_graph for explain messages (named graph filter) + explain_graph = getattr(obj, "explain_graph", None) + if explain_graph is not None: + result["explain_graph"] = explain_graph # Include end_of_stream flag (LLM stream complete) result["end_of_stream"] = getattr(obj, "end_of_stream", False) diff --git a/trustgraph-base/trustgraph/provenance/__init__.py b/trustgraph-base/trustgraph/provenance/__init__.py index bca4c156..5aa6d447 100644 --- a/trustgraph-base/trustgraph/provenance/__init__.py +++ b/trustgraph-base/trustgraph/provenance/__init__.py @@ -41,10 +41,10 @@ from . uris import ( statement_uri, agent_uri, # Query-time provenance URIs - query_session_uri, - retrieval_uri, - selection_uri, - answer_uri, + question_uri, + exploration_uri, + focus_uri, + synthesis_uri, ) # Namespace constants @@ -65,6 +65,8 @@ from . namespaces import ( TG_SOURCE_TEXT, TG_SOURCE_CHAR_OFFSET, TG_SOURCE_CHAR_LENGTH, # Query-time provenance predicates TG_QUERY, TG_EDGE_COUNT, TG_SELECTED_EDGE, TG_REASONING, TG_CONTENT, + # Named graphs + GRAPH_DEFAULT, GRAPH_SOURCE, GRAPH_RETRIEVAL, ) # Triple builders @@ -73,10 +75,12 @@ from . triples import ( derived_entity_triples, triple_provenance_triples, # Query-time provenance triple builders - query_session_triples, - retrieval_triples, - selection_triples, - answer_triples, + question_triples, + exploration_triples, + focus_triples, + synthesis_triples, + # Utility + set_graph, ) # Vocabulary bootstrap @@ -99,10 +103,10 @@ __all__ = [ "statement_uri", "agent_uri", # Query-time provenance URIs - "query_session_uri", - "retrieval_uri", - "selection_uri", - "answer_uri", + "question_uri", + "exploration_uri", + "focus_uri", + "synthesis_uri", # Namespaces "PROV", "PROV_ENTITY", "PROV_ACTIVITY", "PROV_AGENT", "PROV_WAS_DERIVED_FROM", "PROV_WAS_GENERATED_BY", @@ -116,15 +120,19 @@ __all__ = [ "TG_SOURCE_TEXT", "TG_SOURCE_CHAR_OFFSET", "TG_SOURCE_CHAR_LENGTH", # Query-time provenance predicates "TG_QUERY", "TG_EDGE_COUNT", "TG_SELECTED_EDGE", "TG_REASONING", "TG_CONTENT", + # Named graphs + "GRAPH_DEFAULT", "GRAPH_SOURCE", "GRAPH_RETRIEVAL", # Triple builders "document_triples", "derived_entity_triples", "triple_provenance_triples", # Query-time provenance triple builders - "query_session_triples", - "retrieval_triples", - "selection_triples", - "answer_triples", + "question_triples", + "exploration_triples", + "focus_triples", + "synthesis_triples", + # Utility + "set_graph", # Vocabulary "get_vocabulary_triples", "PROV_CLASS_LABELS", diff --git a/trustgraph-base/trustgraph/provenance/namespaces.py b/trustgraph-base/trustgraph/provenance/namespaces.py index f348556e..aaca70db 100644 --- a/trustgraph-base/trustgraph/provenance/namespaces.py +++ b/trustgraph-base/trustgraph/provenance/namespaces.py @@ -67,3 +67,9 @@ TG_EDGE = TG + "edge" TG_REASONING = TG + "reasoning" TG_CONTENT = TG + "content" TG_DOCUMENT = TG + "document" # Reference to document in librarian + +# Named graph URIs for RDF datasets +# These separate different types of data while keeping them in the same collection +GRAPH_DEFAULT = "" # Core knowledge facts (triples extracted from documents) +GRAPH_SOURCE = "urn:graph:source" # Extraction provenance (which document/chunk a triple came from) +GRAPH_RETRIEVAL = "urn:graph:retrieval" # Query-time explainability (question, exploration, focus, synthesis) diff --git a/trustgraph-base/trustgraph/provenance/triples.py b/trustgraph-base/trustgraph/provenance/triples.py index 3e6abae8..a1b04596 100644 --- a/trustgraph-base/trustgraph/provenance/triples.py +++ b/trustgraph-base/trustgraph/provenance/triples.py @@ -25,6 +25,26 @@ from . namespaces import ( from . uris import activity_uri, agent_uri, edge_selection_uri +def set_graph(triples: List[Triple], graph: str) -> List[Triple]: + """ + Set the named graph on a list of triples. + + This creates new Triple objects with the graph field set, + leaving the original triples unchanged. + + Args: + triples: List of Triple objects + graph: Named graph URI (e.g., "urn:graph:retrieval") + + Returns: + List of Triple objects with graph field set + """ + return [ + Triple(s=t.s, p=t.p, o=t.o, g=graph) + for t in triples + ] + + def _iri(uri: str) -> Term: """Create an IRI term.""" return Term(type=IRI, iri=uri) @@ -258,21 +278,27 @@ def triple_provenance_triples( # Query-time provenance triple builders +# +# Terminology: +# Question - What was asked, the anchor for everything +# Exploration - Casting wide, what do we know about this space +# Focus - Closing down, what's actually relevant here +# Synthesis - Weaving the relevant pieces into an answer -def query_session_triples( - session_uri: str, +def question_triples( + question_uri: str, query: str, timestamp: Optional[str] = None, ) -> List[Triple]: """ - Build triples for a query session activity. + Build triples for a question activity. Creates: - - Activity declaration for the query session + - Activity declaration for the question - Query text and timestamp Args: - session_uri: URI of the session (from query_session_uri) + question_uri: URI of the question (from question_uri) query: The user's query text timestamp: ISO timestamp (defaults to now) @@ -283,39 +309,39 @@ def query_session_triples( timestamp = datetime.utcnow().isoformat() + "Z" return [ - _triple(session_uri, RDF_TYPE, _iri(PROV_ACTIVITY)), - _triple(session_uri, RDFS_LABEL, _literal("GraphRAG query session")), - _triple(session_uri, PROV_STARTED_AT_TIME, _literal(timestamp)), - _triple(session_uri, TG_QUERY, _literal(query)), + _triple(question_uri, RDF_TYPE, _iri(PROV_ACTIVITY)), + _triple(question_uri, RDFS_LABEL, _literal("GraphRAG question")), + _triple(question_uri, PROV_STARTED_AT_TIME, _literal(timestamp)), + _triple(question_uri, TG_QUERY, _literal(query)), ] -def retrieval_triples( - retrieval_uri: str, - session_uri: str, +def exploration_triples( + exploration_uri: str, + question_uri: str, edge_count: int, ) -> List[Triple]: """ - Build triples for a retrieval entity (all edges retrieved from subgraph). + Build triples for an exploration entity (all edges retrieved from subgraph). Creates: - - Entity declaration for retrieval - - wasGeneratedBy link to session + - Entity declaration for exploration + - wasGeneratedBy link to question - Edge count metadata Args: - retrieval_uri: URI of the retrieval entity (from retrieval_uri) - session_uri: URI of the parent session + exploration_uri: URI of the exploration entity (from exploration_uri) + question_uri: URI of the parent question edge_count: Number of edges retrieved Returns: List of Triple objects """ return [ - _triple(retrieval_uri, RDF_TYPE, _iri(PROV_ENTITY)), - _triple(retrieval_uri, RDFS_LABEL, _literal("Retrieved edges")), - _triple(retrieval_uri, PROV_WAS_GENERATED_BY, _iri(session_uri)), - _triple(retrieval_uri, TG_EDGE_COUNT, _literal(edge_count)), + _triple(exploration_uri, RDF_TYPE, _iri(PROV_ENTITY)), + _triple(exploration_uri, RDFS_LABEL, _literal("Exploration")), + _triple(exploration_uri, PROV_WAS_GENERATED_BY, _iri(question_uri)), + _triple(exploration_uri, TG_EDGE_COUNT, _literal(edge_count)), ] @@ -327,28 +353,28 @@ def _quoted_triple(s: str, p: str, o: str) -> Term: ) -def selection_triples( - selection_uri: str, - retrieval_uri: str, +def focus_triples( + focus_uri: str, + exploration_uri: str, selected_edges_with_reasoning: List[dict], session_id: str = "", ) -> List[Triple]: """ - Build triples for a selection entity (selected edges with reasoning). + Build triples for a focus entity (selected edges with reasoning). Creates: - - Entity declaration for selection - - wasDerivedFrom link to retrieval + - Entity declaration for focus + - wasDerivedFrom link to exploration - For each selected edge: an edge selection entity with quoted triple and reasoning Structure: - tg:selectedEdge . + tg:selectedEdge . tg:edge <<

>> . tg:reasoning "reason" . Args: - selection_uri: URI of the selection entity (from selection_uri) - retrieval_uri: URI of the parent retrieval entity + focus_uri: URI of the focus entity (from focus_uri) + exploration_uri: URI of the parent exploration entity selected_edges_with_reasoning: List of dicts with 'edge' (s,p,o tuple) and 'reasoning' session_id: Session UUID for generating edge selection URIs @@ -356,9 +382,9 @@ def selection_triples( List of Triple objects """ triples = [ - _triple(selection_uri, RDF_TYPE, _iri(PROV_ENTITY)), - _triple(selection_uri, RDFS_LABEL, _literal("Selected edges")), - _triple(selection_uri, PROV_WAS_DERIVED_FROM, _iri(retrieval_uri)), + _triple(focus_uri, RDF_TYPE, _iri(PROV_ENTITY)), + _triple(focus_uri, RDFS_LABEL, _literal("Focus")), + _triple(focus_uri, PROV_WAS_DERIVED_FROM, _iri(exploration_uri)), ] # Add each selected edge with its reasoning via intermediate entity @@ -372,9 +398,9 @@ def selection_triples( # Create intermediate entity for this edge selection edge_sel_uri = edge_selection_uri(session_id, idx) - # Link selection to edge selection entity + # Link focus to edge selection entity triples.append( - _triple(selection_uri, TG_SELECTED_EDGE, _iri(edge_sel_uri)) + _triple(focus_uri, TG_SELECTED_EDGE, _iri(edge_sel_uri)) ) # Attach quoted triple to edge selection entity @@ -392,23 +418,23 @@ def selection_triples( return triples -def answer_triples( - answer_uri: str, - selection_uri: str, +def synthesis_triples( + synthesis_uri: str, + focus_uri: str, answer_text: str = "", document_id: Optional[str] = None, ) -> List[Triple]: """ - Build triples for an answer entity (final synthesis text). + Build triples for a synthesis entity (final answer text). Creates: - - Entity declaration for answer - - wasDerivedFrom link to selection + - Entity declaration for synthesis + - wasDerivedFrom link to focus - Either document reference (if document_id provided) or inline content Args: - answer_uri: URI of the answer entity (from answer_uri) - selection_uri: URI of the parent selection entity + synthesis_uri: URI of the synthesis entity (from synthesis_uri) + focus_uri: URI of the parent focus entity answer_text: The synthesized answer text (used if no document_id) document_id: Optional librarian document ID (preferred over inline content) @@ -416,16 +442,16 @@ def answer_triples( List of Triple objects """ triples = [ - _triple(answer_uri, RDF_TYPE, _iri(PROV_ENTITY)), - _triple(answer_uri, RDFS_LABEL, _literal("GraphRAG answer")), - _triple(answer_uri, PROV_WAS_DERIVED_FROM, _iri(selection_uri)), + _triple(synthesis_uri, RDF_TYPE, _iri(PROV_ENTITY)), + _triple(synthesis_uri, RDFS_LABEL, _literal("Synthesis")), + _triple(synthesis_uri, PROV_WAS_DERIVED_FROM, _iri(focus_uri)), ] if document_id: # Store reference to document in librarian (as IRI) - triples.append(_triple(answer_uri, TG_DOCUMENT, _iri(document_id))) + triples.append(_triple(synthesis_uri, TG_DOCUMENT, _iri(document_id))) elif answer_text: # Fallback: store inline content - triples.append(_triple(answer_uri, TG_CONTENT, _literal(answer_text))) + triples.append(_triple(synthesis_uri, TG_CONTENT, _literal(answer_text))) return triples diff --git a/trustgraph-base/trustgraph/provenance/uris.py b/trustgraph-base/trustgraph/provenance/uris.py index 0cd5baa4..6c7d0a7f 100644 --- a/trustgraph-base/trustgraph/provenance/uris.py +++ b/trustgraph-base/trustgraph/provenance/uris.py @@ -65,59 +65,65 @@ def agent_uri(component_name: str) -> str: # Query-time provenance URIs # These URIs use the urn:trustgraph: namespace to distinguish query-time # provenance from extraction-time provenance (which uses https://trustgraph.ai/) +# +# Terminology: +# Question - What was asked, the anchor for everything +# Exploration - Casting wide, what do we know about this space +# Focus - Closing down, what's actually relevant here +# Synthesis - Weaving the relevant pieces into an answer -def query_session_uri(session_id: str = None) -> str: +def question_uri(session_id: str = None) -> str: """ - Generate URI for a query session activity. + Generate URI for a question activity. Args: session_id: Optional UUID string. Auto-generates if not provided. Returns: - URN in format: urn:trustgraph:session:{uuid} + URN in format: urn:trustgraph:question:{uuid} """ if session_id is None: session_id = str(uuid.uuid4()) - return f"urn:trustgraph:session:{session_id}" + return f"urn:trustgraph:question:{session_id}" -def retrieval_uri(session_id: str) -> str: +def exploration_uri(session_id: str) -> str: """ - Generate URI for a retrieval entity (edges retrieved from subgraph). + Generate URI for an exploration entity (edges retrieved from subgraph). Args: - session_id: The session UUID (same as query_session_uri). + session_id: The session UUID (same as question_uri). Returns: - URN in format: urn:trustgraph:prov:retrieval:{uuid} + URN in format: urn:trustgraph:prov:exploration:{uuid} """ - return f"urn:trustgraph:prov:retrieval:{session_id}" + return f"urn:trustgraph:prov:exploration:{session_id}" -def selection_uri(session_id: str) -> str: +def focus_uri(session_id: str) -> str: """ - Generate URI for a selection entity (selected edges with reasoning). + Generate URI for a focus entity (selected edges with reasoning). Args: - session_id: The session UUID (same as query_session_uri). + session_id: The session UUID (same as question_uri). Returns: - URN in format: urn:trustgraph:prov:selection:{uuid} + URN in format: urn:trustgraph:prov:focus:{uuid} """ - return f"urn:trustgraph:prov:selection:{session_id}" + return f"urn:trustgraph:prov:focus:{session_id}" -def answer_uri(session_id: str) -> str: +def synthesis_uri(session_id: str) -> str: """ - Generate URI for an answer entity (final synthesis text). + Generate URI for a synthesis entity (final answer text). Args: - session_id: The session UUID (same as query_session_uri). + session_id: The session UUID (same as question_uri). Returns: - URN in format: urn:trustgraph:prov:answer:{uuid} + URN in format: urn:trustgraph:prov:synthesis:{uuid} """ - return f"urn:trustgraph:prov:answer:{session_id}" + return f"urn:trustgraph:prov:synthesis:{session_id}" def edge_selection_uri(session_id: str, edge_index: int) -> str: diff --git a/trustgraph-base/trustgraph/schema/services/retrieval.py b/trustgraph-base/trustgraph/schema/services/retrieval.py index 8f222d98..dd31444e 100644 --- a/trustgraph-base/trustgraph/schema/services/retrieval.py +++ b/trustgraph-base/trustgraph/schema/services/retrieval.py @@ -22,8 +22,8 @@ class GraphRagResponse: error: Error | None = None response: str = "" end_of_stream: bool = False # LLM response stream complete - explain_id: str | None = None # Single explain URI (announced as created) - explain_collection: str | None = None # Collection where explain was stored + explain_id: str | None = None # Single explain URI (announced as created) + explain_graph: str | None = None # Named graph where explain was stored (e.g., urn:graph:retrieval) message_type: str = "" # "chunk" or "explain" end_of_session: bool = False # Entire session complete diff --git a/trustgraph-cli/trustgraph/cli/invoke_graph_rag.py b/trustgraph-cli/trustgraph/cli/invoke_graph_rag.py index 03c4dfdf..db41f631 100644 --- a/trustgraph-cli/trustgraph/cli/invoke_graph_rag.py +++ b/trustgraph-cli/trustgraph/cli/invoke_graph_rag.py @@ -36,14 +36,14 @@ RDFS_LABEL = "http://www.w3.org/2000/01/rdf-schema#label" def _get_event_type(prov_id): """Extract event type from provenance_id""" - if "session" in prov_id: - return "session" - elif "retrieval" in prov_id: - return "retrieval" - elif "selection" in prov_id: - return "selection" - elif "answer" in prov_id: - return "answer" + if "question" in prov_id: + return "question" + elif "exploration" in prov_id: + return "exploration" + elif "focus" in prov_id: + return "focus" + elif "synthesis" in prov_id: + return "synthesis" return "provenance" @@ -51,7 +51,7 @@ def _format_provenance_details(event_type, triples): """Format provenance details based on event type and triples""" lines = [] - if event_type == "session": + if event_type == "question": # Show query and timestamp for s, p, o in triples: if p == TG_QUERY: @@ -59,32 +59,32 @@ def _format_provenance_details(event_type, triples): elif p == PROV_STARTED_AT_TIME: lines.append(f" Time: {o}") - elif event_type == "retrieval": + elif event_type == "exploration": # Show edge count for s, p, o in triples: if p == TG_EDGE_COUNT: - lines.append(f" Edges retrieved: {o}") + lines.append(f" Edges explored: {o}") - elif event_type == "selection": - # For selection, just count edge selection URIs + elif event_type == "focus": + # For focus, just count edge selection URIs # The actual edge details are fetched separately via edge_selections parameter edge_sel_uris = [] for s, p, o in triples: if p == TG_SELECTED_EDGE: edge_sel_uris.append(o) if edge_sel_uris: - lines.append(f" Selected {len(edge_sel_uris)} edge(s)") + lines.append(f" Focused on {len(edge_sel_uris)} edge(s)") - elif event_type == "answer": + elif event_type == "synthesis": # Show content length (not full content - it's already streamed) for s, p, o in triples: if p == TG_CONTENT: - lines.append(f" Answer length: {len(o)} chars") + lines.append(f" Synthesis length: {len(o)} chars") return lines -async def _query_triples_once(ws_url, flow_id, prov_id, user, collection, debug=False): +async def _query_triples_once(ws_url, flow_id, prov_id, user, collection, graph=None, debug=False): """Query triples for a provenance node (single attempt)""" request = { "id": "triples-request", @@ -97,6 +97,9 @@ async def _query_triples_once(ws_url, flow_id, prov_id, user, collection, debug= "limit": 100 } } + # Add graph filter if specified (for named graph queries) + if graph is not None: + request["request"]["g"] = graph if debug: print(f" [debug] querying triples for s={prov_id}", file=sys.stderr) @@ -155,10 +158,10 @@ async def _query_triples_once(ws_url, flow_id, prov_id, user, collection, debug= return triples -async def _query_triples(ws_url, flow_id, prov_id, user, collection, max_retries=5, retry_delay=0.2, debug=False): +async def _query_triples(ws_url, flow_id, prov_id, user, collection, graph=None, max_retries=5, retry_delay=0.2, debug=False): """Query triples for a provenance node with retries for race condition""" for attempt in range(max_retries): - triples = await _query_triples_once(ws_url, flow_id, prov_id, user, collection, debug) + triples = await _query_triples_once(ws_url, flow_id, prov_id, user, collection, graph=graph, debug=debug) if triples: return triples # Wait before retry if empty (triples may not be stored yet) @@ -515,14 +518,14 @@ async def _question_explainable( if message_type == "explain": # Display explain event with details explain_id = resp.get("explain_id", "") - explain_collection = resp.get("explain_collection", "explainability") + explain_graph = resp.get("explain_graph") # Named graph (e.g., urn:graph:retrieval) if explain_id: event_type = _get_event_type(explain_id) print(f"\n [{event_type}] {explain_id}", file=sys.stderr) - # Query triples for this explain node (using explain collection from event) + # Query triples for this explain node (using named graph filter) triples = await _query_triples( - ws_url, flow_id, explain_id, user, explain_collection, debug=debug + ws_url, flow_id, explain_id, user, collection, graph=explain_graph, debug=debug ) # Format and display details @@ -530,17 +533,17 @@ async def _question_explainable( for line in details: print(line, file=sys.stderr) - # For selection events, query each edge selection for details - if event_type == "selection": + # For focus events, query each edge selection for details + if event_type == "focus": for s, p, o in triples: if debug: print(f" [debug] triple: p={p}, o={o}, o_type={type(o).__name__}", file=sys.stderr) if p == TG_SELECTED_EDGE and isinstance(o, str): if debug: print(f" [debug] querying edge selection: {o}", file=sys.stderr) - # Query the edge selection entity (using explain collection from event) + # Query the edge selection entity (using named graph filter) edge_triples = await _query_triples( - ws_url, flow_id, o, user, explain_collection, debug=debug + ws_url, flow_id, o, user, collection, graph=explain_graph, debug=debug ) if debug: print(f" [debug] got {len(edge_triples)} edge triples", file=sys.stderr) @@ -743,7 +746,7 @@ def main(): parser.add_argument( '-x', '--explainable', action='store_true', - help='Show provenance events for explainability (implies streaming)' + help='Show provenance events: Question, Exploration, Focus, Synthesis (implies streaming)' ) parser.add_argument( diff --git a/trustgraph-cli/trustgraph/cli/show_graph.py b/trustgraph-cli/trustgraph/cli/show_graph.py index 105fe604..8db4edf4 100644 --- a/trustgraph-cli/trustgraph/cli/show_graph.py +++ b/trustgraph-cli/trustgraph/cli/show_graph.py @@ -1,6 +1,11 @@ """ Connects to the graph query service and dumps all graph edges. Uses streaming mode for lower time-to-first-result and reduced memory overhead. + +Named graphs: + - Default graph (empty): Core knowledge facts + - urn:graph:source: Extraction provenance (document/chunk sources) + - urn:graph:retrieval: Query-time explainability (question, exploration, focus, synthesis) """ import argparse @@ -12,7 +17,13 @@ default_user = 'trustgraph' default_collection = 'default' default_token = os.getenv("TRUSTGRAPH_TOKEN", None) -def show_graph(url, flow_id, user, collection, limit, batch_size, token=None): +# Named graph constants for convenience +GRAPH_DEFAULT = "" +GRAPH_SOURCE = "urn:graph:source" +GRAPH_RETRIEVAL = "urn:graph:retrieval" + + +def show_graph(url, flow_id, user, collection, limit, batch_size, graph=None, show_graph_column=False, token=None): socket = Api(url, token=token).socket() flow = socket.flow(flow_id) @@ -22,6 +33,7 @@ def show_graph(url, flow_id, user, collection, limit, batch_size, token=None): user=user, collection=collection, s=None, p=None, o=None, + g=graph, # Filter by named graph (None = all graphs) limit=limit, batch_size=batch_size, ): @@ -29,11 +41,16 @@ def show_graph(url, flow_id, user, collection, limit, batch_size, token=None): s = triple.get("s", {}) p = triple.get("p", {}) o = triple.get("o", {}) + g = triple.get("g") # Named graph (None = default graph) # Format terms for display s_str = s.get("v", s.get("i", str(s))) p_str = p.get("v", p.get("i", str(p))) o_str = o.get("v", o.get("i", str(o))) - print(s_str, p_str, o_str) + if show_graph_column: + g_str = g if g else "(default)" + print(f"[{g_str}]", s_str, p_str, o_str) + else: + print(s_str, p_str, o_str) finally: socket.close() @@ -88,8 +105,25 @@ def main(): help='Triples per streaming batch (default: 20)', ) + parser.add_argument( + '-g', '--graph', + default=None, + help='Filter by named graph (e.g., urn:graph:source, urn:graph:retrieval). Use "" for default graph only.', + ) + + parser.add_argument( + '--show-graph', + action='store_true', + help='Show graph column in output', + ) + args = parser.parse_args() + # Handle empty string for default graph filter + graph = args.graph + if graph == '""' or graph == "''": + graph = "" # Filter to default graph only + try: show_graph( @@ -99,6 +133,8 @@ def main(): collection = args.collection, limit = args.limit, batch_size = args.batch_size, + graph = graph, + show_graph_column = args.show_graph, token = args.token, ) diff --git a/trustgraph-flow/trustgraph/chunking/recursive/chunker.py b/trustgraph-flow/trustgraph/chunking/recursive/chunker.py index cb607067..ccc0a55f 100755 --- a/trustgraph-flow/trustgraph/chunking/recursive/chunker.py +++ b/trustgraph-flow/trustgraph/chunking/recursive/chunker.py @@ -14,6 +14,7 @@ from ... base import ChunkingService, ConsumerSpec, ProducerSpec from ... provenance import ( page_uri, chunk_uri_from_page, chunk_uri_from_doc, derived_entity_triples, document_uri, + set_graph, GRAPH_SOURCE, ) # Component identification for provenance @@ -160,7 +161,7 @@ class Processor(ChunkingService): title=f"Chunk {chunk_index}", ) - # Emit provenance triples + # Emit provenance triples (stored in source graph for separation from core knowledge) prov_triples = derived_entity_triples( entity_uri=chunk_uri, parent_uri=parent_uri, @@ -181,7 +182,7 @@ class Processor(ChunkingService): user=v.metadata.user, collection=v.metadata.collection, ), - triples=prov_triples, + triples=set_graph(prov_triples, GRAPH_SOURCE), )) # Forward chunk ID + content (post-chunker optimization) diff --git a/trustgraph-flow/trustgraph/decoding/pdf/pdf_decoder.py b/trustgraph-flow/trustgraph/decoding/pdf/pdf_decoder.py index 1248c7a0..ef85d30d 100755 --- a/trustgraph-flow/trustgraph/decoding/pdf/pdf_decoder.py +++ b/trustgraph-flow/trustgraph/decoding/pdf/pdf_decoder.py @@ -24,6 +24,7 @@ from ... base import Consumer, Producer, ConsumerMetrics, ProducerMetrics from ... provenance import ( document_uri, page_uri, derived_entity_triples, + set_graph, GRAPH_SOURCE, ) # Component identification for provenance @@ -285,7 +286,7 @@ class Processor(FlowProcessor): title=f"Page {page_num}", ) - # Emit provenance triples + # Emit provenance triples (stored in source graph for separation from core knowledge) doc_uri = document_uri(source_doc_id) pg_uri = page_uri(source_doc_id, page_num) @@ -305,7 +306,7 @@ class Processor(FlowProcessor): user=v.metadata.user, collection=v.metadata.collection, ), - triples=prov_triples, + triples=set_graph(prov_triples, GRAPH_SOURCE), )) # Forward page document ID to chunker diff --git a/trustgraph-flow/trustgraph/extract/kg/definitions/extract.py b/trustgraph-flow/trustgraph/extract/kg/definitions/extract.py index b91d30ed..7e301893 100755 --- a/trustgraph-flow/trustgraph/extract/kg/definitions/extract.py +++ b/trustgraph-flow/trustgraph/extract/kg/definitions/extract.py @@ -20,7 +20,7 @@ from .... rdf import TRUSTGRAPH_ENTITIES, DEFINITION, RDF_LABEL, SUBJECT_OF from .... base import FlowProcessor, ConsumerSpec, ProducerSpec from .... base import PromptClientSpec, ParameterSpec -from .... provenance import statement_uri, triple_provenance_triples +from .... provenance import statement_uri, triple_provenance_triples, set_graph, GRAPH_SOURCE from .... flow_version import __version__ as COMPONENT_VERSION DEFINITION_VALUE = Term(type=IRI, iri=DEFINITION) @@ -175,6 +175,7 @@ class Processor(FlowProcessor): triples.append(definition_triple) # Generate provenance for the definition triple (reification) + # Provenance triples go in the source graph for separation from core knowledge stmt_uri = statement_uri() prov_triples = triple_provenance_triples( stmt_uri=stmt_uri, @@ -185,7 +186,7 @@ class Processor(FlowProcessor): llm_model=llm_model, ontology_uri=ontology_uri, ) - triples.extend(prov_triples) + triples.extend(set_graph(prov_triples, GRAPH_SOURCE)) # Link entity to chunk (not top-level document) triples.append(Triple( diff --git a/trustgraph-flow/trustgraph/extract/kg/relationships/extract.py b/trustgraph-flow/trustgraph/extract/kg/relationships/extract.py index 7bda91eb..523f8474 100755 --- a/trustgraph-flow/trustgraph/extract/kg/relationships/extract.py +++ b/trustgraph-flow/trustgraph/extract/kg/relationships/extract.py @@ -20,7 +20,7 @@ from .... rdf import RDF_LABEL, TRUSTGRAPH_ENTITIES, SUBJECT_OF from .... base import FlowProcessor, ConsumerSpec, ProducerSpec from .... base import PromptClientSpec, ParameterSpec -from .... provenance import statement_uri, triple_provenance_triples +from .... provenance import statement_uri, triple_provenance_triples, set_graph, GRAPH_SOURCE from .... flow_version import __version__ as COMPONENT_VERSION RDF_LABEL_VALUE = Term(type=IRI, iri=RDF_LABEL) @@ -162,6 +162,7 @@ class Processor(FlowProcessor): triples.append(relationship_triple) # Generate provenance for the relationship triple (reification) + # Provenance triples go in the source graph for separation from core knowledge stmt_uri = statement_uri() prov_triples = triple_provenance_triples( stmt_uri=stmt_uri, @@ -172,7 +173,7 @@ class Processor(FlowProcessor): llm_model=llm_model, ontology_uri=ontology_uri, ) - triples.extend(prov_triples) + triples.extend(set_graph(prov_triples, GRAPH_SOURCE)) # Label for s triples.append(Triple( diff --git a/trustgraph-flow/trustgraph/retrieval/graph_rag/graph_rag.py b/trustgraph-flow/trustgraph/retrieval/graph_rag/graph_rag.py index 92f09ebf..53d4cbf7 100644 --- a/trustgraph-flow/trustgraph/retrieval/graph_rag/graph_rag.py +++ b/trustgraph-flow/trustgraph/retrieval/graph_rag/graph_rag.py @@ -12,14 +12,16 @@ from ... schema import IRI, LITERAL # Provenance imports from trustgraph.provenance import ( - query_session_uri, - retrieval_uri as make_retrieval_uri, - selection_uri as make_selection_uri, - answer_uri as make_answer_uri, - query_session_triples, - retrieval_triples, - selection_triples, - answer_triples, + question_uri, + exploration_uri as make_exploration_uri, + focus_uri as make_focus_uri, + synthesis_uri as make_synthesis_uri, + question_triples, + exploration_triples, + focus_triples, + synthesis_triples, + set_graph, + GRAPH_RETRIEVAL, ) # Module logger @@ -396,17 +398,20 @@ class GraphRag: # Generate explainability URIs upfront session_id = str(uuid.uuid4()) - session_uri = query_session_uri(session_id) - ret_uri = make_retrieval_uri(session_id) - sel_uri = make_selection_uri(session_id) - ans_uri = make_answer_uri(session_id) + q_uri = question_uri(session_id) + exp_uri = make_exploration_uri(session_id) + foc_uri = make_focus_uri(session_id) + syn_uri = make_synthesis_uri(session_id) timestamp = datetime.utcnow().isoformat() + "Z" - # Emit session explainability immediately + # Emit question explainability immediately if explain_callback: - session_triples = query_session_triples(session_uri, query, timestamp) - await explain_callback(session_triples, session_uri) + q_triples = set_graph( + question_triples(q_uri, query, timestamp), + GRAPH_RETRIEVAL + ) + await explain_callback(q_triples, q_uri) q = Query( rag = self, user = user, collection = collection, @@ -418,10 +423,13 @@ class GraphRag: kg, uri_map = await q.get_labelgraph(query) - # Emit retrieval explain after graph retrieval completes + # Emit exploration explain after graph retrieval completes if explain_callback: - ret_triples = retrieval_triples(ret_uri, session_uri, len(kg)) - await explain_callback(ret_triples, ret_uri) + exp_triples = set_graph( + exploration_triples(exp_uri, q_uri, len(kg)), + GRAPH_RETRIEVAL + ) + await explain_callback(exp_triples, exp_uri) if self.verbose: logger.debug("Invoking LLM...") @@ -511,12 +519,15 @@ class GraphRag: if self.verbose: logger.debug(f"Filtered to {len(selected_edges)} edges") - # Emit selection explain after edge selection completes + # Emit focus explain after edge selection completes if explain_callback: - sel_triples = selection_triples( - sel_uri, ret_uri, selected_edges_with_reasoning, session_id + foc_triples = set_graph( + focus_triples( + foc_uri, exp_uri, selected_edges_with_reasoning, session_id + ), + GRAPH_RETRIEVAL ) - await explain_callback(sel_triples, sel_uri) + await explain_callback(foc_triples, foc_uri) # Step 2: Synthesis - LLM generates answer from selected edges only selected_edge_dicts = [ @@ -554,30 +565,33 @@ class GraphRag: if self.verbose: logger.debug("Query processing complete") - # Emit answer explain after synthesis completes + # Emit synthesis explain after synthesis completes if explain_callback: - answer_doc_id = None + synthesis_doc_id = None answer_text = resp if resp else "" # Save answer to librarian if callback provided if save_answer_callback and answer_text: # Generate document ID as URN matching query-time provenance format - answer_doc_id = f"urn:trustgraph:answer:{session_id}" + synthesis_doc_id = f"urn:trustgraph:synthesis:{session_id}" try: - await save_answer_callback(answer_doc_id, answer_text) + await save_answer_callback(synthesis_doc_id, answer_text) if self.verbose: - logger.debug(f"Saved answer to librarian: {answer_doc_id}") + logger.debug(f"Saved answer to librarian: {synthesis_doc_id}") except Exception as e: logger.warning(f"Failed to save answer to librarian: {e}") - answer_doc_id = None # Fall back to inline content + synthesis_doc_id = None # Fall back to inline content # Generate triples with document reference or inline content - ans_triples = answer_triples( - ans_uri, sel_uri, - answer_text="" if answer_doc_id else answer_text, - document_id=answer_doc_id, + syn_triples = set_graph( + synthesis_triples( + syn_uri, foc_uri, + answer_text="" if synthesis_doc_id else answer_text, + document_id=synthesis_doc_id, + ), + GRAPH_RETRIEVAL ) - await explain_callback(ans_triples, ans_uri) + await explain_callback(syn_triples, syn_uri) if self.verbose: logger.debug(f"Emitted explain for session {session_id}") diff --git a/trustgraph-flow/trustgraph/retrieval/graph_rag/rag.py b/trustgraph-flow/trustgraph/retrieval/graph_rag/rag.py index f7b9054f..85b9fc66 100755 --- a/trustgraph-flow/trustgraph/retrieval/graph_rag/rag.py +++ b/trustgraph-flow/trustgraph/retrieval/graph_rag/rag.py @@ -13,6 +13,7 @@ from ... schema import GraphRagQuery, GraphRagResponse, Error from ... schema import Triples, Metadata from ... schema import LibrarianRequest, LibrarianResponse, DocumentMetadata from ... schema import librarian_request_queue, librarian_response_queue +from ... provenance import GRAPH_RETRIEVAL from . graph_rag import GraphRag from ... base import FlowProcessor, ConsumerSpec, ProducerSpec from ... base import PromptClientSpec, EmbeddingsClientSpec @@ -38,7 +39,6 @@ class Processor(FlowProcessor): triple_limit = params.get("triple_limit", 30) max_subgraph_size = params.get("max_subgraph_size", 150) max_path_length = params.get("max_path_length", 2) - explainability_collection = params.get("explainability_collection", "explainability") super(Processor, self).__init__( **params | { @@ -48,7 +48,6 @@ class Processor(FlowProcessor): "triple_limit": triple_limit, "max_subgraph_size": max_subgraph_size, "max_path_length": max_path_length, - "explainability_collection": explainability_collection, } ) @@ -56,7 +55,6 @@ class Processor(FlowProcessor): self.default_triple_limit = triple_limit self.default_max_subgraph_size = max_subgraph_size self.default_max_path_length = max_path_length - self.explainability_collection = explainability_collection # CRITICAL SECURITY: NEVER share data between users or collections # Each user/collection combination MUST have isolated data access @@ -239,24 +237,25 @@ class Processor(FlowProcessor): explainability_refs_emitted = [] # Real-time explainability callback - emits triples and IDs as they're generated + # Triples are stored in the user's collection with a named graph (urn:graph:retrieval) async def send_explainability(triples, explain_id): - # Send triples to explainability queue + # Send triples to explainability queue - stores in same collection with named graph await flow("explainability").send(Triples( metadata=Metadata( id=explain_id, metadata=[], user=v.user, - collection=self.explainability_collection, + collection=v.collection, # Store in user's collection, not separate explainability collection ), triples=triples, )) - # Send explain ID and collection to response queue + # Send explain ID and graph to response queue await flow("response").send( GraphRagResponse( message_type="explain", explain_id=explain_id, - explain_collection=self.explainability_collection, + explain_graph=GRAPH_RETRIEVAL, ), properties={"id": id} ) @@ -424,11 +423,8 @@ class Processor(FlowProcessor): help=f'Default max path length (default: 2)' ) - parser.add_argument( - '--explainability-collection', - default='explainability', - help=f'Collection for storing explainability triples (default: explainability)' - ) + # Note: Explainability triples are now stored in the user's collection + # with the named graph urn:graph:retrieval (no separate collection needed) def run(): diff --git a/trustgraph-flow/trustgraph/tables/knowledge.py b/trustgraph-flow/trustgraph/tables/knowledge.py index e68fbcab..f6cd7068 100644 --- a/trustgraph-flow/trustgraph/tables/knowledge.py +++ b/trustgraph-flow/trustgraph/tables/knowledge.py @@ -114,7 +114,7 @@ class KnowledgeTableStore: entity_embeddings list< tuple< tuple, - list> + list > >, PRIMARY KEY ((user, document_id), id) @@ -140,7 +140,7 @@ class KnowledgeTableStore: chunks list< tuple< blob, - list> + list > >, PRIMARY KEY ((user, document_id), id)