From c20e6540ecb624d24486981bc2c189e3db2a6176 Mon Sep 17 00:00:00 2001 From: cybermaggedon Date: Tue, 7 Apr 2026 14:51:14 +0100 Subject: [PATCH 1/2] Subscriber resilience and RabbitMQ fixes (#765) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Subscriber resilience: recreate consumer after connection failure - Move consumer creation from Subscriber.start() into the run() loop, matching the pattern used by Consumer. If the connection drops and the consumer is closed in the finally block, the loop now recreates it on the next iteration instead of spinning forever on a None consumer. Consumer thread safety: - Dedicated ThreadPoolExecutor per consumer so all pika operations (create, receive, acknowledge, negative_acknowledge) run on the same thread — pika BlockingConnection is not thread-safe - Applies to both Consumer and Subscriber classes Config handler type audit — fix four mismatched type registrations: - librarian: was ["librarian"] (non-existent type), now ["flow", "active-flow"] (matches config["flow"] that the handler reads) - cores/service: was ["kg-core"], now ["flow"] (reads config["flow"]) - metering/counter: was ["token-costs"], now ["token-cost"] (singular) - agent/mcp_tool: was ["mcp-tool"], now ["mcp"] (reads config["mcp"]) Update tests --- .../test_subscriber_graceful_shutdown.py | 26 +++---- .../test_consumer_concurrency.py | 8 +-- .../trustgraph/base/async_processor.py | 1 - trustgraph-base/trustgraph/base/consumer.py | 70 +++++++++++++------ trustgraph-base/trustgraph/base/subscriber.py | 46 +++++++----- .../trustgraph/agent/mcp_tool/service.py | 2 +- trustgraph-flow/trustgraph/cores/service.py | 2 +- .../trustgraph/librarian/service.py | 5 +- .../trustgraph/metering/counter.py | 2 +- 9 files changed, 96 insertions(+), 66 deletions(-) diff --git a/tests/unit/test_base/test_subscriber_graceful_shutdown.py b/tests/unit/test_base/test_subscriber_graceful_shutdown.py index 0587e3d6..ec14f66b 100644 --- a/tests/unit/test_base/test_subscriber_graceful_shutdown.py +++ b/tests/unit/test_base/test_subscriber_graceful_shutdown.py @@ -61,23 +61,21 @@ async def test_subscriber_deferred_acknowledgment_success(): max_size=10, backpressure_strategy="block" ) - - # Start subscriber to initialize consumer - await subscriber.start() - + subscriber.consumer = mock_consumer + # Create queue for subscription queue = await subscriber.subscribe("test-queue") - + # Create mock message with matching queue name msg = create_mock_message("test-queue", {"data": "test"}) - + # Process message await subscriber._process_message(msg) - + # Should acknowledge successful delivery mock_consumer.acknowledge.assert_called_once_with(msg) mock_consumer.negative_acknowledge.assert_not_called() - + # Message should be in queue assert not queue.empty() received_msg = await queue.get() @@ -108,9 +106,7 @@ async def test_subscriber_dropped_message_still_acks(): max_size=1, # Very small queue backpressure_strategy="drop_new" ) - - # Start subscriber to initialize consumer - await subscriber.start() + subscriber.consumer = mock_consumer # Create queue and fill it queue = await subscriber.subscribe("test-queue") @@ -151,9 +147,7 @@ async def test_subscriber_orphaned_message_acks(): max_size=10, backpressure_strategy="block" ) - - # Start subscriber to initialize consumer - await subscriber.start() + subscriber.consumer = mock_consumer # Don't create any queues - message will be orphaned # This simulates a response arriving after the waiter has unsubscribed @@ -189,9 +183,7 @@ async def test_subscriber_backpressure_strategies(): max_size=2, backpressure_strategy="drop_oldest" ) - - # Start subscriber to initialize consumer - await subscriber.start() + subscriber.consumer = mock_consumer queue = await subscriber.subscribe("test-queue") diff --git a/tests/unit/test_concurrency/test_consumer_concurrency.py b/tests/unit/test_concurrency/test_consumer_concurrency.py index 03244b73..59c7f2b5 100644 --- a/tests/unit/test_concurrency/test_consumer_concurrency.py +++ b/tests/unit/test_concurrency/test_consumer_concurrency.py @@ -81,9 +81,8 @@ class TestTaskGroupConcurrency: # Track how many consume_from_queue calls are made call_count = 0 - original_running = True - async def mock_consume(backend_consumer): + async def mock_consume(backend_consumer, executor=None): nonlocal call_count call_count += 1 # Wait a bit to let all tasks start, then signal stop @@ -107,7 +106,7 @@ class TestTaskGroupConcurrency: consumer = _make_consumer(concurrency=1) call_count = 0 - async def mock_consume(backend_consumer): + async def mock_consume(backend_consumer, executor=None): nonlocal call_count call_count += 1 await asyncio.sleep(0.01) @@ -294,9 +293,8 @@ class TestPollTimeout: raise type('Timeout', (Exception,), {})("timeout") mock_pulsar_consumer.receive = capture_receive - consumer.consumer = mock_pulsar_consumer - await consumer.consume_from_queue() + await consumer.consume_from_queue(mock_pulsar_consumer) assert received_kwargs.get("timeout_millis") == 100 diff --git a/trustgraph-base/trustgraph/base/async_processor.py b/trustgraph-base/trustgraph/base/async_processor.py index c805bffa..4f04df16 100644 --- a/trustgraph-base/trustgraph/base/async_processor.py +++ b/trustgraph-base/trustgraph/base/async_processor.py @@ -94,7 +94,6 @@ class AsyncProcessor: metrics = config_consumer_metrics, start_of_messages = False, - consumer_type = 'exclusive', ) self.running = True diff --git a/trustgraph-base/trustgraph/base/consumer.py b/trustgraph-base/trustgraph/base/consumer.py index 4f8c9de5..b6c28bbe 100644 --- a/trustgraph-base/trustgraph/base/consumer.py +++ b/trustgraph-base/trustgraph/base/consumer.py @@ -12,6 +12,7 @@ import asyncio import time import logging +from concurrent.futures import ThreadPoolExecutor from .. exceptions import TooManyRequests @@ -110,29 +111,37 @@ class Consumer: logger.info(f"Starting {self.concurrency} receiver threads") # Create one backend consumer per concurrent task. - # Each gets its own connection — required for backends - # like RabbitMQ where connections are not thread-safe. + # Each gets its own connection and dedicated thread — + # required for backends like RabbitMQ where connections + # are not thread-safe (pika BlockingConnection must be + # used from a single thread). consumers = [] + executors = [] for i in range(self.concurrency): try: logger.info(f"Subscribing to topic: {self.topic} (worker {i})") - c = await asyncio.to_thread( - self.backend.create_consumer, - topic = self.topic, - subscription = self.subscriber, - schema = self.schema, - initial_position = initial_pos, - consumer_type = self.consumer_type, + executor = ThreadPoolExecutor(max_workers=1) + loop = asyncio.get_event_loop() + c = await loop.run_in_executor( + executor, + lambda: self.backend.create_consumer( + topic = self.topic, + subscription = self.subscriber, + schema = self.schema, + initial_position = initial_pos, + consumer_type = self.consumer_type, + ), ) consumers.append(c) + executors.append(executor) logger.info(f"Successfully subscribed to topic: {self.topic} (worker {i})") except Exception as e: logger.error(f"Consumer subscription exception (worker {i}): {e}", exc_info=True) raise async with asyncio.TaskGroup() as tg: - for c in consumers: - tg.create_task(self.consume_from_queue(c)) + for c, ex in zip(consumers, executors): + tg.create_task(self.consume_from_queue(c, ex)) if self.metrics: self.metrics.state("stopped") @@ -146,7 +155,10 @@ class Consumer: c.close() except Exception: pass + for ex in executors: + ex.shutdown(wait=False) consumers = [] + executors = [] await asyncio.sleep(self.reconnect_time) continue @@ -157,15 +169,18 @@ class Consumer: c.close() except Exception: pass + for ex in executors: + ex.shutdown(wait=False) - async def consume_from_queue(self, consumer): + async def consume_from_queue(self, consumer, executor=None): + loop = asyncio.get_event_loop() while self.running: try: - msg = await asyncio.to_thread( - consumer.receive, - timeout_millis=100 + msg = await loop.run_in_executor( + executor, + lambda: consumer.receive(timeout_millis=100), ) except Exception as e: # Handle timeout from any backend @@ -173,10 +188,11 @@ class Consumer: continue raise e - await self.handle_one_from_queue(msg, consumer) + await self.handle_one_from_queue(msg, consumer, executor) - async def handle_one_from_queue(self, msg, consumer): + async def handle_one_from_queue(self, msg, consumer, executor=None): + loop = asyncio.get_event_loop() expiry = time.time() + self.rate_limit_timeout # This loop is for retry on rate-limit / resource limits @@ -187,8 +203,11 @@ class Consumer: logger.warning("Gave up waiting for rate-limit retry") # Message failed to be processed, this causes it to - # be retried - consumer.negative_acknowledge(msg) + # be retried. Ack on the consumer's dedicated thread + # (pika is not thread-safe). + await loop.run_in_executor( + executor, lambda: consumer.negative_acknowledge(msg) + ) if self.metrics: self.metrics.process("error") @@ -210,8 +229,11 @@ class Consumer: logger.debug("Message processed successfully") - # Acknowledge successful processing of the message - consumer.acknowledge(msg) + # Acknowledge on the consumer's dedicated thread + # (pika is not thread-safe) + await loop.run_in_executor( + executor, lambda: consumer.acknowledge(msg) + ) if self.metrics: self.metrics.process("success") @@ -237,8 +259,10 @@ class Consumer: logger.error(f"Message processing exception: {e}", exc_info=True) # Message failed to be processed, this causes it to - # be retried - consumer.negative_acknowledge(msg) + # be retried. Ack on the consumer's dedicated thread. + await loop.run_in_executor( + executor, lambda: consumer.negative_acknowledge(msg) + ) if self.metrics: self.metrics.process("error") diff --git a/trustgraph-base/trustgraph/base/subscriber.py b/trustgraph-base/trustgraph/base/subscriber.py index 36948131..6cb234b1 100644 --- a/trustgraph-base/trustgraph/base/subscriber.py +++ b/trustgraph-base/trustgraph/base/subscriber.py @@ -7,6 +7,7 @@ import asyncio import time import logging import uuid +from concurrent.futures import ThreadPoolExecutor # Module logger logger = logging.getLogger(__name__) @@ -38,6 +39,7 @@ class Subscriber: self.pending_acks = {} # Track messages awaiting delivery self.consumer = None + self.executor = None def __del__(self): @@ -45,15 +47,6 @@ class Subscriber: async def start(self): - # Create consumer via backend - self.consumer = await asyncio.to_thread( - self.backend.create_consumer, - topic=self.topic, - subscription=self.subscription, - schema=self.schema, - consumer_type='exclusive', - ) - self.task = asyncio.create_task(self.run()) async def stop(self): @@ -80,6 +73,21 @@ class Subscriber: try: + # Create consumer and dedicated thread if needed + # (first run or after failure) + if self.consumer is None: + self.executor = ThreadPoolExecutor(max_workers=1) + loop = asyncio.get_event_loop() + self.consumer = await loop.run_in_executor( + self.executor, + lambda: self.backend.create_consumer( + topic=self.topic, + subscription=self.subscription, + schema=self.schema, + consumer_type='exclusive', + ), + ) + if self.metrics: self.metrics.state("running") @@ -128,9 +136,12 @@ class Subscriber: # Process messages only if not draining if not self.draining: try: - msg = await asyncio.to_thread( - self.consumer.receive, - timeout_millis=250 + loop = asyncio.get_event_loop() + msg = await loop.run_in_executor( + self.executor, + lambda: self.consumer.receive( + timeout_millis=250 + ), ) except Exception as e: # Handle timeout from any backend @@ -172,15 +183,18 @@ class Subscriber: except Exception: pass # Already closed or error self.consumer = None - - + + if self.executor: + self.executor.shutdown(wait=False) + self.executor = None + if self.metrics: self.metrics.state("stopped") if not self.running and not self.draining: return - - # If handler drops out, sleep a retry + + # Sleep before retry await asyncio.sleep(1) async def subscribe(self, id): diff --git a/trustgraph-flow/trustgraph/agent/mcp_tool/service.py b/trustgraph-flow/trustgraph/agent/mcp_tool/service.py index 0bc5d7e3..c793f9ca 100755 --- a/trustgraph-flow/trustgraph/agent/mcp_tool/service.py +++ b/trustgraph-flow/trustgraph/agent/mcp_tool/service.py @@ -24,7 +24,7 @@ class Service(ToolService): **params ) - self.register_config_handler(self.on_mcp_config, types=["mcp-tool"]) + self.register_config_handler(self.on_mcp_config, types=["mcp"]) self.mcp_services = {} diff --git a/trustgraph-flow/trustgraph/cores/service.py b/trustgraph-flow/trustgraph/cores/service.py index 3bc4e9b6..d6390805 100755 --- a/trustgraph-flow/trustgraph/cores/service.py +++ b/trustgraph-flow/trustgraph/cores/service.py @@ -108,7 +108,7 @@ class Processor(AsyncProcessor): flow_config = self, ) - self.register_config_handler(self.on_knowledge_config, types=["kg-core"]) + self.register_config_handler(self.on_knowledge_config, types=["flow"]) self.flows = {} diff --git a/trustgraph-flow/trustgraph/librarian/service.py b/trustgraph-flow/trustgraph/librarian/service.py index 15cc97fa..c735a550 100755 --- a/trustgraph-flow/trustgraph/librarian/service.py +++ b/trustgraph-flow/trustgraph/librarian/service.py @@ -246,7 +246,10 @@ class Processor(AsyncProcessor): taskgroup = self.taskgroup, ) - self.register_config_handler(self.on_librarian_config, types=["librarian"]) + self.register_config_handler( + self.on_librarian_config, + types=["flow", "active-flow"], + ) self.flows = {} diff --git a/trustgraph-flow/trustgraph/metering/counter.py b/trustgraph-flow/trustgraph/metering/counter.py index f120a812..3e0b610c 100644 --- a/trustgraph-flow/trustgraph/metering/counter.py +++ b/trustgraph-flow/trustgraph/metering/counter.py @@ -40,7 +40,7 @@ class Processor(FlowProcessor): } ) - self.register_config_handler(self.on_cost_config, types=["token-costs"]) + self.register_config_handler(self.on_cost_config, types=["token-cost"]) self.register_specification( ConsumerSpec( From e899370d988487d916af16de43d34e4079aee8eb Mon Sep 17 00:00:00 2001 From: cybermaggedon Date: Tue, 7 Apr 2026 22:24:59 +0100 Subject: [PATCH 2/2] Update docs for 2.2 release (#766) - Update protocol specs - Update protocol docs - Update API specs --- docs/api-gateway-changes-v1.8-to-v2.1.md | 108 --- docs/api.html | 176 ++++- docs/cli-changes-v1.8-to-v2.1.md | 112 --- docs/python-api.md | 683 +++--------------- docs/websocket.html | 142 +++- .../schemas/agent/AgentResponse.yaml | 6 + .../schemas/rag/DocumentRagResponse.yaml | 5 + .../schemas/rag/GraphRagResponse.yaml | 5 + specs/api/openapi.yaml | 6 +- specs/api/paths/flow/agent.yaml | 17 + specs/api/paths/flow/document-rag.yaml | 22 +- specs/api/paths/flow/graph-rag.yaml | 22 +- specs/websocket/asyncapi.yaml | 4 +- .../components/messages/ServiceRequest.yaml | 1 + .../messages/requests/SparqlQueryRequest.yaml | 46 ++ 15 files changed, 488 insertions(+), 867 deletions(-) delete mode 100644 docs/api-gateway-changes-v1.8-to-v2.1.md delete mode 100644 docs/cli-changes-v1.8-to-v2.1.md create mode 100644 specs/websocket/components/messages/requests/SparqlQueryRequest.yaml diff --git a/docs/api-gateway-changes-v1.8-to-v2.1.md b/docs/api-gateway-changes-v1.8-to-v2.1.md deleted file mode 100644 index 099dadb0..00000000 --- a/docs/api-gateway-changes-v1.8-to-v2.1.md +++ /dev/null @@ -1,108 +0,0 @@ -# API Gateway Changes: v1.8 to v2.1 - -## Summary - -The API gateway gained new WebSocket service dispatchers for embeddings -queries, a new REST streaming endpoint for document content, and underwent -a significant wire format change from `Value` to `Term`. The "objects" -service was renamed to "rows". - ---- - -## New WebSocket Service Dispatchers - -These are new request/response services available through the WebSocket -multiplexer at `/api/v1/socket` (flow-scoped): - -| Service Key | Description | -|-------------|-------------| -| `document-embeddings` | Queries document chunks by text similarity. Request/response uses `DocumentEmbeddingsRequest`/`DocumentEmbeddingsResponse` schemas. | -| `row-embeddings` | Queries structured data rows by text similarity on indexed fields. Request/response uses `RowEmbeddingsRequest`/`RowEmbeddingsResponse` schemas. | - -These join the existing `graph-embeddings` dispatcher (which was already -present in v1.8 but may have been updated). - -### Full list of WebSocket flow service dispatchers (v2.1) - -Request/response services (via `/api/v1/flow/{flow}/service/{kind}` or -WebSocket mux): - -- `agent`, `text-completion`, `prompt`, `mcp-tool` -- `graph-rag`, `document-rag` -- `embeddings`, `graph-embeddings`, `document-embeddings` -- `triples`, `rows`, `nlp-query`, `structured-query`, `structured-diag` -- `row-embeddings` - ---- - -## New REST Endpoint - -| Method | Path | Description | -|--------|------|-------------| -| `GET` | `/api/v1/document-stream` | Streams document content from the library as raw bytes. Query parameters: `user` (required), `document-id` (required), `chunk-size` (optional, default 1MB). Returns the document content in chunked transfer encoding, decoded from base64 internally. | - ---- - -## Renamed Service: "objects" to "rows" - -| v1.8 | v2.1 | Notes | -|------|------|-------| -| `objects_query.py` / `ObjectsQueryRequestor` | `rows_query.py` / `RowsQueryRequestor` | Schema changed from `ObjectsQueryRequest`/`ObjectsQueryResponse` to `RowsQueryRequest`/`RowsQueryResponse`. | -| `objects_import.py` / `ObjectsImport` | `rows_import.py` / `RowsImport` | Import dispatcher for structured data. | - -The WebSocket service key changed from `"objects"` to `"rows"`, and the -import dispatcher key similarly changed from `"objects"` to `"rows"`. - ---- - -## Wire Format Change: Value to Term - -The serialization layer (`serialize.py`) was rewritten to use the new `Term` -type instead of the old `Value` type. - -### Old format (v1.8 — `Value`) - -```json -{"v": "http://example.org/entity", "e": true} -``` - -- `v`: the value (string) -- `e`: boolean flag indicating whether the value is a URI - -### New format (v2.1 — `Term`) - -IRIs: -```json -{"t": "i", "i": "http://example.org/entity"} -``` - -Literals: -```json -{"t": "l", "v": "some text", "d": "datatype-uri", "l": "en"} -``` - -Quoted triples (RDF-star): -```json -{"t": "r", "r": {"s": {...}, "p": {...}, "o": {...}}} -``` - -- `t`: type discriminator — `"i"` (IRI), `"l"` (literal), `"r"` (quoted triple), `"b"` (blank node) -- Serialization now delegates to `TermTranslator` and `TripleTranslator` from `trustgraph.messaging.translators.primitives` - -### Other serialization changes - -| Field | v1.8 | v2.1 | -|-------|------|------| -| Metadata | `metadata.metadata` (subgraph) | `metadata.root` (simple value) | -| Graph embeddings entity | `entity.vectors` (plural) | `entity.vector` (singular) | -| Document embeddings chunk | `chunk.vectors` + `chunk.chunk` (text) | `chunk.vector` + `chunk.chunk_id` (ID reference) | - ---- - -## Breaking Changes - -- **`Value` to `Term` wire format**: All clients sending/receiving triples, embeddings, or entity contexts through the gateway must update to the new Term format. -- **`objects` to `rows` rename**: WebSocket service key and import key changed. -- **Metadata field change**: `metadata.metadata` (a serialized subgraph) replaced by `metadata.root` (a simple value). -- **Embeddings field changes**: `vectors` (plural) became `vector` (singular); document embeddings now reference `chunk_id` instead of inline `chunk` text. -- **New `/api/v1/document-stream` endpoint**: Additive, not breaking. diff --git a/docs/api.html b/docs/api.html index 7cbddd32..2a03a38b 100644 --- a/docs/api.html +++ b/docs/api.html @@ -422,7 +422,7 @@ data-styled.g138[id="sc-iJQrDi"]{content:"gtHWGb,"}/*!sc*/ -

TrustGraph API Gateway (2.1)

Download OpenAPI specification:

TrustGraph API Gateway (2.2)

Download OpenAPI specification:

REST API for TrustGraph - an AI-powered knowledge graph and RAG system.

Overview

  • AI services: agent, text-completion, prompt, RAG (document/graph)
  • Embeddings: embeddings, graph-embeddings, document-embeddings
  • -
  • Query: triples, rows, nlp-query, structured-query, row-embeddings
  • +
  • Query: triples, rows, nlp-query, structured-query, sparql-query, row-embeddings
  • Data loading: text-load, document-load
  • Utilities: mcp-tool, structured-diag
  • @@ -784,11 +784,26 @@ for processing and handled asynchronously.

    Stop ongoing library document processing.

    list-processing

    List current processing tasks and their status.

    -
    Authorizations:
    bearerAuth
    Request Body schema: application/json
    required
    operation
    required
    string
    Enum: "add-document" "remove-document" "list-documents" "start-processing" "stop-processing" "list-processing"
    Authorizations:
    bearerAuth
    Request Body schema: application/json
    required
    operation
    required
    string
    Enum: "add-document" "remove-document" "list-documents" "get-document-metadata" "get-document-content" "stream-document" "add-child-document" "list-children" "begin-upload" "upload-chunk" "complete-upload" "abort-upload" "get-upload-status" "list-uploads" "start-processing" "stop-processing" "list-processing"

    Error response

    Request samples

    Content type
    application/json
    Example
    {
    • "question": "What is the capital of France?",
    • "user": "alice"
    }

    Response samples

    Content type
    application/json
    Example
    {
    • "chunk-type": "thought",
    • "content": "I need to search for information about quantum computing",
    • "end-of-message": false,
    • "end-of-dialog": false
    }

    Document RAG - retrieve and generate from documents

    http://localhost:8088/api/v1/flow/{flow}/service/agent

    Request samples

    Content type
    application/json
    Example
    {
    • "question": "What is the capital of France?",
    • "user": "alice"
    }

    Response samples

    Content type
    application/json
    Example
    {
    • "chunk-type": "thought",
    • "content": "I need to search for information about quantum computing",
    • "end-of-message": false,
    • "end-of-dialog": false
    }

    Document RAG - retrieve and generate from documents

    Streaming

    Enable streaming: true to receive the answer as it's generated:

      -
    • Multiple messages with response content
    • +
    • Multiple chunk messages with response content
    • +
    • explain messages with inline provenance triples (explain_triples)
    • Final message with end-of-stream: true
    • +
    • Session ends with end_of_session: true
    +

    Explain events carry explain_id, explain_graph, and explain_triples +inline in the stream, so no follow-up knowledge graph query is needed.

    Without streaming, returns complete answer in single response.

    Parameters

      @@ -1216,7 +1256,7 @@ Each step has: thought, action, arguments, observation.

      " class="sc-iKGpAq sc-cCYyou sc-cjERFZ dXXcln fTBBlJ dkmSdy">

      Error response

    Request samples

    Content type
    application/json
    Example
    {
    • "query": "What are the key findings in the research papers?",
    • "user": "alice",
    • "collection": "research"
    }

    Response samples

    Content type
    application/json
    Example
    {
    • "response": "The research papers present three key findings:\n1. Quantum entanglement exhibits non-local correlations\n2. Bell's inequality is violated in experimental tests\n3. Applications in quantum cryptography are promising\n",
    • "end-of-stream": false
    }

    Graph RAG - retrieve and generate from knowledge graph

    http://localhost:8088/api/v1/flow/{flow}/service/document-rag

    Request samples

    Content type
    application/json
    Example
    {
    • "query": "What are the key findings in the research papers?",
    • "user": "alice",
    • "collection": "research"
    }

    Response samples

    Content type
    application/json
    Example
    {
    • "response": "The research papers present three key findings:\n1. Quantum entanglement exhibits non-local correlations\n2. Bell's inequality is violated in experimental tests\n3. Applications in quantum cryptography are promising\n",
    • "end-of-stream": false
    }

    Graph RAG - retrieve and generate from knowledge graph

    Streaming

    Enable streaming: true to receive the answer as it's generated:

      -
    • Multiple messages with response content
    • +
    • Multiple chunk messages with response content
    • +
    • explain messages with inline provenance triples (explain_triples)
    • Final message with end-of-stream: true
    • +
    • Session ends with end_of_session: true
    +

    Explain events carry explain_id, explain_graph, and explain_triples +inline in the stream, so no follow-up knowledge graph query is needed.

    Without streaming, returns complete answer in single response.

    Parameters

    Control retrieval scope with multiple knobs:

    @@ -1332,7 +1380,7 @@ Each step has: thought, action, arguments, observation.

    " class="sc-iKGpAq sc-cCYyou sc-cjERFZ dXXcln fTBBlJ dkmSdy">

    Error response

    Request samples

    Content type
    application/json
    Example
    {
    • "query": "What connections exist between quantum physics and computer science?",
    • "user": "alice",
    • "collection": "research"
    }

    Response samples

    Content type
    application/json
    Example
    {
    • "response": "Quantum physics and computer science intersect primarily through quantum computing.\nThe knowledge graph shows connections through:\n- Quantum algorithms (Shor's algorithm, Grover's algorithm)\n- Quantum information theory\n- Computational complexity theory\n",
    • "end-of-stream": false
    }

    Text completion - direct LLM generation

    http://localhost:8088/api/v1/flow/{flow}/service/graph-rag

    Request samples

    Content type
    application/json
    Example
    {
    • "query": "What connections exist between quantum physics and computer science?",
    • "user": "alice",
    • "collection": "research"
    }

    Response samples

    Content type
    application/json
    Example
    {
    • "response": "Quantum physics and computer science intersect primarily through quantum computing.\nThe knowledge graph shows connections through:\n- Quantum algorithms (Shor's algorithm, Grover's algorithm)\n- Quantum information theory\n- Computational complexity theory\n",
    • "end-of-stream": false
    }

    Text completion - direct LLM generation

    Text Load Overview

    Fire-and-forget document loading:

      -
    • Input: Text content (base64 encoded)
    • +
    • Input: Text content (raw UTF-8 or base64 encoded)
    • Process: Chunk, embed, store
    • Output: None (202 Accepted)
    @@ -2762,7 +2815,12 @@ encoded <span class="token operator">=</span> base64<sp

    Pipeline runs asynchronously after request returns.

    Text Format

    -

    Text must be base64 encoded:

    +

    Text may be sent as raw UTF-8 text:

    +
    {
    +  "text": "Cancer survival: 2.74× higher hazard ratio"
    +}
    +
    +

    Older clients may still send base64 encoded text:

    text_content = "This is the document..."
     encoded = base64.b64encode(text_content.encode('utf-8'))
     
    @@ -2792,8 +2850,8 @@ encoded = base64
    Authorizations:
    bearerAuth
    path Parameters
    flow
    required
    string
    Example: my-flow

    Flow instance ID

    -
    Request Body schema: application/json
    required
    text
    required
    string <byte>

    Text content (base64 encoded)

    +
    Request Body schema: application/json
    required
    text
    required
    string

    Text content, either raw text or base64 encoded for compatibility with older clients

    id
    string

    Document identifier

    user
    string
    Default: "trustgraph"
    = base64

    Error response

    Request samples

    Content type
    application/json
    Example
    {
    • "text": "VGhpcyBpcyB0aGUgZG9jdW1lbnQgdGV4dC4uLg==",
    • "id": "doc-123",
    • "user": "alice",
    • "collection": "research"
    }

    Response samples

    Content type
    application/json
    { }

    Document Load - load binary documents (PDF, etc.)

    http://localhost:8088/api/v1/flow/{flow}/service/text-load

    Request samples

    Content type
    application/json
    Example
    {
    • "text": "This is the document text...",
    • "id": "doc-123",
    • "user": "alice",
    • "collection": "research"
    }

    Response samples

    Content type
    application/json
    { }

    Document Load - load binary documents (PDF, etc.)

    = base64

    Error response

    Request samples

    Content type
    application/json
    Example
    {
    • "data": "JVBERi0xLjQKJeLjz9MKMSAwIG9iago8PC9UeXBlL0NhdGFsb2cvUGFnZXMgMiAwIFI+PmVuZG9iagoyIDAgb2JqCjw8L1R5cGUvUGFnZXMvS2lkc1szIDAgUl0vQ291bnQgMT4+ZW5kb2JqCg==",
    • "id": "doc-789",
    • "user": "alice",
    • "collection": "research"
    }

    Response samples

    Content type
    application/json
    { }

    Import/Export

    http://localhost:8088/api/v1/flow/{flow}/service/document-load

    Request samples

    Content type
    application/json
    Example
    {
    • "data": "JVBERi0xLjQKJeLjz9MKMSAwIG9iago8PC9UeXBlL0NhdGFsb2cvUGFnZXMgMiAwIFI+PmVuZG9iagoyIDAgb2JqCjw8L1R5cGUvUGFnZXMvS2lkc1szIDAgUl0vQ291bnQgMT4+ZW5kb2JqCg==",
    • "id": "doc-789",
    • "user": "alice",
    • "collection": "research"
    }

    Response samples

    Content type
    application/json
    { }

    SPARQL query - execute SPARQL 1.1 queries against the knowledge graph

    Execute a SPARQL 1.1 query against the knowledge graph.

    +

    Supported Query Types

    +
      +
    • SELECT: Returns variable bindings as a table of results
    • +
    • ASK: Returns true/false for existence checks
    • +
    • CONSTRUCT: Returns a set of triples built from a template
    • +
    • DESCRIBE: Returns triples describing matched resources
    • +
    +

    SPARQL Features

    +

    Supports standard SPARQL 1.1 features including:

    +
      +
    • Basic Graph Patterns (BGPs) with triple pattern matching
    • +
    • OPTIONAL, UNION, FILTER
    • +
    • BIND, VALUES
    • +
    • ORDER BY, LIMIT, OFFSET, DISTINCT
    • +
    • GROUP BY with aggregates (COUNT, SUM, AVG, MIN, MAX, GROUP_CONCAT)
    • +
    • Built-in functions (isIRI, STR, REGEX, CONTAINS, etc.)
    • +
    +

    Query Examples

    +

    Find all entities of a type:

    +
    SELECT ?s ?label WHERE {
    +  ?s <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <http://example.com/Person> .
    +  ?s <http://www.w3.org/2000/01/rdf-schema#label> ?label .
    +}
    +LIMIT 10
    +
    +

    Check if an entity exists:

    +
    ASK { <http://example.com/alice> ?p ?o }
    +
    +
    Authorizations:
    bearerAuth
    path Parameters
    flow
    required
    string
    Example: my-flow

    Flow instance ID

    +
    Request Body schema: application/json
    required
    query
    required
    string

    SPARQL 1.1 query string

    +
    user
    string
    Default: "trustgraph"

    User/keyspace identifier

    +
    collection
    string
    Default: "default"

    Collection identifier

    +
    limit
    integer
    Default: 10000

    Safety limit on number of results

    +

    Responses

    Request samples

    Content type
    application/json
    Example
    {
    • "query": "SELECT ?s ?p ?o WHERE { ?s ?p ?o } LIMIT 10",
    • "user": "trustgraph",
    • "collection": "default"
    }

    Response samples

    Content type
    application/json
    Example
    {}

    Import/Export

    Bulk data import and export

    Stream document content from library

    Local development server

    http://localhost:8088/api/metrics/{path}

    Response samples

    Content type
    application/json
    {
    • "error": "Unauthorized"
    }