- add unit tests for base metrics, logging, spec, parameter_spec, and flow modules
- add a lightweight test-only module loader so these tests can run without optional runtime dependencies
- fix Parameter.start/stop to accept self
Subscriber resilience: recreate consumer after connection failure
- Move consumer creation from Subscriber.start() into the run() loop,
matching the pattern used by Consumer. If the connection drops and the
consumer is closed in the finally block, the loop now recreates it on
the next iteration instead of spinning forever on a None consumer.
Consumer thread safety:
- Dedicated ThreadPoolExecutor per consumer so all pika operations
(create, receive, acknowledge, negative_acknowledge) run on the
same thread — pika BlockingConnection is not thread-safe
- Applies to both Consumer and Subscriber classes
Config handler type audit — fix four mismatched type registrations:
- librarian: was ["librarian"] (non-existent type), now ["flow",
"active-flow"] (matches config["flow"] that the handler reads)
- cores/service: was ["kg-core"], now ["flow"] (reads
config["flow"])
- metering/counter: was ["token-costs"], now ["token-cost"]
(singular)
- agent/mcp_tool: was ["mcp-tool"], now ["mcp"] (reads
config["mcp"])
Update tests
Provenance triples are now included directly in explain messages from
GraphRAG, DocumentRAG, and Agent services, eliminating the need for
follow-up knowledge graph queries to retrieve explainability details.
Each explain message in the response stream now carries:
- explain_id: root URI for this provenance step (unchanged)
- explain_graph: named graph where triples are stored (unchanged)
- explain_triples: the actual provenance triples for this step (new)
Changes across the stack:
- Schema: added explain_triples field to GraphRagResponse,
DocumentRagResponse, and AgentResponse
- Services: all explain message call sites pass triples through
(graph_rag, document_rag, agent react, agent orchestrator)
- Translators: encode explain_triples via TripleTranslator for
gateway wire format
- Python SDK: ProvenanceEvent now includes parsed ExplainEntity
and raw triples; expanded event_type detection
- CLI: invoke_graph_rag, invoke_agent, invoke_document_rag use
inline entity when available, fall back to graph query
- Tech specs updated
Additional explainability test
- Fix agent react and orchestrator services appending bare methods
to config_handlers instead of using register_config_handler() —
caused 'method object is not subscriptable' on config notify
- Add exc_info to config fetch retry logging for proper tracebacks
- Remove debug print statements from collection management
dispatcher and translator
- Disable RabbitMQ heartbeats (heartbeat=0) to prevent broker
closing idle producer connections that can't process heartbeat
frames from BlockingConnection
Replace the config push mechanism that broadcast the full config
blob on a 'state' class pub/sub queue with a lightweight notify
signal containing only the version number and affected config
types. Processors fetch the full config via request/response from
the config service when notified.
This eliminates the need for the pub/sub 'state' queue class and
stateful pub/sub services entirely. The config push queue moves
from 'state' to 'flow' class — a simple transient signal rather
than a retained message. This solves the RabbitMQ
late-subscriber problem where restarting processes never received
the current config because their fresh queue had no historical
messages.
Key changes:
- ConfigPush schema: config dict replaced with types list
- Subscribe-then-fetch startup with retry: processors subscribe
to notify queue, fetch config via request/response, then
process buffered notifies with version comparison to avoid race
conditions
- register_config_handler() accepts optional types parameter so
handlers only fire when their config types change
- Short-lived config request/response clients to avoid subscriber
contention on non-persistent response topics
- Config service passes affected types through put/delete/flow
operations
- Gateway ConfigReceiver rewritten with same notify pattern and
retry loop
Tests updated
New tests:
- register_config_handler: without types, with types, multiple
types, multiple handlers
- on_config_notify: old/same version skipped, irrelevant types
skipped (version still updated), relevant type triggers fetch,
handler without types always called, mixed handler filtering,
empty types invokes all, fetch failure handled gracefully
- fetch_config: returns config+version, raises on error response,
stops client even on exception
- fetch_and_apply_config: applies to all handlers on startup,
retries on failure
* fix: prevent duplicate dispatcher creation race condition in invoke_global_service
Concurrent coroutines could all pass the `if key in self.dispatchers` check
before any of them wrote the result back, because `await dispatcher.start()`
yields to the event loop. This caused multiple Pulsar consumers to be created
on the same shared subscription, distributing responses round-robin and
dropping ~2/3 of them — manifesting as a permanent spinner in the Workbench UI.
Apply a double-checked asyncio.Lock in both `invoke_global_service` and
`invoke_flow_service` so only one dispatcher is ever created per service key.
* test: add concurrent-dispatch tests for race condition fix
Add asyncio.gather-based tests that verify invoke_global_service and
invoke_flow_service create exactly one dispatcher under concurrent calls,
preventing the duplicate Pulsar consumer bug.
* fix: prevent duplicate dispatcher creation race condition in invoke_global_service
Concurrent coroutines could all pass the `if key in self.dispatchers` check
before any of them wrote the result back, because `await dispatcher.start()`
yields to the event loop. This caused multiple Pulsar consumers to be created
on the same shared subscription, distributing responses round-robin and
dropping ~2/3 of them — manifesting as a permanent spinner in the Workbench UI.
Apply a double-checked asyncio.Lock in both `invoke_global_service` and
`invoke_flow_service` so only one dispatcher is ever created per service key.
* test: add concurrent-dispatch tests for race condition fix
Add asyncio.gather-based tests that verify invoke_global_service and
invoke_flow_service create exactly one dispatcher under concurrent calls,
preventing the duplicate Pulsar consumer bug.
SPARQL 1.1 query service wrapping pub/sub triples interface
Add a backend-agnostic SPARQL query service that parses SPARQL
queries using rdflib, decomposes them into triple pattern lookups
via the existing TriplesClient pub/sub interface, and performs
in-memory joins, filters, and projections.
Includes:
- SPARQL parser, algebra evaluator, expression evaluator, solution
sequence operations (BGP, JOIN, OPTIONAL, UNION, FILTER, BIND,
VALUES, GROUP BY, ORDER BY, LIMIT/OFFSET, DISTINCT, aggregates)
- FlowProcessor service with TriplesClientSpec
- Gateway dispatcher, request/response translators, API spec
- Python SDK method (FlowInstance.sparql_query)
- CLI command (tg-invoke-sparql-query)
- Tech spec (docs/tech-specs/sparql-query.md)
New unit tests for SPARQL query
Adds a RabbitMQ backend as an alternative to Pulsar, selectable via
PUBSUB_BACKEND=rabbitmq. Both backends implement the same PubSubBackend
protocol — no application code changes needed to switch.
RabbitMQ topology:
- Single topic exchange per topicspace (e.g. 'tg')
- Routing key derived from queue class and topic name
- Shared consumers: named queue bound to exchange (competing, round-robin)
- Exclusive consumers: anonymous auto-delete queue (broadcast, each gets
every message). Used by Subscriber and config push consumer.
- Thread-local producer connections (pika is not thread-safe)
- Push-based consumption via basic_consume with process_data_events
for heartbeat processing
Consumer model changes:
- Consumer class creates one backend consumer per concurrent task
(required for pika thread safety, harmless for Pulsar)
- Consumer class accepts consumer_type parameter
- Subscriber passes consumer_type='exclusive' for broadcast semantics
- Config push consumer uses consumer_type='exclusive' so every
processor instance receives config updates
- handle_one_from_queue receives consumer as parameter for correct
per-connection ack/nack
LibrarianClient:
- New shared client class replacing duplicated librarian request-response
code across 6+ services (chunking, decoders, RAG, etc.)
- Uses stream-document instead of get-document-content for fetching
document content in 1MB chunks (avoids broker message size limits)
- Standalone object (self.librarian = LibrarianClient(...)) not a mixin
- get-document-content marked deprecated in schema and OpenAPI spec
Serialisation:
- Extracted dataclass_to_dict/dict_to_dataclass to shared
serialization.py (used by both Pulsar and RabbitMQ backends)
Librarian queues:
- Changed from flow class (persistent) back to request/response class
now that stream-document eliminates large single messages
- API upload chunk size reduced from 5MB to 3MB to stay under broker
limits after base64 encoding
Factory and CLI:
- get_pubsub() handles 'rabbitmq' backend with RabbitMQ connection params
- add_pubsub_args() includes RabbitMQ options (host, port, credentials)
- add_pubsub_args(standalone=True) defaults to localhost for CLI tools
- init_trustgraph skips Pulsar admin setup for non-Pulsar backends
- tg-dump-queues and tg-monitor-prompts use backend abstraction
- BaseClient and ConfigClient accept generic pubsub config
Remove Pulsar-specific concepts from application code so that
the pub/sub backend is swappable via configuration.
Rename translators:
- to_pulsar/from_pulsar → decode/encode across all translator
classes, dispatch handlers, and tests (55+ files)
- from_response_with_completion → encode_with_completion
- Remove pulsar.schema.Record from translator base class
Queue naming (CLASS:TOPICSPACE:TOPIC):
- Replace topic() helper with queue() using new format:
flow:tg:name, request:tg:name, response:tg:name, state:tg:name
- Queue class implies persistence/TTL (no QoS in names)
- Update Pulsar backend map_topic() to parse new format
- Librarian queues use flow class (persistent, for chunking)
- Config push uses state class (persistent, last-value)
- Remove 15 dead topic imports from schema files
- Update init_trustgraph.py namespace: config → state
Confine Pulsar to pulsar_backend.py:
- Delete legacy PulsarClient class from pubsub.py
- Move add_args to add_pubsub_args() with standalone flag
for CLI tools (defaults to localhost)
- PulsarBackendConsumer.receive() catches _pulsar.Timeout,
raises standard TimeoutError
- Remove Pulsar imports from: async_processor, flow_processor,
log_level, all 11 client files, 4 storage writers, gateway
service, gateway config receiver
- Remove log_level/LoggerLevel from client API
- Rewrite tg-monitor-prompts to use backend abstraction
- Update tg-dump-queues to use add_pubsub_args
Also: pubsub-abstraction.md tech spec covering problem statement,
design goals, as-is requirements, candidate broker assessment,
approach, and implementation order.
Wire message_id on all answer chunks, fix DAG structure message_id:
- Add message_id to AgentAnswer dataclass and propagate in
socket_client._parse_chunk
- Wire message_id into answer callbacks and send_final_response
for all three patterns (react, plan-then-execute, supervisor)
- Supervisor decomposition thought and synthesis answer chunks
now carry message_id
DAG structure fixes:
- Observation derives from sub-trace Synthesis (not Analysis)
when a tool produces a sub-trace; tracked via
last_sub_explain_uri on context
- Subagent sessions derive from parent's Decomposition via
parent_uri on agent_session_triples
- Findings derive from subagent Conclusions (not Decomposition)
- Synthesis derives from all findings (multiple wasDerivedFrom)
ensuring single terminal node
- agent_synthesis_triples accepts list of parent URIs
- Explainability chain walker follows from sub-trace terminal
to find downstream Observation
Emit Analysis before tool execution:
- Add on_action callback to react() in agent_manager.py, called
after reason() but before tool invocation
- Orchestrator and old service emit Analysis+ToolUse triples via
on_action so sub-traces appear after their parent in the stream
Refactor agent provenance so that the decision (thought + tool
selection) and the result (observation) are separate DAG entities:
Question ← Analysis+ToolUse ← Observation ← ... ← Conclusion
Analysis gains tg:ToolUse as a mixin RDF type and is emitted
before tool execution via an on_action callback in react().
This ensures sub-traces (e.g. GraphRAG) appear after their
parent Analysis in the streaming event order.
Observation becomes a standalone prov:Entity with tg:Observation
type, emitted after tool execution. The linear DAG chain runs
through Observation — subsequent iterations and the Conclusion
derive from it, not from the Analysis.
message_id is populated on streaming AgentResponse for thought
and observation chunks, using the provenance URI of the entity
being built. This lets clients group streamed chunks by entity.
Wire changes:
- provenance/agent.py: Add ToolUse type, new
agent_observation_triples(), remove observation from iteration
- agent_manager.py: Add on_action callback between reason() and
tool execution
- orchestrator/pattern_base.py: Split emit, wire message_id,
chain through observation URIs
- orchestrator/react_pattern.py: Emit Analysis via on_action
before tool runs
- agent/react/service.py: Same for non-orchestrator path
- api/explainability.py: New Observation class, updated dispatch
and chain walker
- api/types.py: Add message_id to AgentThought/AgentObservation
- cli: Render Observation separately, [analysis: tool] labels
Tidy agent-orchestrator logs
Added CLI support for selecting the pattern...
tg-invoke-agent -q "What is the document about?" -p supervisor -v
tg-invoke-agent -q "What is the document about?" -p plan-then-execute -v
tg-invoke-agent -q "What is the document about?" -p react -v
Added new event types to tg-show-explain-trace
Add 96 tests covering the orchestrator's aggregation, provenance,
routing, and explainability parsing. These verify the supervisor
fan-out/fan-in lifecycle, the new RDF provenance types
(Decomposition, Finding, Plan, StepResult, Synthesis), and their
round-trip through the wire format.
Unit tests (84):
- Aggregator: register, record completion, peek, build synthesis,
cleanup
- Provenance triple builders: types, provenance links,
goals/steps, labels
- Explainability parsing: from_triples dispatch, field extraction
for all new entity types, precedence over existing types
- PatternBase: is_subagent detection, emit_subagent_completion
message shape
- Completion dispatch: detection logic, full aggregator
integration flow, synthesis request not re-intercepted as
completion
- MetaRouter: task type identification, pattern selection,
valid_patterns constraints, fallback on LLM error or unknown
response
Contract tests (12):
- Orchestration fields on AgentRequest round-trip correctly
- subagent-completion and synthesise step types in request
history
- Plan steps with status and dependencies
- Provenance triple builder → wire format → from_triples
round-trip for all five new entity types
agent-orchestrator: add explainability provenance for all agent
patterns
Extend the provenance/explainability system to provide
human-readable reasoning traces for the orchestrator's three
agent patterns. Previously only ReAct emitted provenance
(session, iteration, conclusion). Now each pattern records its
cognitive steps as typed RDF entities in the knowledge graph,
using composable mixin types (e.g. Finding + Answer).
New provenance chains:
- Supervisor: Question → Decomposition → Finding ×N → Synthesis
- Plan-then-Execute: Question → Plan → StepResult ×N → Synthesis
- ReAct: Question → Analysis ×N → Conclusion (unchanged)
New RDF types: Decomposition, Finding, Plan, StepResult.
New predicates: tg:subagentGoal, tg:planStep.
Reuses existing Synthesis + Answer mixin for final answers.
Provenance library (trustgraph-base):
- Triple builders, URI generators, vocabulary labels for new types
- Client dataclasses with from_triples() dispatch
- fetch_agent_trace() follows branching provenance chains
- API exports updated
Orchestrator (trustgraph-flow):
- PatternBase emit methods for decomposition, finding, plan, step result, and synthesis
- SupervisorPattern emits decomposition during fan-out
- PlanThenExecutePattern emits plan and step results
- Service emits finding triples on subagent completion
- Synthesis provenance replaces generic final triples
CLI (trustgraph-cli):
- invoke_agent -x displays new entity types inline
Introduce an agent orchestrator service that supports three
execution patterns (ReAct, plan-then-execute, supervisor) with
LLM-based meta-routing to select the appropriate pattern and task
type per request. Update the agent schema to support
orchestration fields (correlation, sub-agents, plan steps) and
remove legacy response fields (answer, thought, observation).
Subscribes to prompt request/response Pulsar queues, correlates
messages by ID, and logs a summary with template name, truncated
terms, and elapsed time. Streaming responses are accumulated and
shown at completion. Supports prompt and prompt-rag queue types.
Use max_completion_tokens for OpenAI and Azure OpenAI providers:
The OpenAI API deprecated max_tokens in favor of
max_completion_tokens for chat completions. Newer models
(gpt-4o, o1, o3) reject the old parameter with a 400 error.
AZURE_API_VERSION env var now overrides the default API version:
(falls back to 2024-12-01-preview).
Update tests to test for expected structures
Error responses from the websocket multiplexer were missing the
request ID and using a bare string format instead of the structured
error protocol. This caused clients to hang when a request failed
(e.g. unsupported service for a flow) because the error could not
be routed to the waiting caller.
Include request ID in all error paths, use structured error format
({message, type}) with complete flag, and extract the ID early in
receive() so even malformed requests get a routable error when
possible.
Updated tests - tests were coded against invalid protocol messages
Fix missing auth header in verify_system_status processor check
The check_processors function received the token parameter but
did not include it in the Authorization header when calling the
metrics endpoint, causing 401 errors when gateway auth is enabled.
Replace per-request websocket connections in SocketClient and
AsyncSocketClient with a single persistent connection that
multiplexes requests by ID via a background reader task. This
eliminates repeated TCP+WS handshakes which caused significant
latency over proxies.
Convert show_flows, show_flow_blueprints, and
show_parameter_types CLI tools from sequential HTTP requests to
concurrent websocket requests using AsyncSocketClient, reducing
round trips from O(N) sequential to a small number of parallel
batches.
Also fix describe_interfaces bug in show_flows where response
queue was reading the request field instead of the response
field.
Pass bearer token from GATEWAY_SECRET environment variable as a
URL query parameter on websocket connections to the API gateway.
When unset or empty, no auth is applied (backwards compatible).