trustgraph/trustgraph-base/trustgraph/clients/graph_rag_client.py

84 lines
2.5 KiB
Python
Raw Normal View History

2024-07-10 23:20:06 +01:00
from .. schema import GraphRagQuery, GraphRagResponse
from .. schema import graph_rag_request_queue, graph_rag_response_queue
from . base import BaseClient
2024-07-10 23:20:06 +01:00
# Ugly
class GraphRagClient(BaseClient):
2024-07-10 23:20:06 +01:00
def __init__(
self,
subscriber=None,
input_queue=None,
output_queue=None,
2024-07-10 23:20:06 +01:00
pulsar_host="pulsar://pulsar:6650",
pulsar_api_key=None,
2024-07-10 23:20:06 +01:00
):
if input_queue == None:
input_queue = graph_rag_request_queue
if output_queue == None:
output_queue = graph_rag_response_queue
super(GraphRagClient, self).__init__(
subscriber=subscriber,
input_queue=input_queue,
output_queue=output_queue,
pulsar_host=pulsar_host,
pulsar_api_key=pulsar_api_key,
input_schema=GraphRagQuery,
output_schema=GraphRagResponse,
2024-07-10 23:20:06 +01:00
)
def request(
feat: workspace-based multi-tenancy, replacing user as tenancy axis (#840) Introduces `workspace` as the isolation boundary for config, flows, library, and knowledge data. Removes `user` as a schema-level field throughout the code, API specs, and tests; workspace provides the same separation more cleanly at the trusted flow.workspace layer rather than through client-supplied message fields. Design ------ - IAM tech spec (docs/tech-specs/iam.md) documents current state, proposed auth/access model, and migration direction. - Data ownership model (docs/tech-specs/data-ownership-model.md) captures the workspace/collection/flow hierarchy. Schema + messaging ------------------ - Drop `user` field from AgentRequest/Step, GraphRagQuery, DocumentRagQuery, Triples/Graph/Document/Row EmbeddingsRequest, Sparql/Rows/Structured QueryRequest, ToolServiceRequest. - Keep collection/workspace routing via flow.workspace at the service layer. - Translators updated to not serialise/deserialise user. API specs --------- - OpenAPI schemas and path examples cleaned of user fields. - Websocket async-api messages updated. - Removed the unused parameters/User.yaml. Services + base --------------- - Librarian, collection manager, knowledge, config: all operations scoped by workspace. Config client API takes workspace as first positional arg. - `flow.workspace` set at flow start time by the infrastructure; no longer pass-through from clients. - Tool service drops user-personalisation passthrough. CLI + SDK --------- - tg-init-workspace and workspace-aware import/export. - All tg-* commands drop user args; accept --workspace. - Python API/SDK (flow, socket_client, async_*, explainability, library) drop user kwargs from every method signature. MCP server ---------- - All tool endpoints drop user parameters; socket_manager no longer keyed per user. Flow service ------------ - Closure-based topic cleanup on flow stop: only delete topics whose blueprint template was parameterised AND no remaining live flow (across all workspaces) still resolves to that topic. Three scopes fall out naturally from template analysis: * {id} -> per-flow, deleted on stop * {blueprint} -> per-blueprint, kept while any flow of the same blueprint exists * {workspace} -> per-workspace, kept while any flow in the workspace exists * literal -> global, never deleted (e.g. tg.request.librarian) Fixes a bug where stopping a flow silently destroyed the global librarian exchange, wedging all library operations until manual restart. RabbitMQ backend ---------------- - heartbeat=60, blocked_connection_timeout=300. Catches silently dead connections (broker restart, orphaned channels, network partitions) within ~2 heartbeat windows, so the consumer reconnects and re-binds its queue rather than sitting forever on a zombie connection. Tests ----- - Full test refresh: unit, integration, contract, provenance. - Dropped user-field assertions and constructor kwargs across ~100 test files. - Renamed user-collection isolation tests to workspace-collection.
2026-04-21 23:23:01 +01:00
self, query, collection="default",
chunk_callback=None,
explain_callback=None,
timeout=500
):
"""
Request a graph RAG query with optional streaming callbacks.
2024-07-10 23:20:06 +01:00
Args:
query: The question to ask
collection: Collection identifier
chunk_callback: Optional callback(text, end_of_stream) for text chunks
explain_callback: Optional callback(explain_id, explain_graph, explain_triples) for explain notifications
timeout: Request timeout in seconds
Returns:
Complete response text (accumulated from all chunks)
"""
accumulated_response = []
def inspect(x):
# Handle explain notifications
if x.message_type == 'explain':
if explain_callback and x.explain_id:
explain_callback(x.explain_id, x.explain_graph, x.explain_triples)
return False # Continue receiving
# Handle text chunks
if x.message_type == 'chunk':
if x.response:
accumulated_response.append(x.response)
if chunk_callback:
chunk_callback(x.response, x.end_of_stream)
# Complete when session ends
if x.end_of_session:
return True
return False # Continue receiving
self.call(
feat: workspace-based multi-tenancy, replacing user as tenancy axis (#840) Introduces `workspace` as the isolation boundary for config, flows, library, and knowledge data. Removes `user` as a schema-level field throughout the code, API specs, and tests; workspace provides the same separation more cleanly at the trusted flow.workspace layer rather than through client-supplied message fields. Design ------ - IAM tech spec (docs/tech-specs/iam.md) documents current state, proposed auth/access model, and migration direction. - Data ownership model (docs/tech-specs/data-ownership-model.md) captures the workspace/collection/flow hierarchy. Schema + messaging ------------------ - Drop `user` field from AgentRequest/Step, GraphRagQuery, DocumentRagQuery, Triples/Graph/Document/Row EmbeddingsRequest, Sparql/Rows/Structured QueryRequest, ToolServiceRequest. - Keep collection/workspace routing via flow.workspace at the service layer. - Translators updated to not serialise/deserialise user. API specs --------- - OpenAPI schemas and path examples cleaned of user fields. - Websocket async-api messages updated. - Removed the unused parameters/User.yaml. Services + base --------------- - Librarian, collection manager, knowledge, config: all operations scoped by workspace. Config client API takes workspace as first positional arg. - `flow.workspace` set at flow start time by the infrastructure; no longer pass-through from clients. - Tool service drops user-personalisation passthrough. CLI + SDK --------- - tg-init-workspace and workspace-aware import/export. - All tg-* commands drop user args; accept --workspace. - Python API/SDK (flow, socket_client, async_*, explainability, library) drop user kwargs from every method signature. MCP server ---------- - All tool endpoints drop user parameters; socket_manager no longer keyed per user. Flow service ------------ - Closure-based topic cleanup on flow stop: only delete topics whose blueprint template was parameterised AND no remaining live flow (across all workspaces) still resolves to that topic. Three scopes fall out naturally from template analysis: * {id} -> per-flow, deleted on stop * {blueprint} -> per-blueprint, kept while any flow of the same blueprint exists * {workspace} -> per-workspace, kept while any flow in the workspace exists * literal -> global, never deleted (e.g. tg.request.librarian) Fixes a bug where stopping a flow silently destroyed the global librarian exchange, wedging all library operations until manual restart. RabbitMQ backend ---------------- - heartbeat=60, blocked_connection_timeout=300. Catches silently dead connections (broker restart, orphaned channels, network partitions) within ~2 heartbeat windows, so the consumer reconnects and re-binds its queue rather than sitting forever on a zombie connection. Tests ----- - Full test refresh: unit, integration, contract, provenance. - Dropped user-field assertions and constructor kwargs across ~100 test files. - Renamed user-collection isolation tests to workspace-collection.
2026-04-21 23:23:01 +01:00
collection=collection, query=query,
inspect=inspect, timeout=timeout
)
return "".join(accumulated_response)
2024-07-10 23:20:06 +01:00