diff --git a/.github/workflows/pull-request.yaml b/.github/workflows/pull-request.yaml index dcf93946..88d2b79e 100644 --- a/.github/workflows/pull-request.yaml +++ b/.github/workflows/pull-request.yaml @@ -22,7 +22,7 @@ jobs: uses: actions/checkout@v3 - name: Setup packages - run: make update-package-versions VERSION=2.6.999 + run: make update-package-versions VERSION=2.5.999 - name: Setup environment run: python3 -m venv env diff --git a/README.md b/README.md index 38d1c39b..c366a3d9 100644 --- a/README.md +++ b/README.md @@ -3,7 +3,7 @@ -[![PyPI version](https://img.shields.io/pypi/v/trustgraph.svg)](https://pypi.org/project/trustgraph/) ![License](https://img.shields.io/badge/license-Apache%202.0-blue) ![E2E Tests](https://github.com/trustgraph-ai/trustgraph/actions/workflows/release.yaml/badge.svg) +[![PyPI version](https://img.shields.io/pypi/v/trustgraph.svg)](https://pypi.org/project/trustgraph/) [![License](https://img.shields.io/github/license/trustgraph-ai/trustgraph?color=blue)](LICENSE) ![E2E Tests](https://github.com/trustgraph-ai/trustgraph/actions/workflows/release.yaml/badge.svg) [![Discord](https://img.shields.io/discord/1251652173201149994 )](https://discord.gg/sQMwkRz5GX) [![Ask DeepWiki](https://deepwiki.com/badge.svg)](https://deepwiki.com/trustgraph-ai/trustgraph) @@ -11,89 +11,44 @@ trustgraph-ai%2Ftrustgraph | Trendshift -# Write context once. Run agents anywhere. +# The agent runtime platform -Stop rebuilding context from scratch. TrustGraph treats context as a holon — a modular, independent whole that naturally snaps into a larger domain-wide intelligence layer. By deploying context as holonic context graphs, TrustGraph powers multi-tenant agent workflows, dramatically reduces token consumption, and aligns with semantic web standards (RDF, OWL, SKOS, SHACL). Version your context, share it across teams, and scale with full provenance. +TrustGraph is an agent runtime platform built around context graphs — structured, queryable representations of your domain knowledge that ground every agent query in verified, explainable facts in private deployments with sovereign control. The platform is the full stack for agentic systems: context graphs, memory, retrieval, orchestration, and inference for precision-critical agent workloads. -## What TrustGraph Does - -TrustGraph is a complete holonic context harness for all LLMs. It provides the full infrastructure layer underneath your agents: knowledge ingestion, structured storage, graph-grounded retrieval, agent orchestration, and a full LLM inferencing stack. - -TrustGraph relies on absolutely no 3rd party services aside from optional API integrations to cloud-hosted LLMs. Whether you are using Anthropic's or OpenAI's API, or self-hosting Qwen3.7 via vLLM, TrustGraph handles it all with pre-built API connectors and a full LLM inferencing stack to enrich the models with a sovereign, private holonic system that grounds your agents in reality. - -## The Problem: Why Agents Break - -When you build an AI agent today, you spend most of your time fighting context: - -- **RAG retrieves fragments, not meaning**. Chunks of text have no structure. Relationships between facts are invisible. Your agent guesses at the connections. - -- **Context is disposable**. What the agent learned in one session is gone in the next. There is no persistent, structured knowledge layer underneath. - -- **Answers aren't traceable**. You can't explain why the agent said what it said, which means you can't trust it in production. - -- **Knowledge can't be reused**. You rebuild the same context pipelines for every new project, every new agent, every new environment. - -These aren't retrieval problems. They are structural problems. Context needs to be organized, versioned, and composable — exactly the way software infrastructure is. - -## The Solution: A Holonic Context System -The philosopher Arthur Koestler coined the word [holon](https://en.wikipedia.org/wiki/Holon_(philosophy)) to describe something that is simultaneously a whole in itself and a part of something larger. A fact is whole. It is also part of a domain. A domain is whole. It is also part of an organization's knowledge. - -AI agents break down because this holonic structure is never built. Context gets shoved into flat text windows, scattered across vector stores, or hardwired into one-off prompts. Facts lose their relationships. - -TrustGraph solves this by organizing your domain into holonic context graphs. Entities, relationships, and evidence are treated as first-class objects. Every agent query is grounded against these holons—marrying symbolic graph structures with vector embeddings. Every answer carries provenance. Every fact is traceable. - -## Context Cores: Knowledge as a First-Class Citizen - -A Context Core is the deployable unit of knowledge in TrustGraph. It packages everything an agent needs to reason reliably over a domain into a single, portable artifact. - -### What's inside a Context Core -- **Ontology** — your domain schema and entity mappings -- **Holon** — entities, relationships, and supporting evidence -- **Embeddings** — vector indexes for fast semantic entry-point lookup -- **Provenance** — where every fact came from, when, and how it was derived -- **Retrieval policies** — traversal rules, freshness controls, authority ranking - -Context Cores decouple what agents know from how agents are deployed. Build once. Run in Docker locally, Kubernetes in production, or on any cloud. Pin a version. Roll back. Promote across environments. This is context engineering — and it works because knowledge is finally treated like the infrastructure it is. - -## Explainability: Trust Your Agents in Production -LLMs are black boxes, and traditional RAG makes it worse. When an agent pulls flat text chunks from a vector store, you have no idea how it connected those fragments to form an answer. You cannot ship agents to production if you can't explain why they said what they said. - -### How TrustGraph makes agents explainable: - -- **Traceable Reasoning Paths**: Instead of guessing at connections between text chunks, TrustGraph traverses explicit relationship paths in the holonic context graph. You can inspect exactly which entities, relationships, and sub-graphs were pulled into the LLM's context window to generate a given response. -- **Fact-Level Provenance**: Every node and edge in the graph carries strict provenance. When an agent makes a claim, you can trace it back to the exact source document, the time it was ingested, and the extraction method used to derive it. -- **No Black-Box Guesses**: By grounding the LLM in a structured, symbolic graph, you eliminate the hallucinations that occur when models are forced to infer relationships from unstructured text. If a fact isn't in the graph, the agent doesn't use it. - -TrustGraph doesn't just give you answers - it gives you the receipt. Every fact is traceable, every connection is visible, and every output is verifiable. - -## Workspaces, Collections, and Flows - -TrustGraph has a [three-level system](https://docs.trustgraph.ai/overview/workspaces) for organizing and isolating knowledge. - -A `Workspace` is the outermost boundary — a fully isolated tenancy scope where all data, users, configuration, and pipelines live independently from every other workspace. Isolation is structural: enforced at the pub/sub queue, storage, and API gateway layers, not by trusting a field in a message body. - -Within a workspace, a `Collection` groups related holons, graph structures, embeddings, and documents together — think of it as a dedicated shelf in a library, scoped to a specific domain, project, or customer. - -A `Flow` is a running data processing pipeline that defines how raw data moves through ingestion, extraction, structuring, and storage — the assembly line that turns documents into queryable knowledge. Together, the three layers let you run multiple isolated tenants on a single deployment, separate knowledge by domain within each tenant, and process that knowledge through fully configurable pipelines — all without restarting the system or rebuilding your infrastructure. - -## The Full Stack -TrustGraph is not a wrapper around a graph database. It is the complete backend for production agentic systems. - -- **Holonic context graph engine**: automated entity and relationship extraction, ontology-driven graph construction, graph-grounded retrieval for explainable outputs -- **Multi-model database**: tabular/relational, key-value, document, graph, vectors, images, video, and audio — all managed in Cassandra and S3-compatible Garage -- **Out-of-the-box RAG pipelines**: DocumentRAG, GraphRAG, and OntologyRAG ready to deploy -- **Fully agentic orchestration**: single or multi-agent, ReAct, Plan-then-Execute, Supervisor patterns, and MCP integration -- **3D Knowledge Explorer**: interactive graph visualization with BFS neighborhood extraction and edge pulse animation -- **Automated data ingest**: quick ingest with semantic similarity or ontology-structured precision retrieval -- **Run anywhere**: Docker/Podman locally, Kubernetes in the cloud - -All major LLMs — Anthropic, Cohere, Gemini, Mistral, OpenAI, and more via API. - -vLLM, Ollama, TGI, LM Studio, and Llamafiles for fully local inferencing. - -Verified cloud deployments for Alibaba Cloud, AWS, Azure, GCP, OVHcloud, and Scaleway. +The platform: +- [x] Multi-model and multimodal database system + - [x] Tabular/relational, key-value + - [x] Document, graph, and vectors + - [x] Images, video, and audio +- [x] Context Graph engine + - [x] Automated entity and relationship extraction + - [x] Ontology-driven graph construction + - [x] Graph-grounded retrieval for explainable outputs +- [x] Automated data ingest and loading + - [x] Quick ingest with semantic similarity retrieval + - [x] Ontology structuring for precision retrieval +- [x] Out-of-the-box RAG pipelines + - [x] DocumentRAG + - [x] GraphRAG + - [x] OntologyRAG +- [x] 3D GraphViz for exploring context +- [x] Fully Agentic System + - [x] Single or Multi Agent + - [x] ReAct, Plan-then-Execute, and Supervisor patterns + - [x] MCP integration +- [x] Run anywhere + - [x] Deploy locally with Docker + - [x] Deploy in cloud with Kubernetes +- [x] Support for all major LLMs + - [x] API support for Anthropic, Cohere, Gemini, Mistral, OpenAI, and others + - [x] Model inferencing with vLLM, Ollama, TGI, LM Studio, and Llamafiles +- [x] Developer friendly + - [x] REST API [Docs](https://docs.trustgraph.ai/reference/apis/rest.html) + - [x] Websocket API [Docs](https://docs.trustgraph.ai/reference/apis/websocket.html) + - [x] Python API [Docs](https://docs.trustgraph.ai/reference/apis/python) + - [x] CLI [Docs](https://docs.trustgraph.ai/reference/cli/) ## No API Keys Required @@ -107,12 +62,12 @@ Everything else is included. - [x] Managed Multi-model storage in [Cassandra](https://cassandra.apache.org/_/index.html) - [x] Managed Vector embedding storage in [Qdrant](https://github.com/qdrant/qdrant) - [x] Managed File and Object storage in [Garage](https://github.com/deuxfleurs-org/garage) (S3 compatible) -- [x] Managed High-speed Pub/Sub messaging fabric with [Pulsar](https://github.com/apache/pulsar) or [RabbitMQ](https://www.rabbitmq.com/) +- [x] Managed High-speed Pub/Sub messaging fabric with [Pulsar](https://github.com/apache/pulsar) - [x] Complete LLM inferencing stack for open LLMs with [vLLM](https://github.com/vllm-project/vllm), [TGI](https://github.com/huggingface/text-generation-inference), [Ollama](https://github.com/ollama/ollama), [LM Studio](https://github.com/lmstudio-ai), and [Llamafiles](https://github.com/mozilla-ai/llamafile) ## Quickstart -No need to clone the repo unless you are building from source. TrustGraph deploys as a set of Docker containers. Configure it on the command line in one step: +There's no need to clone this repo, unless you want to build from source. TrustGraph is a fully containerized app that deploys as a set of Docker containers. To configure TrustGraph on the command line: ``` npx @trustgraph/config @@ -123,39 +78,44 @@ The config process will generate an app config that can be run locally with Dock - Deployment instructions as `INSTALLATION.md`

-

For a browser based configuration, try the [Configuration Terminal](https://config-ui.demo.trustgraph.ai/). -## Watch What is a Holonic Context Graph? +## Watch What is a Context Graph? [![What is a Context Graph?](https://img.youtube.com/vi/gZjlt5WcWB4/maxresdefault.jpg)](https://www.youtube.com/watch?v=gZjlt5WcWB4) -## Watch Holonic Context Graphs in Action +## Watch Context Graphs in Action [![Context Graphs in Action with TrustGraph](https://img.youtube.com/vi/sWc7mkhITIo/maxresdefault.jpg)](https://www.youtube.com/watch?v=sWc7mkhITIo) ## Getting Started with TrustGraph - [**Getting Started Guides**](https://docs.trustgraph.ai/getting-started) +- [**Using the Workbench**](#workbench) - [**Developer APIs and CLI**](https://docs.trustgraph.ai/reference) - [**Deployment Guides**](https://docs.trustgraph.ai/deployment) -## TrustGraph UI +## Workbench -Image +The **Workbench** provides tools for all major features of TrustGraph. The **Workbench** is on port `8888` by default. -The UI provides tools for all major features of TrustGraph. The UI deploys on port `8888` by default. - -- **Agent Console** — Query your agents directly with streaming responses and live explainability event tracking, so you can watch reasoning unfold in real time -- **GraphRAG View** — Interactive graph RAG queries with a visual explainability DAG and inline provenance display, making it easy to see exactly where answers came from -- **Context Explorer** — An interactive 3D context graph explorer with dynamic graph loading, BFS neighborhood extraction, edge pulse animation, and multiple navigation views -- **Document Ingestion** — A complete upload and submission workflow with page and chunk inspection and document structure browsing -- **Ontology Workbench** — A full ontology editor with class and property trees, OWL/XML and Turtle import/export with round-trip fidelity, circular dependency detection, and safe-delete confirmation dialogs -- **Schema Workbench** — Interactive schema management with list, create, edit, and delete operations including field and index management -- **Prompt Editor** — A dedicated prompt editing workflow +- **Vector Search**: Search the installed knowledge bases +- **Agentic, GraphRAG and LLM Chat**: Chat interface for agents, GraphRAG queries, or direct to LLMs +- **Relationships**: Analyze deep relationships in the installed knowledge bases +- **Graph Visualizer**: 3D GraphViz of the installed knowledge bases +- **Library**: Staging area for installing knowledge bases +- **Flow Classes**: Workflow preset configurations +- **Flows**: Create custom workflows and adjust LLM parameters during runtime +- **Knowledge Cores**: Manage resuable knowledge bases +- **Prompts**: Manage and adjust prompts during runtime +- **Schemas**: Define custom schemas for structured data knowledge bases +- **Ontologies**: Define custom ontologies for unstructured data knowledge bases +- **Agent Tools**: Define tools with collections, knowledge cores, MCP connections, and tool groups +- **MCP Tools**: Connect to MCP servers ## TypeScript Library for UIs @@ -165,6 +125,134 @@ There are 3 libraries for quick UI integration of TrustGraph services. - [@trustgraph/react-state](https://www.npmjs.com/package/@trustgraph/react-state) - [@trustgraph/react-provider](https://www.npmjs.com/package/@trustgraph/react-provider) +## Context Cores + +Context Cores are how TrustGraph treats context like code. A Context Core is a **portable, versioned bundle of context** that you can ship between projects and environments, pin in production, and reuse across agents. It packages the “stuff agents need to know” (structured knowledge + embeddings + evidence + policies) into a single artifact, so you can treat context like code: build it, test it, version it, promote it, and roll it back. TrustGraph is built to support this kind of end-to-end context engineering and orchestration workflow. + +### What’s inside a Context Core +A Context Core typically includes: +- Ontology (your domain schema) and mappings +- Context Graph (entities, relationships, supporting evidence) +- Embeddings / vector indexes for fast semantic entry-point lookup +- Source manifests + provenance (where facts came from, when, and how they were derived) +- Retrieval policies (traversal rules, freshness, authority ranking) + +## Tech Stack +TrustGraph provides component flexibility to optimize agent workflows. + +
+LLM APIs +
+ +- Anthropic
+- AWS Bedrock
+- AzureAI
+- AzureOpenAI
+- Cohere
+- Google AI Studio
+- Google VertexAI
+- Mistral
+- OpenAI
+ +
+
+LLM Orchestration +
+ +- LM Studio
+- Llamafiles
+- Ollama
+- TGI
+- vLLM
+ +
+
+Multi-model storage +
+ +- Apache Cassandra
+ +
+
+VectorDB +
+ +- Qdrant
+ +
+
+File and Object Storage +
+ +- Garage
+ +
+
+Observability +
+ +- Prometheus
+- Grafana
+- Loki
+ +
+
+Data Streaming +
+ +- Apache Pulsar
+- RabbitMQ
+- Apache Kafka
+ +
+
+Clouds +
+ +- AWS
+- Azure
+- Google Cloud
+- OVHcloud
+- Scaleway
+ +
+ +## Observability & Telemetry + +Once the platform is running, access the Grafana dashboard at: + +``` +http://localhost:3000 +``` + +Default credentials are: + +``` +user: admin +password: admin +``` + +The default Grafana dashboard tracks the following: + +
+Telemetry +
+ +- LLM Latency
+- Error Rate
+- Service Request Rates
+- Queue Backlogs
+- Chunking Histogram
+- Error Source by Service
+- Rate Limit Events
+- CPU usage by Service
+- Memory usage by Service
+- Models Deployed
+- Token Throughput (Tokens/second)
+- Cost Throughput (Cost/second)
+ +
+ ## Contributing [Developer's Guide](https://docs.trustgraph.ai/guides/building/introduction.html) @@ -173,7 +261,7 @@ There are 3 libraries for quick UI integration of TrustGraph services. **TrustGraph** is licensed under [Apache 2.0](https://www.apache.org/licenses/LICENSE-2.0). - Copyright 2024-2026 TrustGraph + Copyright 2024-2025 TrustGraph Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. diff --git a/tests/unit/test_decoding/test_pdf_decoder.py b/tests/unit/test_decoding/test_pdf_decoder.py index 641a9d78..04807b20 100644 --- a/tests/unit/test_decoding/test_pdf_decoder.py +++ b/tests/unit/test_decoding/test_pdf_decoder.py @@ -49,7 +49,7 @@ class TestPdfDecoderProcessor(IsolatedAsyncioTestCase): async def test_on_message_success(self, mock_pdf_loader_class, mock_producer, mock_consumer): """Test successful PDF processing""" # Mock PDF content - pdf_content = b"%PDF-1.7\nfake pdf content" + pdf_content = b"fake pdf content" pdf_base64 = base64.b64encode(pdf_content).decode('utf-8') # Mock PyPDFLoader @@ -88,55 +88,13 @@ class TestPdfDecoderProcessor(IsolatedAsyncioTestCase): # Verify triples were sent for each page (provenance) assert mock_triples_flow.send.call_count == 2 - @patch('trustgraph.base.librarian_client.Consumer') - @patch('trustgraph.base.librarian_client.Producer') - @patch('trustgraph.decoding.pdf.pdf_decoder.PyPDFLoader') - @patch('trustgraph.base.async_processor.AsyncProcessor', MockAsyncProcessor) - async def test_on_message_rejects_librarian_content_that_is_not_pdf(self, mock_pdf_loader_class, mock_producer, mock_consumer): - """Test rejecting non-PDF content before invoking the PDF loader""" - html_content = b"Not found" - html_base64 = base64.b64encode(html_content) - - mock_metadata = Metadata(id="test-doc") - mock_document = Document(metadata=mock_metadata, document_id="doc-123") - mock_msg = MagicMock() - mock_msg.value.return_value = mock_document - - mock_output_flow = AsyncMock() - mock_triples_flow = AsyncMock() - mock_flow = MagicMock(side_effect=lambda name: { - "output": mock_output_flow, - "triples": mock_triples_flow, - }.get(name)) - mock_flow.librarian.fetch_document_metadata = AsyncMock( - return_value=MagicMock(kind="application/pdf") - ) - mock_flow.librarian.fetch_document_content = AsyncMock( - return_value=html_base64 - ) - mock_flow.librarian.save_child_document = AsyncMock() - - config = { - 'id': 'test-pdf-decoder', - 'taskgroup': AsyncMock() - } - - processor = Processor(**config) - - await processor.on_message(mock_msg, None, mock_flow) - - mock_pdf_loader_class.assert_not_called() - mock_output_flow.send.assert_not_called() - mock_triples_flow.send.assert_not_called() - mock_flow.librarian.save_child_document.assert_not_called() - @patch('trustgraph.base.librarian_client.Consumer') @patch('trustgraph.base.librarian_client.Producer') @patch('trustgraph.decoding.pdf.pdf_decoder.PyPDFLoader') @patch('trustgraph.base.async_processor.AsyncProcessor', MockAsyncProcessor) async def test_on_message_empty_pdf(self, mock_pdf_loader_class, mock_producer, mock_consumer): """Test handling of empty PDF""" - pdf_content = b"%PDF-1.7\nfake pdf content" + pdf_content = b"fake pdf content" pdf_base64 = base64.b64encode(pdf_content).decode('utf-8') mock_loader = MagicMock() @@ -168,7 +126,7 @@ class TestPdfDecoderProcessor(IsolatedAsyncioTestCase): @patch('trustgraph.base.async_processor.AsyncProcessor', MockAsyncProcessor) async def test_on_message_unicode_content(self, mock_pdf_loader_class, mock_producer, mock_consumer): """Test handling of unicode content in PDF""" - pdf_content = b"%PDF-1.7\nfake pdf content" + pdf_content = b"fake pdf content" pdf_base64 = base64.b64encode(pdf_content).decode('utf-8') mock_loader = MagicMock() diff --git a/tests/unit/test_query/test_rows_cassandra_query.py b/tests/unit/test_query/test_rows_cassandra_query.py index fb385f43..b61500a4 100644 --- a/tests/unit/test_query/test_rows_cassandra_query.py +++ b/tests/unit/test_query/test_rows_cassandra_query.py @@ -333,8 +333,8 @@ class TestUnifiedTableQueries: """Test queries against the unified rows table""" @pytest.mark.asyncio - @patch('trustgraph.query.rows.cassandra.service.async_execute_paged', new_callable=AsyncMock) - async def test_query_with_index_match(self, mock_async_execute_paged): + @patch('trustgraph.query.rows.cassandra.service.async_execute', new_callable=AsyncMock) + async def test_query_with_index_match(self, mock_async_execute): """Test query execution with matching index""" processor = MagicMock() processor.session = MagicMock() @@ -344,10 +344,10 @@ class TestUnifiedTableQueries: processor.find_matching_index = Processor.find_matching_index.__get__(processor, Processor) processor.query_cassandra = Processor.query_cassandra.__get__(processor, Processor) - # Mock async_execute_paged to return test data (list of pages) + # Mock async_execute to return test data mock_row = MagicMock() mock_row.data = {"id": "123", "name": "Test Product", "category": "electronics"} - mock_async_execute_paged.return_value = [[mock_row]] + mock_async_execute.return_value = [mock_row] schema = RowSchema( name="products", @@ -370,10 +370,10 @@ class TestUnifiedTableQueries: # Verify Cassandra was connected and queried processor.connect_cassandra.assert_called_once() - mock_async_execute_paged.assert_called_once() + mock_async_execute.assert_called_once() # Verify query structure - should query unified rows table - call_args = mock_async_execute_paged.call_args + call_args = mock_async_execute.call_args query = call_args[0][1] params = call_args[0][2] @@ -394,8 +394,8 @@ class TestUnifiedTableQueries: assert results[0]["category"] == "electronics" @pytest.mark.asyncio - @patch('trustgraph.query.rows.cassandra.service.async_scan', new_callable=AsyncMock) - async def test_query_without_index_match(self, mock_async_scan): + @patch('trustgraph.query.rows.cassandra.service.async_execute', new_callable=AsyncMock) + async def test_query_without_index_match(self, mock_async_execute): """Test query execution without matching index (scan mode)""" processor = MagicMock() processor.session = MagicMock() @@ -406,10 +406,12 @@ class TestUnifiedTableQueries: processor._matches_filters = Processor._matches_filters.__get__(processor, Processor) processor.query_cassandra = Processor.query_cassandra.__get__(processor, Processor) - # Mock async_scan to return filtered test data + # Mock async_execute to return test data mock_row1 = MagicMock() mock_row1.data = {"id": "1", "name": "Product A", "price": "100"} - mock_async_scan.return_value = [mock_row1] + mock_row2 = MagicMock() + mock_row2.data = {"id": "2", "name": "Product B", "price": "200"} + mock_async_execute.return_value = [mock_row1, mock_row2] schema = RowSchema( name="products", @@ -430,16 +432,13 @@ class TestUnifiedTableQueries: limit=10 ) - # Verify async_scan was called - mock_async_scan.assert_called_once() - - # Verify query structure - call_args = mock_async_scan.call_args + # Query should use ALLOW FILTERING for scan + call_args = mock_async_execute.call_args query = call_args[0][1] assert "ALLOW FILTERING" in query - # Should return filtered results + # Should post-filter results assert len(results) == 1 assert results[0]["name"] == "Product A" diff --git a/trustgraph-base/trustgraph/messaging/translators/iam.py b/trustgraph-base/trustgraph/messaging/translators/iam.py index 9e16d3df..1d7bf21c 100644 --- a/trustgraph-base/trustgraph/messaging/translators/iam.py +++ b/trustgraph-base/trustgraph/messaging/translators/iam.py @@ -5,7 +5,6 @@ from ...schema import ( UserInput, UserRecord, WorkspaceInput, WorkspaceRecord, ApiKeyInput, ApiKeyRecord, - GroupInput, GrantInput, ) from .base import MessageTranslator @@ -44,25 +43,6 @@ def _api_key_input_from_dict(d): ) -def _group_input_from_dict(d): - if d is None: - return None - return GroupInput( - name=d.get("name", ""), - description=d.get("description", ""), - enabled=d.get("enabled", True), - ) - - -def _grant_input_from_dict(d): - if d is None: - return None - return GrantInput( - capability=d.get("capability", ""), - workspace=d.get("workspace", ""), - ) - - def _user_record_to_dict(r): if r is None: return None @@ -122,15 +102,6 @@ class IamRequestTranslator(MessageTranslator): data.get("workspace_record") ), key=_api_key_input_from_dict(data.get("key")), - group_id=data.get("group_id", ""), - member_type=data.get("member_type", ""), - member_id=data.get("member_id", ""), - group=_group_input_from_dict(data.get("group")), - grant=_grant_input_from_dict(data.get("grant")), - capability=data.get("capability", ""), - resource_json=data.get("resource_json", ""), - parameters_json=data.get("parameters_json", ""), - authorise_checks=data.get("authorise_checks", ""), ) def encode(self, obj: IamRequest) -> Dict[str, Any]: @@ -138,9 +109,6 @@ class IamRequestTranslator(MessageTranslator): for fname in ( "workspace", "actor", "user_id", "username", "key_id", "api_key", "password", "new_password", - "group_id", "member_type", "member_id", - "capability", "resource_json", "parameters_json", - "authorise_checks", ): v = getattr(obj, fname, "") if v: @@ -167,17 +135,6 @@ class IamRequestTranslator(MessageTranslator): "name": obj.key.name, "expires": obj.key.expires, } - if obj.group is not None: - result["group"] = { - "name": obj.group.name, - "description": obj.group.description, - "enabled": obj.group.enabled, - } - if obj.grant is not None: - result["grant"] = { - "capability": obj.grant.capability, - "workspace": obj.grant.workspace, - } return result @@ -233,23 +190,6 @@ class IamResponseTranslator(MessageTranslator): # setup, so it can't be dropped by a truthy-only filter. result["bootstrap_available"] = bool(obj.bootstrap_available) - # authorise / authorise-many outputs. - if obj.decision_allow: - result["decision_allow"] = obj.decision_allow - if obj.decision_ttl_seconds: - result["decision_ttl_seconds"] = obj.decision_ttl_seconds - if obj.decisions_json: - result["decisions_json"] = obj.decisions_json - - # Enterprise IAM outputs. - for fname in ( - "group_json", "groups_json", "members_json", - "grants_json", "effective_permissions_json", - ): - v = getattr(obj, fname, "") - if v: - result[fname] = v - return result def encode_with_completion( diff --git a/trustgraph-base/trustgraph/schema/services/iam.py b/trustgraph-base/trustgraph/schema/services/iam.py index 6a657d62..797d6203 100644 --- a/trustgraph-base/trustgraph/schema/services/iam.py +++ b/trustgraph-base/trustgraph/schema/services/iam.py @@ -74,21 +74,6 @@ class ApiKeyRecord: last_used: str = "" -# ---- Enterprise IAM types (additive) ---- - -@dataclass -class GroupInput: - name: str = "" - description: str = "" - enabled: bool = True - - -@dataclass -class GrantInput: - capability: str = "" - workspace: str = "" - - @dataclass class IamRequest: operation: str = "" @@ -114,13 +99,6 @@ class IamRequest: workspace_record: WorkspaceInput | None = None key: ApiKeyInput | None = None - # ---- Enterprise IAM inputs (additive) ---- - group_id: str = "" - member_type: str = "" - member_id: str = "" - group: GroupInput | None = None - grant: GrantInput | None = None - # ---- authorise / authorise-many inputs ---- # Capability string from the vocabulary in capabilities.md. capability: str = "" @@ -186,14 +164,6 @@ class IamResponse: # authorise_checks. decisions_json: str = "" - # ---- Enterprise IAM outputs (additive) ---- - # JSON-serialised payloads for enterprise group/grant operations. - group_json: str = "" - groups_json: str = "" - members_json: str = "" - grants_json: str = "" - effective_permissions_json: str = "" - error: Error | None = None diff --git a/trustgraph-bedrock/pyproject.toml b/trustgraph-bedrock/pyproject.toml index 2dc724b0..7aa2f96a 100644 --- a/trustgraph-bedrock/pyproject.toml +++ b/trustgraph-bedrock/pyproject.toml @@ -10,7 +10,7 @@ description = "TrustGraph provides a means to run a pipeline of flexible AI proc readme = "README.md" requires-python = ">=3.8" dependencies = [ - "trustgraph-base>=2.6,<2.7", + "trustgraph-base>=2.5,<2.6", "pulsar-client", "prometheus-client", "boto3", diff --git a/trustgraph-cli/pyproject.toml b/trustgraph-cli/pyproject.toml index 006a07f4..16b0ae0a 100644 --- a/trustgraph-cli/pyproject.toml +++ b/trustgraph-cli/pyproject.toml @@ -10,7 +10,7 @@ description = "TrustGraph provides a means to run a pipeline of flexible AI proc readme = "README.md" requires-python = ">=3.8" dependencies = [ - "trustgraph-base>=2.6,<2.7", + "trustgraph-base>=2.5,<2.6", "requests", "pulsar-client", "aiohttp", diff --git a/trustgraph-cli/trustgraph/cli/load_structured_data.py b/trustgraph-cli/trustgraph/cli/load_structured_data.py index 5649a5ae..dccf548e 100644 --- a/trustgraph-cli/trustgraph/cli/load_structured_data.py +++ b/trustgraph-cli/trustgraph/cli/load_structured_data.py @@ -78,7 +78,7 @@ def load_structured_data( logger.info("Step 1: Analyzing data to discover best matching schema...") # Step 1: Auto-discover schema (reuse discover_schema logic) - discovered_schema = _auto_discover_schema(api_url, input_file, sample_chars, flow, logger, token=token, workspace=workspace) + discovered_schema = _auto_discover_schema(api_url, input_file, sample_chars, flow, logger, workspace=workspace) if not discovered_schema: logger.error("Failed to discover suitable schema automatically") print("❌ Could not automatically determine the best schema for your data.") @@ -90,7 +90,7 @@ def load_structured_data( # Step 2: Auto-generate descriptor logger.info("Step 2: Generating descriptor configuration...") - auto_descriptor = _auto_generate_descriptor(api_url, input_file, discovered_schema, sample_chars, flow, logger, token=token, workspace=workspace) + auto_descriptor = _auto_generate_descriptor(api_url, input_file, discovered_schema, sample_chars, flow, logger, workspace=workspace) if not auto_descriptor: logger.error("Failed to generate descriptor automatically") print("❌ Could not automatically generate descriptor configuration.") @@ -172,7 +172,7 @@ def load_structured_data( logger.info(f"Sample chars: {sample_chars} characters") # Use the helper function to discover schema (get raw response for display) - response = _auto_discover_schema(api_url, input_file, sample_chars, flow, logger, return_raw_response=True, token=token, workspace=workspace) + response = _auto_discover_schema(api_url, input_file, sample_chars, flow, logger, return_raw_response=True, workspace=workspace) if response: # Debug: print response type and content @@ -203,7 +203,7 @@ def load_structured_data( # If no schema specified, discover it first if not schema_name: logger.info("No schema specified, auto-discovering...") - schema_name = _auto_discover_schema(api_url, input_file, sample_chars, flow, logger, token=token, workspace=workspace) + schema_name = _auto_discover_schema(api_url, input_file, sample_chars, flow, logger, workspace=workspace) if not schema_name: print("Error: Could not determine schema automatically.") print("Please specify a schema using --schema-name or run --discover-schema first.") @@ -213,7 +213,7 @@ def load_structured_data( logger.info(f"Target schema: {schema_name}") # Generate descriptor using helper function - descriptor = _auto_generate_descriptor(api_url, input_file, schema_name, sample_chars, flow, logger, token=token, workspace=workspace) + descriptor = _auto_generate_descriptor(api_url, input_file, schema_name, sample_chars, flow, logger, workspace=workspace) if descriptor: # Output the generated descriptor @@ -603,7 +603,7 @@ def _send_to_trustgraph(rows, api_url, flow, batch_size=1000, token=None, worksp # Helper functions for auto mode -def _auto_discover_schema(api_url, input_file, sample_chars, flow, logger, return_raw_response=False, token=None, workspace="default"): +def _auto_discover_schema(api_url, input_file, sample_chars, flow, logger, return_raw_response=False, workspace="default"): """Auto-discover the best matching schema for the input data Args: @@ -626,7 +626,7 @@ def _auto_discover_schema(api_url, input_file, sample_chars, flow, logger, retur # Import API modules from trustgraph.api import Api from trustgraph.api.types import ConfigKey - api = Api(api_url, token=token, workspace=workspace) + api = Api(api_url, workspace=workspace) config_api = api.config() # Get available schemas @@ -707,7 +707,7 @@ def _auto_discover_schema(api_url, input_file, sample_chars, flow, logger, retur return None -def _auto_generate_descriptor(api_url, input_file, schema_name, sample_chars, flow, logger, token=None, workspace="default"): +def _auto_generate_descriptor(api_url, input_file, schema_name, sample_chars, flow, logger, workspace="default"): """Auto-generate descriptor configuration for the discovered schema""" try: # Read sample data @@ -717,7 +717,7 @@ def _auto_generate_descriptor(api_url, input_file, schema_name, sample_chars, fl # Import API modules from trustgraph.api import Api from trustgraph.api.types import ConfigKey - api = Api(api_url, token=token, workspace=workspace) + api = Api(api_url, workspace=workspace) config_api = api.config() # Get schema definition diff --git a/trustgraph-embeddings-hf/pyproject.toml b/trustgraph-embeddings-hf/pyproject.toml index b8bd7d1c..4bf17688 100644 --- a/trustgraph-embeddings-hf/pyproject.toml +++ b/trustgraph-embeddings-hf/pyproject.toml @@ -10,8 +10,8 @@ description = "HuggingFace embeddings support for TrustGraph." readme = "README.md" requires-python = ">=3.8" dependencies = [ - "trustgraph-base>=2.6,<2.7", - "trustgraph-flow>=2.6,<2.7", + "trustgraph-base>=2.5,<2.6", + "trustgraph-flow>=2.5,<2.6", "torch", "urllib3", "transformers", diff --git a/trustgraph-flow/pyproject.toml b/trustgraph-flow/pyproject.toml index f9f6c5d9..547dea3c 100644 --- a/trustgraph-flow/pyproject.toml +++ b/trustgraph-flow/pyproject.toml @@ -10,7 +10,7 @@ description = "TrustGraph provides a means to run a pipeline of flexible AI proc readme = "README.md" requires-python = ">=3.8" dependencies = [ - "trustgraph-base>=2.6,<2.7", + "trustgraph-base>=2.5,<2.6", "aiohttp", "anthropic", "scylla-driver", diff --git a/trustgraph-flow/trustgraph/decoding/pdf/pdf_decoder.py b/trustgraph-flow/trustgraph/decoding/pdf/pdf_decoder.py index ae393028..ca242265 100755 --- a/trustgraph-flow/trustgraph/decoding/pdf/pdf_decoder.py +++ b/trustgraph-flow/trustgraph/decoding/pdf/pdf_decoder.py @@ -32,10 +32,6 @@ logger = logging.getLogger(__name__) default_ident = "document-decoder" -def _looks_like_pdf(content): - return content.lstrip().startswith(b"%PDF-") - - class Processor(FlowProcessor): def __init__(self, **params): @@ -98,37 +94,33 @@ class Processor(FlowProcessor): ) return - # Check if we should fetch from librarian or use inline data - if v.document_id: - # Fetch from librarian via Pulsar - logger.info(f"Fetching document {v.document_id} from librarian...") - - content = await flow.librarian.fetch_document_content( - document_id=v.document_id, - - ) - - # Content is base64 encoded - if isinstance(content, str): - content = content.encode('utf-8') - decoded_content = base64.b64decode(content) - - logger.info(f"Fetched {len(decoded_content)} bytes from librarian") - else: - # Use inline data (backward compatibility) - decoded_content = base64.b64decode(v.data) - - if not _looks_like_pdf(decoded_content): - logger.error( - f"Document {v.metadata.id} is not valid PDF content. " - f"Ignoring document." - ) - return - - with tempfile.NamedTemporaryFile(delete=False, suffix='.pdf') as fp: + with tempfile.NamedTemporaryFile(delete_on_close=False, suffix='.pdf') as fp: temp_path = fp.name - fp.write(decoded_content) - fp.close() + + # Check if we should fetch from librarian or use inline data + if v.document_id: + # Fetch from librarian via Pulsar + logger.info(f"Fetching document {v.document_id} from librarian...") + fp.close() + + content = await flow.librarian.fetch_document_content( + document_id=v.document_id, + + ) + + # Content is base64 encoded + if isinstance(content, str): + content = content.encode('utf-8') + decoded_content = base64.b64decode(content) + + with open(temp_path, 'wb') as f: + f.write(decoded_content) + + logger.info(f"Fetched {len(decoded_content)} bytes from librarian") + else: + # Use inline data (backward compatibility) + fp.write(base64.b64decode(v.data)) + fp.close() global PyPDFLoader if PyPDFLoader is None: diff --git a/trustgraph-flow/trustgraph/gateway/registry.py b/trustgraph-flow/trustgraph/gateway/registry.py index 4fba7920..ca235315 100644 --- a/trustgraph-flow/trustgraph/gateway/registry.py +++ b/trustgraph-flow/trustgraph/gateway/registry.py @@ -506,18 +506,18 @@ _FLOW_SERVICES = { "text-completion": "llm", "prompt": "llm", "mcp-tool": "mcp", - "graph-rag": "graph-rag:read", - "document-rag": "document-rag:read", + "graph-rag": "graph:read", + "document-rag": "documents:read", "embeddings": "embeddings", - "graph-embeddings": "graph-embeddings:read", - "document-embeddings": "document-embeddings:read", - "triples": "triples:read", + "graph-embeddings": "graph:read", + "document-embeddings": "documents:read", + "triples": "graph:read", "rows": "rows:read", - "nlp-query": "nlp-query:read", - "structured-query": "structured-query:read", - "structured-diag": "structured-query:read", - "row-embeddings": "row-embeddings:read", - "sparql": "sparql:read", + "nlp-query": "rows:read", + "structured-query": "rows:read", + "structured-diag": "rows:read", + "row-embeddings": "rows:read", + "sparql": "graph:read", } for _kind, _cap in _FLOW_SERVICES.items(): _register_flow_kind("flow-service", _kind, _cap) @@ -525,10 +525,10 @@ for _kind, _cap in _FLOW_SERVICES.items(): # Streaming import socket endpoints. _FLOW_IMPORTS = { - "triples": "triples:write", - "graph-embeddings": "graph-embeddings:write", - "document-embeddings": "document-embeddings:write", - "entity-contexts": "entity-contexts:write", + "triples": "graph:write", + "graph-embeddings": "graph:write", + "document-embeddings": "documents:write", + "entity-contexts": "documents:write", "rows": "rows:write", } for _kind, _cap in _FLOW_IMPORTS.items(): @@ -537,35 +537,10 @@ for _kind, _cap in _FLOW_IMPORTS.items(): # Streaming export socket endpoints. _FLOW_EXPORTS = { - "triples": "triples:read", - "graph-embeddings": "graph-embeddings:read", - "document-embeddings": "document-embeddings:read", - "entity-contexts": "entity-contexts:read", + "triples": "graph:read", + "graph-embeddings": "graph:read", + "document-embeddings": "documents:read", + "entity-contexts": "documents:read", } for _kind, _cap in _FLOW_EXPORTS.items(): _register_flow_kind("flow-export", _kind, _cap) - - -# --------------------------------------------------------------------------- -# Enterprise IAM operations. -# -# These are additive — they register alongside the OSS IAM operations. -# When the OSS regime receives an unknown operation it returns an error; -# when the enterprise regime is running, it handles them. -# --------------------------------------------------------------------------- - -for _op in ( - "create-group", "get-group", "list-groups", - "update-group", "delete-group", - "add-group-member", "remove-group-member", "list-group-members", - "add-group-grant", "remove-group-grant", "list-group-grants", - "add-user-grant", "remove-user-grant", "list-user-grants", - "resolve-effective-permissions", -): - register(Operation( - name=_op, - capability="iam:admin", - resource_level=ResourceLevel.SYSTEM, - extract_resource=_empty_resource, - extract_parameters=_no_parameters, - )) diff --git a/trustgraph-flow/trustgraph/iam/service/iam.py b/trustgraph-flow/trustgraph/iam/service/iam.py index 037b6bb0..5f86e688 100644 --- a/trustgraph-flow/trustgraph/iam/service/iam.py +++ b/trustgraph-flow/trustgraph/iam/service/iam.py @@ -58,18 +58,8 @@ AUTHZ_CACHE_TTL_SECONDS = 60 _READER_CAPS = { "agent", "graph:read", - "triples:read", - "sparql:read", - "graph-rag:read", - "graph-embeddings:read", "documents:read", - "document-rag:read", - "document-embeddings:read", - "entity-contexts:read", "rows:read", - "nlp-query:read", - "structured-query:read", - "row-embeddings:read", "llm", "embeddings", "mcp", @@ -83,10 +73,6 @@ _READER_CAPS = { _WRITER_CAPS = _READER_CAPS | { "graph:write", - "triples:write", - "graph-embeddings:write", - "document-embeddings:write", - "entity-contexts:write", "documents:write", "rows:write", "collections:write", diff --git a/trustgraph-flow/trustgraph/query/rows/cassandra/service.py b/trustgraph-flow/trustgraph/query/rows/cassandra/service.py index f9868d67..7157daae 100644 --- a/trustgraph-flow/trustgraph/query/rows/cassandra/service.py +++ b/trustgraph-flow/trustgraph/query/rows/cassandra/service.py @@ -24,7 +24,7 @@ from .... schema import RowsQueryRequest, RowsQueryResponse, GraphQLError from .... schema import Error, RowSchema, Field as SchemaField from .... base import FlowProcessor, ConsumerSpec, ProducerSpec from .... base.cassandra_config import add_cassandra_args, resolve_cassandra_config -from .... tables.cassandra_async import async_execute, async_execute_paged, async_scan +from .... tables.cassandra_async import async_execute from ... graphql import GraphQLSchemaBuilder, SortDirection @@ -180,7 +180,7 @@ class Processor(FlowProcessor): description=field_def.get("description", ""), required=field_def.get("required", False), enum_values=field_def.get("enum", []), - indexed=field_def.get("indexed", False), + indexed=field_def.get("indexed", False) ) fields.append(field) @@ -232,8 +232,6 @@ class Processor(FlowProcessor): for index_name in index_names: if index_name in filters: value = filters[index_name] - if value == "" or value is None: - continue # Single field index -> single element list index_value = [str(value)] return (index_name, index_value) @@ -284,13 +282,11 @@ class Processor(FlowProcessor): query += f" LIMIT {limit}" try: - pages = await async_execute_paged( - self.session, query, params - ) - for page in pages: - for row in page: - row_dict = dict(row.data) if row.data else {} - results.append(row_dict) + rows = await async_execute(self.session, query, params) + for row in rows: + # Convert data map to dict with proper field names + row_dict = dict(row.data) if row.data else {} + results.append(row_dict) except Exception as e: logger.error(f"Failed to query rows: {e}", exc_info=True) raise @@ -312,6 +308,8 @@ class Processor(FlowProcessor): # Query using the first index (arbitrary choice for scan) primary_index = index_names[0] + # We need to scan all values for this index + # This requires ALLOW FILTERING or a different approach query = f""" SELECT data, source FROM {safe_keyspace}.rows WHERE collection = %s @@ -322,18 +320,17 @@ class Processor(FlowProcessor): params = [collection, schema_name, primary_index] try: - def row_filter(row): - row_dict = dict(row.data) if row.data else {} - return self._matches_filters(row_dict, filters, row_schema) + rows = await async_execute(self.session, query, params) - matched_rows = await async_scan( - self.session, query, params, - row_filter=row_filter, - limit=limit, - ) - for row in matched_rows: + for row in rows: row_dict = dict(row.data) if row.data else {} - results.append(row_dict) + + # Apply post-filters + if self._matches_filters(row_dict, filters, row_schema): + results.append(row_dict) + + if limit and len(results) >= limit: + break except Exception as e: logger.error(f"Failed to scan rows: {e}", exc_info=True) @@ -366,7 +363,7 @@ class Processor(FlowProcessor): # Parse filter key for operator if '_' in filter_key: parts = filter_key.rsplit('_', 1) - if parts[1] in ['gt', 'gte', 'lt', 'lte', 'contains', 'in', 'not', 'startsWith', 'endsWith', 'not_in']: + if parts[1] in ['gt', 'gte', 'lt', 'lte', 'contains', 'in']: field_name = parts[0] operator = parts[1] else: @@ -403,18 +400,6 @@ class Processor(FlowProcessor): elif operator == 'in': if str(row_value) not in [str(v) for v in filter_value]: return False - elif operator == 'not': - if str(row_value) == str(filter_value): - return False - elif operator == 'startsWith': - if not str(row_value).startswith(str(filter_value)): - return False - elif operator == 'endsWith': - if not str(row_value).endswith(str(filter_value)): - return False - elif operator == 'not_in': - if str(row_value) in [str(v) for v in filter_value]: - return False except (ValueError, TypeError): return False diff --git a/trustgraph-flow/trustgraph/storage/rows/cassandra/write.py b/trustgraph-flow/trustgraph/storage/rows/cassandra/write.py index 31fc41a7..e5506723 100755 --- a/trustgraph-flow/trustgraph/storage/rows/cassandra/write.py +++ b/trustgraph-flow/trustgraph/storage/rows/cassandra/write.py @@ -172,7 +172,7 @@ class Processor(CollectionConfigHandler, FlowProcessor): description=field_def.get("description", ""), required=field_def.get("required", False), enum_values=field_def.get("enum", []), - indexed=field_def.get("indexed", False), + indexed=field_def.get("indexed", False) ) fields.append(field) diff --git a/trustgraph-flow/trustgraph/tables/cassandra_async.py b/trustgraph-flow/trustgraph/tables/cassandra_async.py index fe410a26..205ed6b9 100644 --- a/trustgraph-flow/trustgraph/tables/cassandra_async.py +++ b/trustgraph-flow/trustgraph/tables/cassandra_async.py @@ -80,14 +80,14 @@ def _set_exception_if_pending(fut, exc): fut.set_exception(exc) -async def async_execute_paged(session, query, parameters=None, fetch_size=5000): +async def async_execute_paged(session, query, parameters=None, fetch_size=100): """Execute a CQL query with page-by-page iteration. Uses synchronous session.execute() inside run_in_executor so that the driver's ResultSet paging works correctly without materialising the entire result set in memory. - Returns all pages as a list of lists. + Yields one page of rows at a time (as a list). """ loop = asyncio.get_running_loop() @@ -111,50 +111,3 @@ async def async_execute_paged(session, query, parameters=None, fetch_size=5000): return await loop.run_in_executor( None, _fetch_all_pages ) - - -async def async_scan( - session, query, parameters=None, row_filter=None, - limit=None, fetch_size=5000, -): - """Scan a CQL query page-by-page, applying a filter and limit. - - Only matching rows accumulate in memory. Each page is discarded - after processing, so peak memory is bounded by fetch_size plus - the number of matching rows (capped by limit). - - Args: - session: cassandra.cluster.Session - query: CQL statement string - parameters: bind params - row_filter: callable(row) -> bool, or None to accept all - limit: max results to return, or None for unlimited - fetch_size: rows per Cassandra page fetch - - Returns: - List of matching rows. - """ - loop = asyncio.get_running_loop() - - if isinstance(query, str): - stmt = SimpleStatement(query, fetch_size=fetch_size) - else: - stmt = query - stmt.fetch_size = fetch_size - - def _scan(): - results = [] - result_set = session.execute(stmt, parameters) - while True: - for row in result_set.current_rows: - if row_filter is None or row_filter(row): - results.append(row) - if limit and len(results) >= limit: - return results - if result_set.has_more_pages: - result_set.fetch_next_page() - else: - break - return results - - return await loop.run_in_executor(None, _scan) diff --git a/trustgraph-mcp/trustgraph/mcp_server/mcp.py b/trustgraph-mcp/trustgraph/mcp_server/mcp.py index 11b975b2..7378db64 100755 --- a/trustgraph-mcp/trustgraph/mcp_server/mcp.py +++ b/trustgraph-mcp/trustgraph/mcp_server/mcp.py @@ -8,180 +8,71 @@ import logging import json import uuid import argparse -from dataclasses import dataclass, field +from dataclasses import dataclass from collections.abc import AsyncIterator from functools import partial from mcp.server.fastmcp import FastMCP, Context -from mcp.server.auth.provider import AccessToken, TokenVerifier -from mcp.server.auth.middleware.auth_context import get_access_token +from mcp.types import TextContent +from websockets.asyncio.client import connect from trustgraph.base.logging import add_logging_args, setup_logging -from . tg_socket import WebSocketManager, _token_key - -logger = logging.getLogger(__name__) - - -# Wire-format Term type codes (match TermTranslator compact keys) -_TERM_TYPES = { - "iri": "i", - "literal": "l", - "blank": "b", -} - - -def _make_term(value: str, term_type: str) -> dict: - """Build a compact-key Term dict for the gateway wire format. - - Args: - value: The term value (IRI string, literal text, or blank node id). - term_type: One of "iri", "literal", "blank". - """ - t = _TERM_TYPES.get(term_type) - if t is None: - raise ValueError( - f"Unknown term type '{term_type}' — " - f"expected one of: {', '.join(_TERM_TYPES)}" - ) - - if t == "i": - return {"t": t, "i": value} - elif t == "l": - return {"t": t, "v": value} - elif t == "b": - return {"t": t, "d": value} - return {"t": t} - -# ── Security boundary: MCP client → MCP server ── -# The MCP client authenticates to this server via a Bearer token in the -# HTTP Authorization header. The SDK's auth middleware extracts and -# verifies the token before any tool handler runs. -# -# We implement a pass-through TokenVerifier: the gateway is the real -# authority, so we accept any non-empty Bearer token here and forward -# it to the gateway for validation. The gateway's in-band auth -# protocol and IAM regime decide whether the token is valid. -# -# This means an invalid token will connect to the MCP server but will -# fail when the first WebSocket auth frame is sent to the gateway. -# That is intentional — the gateway is the single source of truth. - - -class PassthroughTokenVerifier(TokenVerifier): - """Accept any non-empty Bearer token and forward it downstream. - - The TrustGraph gateway is the authority for token validation, not - this MCP server. We store the raw token in the AccessToken so that - tool handlers can retrieve it via ``get_access_token().token`` and - forward it to the gateway. - """ - - async def verify_token(self, token: str) -> AccessToken | None: - if not token: - return None - return AccessToken( - token=token, - client_id="mcp-caller", - scopes=[], - ) - +from . tg_socket import WebSocketManager @dataclass class AppContext: - sockets: dict[str, WebSocketManager] = field(default_factory=dict) - websocket_url: str = "" - + sockets: dict[str, WebSocketManager] + websocket_url: str + gateway_token: str @asynccontextmanager -async def app_lifespan( - server: FastMCP, - websocket_url: str = "ws://api-gateway:8088/api/v1/socket", -) -> AsyncIterator[AppContext]: - """Manage per-server state: the pool of per-caller WebSocket - connections to the gateway.""" +async def app_lifespan(server: FastMCP, websocket_url: str = "ws://api-gateway:8088/api/v1/socket", gateway_token: str = "") -> AsyncIterator[AppContext]: - sockets: dict[str, WebSocketManager] = {} + """ + Manage application lifecycle with type-safe context + """ + + # Initialize on startup + sockets = {} try: - yield AppContext(sockets=sockets, websocket_url=websocket_url) + yield AppContext(sockets=sockets, websocket_url=websocket_url, gateway_token=gateway_token) finally: - logger.info("Shutting down — closing %d WebSocket(s)", len(sockets)) + # Cleanup on shutdown + logging.info("Shutting down context") - for key, manager in sockets.items(): - try: - await manager.stop() - except Exception as e: - logger.warning("Error closing socket %s: %s", key, e) + for k, manager in sockets.items(): + logging.info(f"Closing socket for {k}") + await manager.stop() - logger.info("Shutdown complete") + logging.info("Shutdown complete") - -def _require_token() -> str: - """Extract the caller's Bearer token from the MCP auth context. - - Raises RuntimeError if no token is present (the caller did not - authenticate). - """ - # ── Security boundary: token extraction ── - # get_access_token() reads the contextvar set by the SDK's - # AuthContextMiddleware. The token was placed there by - # PassthroughTokenVerifier.verify_token() and is the raw Bearer - # value from the MCP client's Authorization header. - access = get_access_token() - if access is None or not access.token: - raise RuntimeError( - "Authentication required — send a Bearer token in the " - "Authorization header" - ) - return access.token - - -async def get_socket_manager(ctx, token): - """Return (or create) an authenticated WebSocket for this token. - - Each unique token gets its own WebSocket connection so that - gateway-side identity, workspace binding, and capability scoping - are preserved per caller. - """ +async def get_socket_manager(ctx): lifespan_context = ctx.request_context.lifespan_context sockets = lifespan_context.sockets websocket_url = lifespan_context.websocket_url + gateway_token = lifespan_context.gateway_token - key = _token_key(token) + if "default" in sockets: + logging.info("Return existing socket manager") + return sockets["default"] - if key in sockets: - manager = sockets[key] - if manager.socket is not None: - return manager - # Socket was closed (e.g. server-side timeout) — reconnect. - del sockets[key] + logging.info(f"Opening socket to {websocket_url}...") - logger.info("Opening authenticated WebSocket to %s …", websocket_url) + # Create manager with empty pending requests + manager = WebSocketManager(websocket_url, token=gateway_token) - manager = WebSocketManager(websocket_url, token=token) + # Start reader task with the proper manager await manager.start() - # Verify the token is valid by calling whoami. This confirms the - # gateway accepted the token and gives us the caller's identity. - try: - identity = await manager.whoami() - logger.info( - "WebSocket ready — caller: %s", - identity.get("handle", "unknown"), - ) - except Exception as e: - await manager.stop() - raise RuntimeError( - f"Token rejected by gateway (whoami failed): {e}" - ) from e + sockets["default"] = manager - sockets[key] = manager + logging.info("Return new socket manager") return manager - @dataclass class EmbeddingsResponse: vectors: List[List[float]] @@ -291,23 +182,10 @@ class PutConfigResponse: class DeleteConfigResponse: pass -@dataclass -class SparqlQueryResponse: - query_type: str - variables: List[str] - bindings: List[Dict[str, Any]] - ask_result: bool - triples: List[Dict[str, Any]] - -@dataclass -class GraphQLQueryResponse: - data: Any - errors: List[Dict[str, Any]] - @dataclass class GetPromptsResponse: prompts: List[str] - + @dataclass class GetPromptResponse: prompt: Dict[str, Any] @@ -316,61 +194,31 @@ class GetPromptResponse: class GetSystemPromptResponse: prompt: str - class McpServer: - def __init__( - self, - host: str = "0.0.0.0", - port: int = 8000, - websocket_url: str = "ws://api-gateway:8088/api/v1/socket", - auth_issuer: str = "", - auth_resource_url: str = "", - ): + def __init__(self, host: str = "0.0.0.0", port: int = 8000, websocket_url: str = "ws://api-gateway:8088/api/v1/socket", gateway_token: str = ""): self.host = host self.port = port self.websocket_url = websocket_url + self.gateway_token = gateway_token - lifespan_with_url = partial( - app_lifespan, websocket_url=websocket_url, - ) - - # ── Security: MCP-level auth configuration ── - # The SDK requires AuthSettings whenever a token_verifier is - # present. The issuer_url tells MCP clients where to obtain - # tokens; resource_server_url identifies this server in OAuth - # protected-resource metadata. - # - # The PassthroughTokenVerifier accepts any non-empty Bearer - # token — real validation happens at the gateway. This is - # intentional: the gateway is the single source of truth for - # identity and capability checks. - from mcp.server.auth.settings import AuthSettings - - auth_settings = AuthSettings( - issuer_url=auth_issuer or f"http://{host}:{port}", - resource_server_url=auth_resource_url or f"http://{host}:{port}", - ) - + # Create a partial function to pass websocket_url to app_lifespan + lifespan_with_url = partial(app_lifespan, websocket_url=websocket_url, gateway_token=gateway_token) + self.mcp = FastMCP( - "TrustGraph", - dependencies=["trustgraph-base"], - host=self.host, - port=self.port, + "TrustGraph", dependencies=["trustgraph-base"], + host=self.host, port=self.port, lifespan=lifespan_with_url, - token_verifier=PassthroughTokenVerifier(), - auth=auth_settings, ) self._register_tools() - + def _register_tools(self): """Register all MCP tools""" + # Register all the tools that were previously registered globally self.mcp.tool()(self.embeddings) self.mcp.tool()(self.text_completion) self.mcp.tool()(self.graph_rag) self.mcp.tool()(self.agent) self.mcp.tool()(self.triples_query) - self.mcp.tool()(self.sparql_query) - self.mcp.tool()(self.graphql_query) self.mcp.tool()(self.graph_embeddings_query) self.mcp.tool()(self.get_config_all) self.mcp.tool()(self.get_config) @@ -395,69 +243,67 @@ class McpServer: self.mcp.tool()(self.load_document) self.mcp.tool()(self.remove_document) self.mcp.tool()(self.add_processing) - + def run(self): """Run the MCP server""" self.mcp.run(transport="streamable-http") - async def _get_manager(self, ctx): - """Get an authenticated WebSocket manager for the current caller. - - Extracts the Bearer token from the MCP auth context and returns - a per-token WebSocket connection to the gateway. - """ - token = _require_token() - return await get_socket_manager(ctx, token) - async def embeddings( self, - texts: List[str], + text: str, flow_id: str | None = None, - workspace: str | None = None, ctx: Context = None, ) -> EmbeddingsResponse: """ - Generate vector embeddings for the given texts using TrustGraph's embedding models. - + Generate vector embeddings for the given text using TrustGraph's embedding models. + This tool converts text into high-dimensional vectors that capture semantic meaning, enabling similarity searches, clustering, and other vector-based operations. - + Args: - texts: List of input texts to convert into embeddings. Each text can be a - sentence, paragraph, or document. + text: The input text to convert into embeddings. Can be a sentence, paragraph, + or document. The text will be processed by the configured embedding model. flow_id: Optional flow identifier to use for processing (default: "default"). Different flows may use different embedding models or configurations. - workspace: Optional workspace to query. If omitted, uses the caller's - default workspace. - + Returns: - EmbeddingsResponse containing a list of vectors, one per input text. + EmbeddingsResponse containing a list of vectors. Each vector is a list of floats + representing the text's semantic embedding in the model's vector space. + + Example usage: + - Convert a query into embeddings for similarity search + - Generate embeddings for documents before storing them + - Create embeddings for comparison with existing knowledge """ - logger.info("Embeddings request") + logging.info("Embeddings request made") if flow_id is None: flow_id = "default" - manager = await self._get_manager(ctx) + manager = await get_socket_manager(ctx, "trustgraph") - if ctx: - await ctx.session.send_log_message( - level="info", - data="Computing embeddings via websocket...", - logger="notification_stream", - related_request_id=ctx.request_id, - ) + if ctx is None: + raise RuntimeError("No context provided") - request_data = {"texts": texts} - - gen = manager.request( - "embeddings", request_data, flow_id, workspace=workspace, + await ctx.session.send_log_message( + level="info", + data=f"Computing embeddings via websocket...", + logger="notification_stream", + related_request_id=ctx.request_id, ) + # Send websocket request + request_data = {"text": text} + logging.info("making request") + + gen = manager.request("embeddings", request_data, flow_id) + async for response in gen: + + # Extract vectors from response vectors = response.get("vectors", [[]]) break - + return EmbeddingsResponse(vectors=vectors) async def text_completion( @@ -465,47 +311,62 @@ class McpServer: prompt: str, system: str | None = None, flow_id: str | None = None, - workspace: str | None = None, ctx: Context = None, ) -> TextCompletionResponse: """ Generate text completions using TrustGraph's language models. - + + This tool sends prompts to configured language models and returns generated text. + It supports both user prompts and system instructions for controlling generation. + Args: prompt: The main prompt or question to send to the language model. + This is the primary input that guides the model's response. system: Optional system prompt that sets the context, role, or behavior - for the AI assistant. - flow_id: Optional flow identifier (default: "default"). - workspace: Optional workspace to query. If omitted, uses the caller's - default workspace. - + for the AI assistant (e.g., "You are a helpful coding assistant"). + System prompts influence how the model interprets and responds. + flow_id: Optional flow identifier (default: "default"). Different flows + may use different models, parameters, or processing pipelines. + Returns: TextCompletionResponse containing the generated text response from the model. + + Example usage: + - Ask questions and get AI-generated answers + - Generate code, documentation, or creative content + - Perform text analysis, summarization, or transformation tasks + - Use system prompts to control tone, style, or domain expertise """ if system is None: system = "" if flow_id is None: flow_id = "default" - manager = await self._get_manager(ctx) + if ctx is None: + raise RuntimeError("No context provided") - if ctx: - await ctx.session.send_log_message( - level="info", - data="Generating text completion via websocket...", - logger="notification_stream", - related_request_id=ctx.request_id, - ) + # Use websocket if context is available + logging.info("Text completion request made via websocket") - request_data = {"system": system, "prompt": prompt} + manager = await get_socket_manager(ctx, "trustgraph") - gen = manager.request( - "text-completion", request_data, flow_id, workspace=workspace, + await ctx.session.send_log_message( + level="info", + data=f"Generating text completion via websocket...", + logger="notification_stream", + related_request_id=ctx.request_id, ) + # Send websocket request + request_data = {"system": system, "prompt": prompt} + + gen = manager.request("text-completion", request_data, flow_id) + async for response in gen: + + # Extract vectors from response text = response.get("response", "") break - + return TextCompletionResponse(response=text) async def graph_rag( @@ -517,43 +378,58 @@ class McpServer: max_subgraph_size: int | None = None, max_path_length: int | None = None, flow_id: str | None = None, - workspace: str | None = None, ctx: Context = None, ) -> GraphRagResponse: """ Perform Graph-based Retrieval Augmented Generation (GraphRAG) queries. - + GraphRAG combines knowledge graph traversal with language model generation to provide - contextually rich answers. - + contextually rich answers. It explores relationships between entities to build relevant + context before generating responses. + Args: question: The question or query to answer using the knowledge graph. + The system will find relevant entities and relationships to inform the response. collection: Knowledge collection to query (default: "default"). + Different collections may contain domain-specific knowledge. entity_limit: Maximum number of entities to retrieve during graph traversal. + Higher limits provide more context but increase processing time. triple_limit: Maximum number of relationship triples to consider. + Controls the depth of relationship exploration. max_subgraph_size: Maximum size of the subgraph to extract for context. + Larger subgraphs provide richer context but use more resources. max_path_length: Maximum path length to traverse in the knowledge graph. + Longer paths can discover distant but relevant relationships. flow_id: Processing flow to use (default: "default"). - workspace: Optional workspace to query. If omitted, uses the caller's - default workspace. - + Returns: GraphRagResponse containing the generated answer informed by knowledge graph context. + + Example usage: + - Answer complex questions requiring multi-hop reasoning + - Explore relationships between entities in your knowledge base + - Generate responses grounded in structured knowledge + - Perform research queries across connected information """ if collection is None: collection = "default" if flow_id is None: flow_id = "default" - manager = await self._get_manager(ctx) + if ctx is None: + raise RuntimeError("No context provided") - if ctx: - await ctx.session.send_log_message( - level="info", - data="Processing GraphRAG query via websocket...", - logger="notification_stream", - related_request_id=ctx.request_id, - ) + logging.info("GraphRAG request made via websocket") + manager = await get_socket_manager(ctx) + + await ctx.session.send_log_message( + level="info", + data=f"Processing GraphRAG query via websocket...", + logger="notification_stream", + related_request_id=ctx.request_id, + ) + + # Build request data with all parameters request_data = { "query": question } @@ -564,19 +440,20 @@ class McpServer: if max_subgraph_size: request_data["max_subgraph_size"] = max_subgraph_size if max_path_length: request_data["max_path_length"] = max_path_length - gen = manager.request( - "graph-rag", request_data, flow_id, workspace=workspace, - ) + gen = manager.request("graph-rag", request_data, flow_id) text_chunks = [] async for response in gen: + # Handle new message format with message_type message_type = response.get("message_type", "chunk") + # Only collect text from chunk messages if message_type == "chunk": chunk_text = response.get("response", "") if chunk_text: text_chunks.append(chunk_text) + # Check if session is complete if response.get("end_of_session"): break @@ -587,447 +464,404 @@ class McpServer: question: str, collection: str | None = None, flow_id: str | None = None, - workspace: str | None = None, ctx: Context = None, ) -> AgentResponse: """ Execute intelligent agent queries with reasoning and tool usage capabilities. - + + The agent can perform complex multi-step reasoning, use tools, and provide + detailed thought processes. It's designed for tasks requiring planning, + analysis, and iterative problem-solving. + Args: - question: The question or task for the agent to solve. + question: The question or task for the agent to solve. Can be complex + queries requiring multiple steps, analysis, or tool usage. collection: Knowledge collection the agent can access (default: "default"). - flow_id: Agent workflow to use (default: "default"). - workspace: Optional workspace to query. If omitted, uses the caller's - default workspace. - + Determines what information and tools are available. + flow_id: Agent workflow to use (default: "default"). Different flows + may have different capabilities, tools, or reasoning strategies. + Returns: AgentResponse containing the final answer after the agent's reasoning process. + During execution, you'll see intermediate thoughts and observations. + + Example usage: + - Solve complex analytical problems requiring multiple steps + - Perform research tasks across multiple information sources + - Handle queries that need tool usage and decision-making + - Get detailed explanations of reasoning processes + + Note: This tool provides real-time updates on the agent's thinking process + through log messages, so you can follow its reasoning steps. """ if collection is None: collection = "default" if flow_id is None: flow_id = "default" - manager = await self._get_manager(ctx) + if ctx is None: + raise RuntimeError("No context provided") - if ctx: - await ctx.session.send_log_message( - level="info", - data="Processing agent query via websocket...", - logger="notification_stream", - related_request_id=ctx.request_id, - ) + logging.info("Agent request made via websocket") + manager = await get_socket_manager(ctx) + + await ctx.session.send_log_message( + level="info", + data=f"Processing agent query via websocket...", + logger="notification_stream", + related_request_id=ctx.request_id, + ) + + # Build request data with all parameters request_data = { "question": question } if collection: request_data["collection"] = collection - gen = manager.request( - "agent", request_data, flow_id, workspace=workspace, - ) + gen = manager.request("agent", request_data, flow_id) async for response in gen: - logger.debug("Agent response: %s", response) + logging.debug(f"Agent response: {response}") - if ctx: - if "thought" in response: - await ctx.session.send_log_message( - level="info", - data=f"Thinking: {response['thought']}", - logger="notification_stream", - related_request_id=ctx.request_id, - ) + if "thought" in response: + await ctx.session.send_log_message( + level="info", + data=f"Thinking: {response['thought']}", + logger="notification_stream", + related_request_id=ctx.request_id, + ) - if "observation" in response: - await ctx.session.send_log_message( - level="info", - data=f"Observation: {response['observation']}", - logger="notification_stream", - related_request_id=ctx.request_id, - ) + if "observation" in response: + await ctx.session.send_log_message( + level="info", + data=f"Observation: {response['observation']}", + logger="notification_stream", + related_request_id=ctx.request_id, + ) + # Extract vectors from response if "answer" in response: answer = response.get("answer", "") return AgentResponse(answer=answer) async def triples_query( self, - s: str | None = None, - s_type: str | None = None, - p: str | None = None, - p_type: str | None = None, - o: str | None = None, - o_type: str | None = None, - collection: str | None = None, - graph: str | None = None, + s_v: str | None = None, + s_e: bool | None = None, + p_v: str | None = None, + p_e: bool | None = None, + o_v: str | None = None, + o_e: bool | None = None, limit: int | None = None, flow_id: str | None = None, - workspace: str | None = None, ctx: Context = None, ) -> TriplesQueryResponse: """ Query knowledge graph triples using subject-predicate-object patterns. - - Each of s, p, o is an RDF term value. Use the corresponding _type - parameter to specify the term kind: - - "iri" (default for s and p): an IRI / entity reference - - "literal" (default for o): a plain literal value - - "blank": a blank node identifier - + + Knowledge graphs store information as triples (subject, predicate, object). + This tool allows flexible querying by specifying any combination of these + components, with wildcards for unspecified parts. + Args: - s: Subject value to match. Leave None for wildcard. - s_type: Subject term type: "iri" (default), "literal", or "blank". - p: Predicate value to match. Leave None for wildcard. - p_type: Predicate term type: "iri" (default), "literal", or "blank". - o: Object value to match. Leave None for wildcard. - o_type: Object term type: "iri", "literal" (default), or "blank". - collection: Knowledge collection to query (default: "default"). - graph: Named graph IRI to restrict the query. None = default graph, - "*" = all graphs. + s_v: Subject value to match (e.g., "John", "Apple Inc."). Leave None for wildcard. + s_e: Whether subject should be treated as an entity (True) or literal (False). + p_v: Predicate/relationship value (e.g., "works_for", "type_of"). Leave None for wildcard. + p_e: Whether predicate should be treated as an entity (True) or literal (False). + o_v: Object value to match (e.g., "Engineer", "Company"). Leave None for wildcard. + o_e: Whether object should be treated as an entity (True) or literal (False). limit: Maximum number of triples to return (default: 20). flow_id: Processing flow identifier (default: "default"). - workspace: Optional workspace to query. If omitted, uses the caller's - default workspace. - + Returns: TriplesQueryResponse containing matching triples from the knowledge graph. + + Example queries: + - Find all relationships for an entity: s_v="John", others None + - Find all instances of a relationship: p_v="works_for", others None + - Find specific facts: s_v="John", p_v="works_for", o_v=None + - Explore entity types: p_v="type_of", others None + + Use this for: + - Exploring knowledge graph structure + - Finding specific facts or relationships + - Discovering connections between entities + - Validating or debugging knowledge content """ if flow_id is None: flow_id = "default" if limit is None: limit = 20 - if collection is None: collection = "default" - manager = await self._get_manager(ctx) + if ctx is None: + raise RuntimeError("No context provided") - if ctx: - await ctx.session.send_log_message( - level="info", - data="Processing triples query via websocket...", - logger="notification_stream", - related_request_id=ctx.request_id, - ) + logging.info("Triples query request made via websocket") - request_data = { - "limit": limit, - "collection": collection, - } + manager = await get_socket_manager(ctx, "trustgraph") - if s is not None: - request_data["s"] = _make_term(s, s_type or "iri") - - if p is not None: - request_data["p"] = _make_term(p, p_type or "iri") - - if o is not None: - request_data["o"] = _make_term(o, o_type or "literal") - - if graph is not None: - request_data["g"] = graph - - gen = manager.request( - "triples", request_data, flow_id, workspace=workspace, + await ctx.session.send_log_message( + level="info", + data=f"Processing triples query via websocket...", + logger="notification_stream", + related_request_id=ctx.request_id, ) + # Build request data with Value objects + request_data = { + "limit": limit + } + + # Add subject if provided + if s_v is not None: + request_data["s"] = {"v": s_v, "e": s_e } + + # Add predicate if provided + if p_v is not None: + request_data["p"] = {"v": p_v, "e": p_e } + + # Add object if provided + if o_v is not None: + request_data["o"] = {"v": o_v, "e": o_e } + + gen = manager.request("triples", request_data, flow_id) + async for response in gen: + # Extract response data triples = response.get("response", []) break - + return TriplesQueryResponse(triples=triples) - async def sparql_query( - self, - query: str, - collection: str | None = None, - limit: int | None = None, - flow_id: str | None = None, - workspace: str | None = None, - ctx: Context = None, - ) -> SparqlQueryResponse: - """ - Execute a SPARQL query against the knowledge graph. - - Supports SELECT, ASK, CONSTRUCT, and DESCRIBE query forms. - - Args: - query: SPARQL query string (e.g. "SELECT ?s ?p ?o WHERE { ?s ?p ?o } LIMIT 10"). - collection: Knowledge collection to query (default: "default"). - limit: Safety limit on number of results (default: 10000). - flow_id: Processing flow identifier (default: "default"). - workspace: Optional workspace to query. If omitted, uses the caller's - default workspace. - - Returns: - SparqlQueryResponse containing the query results. The structure depends - on query type: - - SELECT: variables (column names) and bindings (rows of Term values) - - ASK: ask_result (boolean) - - CONSTRUCT/DESCRIBE: triples - """ - - if collection is None: collection = "default" - if flow_id is None: flow_id = "default" - if limit is None: limit = 10000 - - manager = await self._get_manager(ctx) - - if ctx: - await ctx.session.send_log_message( - level="info", - data="Processing SPARQL query via websocket...", - logger="notification_stream", - related_request_id=ctx.request_id, - ) - - request_data = { - "query": query, - "collection": collection, - "limit": limit, - } - - gen = manager.request( - "sparql", request_data, flow_id, workspace=workspace, - ) - - async for response in gen: - query_type = response.get("query-type", "") - return SparqlQueryResponse( - query_type=query_type, - variables=response.get("variables", []), - bindings=response.get("bindings", []), - ask_result=response.get("ask-result", False), - triples=response.get("triples", []), - ) - - async def graphql_query( - self, - query: str, - collection: str | None = None, - variables: Dict[str, Any] | None = None, - operation_name: str | None = None, - flow_id: str | None = None, - workspace: str | None = None, - ctx: Context = None, - ) -> GraphQLQueryResponse: - """ - Execute a GraphQL query against structured data (rows). - - Queries structured data schemas that have been loaded into TrustGraph. - The available types and fields depend on the schemas configured in the - target workspace. - - Args: - query: GraphQL query string (e.g. '{ customers(where: {status: {eq: "active"}}) { id name } }'). - collection: Data collection to query (default: "default"). - variables: Optional GraphQL variables as a dict. - operation_name: Optional operation name for multi-operation documents. - flow_id: Processing flow identifier (default: "default"). - workspace: Optional workspace to query. If omitted, uses the caller's - default workspace. - - Returns: - GraphQLQueryResponse containing data (the query result) and errors - (any GraphQL field-level errors). - """ - - if collection is None: collection = "default" - if flow_id is None: flow_id = "default" - - manager = await self._get_manager(ctx) - - if ctx: - await ctx.session.send_log_message( - level="info", - data="Processing GraphQL query via websocket...", - logger="notification_stream", - related_request_id=ctx.request_id, - ) - - request_data = { - "query": query, - "collection": collection, - "variables": variables or {}, - } - - if operation_name is not None: - request_data["operation_name"] = operation_name - - gen = manager.request( - "rows", request_data, flow_id, workspace=workspace, - ) - - async for response in gen: - return GraphQLQueryResponse( - data=response.get("data"), - errors=response.get("errors", []), - ) - async def graph_embeddings_query( self, vectors: List[List[float]], limit: int | None = None, flow_id: str | None = None, - workspace: str | None = None, ctx: Context = None, ) -> GraphEmbeddingsQueryResponse: """ Find entities in the knowledge graph using vector similarity search. - + + This tool performs semantic search by comparing embedding vectors to find + the most similar entities in the knowledge graph. It's useful for finding + conceptually related information even when exact text matches don't exist. + Args: - vectors: List of embedding vectors to search with. + vectors: List of embedding vectors to search with. Each vector should be + a list of floats representing semantic embeddings (typically from + the embeddings tool). Multiple vectors can be provided for batch queries. limit: Maximum number of similar entities to return (default: 20). + Higher limits provide more results but may include less relevant matches. flow_id: Processing flow identifier (default: "default"). - workspace: Optional workspace to query. If omitted, uses the caller's - default workspace. - + Returns: - GraphEmbeddingsQueryResponse containing entities ranked by similarity. + GraphEmbeddingsQueryResponse containing entities ranked by similarity to the + input vectors, along with similarity scores and entity metadata. + + Example workflow: + 1. Use the 'embeddings' tool to convert text to vectors + 2. Use this tool to find similar entities in the knowledge graph + 3. Explore the returned entities for relevant information + + Use this for: + - Semantic search across knowledge entities + - Finding conceptually similar content + - Discovering related entities without exact keyword matches + - Building recommendation systems based on entity similarity """ if flow_id is None: flow_id = "default" if limit is None: limit = 20 - manager = await self._get_manager(ctx) + if ctx is None: + raise RuntimeError("No context provided") - if ctx: - await ctx.session.send_log_message( - level="info", - data="Processing graph embeddings query via websocket...", - logger="notification_stream", - related_request_id=ctx.request_id, - ) + logging.info("Graph embeddings query request made via websocket") + manager = await get_socket_manager(ctx, "trustgraph") + + await ctx.session.send_log_message( + level="info", + data=f"Processing graph embeddings query via websocket...", + logger="notification_stream", + related_request_id=ctx.request_id, + ) + + # Build request data request_data = { "vectors": vectors, "limit": limit } - gen = manager.request( - "graph-embeddings", request_data, flow_id, workspace=workspace, - ) + gen = manager.request("graph-embeddings", request_data, flow_id) async for response in gen: + # Extract entities from response entities = response.get("entities", []) break - + return GraphEmbeddingsQueryResponse(entities=entities) async def get_config_all( self, - workspace: str | None = None, ctx: Context = None, ) -> ConfigResponse: """ Retrieve the complete TrustGraph system configuration. - - Args: - workspace: Optional workspace. If omitted, uses the caller's - default workspace. - + + This tool returns all configuration settings for the TrustGraph system, + including model configurations, API keys, flow definitions, and system parameters. + Returns: - ConfigResponse containing the full configuration as a nested dictionary. + ConfigResponse containing the full configuration as a nested dictionary + with all system settings, organized by category (e.g., models, flows, storage). + + Use this for: + - Inspecting current system configuration + - Debugging configuration issues + - Understanding available models and settings + - Auditing system setup and parameters """ - manager = await self._get_manager(ctx) + if ctx is None: + raise RuntimeError("No context provided") - if ctx: - await ctx.session.send_log_message( - level="info", - data="Retrieving all configuration via websocket...", - logger="notification_stream", - related_request_id=ctx.request_id, - ) + logging.info("Get config all request made via websocket") + + manager = await get_socket_manager(ctx, "trustgraph") + + await ctx.session.send_log_message( + level="info", + data=f"Retrieving all configuration via websocket...", + logger="notification_stream", + related_request_id=ctx.request_id, + ) request_data = { "operation": "config" } - gen = manager.request("config", request_data, None, workspace=workspace) + gen = manager.request("config", request_data, None) async for response in gen: config = response.get("config", {}) break - + return ConfigResponse(config=config) async def get_config( self, keys: List[Dict[str, str]], - workspace: str | None = None, ctx: Context = None, ) -> ConfigGetResponse: """ Retrieve specific configuration values by key. - + + This tool allows you to fetch specific configuration settings without + retrieving the entire configuration. Useful for checking particular + settings or API keys. + Args: - keys: List of configuration keys to retrieve. Each key should be a dict with - 'type' and 'key' fields. - workspace: Optional workspace. If omitted, uses the caller's - default workspace. - + keys: List of configuration keys to retrieve. Each key should be a dict with: + - 'type': Configuration category (e.g., 'llm', 'embeddings', 'storage') + - 'key': Specific setting name within that category + Returns: ConfigGetResponse containing the requested configuration values. + + Example keys: + - {'type': 'llm', 'key': 'openai.model'} + - {'type': 'embeddings', 'key': 'default.model'} + - {'type': 'storage', 'key': 'database.url'} + + Use this for: + - Checking specific model configurations + - Validating API key settings + - Inspecting individual system parameters """ - manager = await self._get_manager(ctx) + if ctx is None: + raise RuntimeError("No context provided") - if ctx: - await ctx.session.send_log_message( - level="info", - data="Retrieving specific configuration via websocket...", - logger="notification_stream", - related_request_id=ctx.request_id, - ) + logging.info("Get config request made via websocket") + + manager = await get_socket_manager(ctx, "trustgraph") + + await ctx.session.send_log_message( + level="info", + data=f"Retrieving specific configuration via websocket...", + logger="notification_stream", + related_request_id=ctx.request_id, + ) request_data = { "operation": "get", "keys": keys } - gen = manager.request("config", request_data, None, workspace=workspace) + gen = manager.request("config", request_data, None) async for response in gen: values = response.get("values", []) break - + return ConfigGetResponse(values=values) async def put_config( self, values: List[Dict[str, str]], - workspace: str | None = None, ctx: Context = None, ) -> PutConfigResponse: """ Update system configuration values. - + + This tool allows you to modify TrustGraph system settings, such as + model parameters, API keys, and system behavior configurations. + Args: - values: List of configuration updates. Each should be a dict with - 'type', 'key', and 'value' fields. - workspace: Optional workspace. If omitted, uses the caller's - default workspace. - + values: List of configuration updates. Each update should be a dict with: + - 'type': Configuration category (e.g., 'llm', 'embeddings') + - 'key': Specific setting name to update + - 'value': New value for the setting + Returns: PutConfigResponse confirming the configuration update. + + Example updates: + - {'type': 'llm', 'key': 'openai.model', 'value': 'gpt-4'} + - {'type': 'embeddings', 'key': 'batch_size', 'value': '100'} + + Use this for: + - Switching between different models + - Updating API credentials + - Modifying system behavior parameters + - Configuring processing settings + + Note: Configuration changes may require system restart to take effect. """ - manager = await self._get_manager(ctx) + if ctx is None: + raise RuntimeError("No context provided") - if ctx: - await ctx.session.send_log_message( - level="info", - data="Updating configuration via websocket...", - logger="notification_stream", - related_request_id=ctx.request_id, - ) + logging.info("Put config request made via websocket") + + manager = await get_socket_manager(ctx, "trustgraph") + + await ctx.session.send_log_message( + level="info", + data=f"Updating configuration via websocket...", + logger="notification_stream", + related_request_id=ctx.request_id, + ) request_data = { "operation": "put", "values": values } - gen = manager.request("config", request_data, None, workspace=workspace) + gen = manager.request("config", request_data, None) async for response in gen: return PutConfigResponse() @@ -1035,73 +869,97 @@ class McpServer: async def delete_config( self, keys: List[Dict[str, str]], - workspace: str | None = None, ctx: Context = None, ) -> DeleteConfigResponse: """ Delete specific configuration entries from the system. - + + This tool removes configuration settings, reverting them to system defaults + or disabling specific features. + Args: - keys: List of configuration keys to delete. Each should be a dict with - 'type' and 'key' fields. - workspace: Optional workspace. If omitted, uses the caller's - default workspace. - + keys: List of configuration keys to delete. Each key should be a dict with: + - 'type': Configuration category (e.g., 'llm', 'embeddings') + - 'key': Specific setting name to remove + Returns: DeleteConfigResponse confirming the deletion. + + Use this for: + - Removing custom model configurations + - Clearing API credentials + - Resetting settings to defaults + - Cleaning up obsolete configurations + + Warning: Deleting essential configuration may cause system functionality + to be disabled until properly reconfigured. """ - manager = await self._get_manager(ctx) + if ctx is None: + raise RuntimeError("No context provided") - if ctx: - await ctx.session.send_log_message( - level="info", - data="Deleting configuration via websocket...", - logger="notification_stream", - related_request_id=ctx.request_id, - ) + logging.info("Delete config request made via websocket") + + manager = await get_socket_manager(ctx, "trustgraph") + + await ctx.session.send_log_message( + level="info", + data=f"Deleting configuration via websocket...", + logger="notification_stream", + related_request_id=ctx.request_id, + ) request_data = { "operation": "delete", "keys": keys } - gen = manager.request("config", request_data, None, workspace=workspace) + gen = manager.request("config", request_data, None) async for response in gen: return DeleteConfigResponse() async def get_prompts( self, - workspace: str | None = None, ctx: Context = None, ) -> GetPromptsResponse: """ List all available prompt templates in the system. - - Args: - workspace: Optional workspace. If omitted, uses the caller's - default workspace. - + + Prompt templates are reusable prompts that can be used with language models + for consistent behavior across different queries and use cases. + Returns: GetPromptsResponse containing a list of available prompt template IDs. + Each ID can be used with get_prompt to retrieve the full template. + + Use this for: + - Discovering available prompt templates + - Exploring pre-configured prompts for different tasks + - Finding templates for specific use cases + - Understanding what prompt options are available """ - manager = await self._get_manager(ctx) + if ctx is None: + raise RuntimeError("No context provided") - if ctx: - await ctx.session.send_log_message( - level="info", - data="Retrieving prompt templates via websocket...", - logger="notification_stream", - related_request_id=ctx.request_id, - ) + logging.info("Get prompts request made via websocket") + manager = await get_socket_manager(ctx, "trustgraph") + + await ctx.session.send_log_message( + level="info", + data=f"Retrieving prompt templates via websocket...", + logger="notification_stream", + related_request_id=ctx.request_id, + ) + + # First get all config request_data = { "operation": "config" } - gen = manager.request("config", request_data, None, workspace=workspace) + gen = manager.request("config", request_data, None) async for response in gen: config = response.get("config", {}) @@ -1113,36 +971,49 @@ class McpServer: async def get_prompt( self, prompt_id: str, - workspace: str | None = None, ctx: Context = None, ) -> GetPromptResponse: """ Retrieve a specific prompt template by ID. - + + Prompt templates contain structured prompts with placeholders, instructions, + and metadata for specific tasks or domains. + Args: prompt_id: The unique identifier of the prompt template to retrieve. - workspace: Optional workspace. If omitted, uses the caller's - default workspace. - + Use get_prompts to see available template IDs. + Returns: - GetPromptResponse containing the complete prompt template. + GetPromptResponse containing the complete prompt template with its + structure, placeholders, and usage instructions. + + Use this for: + - Examining prompt template structure + - Understanding how to use specific templates + - Copying or modifying existing prompts + - Learning prompt engineering patterns """ - manager = await self._get_manager(ctx) + if ctx is None: + raise RuntimeError("No context provided") - if ctx: - await ctx.session.send_log_message( - level="info", - data=f"Retrieving prompt template '{prompt_id}' via websocket...", - logger="notification_stream", - related_request_id=ctx.request_id, - ) + logging.info("Get prompt request made via websocket") + manager = await get_socket_manager(ctx, "trustgraph") + + await ctx.session.send_log_message( + level="info", + data=f"Retrieving prompt template '{prompt_id}' via websocket...", + logger="notification_stream", + related_request_id=ctx.request_id, + ) + + # First get all config request_data = { "operation": "config" } - gen = manager.request("config", request_data, None, workspace=workspace) + gen = manager.request("config", request_data, None) async for response in gen: config = response.get("config", {}) @@ -1154,35 +1025,44 @@ class McpServer: async def get_system_prompt( self, - workspace: str | None = None, ctx: Context = None, ) -> GetSystemPromptResponse: """ Retrieve the current system prompt configuration. - - Args: - workspace: Optional workspace. If omitted, uses the caller's - default workspace. - + + The system prompt defines the default behavior, personality, and instructions + for language models across the TrustGraph system. + Returns: - GetSystemPromptResponse containing the system prompt text. + GetSystemPromptResponse containing the system prompt text and configuration. + + Use this for: + - Understanding default AI behavior settings + - Checking current system-wide prompt configuration + - Auditing AI personality and instruction settings + - Debugging unexpected AI responses """ - manager = await self._get_manager(ctx) + if ctx is None: + raise RuntimeError("No context provided") - if ctx: - await ctx.session.send_log_message( - level="info", - data="Retrieving system prompt via websocket...", - logger="notification_stream", - related_request_id=ctx.request_id, - ) + logging.info("Get system prompt request made via websocket") + manager = await get_socket_manager(ctx, "trustgraph") + + await ctx.session.send_log_message( + level="info", + data=f"Retrieving system prompt via websocket...", + logger="notification_stream", + related_request_id=ctx.request_id, + ) + + # First get all config request_data = { "operation": "config" } - gen = manager.request("config", request_data, None, workspace=workspace) + gen = manager.request("config", request_data, None) async for response in gen: config = response.get("config", {}) @@ -1193,39 +1073,51 @@ class McpServer: async def get_token_costs( self, - workspace: str | None = None, ctx: Context = None, ) -> ConfigTokenCostsResponse: """ Retrieve token pricing information for all configured AI models. - - Args: - workspace: Optional workspace. If omitted, uses the caller's - default workspace. - + + This tool provides cost information for input and output tokens across + different language models, helping with budget planning and cost optimization. + Returns: - ConfigTokenCostsResponse containing pricing data for each model. + ConfigTokenCostsResponse containing pricing data for each model including: + - Model name/identifier + - Input token cost (per token) + - Output token cost (per token) + + Use this for: + - Estimating costs for different models + - Choosing cost-effective models for tasks + - Budget planning and cost analysis + - Monitoring and optimizing AI spending """ - manager = await self._get_manager(ctx) + if ctx is None: + raise RuntimeError("No context provided") - if ctx: - await ctx.session.send_log_message( - level="info", - data="Retrieving token costs via websocket...", - logger="notification_stream", - related_request_id=ctx.request_id, - ) + logging.info("Get token costs request made via websocket") + + manager = await get_socket_manager(ctx, "trustgraph") + + await ctx.session.send_log_message( + level="info", + data=f"Retrieving token costs via websocket...", + logger="notification_stream", + related_request_id=ctx.request_id, + ) request_data = { "operation": "getvalues", "type": "token-costs" } - gen = manager.request("config", request_data, None, workspace=workspace) + gen = manager.request("config", request_data, None) async for response in gen: values = response.get("values", []) + # Transform to match TypeScript API format costs = [] for item in values: try: @@ -1238,89 +1130,106 @@ class McpServer: except (json.JSONDecodeError, AttributeError): continue break - + return ConfigTokenCostsResponse(costs=costs) async def get_knowledge_cores( self, - workspace: str | None = None, ctx: Context = None, ) -> KnowledgeCoresResponse: """ List all available knowledge graph cores in the current workspace. - Args: - workspace: Optional workspace. If omitted, uses the caller's - default workspace. + Knowledge cores are packaged collections of structured knowledge that can + be loaded into the system for querying and reasoning. They contain entities, + relationships, and facts organized as knowledge graphs. Returns: KnowledgeCoresResponse containing a list of available knowledge core IDs. + + Use this for: + - Discovering available knowledge collections + - Understanding what knowledge domains are accessible + - Planning which cores to load for specific tasks + - Managing knowledge resources """ - manager = await self._get_manager(ctx) + if ctx is None: + raise RuntimeError("No context provided") - if ctx: - await ctx.session.send_log_message( - level="info", - data="Retrieving knowledge graph cores via websocket...", - logger="notification_stream", - related_request_id=ctx.request_id, - ) + logging.info("Get knowledge cores request made via websocket") + + manager = await get_socket_manager(ctx) + + await ctx.session.send_log_message( + level="info", + data=f"Retrieving knowledge graph cores via websocket...", + logger="notification_stream", + related_request_id=ctx.request_id, + ) request_data = { "operation": "list-kg-cores", } - gen = manager.request( - "knowledge", request_data, None, workspace=workspace, - ) + gen = manager.request("knowledge", request_data, None) async for response in gen: ids = response.get("ids", []) break - + return KnowledgeCoresResponse(ids=ids) async def delete_kg_core( self, core_id: str, - workspace: str | None = None, ctx: Context = None, ) -> DeleteKgCoreResponse: """ Permanently delete a knowledge graph core. + This operation removes a knowledge core from storage. Use with caution + as this action cannot be undone. + Args: core_id: Unique identifier of the knowledge core to delete. - workspace: Optional workspace. If omitted, uses the caller's - default workspace. Returns: DeleteKgCoreResponse confirming the deletion. + + Use this for: + - Cleaning up obsolete knowledge cores + - Removing test or experimental data + - Managing storage space + - Maintaining organized knowledge collections + + Warning: This permanently deletes the knowledge core and all its data. """ - manager = await self._get_manager(ctx) + if ctx is None: + raise RuntimeError("No context provided") - if ctx: - await ctx.session.send_log_message( - level="info", - data=f"Deleting knowledge graph core '{core_id}' via websocket...", - logger="notification_stream", - related_request_id=ctx.request_id, - ) + logging.info("Delete KG core request made via websocket") + + manager = await get_socket_manager(ctx) + + await ctx.session.send_log_message( + level="info", + data=f"Deleting knowledge graph core '{core_id}' via websocket...", + logger="notification_stream", + related_request_id=ctx.request_id, + ) request_data = { "operation": "delete-kg-core", "id": core_id, } - gen = manager.request( - "knowledge", request_data, None, workspace=workspace, - ) + gen = manager.request("knowledge", request_data, None) async for response in gen: break - + return DeleteKgCoreResponse() async def load_kg_core( @@ -1328,34 +1237,46 @@ class McpServer: core_id: str, flow: str, collection: str | None = None, - workspace: str | None = None, ctx: Context = None, ) -> LoadKgCoreResponse: """ Load a knowledge graph core into the active system for querying. + This operation makes a knowledge core available for GraphRAG queries, + triple searches, and other knowledge-based operations. + Args: core_id: Unique identifier of the knowledge core to load. - flow: Processing flow to use for loading the core. - collection: Target collection name (default: "default"). - workspace: Optional workspace. If omitted, uses the caller's - default workspace. + flow: Processing flow to use for loading the core. Different flows + may apply different processing, indexing, or optimization steps. + collection: Target collection name (default: "default"). The loaded + knowledge will be available under this collection name. Returns: LoadKgCoreResponse confirming the core has been loaded. + + Use this for: + - Making knowledge cores available for queries + - Switching between different knowledge domains + - Loading domain-specific knowledge for tasks + - Preparing knowledge for GraphRAG operations """ if collection is None: collection = "default" - manager = await self._get_manager(ctx) + if ctx is None: + raise RuntimeError("No context provided") - if ctx: - await ctx.session.send_log_message( - level="info", - data=f"Loading knowledge graph core '{core_id}' via websocket...", - logger="notification_stream", - related_request_id=ctx.request_id, - ) + logging.info("Load KG core request made via websocket") + + manager = await get_socket_manager(ctx) + + await ctx.session.send_log_message( + level="info", + data=f"Loading knowledge graph core '{core_id}' via websocket...", + logger="notification_stream", + related_request_id=ctx.request_id, + ) request_data = { "operation": "load-kg-core", @@ -1364,241 +1285,292 @@ class McpServer: "collection": collection } - gen = manager.request( - "knowledge", request_data, None, workspace=workspace, - ) + gen = manager.request("knowledge", request_data, None) async for response in gen: break - + return LoadKgCoreResponse() async def get_kg_core( self, core_id: str, - workspace: str | None = None, ctx: Context = None, ) -> GetKgCoreResponse: """ Download and retrieve the complete content of a knowledge graph core. + This tool streams the entire content of a knowledge core, returning all + entities, relationships, and metadata. Due to potentially large data sizes, + the content is streamed in chunks. + Args: core_id: Unique identifier of the knowledge core to retrieve. - workspace: Optional workspace. If omitted, uses the caller's - default workspace. Returns: GetKgCoreResponse containing all chunks of the knowledge core data. + Each chunk contains part of the knowledge graph structure. + + Use this for: + - Examining knowledge core content and structure + - Debugging knowledge graph data + - Exporting knowledge for backup or analysis + - Understanding the scope and quality of knowledge + + Note: Large knowledge cores may take significant time to download. + Progress updates are provided through log messages during streaming. """ - manager = await self._get_manager(ctx) + if ctx is None: + raise RuntimeError("No context provided") - if ctx: - await ctx.session.send_log_message( - level="info", - data=f"Retrieving knowledge graph core '{core_id}' via websocket...", - logger="notification_stream", - related_request_id=ctx.request_id, - ) + logging.info("Get KG core request made via websocket") + + manager = await get_socket_manager(ctx) + + await ctx.session.send_log_message( + level="info", + data=f"Retrieving knowledge graph core '{core_id}' via websocket...", + logger="notification_stream", + related_request_id=ctx.request_id, + ) request_data = { "operation": "get-kg-core", "id": core_id, } + # Collect all streaming responses chunks = [] - gen = manager.request( - "knowledge", request_data, None, workspace=workspace, - ) + gen = manager.request("knowledge", request_data, None) async for response in gen: + # Check for end of stream if response.get("eos", False): - if ctx: - await ctx.session.send_log_message( - level="info", - data="Completed streaming KG core data", - logger="notification_stream", - related_request_id=ctx.request_id, - ) + await ctx.session.send_log_message( + level="info", + data=f"Completed streaming KG core data", + logger="notification_stream", + related_request_id=ctx.request_id, + ) break else: chunks.append(response) - if ctx: - await ctx.session.send_log_message( - level="info", - data=f"Received KG core chunk ({len(chunks)} chunks so far)", - logger="notification_stream", - related_request_id=ctx.request_id, - ) - + await ctx.session.send_log_message( + level="info", + data=f"Received KG core chunk ({len(chunks)} chunks so far)", + logger="notification_stream", + related_request_id=ctx.request_id, + ) + return GetKgCoreResponse(chunks=chunks) async def get_flows( self, - workspace: str | None = None, ctx: Context = None, ) -> FlowsResponse: """ List all available processing flows in the system. - - Args: - workspace: Optional workspace. If omitted, uses the caller's - default workspace. - + + Flows define processing pipelines for different types of operations + (e.g., document processing, knowledge extraction, query handling). + Each flow encapsulates a specific workflow with configured steps. + Returns: FlowsResponse containing a list of available flow identifiers. + + Use this for: + - Discovering available processing workflows + - Understanding what processing options are available + - Choosing appropriate flows for specific tasks + - Planning workflow-based operations """ - manager = await self._get_manager(ctx) + if ctx is None: + raise RuntimeError("No context provided") - if ctx: - await ctx.session.send_log_message( - level="info", - data="Retrieving available flows via websocket...", - logger="notification_stream", - related_request_id=ctx.request_id, - ) + logging.info("Get flows request made via websocket") + + manager = await get_socket_manager(ctx, "trustgraph") + + await ctx.session.send_log_message( + level="info", + data=f"Retrieving available flows via websocket...", + logger="notification_stream", + related_request_id=ctx.request_id, + ) request_data = { "operation": "list-flows" } - gen = manager.request( - "flow", request_data, None, workspace=workspace, - ) + gen = manager.request("flow", request_data, None) async for response in gen: flow_ids = response.get("flow-ids", []) break - + return FlowsResponse(flow_ids=flow_ids) async def get_flow( self, flow_id: str, - workspace: str | None = None, ctx: Context = None, ) -> FlowResponse: """ Retrieve the complete definition of a specific processing flow. - + + This tool returns the detailed configuration, steps, and parameters + of a processing flow, showing how it processes data and what operations it performs. + Args: flow_id: Unique identifier of the flow to retrieve. - workspace: Optional workspace. If omitted, uses the caller's - default workspace. - + Returns: - FlowResponse containing the complete flow definition. + FlowResponse containing the complete flow definition including: + - Flow configuration and parameters + - Processing steps and their order + - Input/output specifications + - Dependencies and requirements + + Use this for: + - Understanding how specific flows work + - Debugging flow processing issues + - Learning flow configuration patterns + - Customizing or duplicating flows """ - manager = await self._get_manager(ctx) + if ctx is None: + raise RuntimeError("No context provided") - if ctx: - await ctx.session.send_log_message( - level="info", - data=f"Retrieving flow definition for '{flow_id}' via websocket...", - logger="notification_stream", - related_request_id=ctx.request_id, - ) + logging.info("Get flow request made via websocket") + + manager = await get_socket_manager(ctx, "trustgraph") + + await ctx.session.send_log_message( + level="info", + data=f"Retrieving flow definition for '{flow_id}' via websocket...", + logger="notification_stream", + related_request_id=ctx.request_id, + ) request_data = { "operation": "get-flow", "flow-id": flow_id, } - gen = manager.request( - "flow", request_data, None, workspace=workspace, - ) + gen = manager.request("flow", request_data, None) async for response in gen: flow_data = response.get("flow", "{}") + # Parse JSON flow definition as done in TypeScript flow = json.loads(flow_data) if isinstance(flow_data, str) else flow_data break - + return FlowResponse(flow=flow) async def get_flow_classes( self, - workspace: str | None = None, ctx: Context = None, ) -> FlowClassesResponse: """ List all available flow class templates. - - Args: - workspace: Optional workspace. If omitted, uses the caller's - default workspace. - + + Flow classes are templates that define types of processing workflows. + They serve as blueprints for creating specific flow instances with + customized parameters. + Returns: FlowClassesResponse containing a list of available flow class names. + + Use this for: + - Discovering available flow templates + - Understanding what types of processing are supported + - Planning new flow creation + - Exploring system capabilities """ - manager = await self._get_manager(ctx) + if ctx is None: + raise RuntimeError("No context provided") - if ctx: - await ctx.session.send_log_message( - level="info", - data="Retrieving flow classes via websocket...", - logger="notification_stream", - related_request_id=ctx.request_id, - ) + logging.info("Get flow classes request made via websocket") + + manager = await get_socket_manager(ctx, "trustgraph") + + await ctx.session.send_log_message( + level="info", + data=f"Retrieving flow classes via websocket...", + logger="notification_stream", + related_request_id=ctx.request_id, + ) request_data = { "operation": "list-classes" } - gen = manager.request( - "flow", request_data, None, workspace=workspace, - ) + gen = manager.request("flow", request_data, None) async for response in gen: class_names = response.get("class-names", []) break - + return FlowClassesResponse(class_names=class_names) async def get_flow_class( self, class_name: str, - workspace: str | None = None, ctx: Context = None, ) -> FlowClassResponse: """ Retrieve the definition of a specific flow class template. - + + Flow classes define the structure, parameters, and capabilities of + flow types. This tool returns the class specification including + configurable parameters and processing logic. + Args: class_name: Name of the flow class to retrieve. - workspace: Optional workspace. If omitted, uses the caller's - default workspace. - + Returns: - FlowClassResponse containing the flow class definition. + FlowClassResponse containing the flow class definition with: + - Class parameters and configuration options + - Processing capabilities and requirements + - Usage instructions and examples + + Use this for: + - Understanding flow class capabilities + - Learning how to configure new flows + - Troubleshooting flow creation issues + - Exploring advanced flow features """ - manager = await self._get_manager(ctx) + if ctx is None: + raise RuntimeError("No context provided") - if ctx: - await ctx.session.send_log_message( - level="info", - data=f"Retrieving flow class definition for '{class_name}' via websocket...", - logger="notification_stream", - related_request_id=ctx.request_id, - ) + logging.info("Get flow class request made via websocket") + + manager = await get_socket_manager(ctx, "trustgraph") + + await ctx.session.send_log_message( + level="info", + data=f"Retrieving flow class definition for '{class_name}' via websocket...", + logger="notification_stream", + related_request_id=ctx.request_id, + ) request_data = { "operation": "get-class", "class-name": class_name } - gen = manager.request( - "flow", request_data, None, workspace=workspace, - ) + gen = manager.request("flow", request_data, None) async for response in gen: class_def_data = response.get("class-definition", "{}") + # Parse JSON class definition as done in TypeScript class_definition = json.loads(class_def_data) if isinstance(class_def_data, str) else class_def_data break - + return FlowClassResponse(class_definition=class_definition) async def start_flow( @@ -1606,32 +1578,43 @@ class McpServer: flow_id: str, class_name: str, description: str, - workspace: str | None = None, ctx: Context = None, ) -> StartFlowResponse: """ Create and start a new processing flow instance. - + + This tool creates a new flow based on a flow class template and starts + it running. The flow will begin processing according to its configuration. + Args: flow_id: Unique identifier for the new flow instance. class_name: Flow class template to use for creating the flow. + Use get_flow_classes to see available classes. description: Human-readable description of the flow's purpose. - workspace: Optional workspace. If omitted, uses the caller's - default workspace. - + Returns: StartFlowResponse confirming the flow has been started. + + Use this for: + - Creating new processing workflows + - Starting automated processing tasks + - Launching background operations + - Initiating data processing pipelines """ - manager = await self._get_manager(ctx) + if ctx is None: + raise RuntimeError("No context provided") - if ctx: - await ctx.session.send_log_message( - level="info", - data=f"Starting flow '{flow_id}' with class '{class_name}' via websocket...", - logger="notification_stream", - related_request_id=ctx.request_id, - ) + logging.info("Start flow request made via websocket") + + manager = await get_socket_manager(ctx, "trustgraph") + + await ctx.session.send_log_message( + level="info", + data=f"Starting flow '{flow_id}' with class '{class_name}' via websocket...", + logger="notification_stream", + related_request_id=ctx.request_id, + ) request_data = { "operation": "start-flow", @@ -1640,135 +1623,162 @@ class McpServer: "description": description } - gen = manager.request( - "flow", request_data, None, workspace=workspace, - ) + gen = manager.request("flow", request_data, None) async for response in gen: break - + return StartFlowResponse() async def stop_flow( self, flow_id: str, - workspace: str | None = None, ctx: Context = None, ) -> StopFlowResponse: """ Stop a running flow instance. - + + This tool gracefully stops a running flow, allowing it to complete + current operations before shutting down. + Args: flow_id: Unique identifier of the flow instance to stop. - workspace: Optional workspace. If omitted, uses the caller's - default workspace. - + Returns: StopFlowResponse confirming the flow has been stopped. + + Use this for: + - Stopping unwanted or completed flows + - Managing system resources + - Interrupting long-running processes + - Maintaining flow lifecycle """ - manager = await self._get_manager(ctx) + if ctx is None: + raise RuntimeError("No context provided") - if ctx: - await ctx.session.send_log_message( - level="info", - data=f"Stopping flow '{flow_id}' via websocket...", - logger="notification_stream", - related_request_id=ctx.request_id, - ) + logging.info("Stop flow request made via websocket") + + manager = await get_socket_manager(ctx, "trustgraph") + + await ctx.session.send_log_message( + level="info", + data=f"Stopping flow '{flow_id}' via websocket...", + logger="notification_stream", + related_request_id=ctx.request_id, + ) request_data = { "operation": "stop-flow", "flow-id": flow_id } - gen = manager.request( - "flow", request_data, None, workspace=workspace, - ) + gen = manager.request("flow", request_data, None) async for response in gen: break - + return StopFlowResponse() async def get_documents( self, - workspace: str | None = None, ctx: Context = None, ) -> DocumentsResponse: """ List all documents stored in the TrustGraph document library. - Args: - workspace: Optional workspace. If omitted, uses the caller's - default workspace. + This tool returns metadata for all documents that have been uploaded + to the system, including their processing status and properties. Returns: - DocumentsResponse containing metadata for each document. + DocumentsResponse containing metadata for each document including: + - Document ID and title + - Upload timestamp + - MIME type and size information + - Tags and custom metadata + - Processing status + + Use this for: + - Browsing available documents + - Managing document collections + - Finding documents by metadata + - Auditing document storage """ - manager = await self._get_manager(ctx) + if ctx is None: + raise RuntimeError("No context provided") - if ctx: - await ctx.session.send_log_message( - level="info", - data="Retrieving documents list via websocket...", - logger="notification_stream", - related_request_id=ctx.request_id, - ) + logging.info("Get documents request made via websocket") + + manager = await get_socket_manager(ctx) + + await ctx.session.send_log_message( + level="info", + data=f"Retrieving documents list via websocket...", + logger="notification_stream", + related_request_id=ctx.request_id, + ) request_data = { "operation": "list-documents", } - gen = manager.request( - "librarian", request_data, None, workspace=workspace, - ) + gen = manager.request("librarian", request_data, None) async for response in gen: document_metadatas = response.get("document-metadatas", []) break - + return DocumentsResponse(document_metadatas=document_metadatas) async def get_processing( self, - workspace: str | None = None, ctx: Context = None, ) -> ProcessingResponse: """ List all documents currently in the processing queue. - Args: - workspace: Optional workspace. If omitted, uses the caller's - default workspace. + This tool shows documents that are being processed or waiting to be + processed, along with their processing status and configuration. Returns: - ProcessingResponse containing processing metadata. + ProcessingResponse containing processing metadata including: + - Processing job ID and document ID + - Processing flow and status + - Target collection + - Timestamp and progress information + + Use this for: + - Monitoring document processing progress + - Debugging processing issues + - Managing processing queues + - Understanding system workload """ - manager = await self._get_manager(ctx) + if ctx is None: + raise RuntimeError("No context provided") - if ctx: - await ctx.session.send_log_message( - level="info", - data="Retrieving processing list via websocket...", - logger="notification_stream", - related_request_id=ctx.request_id, - ) + logging.info("Get processing request made via websocket") + + manager = await get_socket_manager(ctx) + + await ctx.session.send_log_message( + level="info", + data=f"Retrieving processing list via websocket...", + logger="notification_stream", + related_request_id=ctx.request_id, + ) request_data = { "operation": "list-processing", } - gen = manager.request( - "librarian", request_data, None, workspace=workspace, - ) + gen = manager.request("librarian", request_data, None) async for response in gen: processing_metadatas = response.get("processing-metadatas", []) break - + return ProcessingResponse(processing_metadatas=processing_metadatas) async def load_document( @@ -1780,39 +1790,50 @@ class McpServer: title: str = "", comments: str = "", tags: List[str] | None = None, - workspace: str | None = None, ctx: Context = None, ) -> LoadDocumentResponse: """ Upload a document to the TrustGraph document library. + This tool stores documents with rich metadata for later processing, + search, and knowledge extraction. Documents can be text files, PDFs, + or other supported formats. + Args: document: The document content as a string. For binary files, this should be base64-encoded content. document_id: Optional unique identifier. If not provided, one will be generated. metadata: Optional list of custom metadata key-value pairs. - mime_type: MIME type of the document. + mime_type: MIME type of the document (e.g., 'text/plain', 'application/pdf'). title: Human-readable title for the document. comments: Optional description or notes about the document. - tags: List of tags for categorizing the document. - workspace: Optional workspace. If omitted, uses the caller's - default workspace. + tags: List of tags for categorizing and finding the document. Returns: LoadDocumentResponse confirming the document has been stored. + + Use this for: + - Adding new documents to the knowledge base + - Storing reference materials and data sources + - Building document collections for processing + - Importing external content for analysis """ if tags is None: tags = [] - manager = await self._get_manager(ctx) + if ctx is None: + raise RuntimeError("No context provided") - if ctx: - await ctx.session.send_log_message( - level="info", - data="Loading document to library via websocket...", - logger="notification_stream", - related_request_id=ctx.request_id, - ) + logging.info("Load document request made via websocket") + + manager = await get_socket_manager(ctx) + + await ctx.session.send_log_message( + level="info", + data=f"Loading document to library via websocket...", + logger="notification_stream", + related_request_id=ctx.request_id, + ) import time timestamp = int(time.time()) @@ -1831,55 +1852,63 @@ class McpServer: "content": document } - gen = manager.request( - "librarian", request_data, None, workspace=workspace, - ) + gen = manager.request("librarian", request_data, None) async for response in gen: break - + return LoadDocumentResponse() async def remove_document( self, document_id: str, - workspace: str | None = None, ctx: Context = None, ) -> RemoveDocumentResponse: """ Permanently remove a document from the library. + This operation deletes a document and all its associated metadata. + Use with caution as this action cannot be undone. + Args: document_id: Unique identifier of the document to remove. - workspace: Optional workspace. If omitted, uses the caller's - default workspace. Returns: RemoveDocumentResponse confirming the document has been deleted. + + Use this for: + - Cleaning up obsolete or incorrect documents + - Managing storage space + - Removing sensitive or inappropriate content + - Maintaining organized document collections + + Warning: This permanently deletes the document and all its metadata. """ - manager = await self._get_manager(ctx) + if ctx is None: + raise RuntimeError("No context provided") - if ctx: - await ctx.session.send_log_message( - level="info", - data=f"Removing document '{document_id}' from library via websocket...", - logger="notification_stream", - related_request_id=ctx.request_id, - ) + logging.info("Remove document request made via websocket") + + manager = await get_socket_manager(ctx) + + await ctx.session.send_log_message( + level="info", + data=f"Removing document '{document_id}' from library via websocket...", + logger="notification_stream", + related_request_id=ctx.request_id, + ) request_data = { "operation": "remove-document", "document-id": document_id, } - gen = manager.request( - "librarian", request_data, None, workspace=workspace, - ) + gen = manager.request("librarian", request_data, None) async for response in gen: break - + return RemoveDocumentResponse() async def add_processing( @@ -1889,37 +1918,53 @@ class McpServer: flow: str, collection: str | None = None, tags: List[str] | None = None, - workspace: str | None = None, ctx: Context = None, ) -> AddProcessingResponse: """ Queue a document for processing through a specific workflow. + This tool adds a document to the processing queue where it will be + processed by the specified flow to extract knowledge, create embeddings, + or perform other analysis operations. + Args: processing_id: Unique identifier for this processing job. document_id: ID of the document to process (must exist in library). - flow: Processing flow to use. + flow: Processing flow to use. Different flows perform different + types of analysis (e.g., knowledge extraction, summarization). collection: Target collection for processed knowledge (default: "default"). + Results will be stored under this collection name. tags: Optional tags for categorizing this processing job. - workspace: Optional workspace. If omitted, uses the caller's - default workspace. Returns: AddProcessingResponse confirming the document has been queued. + + Use this for: + - Processing uploaded documents into knowledge + - Extracting entities and relationships from text + - Creating searchable embeddings + - Converting documents into structured knowledge + + Note: Processing may take time depending on document size and flow complexity. + Use get_processing to monitor progress. """ if collection is None: collection = "default" if tags is None: tags = [] - manager = await self._get_manager(ctx) + if ctx is None: + raise RuntimeError("No context provided") - if ctx: - await ctx.session.send_log_message( - level="info", - data=f"Adding document '{document_id}' to processing queue via websocket...", - logger="notification_stream", - related_request_id=ctx.request_id, - ) + logging.info("Add processing request made via websocket") + + manager = await get_socket_manager(ctx) + + await ctx.session.send_log_message( + level="info", + data=f"Adding document '{document_id}' to processing queue via websocket...", + logger="notification_stream", + related_request_id=ctx.request_id, + ) import time timestamp = int(time.time()) @@ -1936,61 +1981,38 @@ class McpServer: } } - gen = manager.request( - "librarian", request_data, None, workspace=workspace, - ) + gen = manager.request("librarian", request_data, None) async for response in gen: break - + return AddProcessingResponse() - def main(): parser = argparse.ArgumentParser(description='TrustGraph MCP Server') - parser.add_argument( - '--host', default='0.0.0.0', - help='Host to bind to (default: 0.0.0.0)', - ) - parser.add_argument( - '--port', type=int, default=8000, - help='Port to bind to (default: 8000)', - ) - parser.add_argument( - '--websocket-url', - default='ws://api-gateway:8088/api/v1/socket', - help='WebSocket URL for the TrustGraph gateway', - ) - parser.add_argument( - '--auth-issuer', - default=os.environ.get("AUTH_ISSUER", ""), - help='OAuth issuer URL for MCP auth metadata discovery', - ) - parser.add_argument( - '--auth-resource-url', - default=os.environ.get("AUTH_RESOURCE_URL", ""), - help='Resource server URL for OAuth protected resource metadata', - ) + parser.add_argument('--host', default='0.0.0.0', help='Host to bind to (default: 0.0.0.0)') + parser.add_argument('--port', type=int, default=8000, help='Port to bind to (default: 8000)') + parser.add_argument('--websocket-url', default='ws://api-gateway:8088/api/v1/socket', help='WebSocket URL to connect to (default: ws://api-gateway:8088/api/v1/socket)') + # Add logging arguments add_logging_args(parser) args = parser.parse_args() + # Setup logging before creating server setup_logging(vars(args)) - server = McpServer( - host=args.host, - port=args.port, - websocket_url=args.websocket_url, - auth_issuer=args.auth_issuer, - auth_resource_url=args.auth_resource_url, - ) + # Read gateway auth token from environment + gateway_token = os.environ.get("GATEWAY_SECRET", "") + + # Create and run the MCP server + server = McpServer(host=args.host, port=args.port, websocket_url=args.websocket_url, gateway_token=gateway_token) server.run() - def run(): + """Legacy function for backward compatibility""" main() - if __name__ == "__main__": main() + diff --git a/trustgraph-mcp/trustgraph/mcp_server/tg_socket.py b/trustgraph-mcp/trustgraph/mcp_server/tg_socket.py index 9fbf7459..bff8ae75 100644 --- a/trustgraph-mcp/trustgraph/mcp_server/tg_socket.py +++ b/trustgraph-mcp/trustgraph/mcp_server/tg_socket.py @@ -1,110 +1,49 @@ +from dataclasses import dataclass from websockets.asyncio.client import connect +from urllib.parse import urlencode, urlparse, urlunparse, parse_qs import asyncio import logging import json import uuid -import hashlib - -logger = logging.getLogger(__name__) - - -def _token_key(token): - """Derive a dict key from a token without storing the raw secret.""" - return hashlib.sha256(token.encode()).hexdigest()[:16] - +import time class WebSocketManager: - """Manages an authenticated WebSocket connection to the TrustGraph - gateway on behalf of a single caller. - Each caller token gets its own WebSocketManager so that gateway-side - identity, workspace, and capability scoping are preserved end-to-end. - """ - - def __init__(self, url, token): + def __init__(self, url, token=None): self.url = url - # ── Security boundary: token storage ── - # This is the MCP caller's Bearer token, forwarded verbatim to - # the gateway. It MUST NOT be logged, persisted, or shared - # across callers. It is held only for the lifetime of this - # connection so that re-auth (e.g. after a reconnect) is - # possible. self.token = token self.socket = None - self.identity = None - self.last_used = None + + # FIXME: authentication is broken. The /api/v1/socket endpoint uses + # in-band auth (first-frame protocol via the Mux dispatcher), not + # query-parameter tokens. This query-string token is silently ignored. + # Fix: after connect(), send an auth frame with the bearer token as + # the first message, matching the gateway's in-band auth protocol. + def _build_url(self): + if not self.token: + return self.url + parsed = urlparse(self.url) + params = parse_qs(parsed.query) + params["token"] = [self.token] + new_query = urlencode(params, doseq=True) + return urlunparse(parsed._replace(query=new_query)) async def start(self): - """Connect and authenticate via the gateway's in-band auth - protocol. Raises on auth failure.""" - - # ── Security boundary: MCP server → gateway ── - # The WebSocket connects to the gateway and authenticates using - # the caller's Bearer token via the in-band first-frame auth - # protocol. The token belongs to the MCP client — we forward - # it as-is and never interpret its contents. - self.socket = await connect(self.url) + self.socket = await connect(self._build_url()) self.pending_requests = {} self.running = True - - await self._authenticate() - self.reader_task = asyncio.create_task(self.reader()) - async def _authenticate(self): - """Send in-band auth frame and wait for auth-ok / auth-failed. - - The gateway expects ``{"type": "auth", "token": "..."}`` as the - first frame on a new WebSocket. Any service frame sent before - auth-ok is rejected. - """ - await self.socket.send(json.dumps({ - "type": "auth", - "token": self.token, - })) - - response_text = await asyncio.wait_for(self.socket.recv(), 10) - response = json.loads(response_text) - - if response.get("type") == "auth-ok": - logger.info( - "WebSocket authenticated, default workspace: %s", - response.get("workspace"), - ) - return - - # Auth failed — close immediately, do not leave an - # unauthenticated socket open. - await self.socket.close() - self.socket = None - - if response.get("type") == "auth-failed": - raise RuntimeError( - "Gateway rejected the authentication token" - ) - - raise RuntimeError( - f"Unexpected auth response type: {response.get('type')}" - ) - - async def whoami(self): - """Verify the token by calling the gateway's whoami endpoint. - Returns the identity dict and caches it on ``self.identity``. - """ - gen = self.request("iam", {"operation": "whoami"}, flow_id=None) - async for response in gen: - self.identity = response - return response - async def stop(self): self.running = False - if hasattr(self, "reader_task"): - await self.reader_task + await self.reader_task async def reader(self): - """Background task: read WebSocket frames and route them to the - correct pending-request queue by ``id``.""" + """ + Background task to read websocket responses and route to correct + request + """ while self.running: try: @@ -120,21 +59,23 @@ class WebSocketManager: request_id = response.get("id") if request_id and request_id in self.pending_requests: + # Put the response in the queue queue = self.pending_requests[request_id] await queue.put(response) else: - logger.warning( - "Response for unknown request ID: %s", request_id + logging.warning( + f"Response for unknown request ID: {request_id}" ) except Exception as e: - logger.error("Error in websocket reader: %s", e) + logging.error(f"Error in websocket reader: {e}") + # Put error in all pending queues for queue in self.pending_requests.values(): try: await queue.put({"error": str(e)}) - except Exception: + except: pass self.pending_requests.clear() @@ -145,29 +86,25 @@ class WebSocketManager: async def request( self, service, request_data, flow_id="default", - workspace=None, ): - """Send a request via WebSocket and yield responses. - - Args: - service: Gateway service name (e.g. "graph-rag", "config"). - request_data: Inner request payload. - flow_id: Optional flow identifier. ``None`` omits the field - (workspace-level services don't use flows). - workspace: Optional workspace override. When ``None`` the - gateway uses the caller's default workspace. + """ + Send a request via websocket and handle single or streaming responses """ - import time - self.last_used = time.monotonic() - + # Generate unique request ID request_id = f"{uuid.uuid4()}" + # Determine if this service streams responses + streaming_services = {"agent"} + is_streaming = service in streaming_services + + # Create a queue for all responses (streaming and single) response_queue = asyncio.Queue() self.pending_requests[request_id] = response_queue try: + # Build request message message = { "id": request_id, "service": service, @@ -177,16 +114,7 @@ class WebSocketManager: if flow_id is not None: message["flow"] = flow_id - # ── Security boundary: workspace scoping ── - # When the caller supplies a workspace, we set it on the - # message envelope. The gateway's enforce_workspace() - # validates that the authenticated identity is permitted - # to access the target workspace — we MUST NOT skip or - # override that check. When workspace is None, the - # gateway default-fills from the identity's bound workspace. - if workspace is not None: - message["workspace"] = workspace - + # Send request await self.socket.send(json.dumps(message)) while self.running: @@ -199,17 +127,19 @@ class WebSocketManager: continue if "error" in response: - if isinstance(response["error"], dict): - raise RuntimeError( - response["error"].get("message", str(response["error"])) - ) + if "message" in response["error"]: + raise RuntimeError(response["error"]["text"]) else: raise RuntimeError(str(response["error"])) yield response["response"] - if response.get("complete"): - break + if "complete" in response: + if response["complete"]: + break - finally: + except Exception as e: + # Clean up on error self.pending_requests.pop(request_id, None) + raise e + diff --git a/trustgraph-ocr/pyproject.toml b/trustgraph-ocr/pyproject.toml index 4b515032..fa9f7cd4 100644 --- a/trustgraph-ocr/pyproject.toml +++ b/trustgraph-ocr/pyproject.toml @@ -10,7 +10,7 @@ description = "TrustGraph provides a means to run a pipeline of flexible AI proc readme = "README.md" requires-python = ">=3.8" dependencies = [ - "trustgraph-base>=2.6,<2.7", + "trustgraph-base>=2.5,<2.6", "pulsar-client", "prometheus-client", "boto3", diff --git a/trustgraph-unstructured/pyproject.toml b/trustgraph-unstructured/pyproject.toml index dc987fd9..f17b9812 100644 --- a/trustgraph-unstructured/pyproject.toml +++ b/trustgraph-unstructured/pyproject.toml @@ -10,7 +10,7 @@ description = "TrustGraph provides a means to run a pipeline of flexible AI proc readme = "README.md" requires-python = ">=3.8" dependencies = [ - "trustgraph-base>=2.6,<2.7", + "trustgraph-base>=2.5,<2.6", "pulsar-client", "prometheus-client", "python-magic", diff --git a/trustgraph-vertexai/pyproject.toml b/trustgraph-vertexai/pyproject.toml index 50acce0d..347594fe 100644 --- a/trustgraph-vertexai/pyproject.toml +++ b/trustgraph-vertexai/pyproject.toml @@ -10,7 +10,7 @@ description = "TrustGraph provides a means to run a pipeline of flexible AI proc readme = "README.md" requires-python = ">=3.8" dependencies = [ - "trustgraph-base>=2.6,<2.7", + "trustgraph-base>=2.5,<2.6", "pulsar-client", "google-genai", "google-api-core", diff --git a/trustgraph/pyproject.toml b/trustgraph/pyproject.toml index 5746f7eb..bcc72a41 100644 --- a/trustgraph/pyproject.toml +++ b/trustgraph/pyproject.toml @@ -10,13 +10,13 @@ description = "TrustGraph provides a means to run a pipeline of flexible AI proc readme = "README.md" requires-python = ">=3.8" dependencies = [ - "trustgraph-base>=2.6,<2.7", - "trustgraph-bedrock>=2.6,<2.7", - "trustgraph-cli>=2.6,<2.7", - "trustgraph-embeddings-hf>=2.6,<2.7", - "trustgraph-flow>=2.6,<2.7", - "trustgraph-unstructured>=2.6,<2.7", - "trustgraph-vertexai>=2.6,<2.7", + "trustgraph-base>=2.5,<2.6", + "trustgraph-bedrock>=2.5,<2.6", + "trustgraph-cli>=2.5,<2.6", + "trustgraph-embeddings-hf>=2.5,<2.6", + "trustgraph-flow>=2.5,<2.6", + "trustgraph-unstructured>=2.5,<2.6", + "trustgraph-vertexai>=2.5,<2.6", ] classifiers = [ "Programming Language :: Python :: 3",