diff --git a/README.md b/README.md
index b66edc70..c366a3d9 100644
--- a/README.md
+++ b/README.md
@@ -11,11 +11,11 @@
-# The semantic deployment platform
+# The agent runtime platform
-TrustGraph is a comprehensive semantic infrastructure for agents built around context graphs — structured, queryable representations of your domain knowledge that ground every agent query in verified, explainable facts in private deployments with sovereign control. The platform is the full stack for agentic systems: context graphs, memory, retrieval, orchestration, and inference for deterministic agent workloads.
+TrustGraph is an agent runtime platform built around context graphs — structured, queryable representations of your domain knowledge that ground every agent query in verified, explainable facts in private deployments with sovereign control. The platform is the full stack for agentic systems: context graphs, memory, retrieval, orchestration, and inference for precision-critical agent workloads.
The platform:
- [x] Multi-model and multimodal database system
@@ -99,21 +99,23 @@ For a browser based configuration, try the [Configuration Terminal](https://conf
- [**Developer APIs and CLI**](https://docs.trustgraph.ai/reference)
- [**Deployment Guides**](https://docs.trustgraph.ai/deployment)
-## Context Graph UI
+## Workbench
-
+The **Workbench** provides tools for all major features of TrustGraph. The **Workbench** is on port `8888` by default.
-The UI provides tools for all major features of TrustGraph. The UI deploys on port `8888` by default.
-
-- **Agent Console** — Query your agents directly with streaming responses and live explainability event tracking, so you can watch reasoning unfold in real time
-- **GraphRAG View** — Interactive graph RAG queries with a visual explainability DAG and inline provenance display, making it easy to see exactly where answers came from
-- **Context Explorer** — An interactive 3D context graph explorer with dynamic graph loading, BFS neighborhood extraction, edge pulse animation, and multiple navigation views
-- **Document Ingestion** — A complete upload and submission workflow with page and chunk inspection and document structure browsing
-- **Ontology Workbench** — A full ontology editor with class and property trees, OWL/XML and Turtle import/export with round-trip fidelity, circular dependency detection, and safe-delete confirmation dialogs
-- **Schema Workbench** — Interactive schema management with list, create, edit, and delete operations including field and index management
-- **Flow Management** — Flow creation and detail views with configurable parameters, temperature controls, and grouped storage layout
-- **Workspace UX** — Workspace selection and management surfaced directly in the interface
-- **Prompt Editor** — A dedicated prompt editing workflow
+- **Vector Search**: Search the installed knowledge bases
+- **Agentic, GraphRAG and LLM Chat**: Chat interface for agents, GraphRAG queries, or direct to LLMs
+- **Relationships**: Analyze deep relationships in the installed knowledge bases
+- **Graph Visualizer**: 3D GraphViz of the installed knowledge bases
+- **Library**: Staging area for installing knowledge bases
+- **Flow Classes**: Workflow preset configurations
+- **Flows**: Create custom workflows and adjust LLM parameters during runtime
+- **Knowledge Cores**: Manage resuable knowledge bases
+- **Prompts**: Manage and adjust prompts during runtime
+- **Schemas**: Define custom schemas for structured data knowledge bases
+- **Ontologies**: Define custom ontologies for unstructured data knowledge bases
+- **Agent Tools**: Define tools with collections, knowledge cores, MCP connections, and tool groups
+- **MCP Tools**: Connect to MCP servers
## TypeScript Library for UIs
diff --git a/tests/unit/test_base/test_cassandra_config.py b/tests/unit/test_base/test_cassandra_config.py
index fe8a8379..a291434d 100644
--- a/tests/unit/test_base/test_cassandra_config.py
+++ b/tests/unit/test_base/test_cassandra_config.py
@@ -409,57 +409,4 @@ class TestEdgeCases:
assert hosts == ['mixed-host']
assert username is None # Stays None
- assert password == 'mixed-pass'
-
-
-class TestReplicationFactorParamPath:
-
- def test_explicit_kwarg(self):
- with patch.dict(os.environ, {}, clear=True):
- _, _, _, _, rf = resolve_cassandra_config(
- replication_factor=3,
- )
- assert rf == 3
-
- def test_kwarg_overrides_env(self):
- with patch.dict(os.environ, {'CASSANDRA_REPLICATION_FACTOR': '5'}, clear=True):
- _, _, _, _, rf = resolve_cassandra_config(
- replication_factor=3,
- )
- assert rf == 3
-
- def test_env_fallback_when_kwarg_none(self):
- with patch.dict(os.environ, {'CASSANDRA_REPLICATION_FACTOR': '5'}, clear=True):
- _, _, _, _, rf = resolve_cassandra_config(
- replication_factor=None,
- )
- assert rf == 5
-
- def test_default_when_no_kwarg_no_env(self):
- with patch.dict(os.environ, {}, clear=True):
- _, _, _, _, rf = resolve_cassandra_config()
- assert rf == 1
-
- def test_params_dict_path(self):
- with patch.dict(os.environ, {}, clear=True):
- params = {'cassandra_replication_factor': 3}
- _, _, _, _, rf = resolve_cassandra_config(
- replication_factor=params.get('cassandra_replication_factor'),
- )
- assert rf == 3
-
- def test_params_dict_overrides_env(self):
- with patch.dict(os.environ, {'CASSANDRA_REPLICATION_FACTOR': '5'}, clear=True):
- params = {'cassandra_replication_factor': 3}
- _, _, _, _, rf = resolve_cassandra_config(
- replication_factor=params.get('cassandra_replication_factor'),
- )
- assert rf == 3
-
- def test_params_dict_missing_falls_to_env(self):
- with patch.dict(os.environ, {'CASSANDRA_REPLICATION_FACTOR': '5'}, clear=True):
- params = {}
- _, _, _, _, rf = resolve_cassandra_config(
- replication_factor=params.get('cassandra_replication_factor'),
- )
- assert rf == 5
\ No newline at end of file
+ assert password == 'mixed-pass'
\ No newline at end of file
diff --git a/tests/unit/test_base/test_qdrant_config.py b/tests/unit/test_base/test_qdrant_config.py
deleted file mode 100644
index dbbe4214..00000000
--- a/tests/unit/test_base/test_qdrant_config.py
+++ /dev/null
@@ -1,136 +0,0 @@
-
-import os
-import pytest
-from unittest.mock import patch
-
-from trustgraph.base.qdrant_config import (
- get_qdrant_defaults,
- resolve_qdrant_config,
-)
-
-
-class TestGetQdrantDefaults:
-
- def test_defaults_with_no_env_vars(self):
- with patch.dict(os.environ, {}, clear=True):
- defaults = get_qdrant_defaults()
- assert defaults['url'] == 'http://localhost:6333'
- assert defaults['api_key'] is None
- assert defaults['replication_factor'] == 1
- assert defaults['shard_number'] == 1
-
- def test_defaults_from_env(self):
- env = {
- 'QDRANT_URL': 'http://qdrant:6333',
- 'QDRANT_API_KEY': 'secret',
- 'QDRANT_REPLICATION_FACTOR': '3',
- 'QDRANT_SHARD_NUMBER': '5',
- }
- with patch.dict(os.environ, env, clear=True):
- defaults = get_qdrant_defaults()
- assert defaults['url'] == 'http://qdrant:6333'
- assert defaults['api_key'] == 'secret'
- assert defaults['replication_factor'] == 3
- assert defaults['shard_number'] == 5
-
-
-class TestResolveQdrantConfig:
-
- def test_defaults(self):
- with patch.dict(os.environ, {}, clear=True):
- url, api_key, rf, sn = resolve_qdrant_config()
- assert url == 'http://localhost:6333'
- assert api_key is None
- assert rf == 1
- assert sn == 1
-
- def test_explicit_kwargs(self):
- with patch.dict(os.environ, {}, clear=True):
- url, api_key, rf, sn = resolve_qdrant_config(
- url='http://custom:6333',
- api_key='key',
- replication_factor=3,
- shard_number=5,
- )
- assert url == 'http://custom:6333'
- assert api_key == 'key'
- assert rf == 3
- assert sn == 5
-
- def test_kwargs_override_env(self):
- env = {
- 'QDRANT_URL': 'http://env:6333',
- 'QDRANT_REPLICATION_FACTOR': '10',
- 'QDRANT_SHARD_NUMBER': '10',
- }
- with patch.dict(os.environ, env, clear=True):
- url, _, rf, sn = resolve_qdrant_config(
- url='http://explicit:6333',
- replication_factor=3,
- shard_number=5,
- )
- assert url == 'http://explicit:6333'
- assert rf == 3
- assert sn == 5
-
- def test_env_fallback_when_kwargs_none(self):
- env = {
- 'QDRANT_URL': 'http://env:6333',
- 'QDRANT_REPLICATION_FACTOR': '3',
- 'QDRANT_SHARD_NUMBER': '5',
- }
- with patch.dict(os.environ, env, clear=True):
- url, _, rf, sn = resolve_qdrant_config()
- assert url == 'http://env:6333'
- assert rf == 3
- assert sn == 5
-
- def test_params_dict_path(self):
- with patch.dict(os.environ, {}, clear=True):
- params = {
- 'store_uri': 'http://params:6333',
- 'api_key': 'pkey',
- 'qdrant_replication_factor': 3,
- 'qdrant_shard_number': 5,
- }
- url, api_key, rf, sn = resolve_qdrant_config(
- url=params.get('store_uri'),
- api_key=params.get('api_key'),
- replication_factor=params.get('qdrant_replication_factor'),
- shard_number=params.get('qdrant_shard_number'),
- )
- assert url == 'http://params:6333'
- assert api_key == 'pkey'
- assert rf == 3
- assert sn == 5
-
- def test_params_dict_overrides_env(self):
- env = {
- 'QDRANT_REPLICATION_FACTOR': '10',
- 'QDRANT_SHARD_NUMBER': '10',
- }
- with patch.dict(os.environ, env, clear=True):
- params = {
- 'qdrant_replication_factor': 3,
- 'qdrant_shard_number': 5,
- }
- _, _, rf, sn = resolve_qdrant_config(
- replication_factor=params.get('qdrant_replication_factor'),
- shard_number=params.get('qdrant_shard_number'),
- )
- assert rf == 3
- assert sn == 5
-
- def test_params_dict_missing_falls_to_env(self):
- env = {
- 'QDRANT_REPLICATION_FACTOR': '3',
- 'QDRANT_SHARD_NUMBER': '5',
- }
- with patch.dict(os.environ, env, clear=True):
- params = {}
- _, _, rf, sn = resolve_qdrant_config(
- replication_factor=params.get('qdrant_replication_factor'),
- shard_number=params.get('qdrant_shard_number'),
- )
- assert rf == 3
- assert sn == 5
diff --git a/tests/unit/test_decoding/test_pdf_decoder.py b/tests/unit/test_decoding/test_pdf_decoder.py
index 641a9d78..04807b20 100644
--- a/tests/unit/test_decoding/test_pdf_decoder.py
+++ b/tests/unit/test_decoding/test_pdf_decoder.py
@@ -49,7 +49,7 @@ class TestPdfDecoderProcessor(IsolatedAsyncioTestCase):
async def test_on_message_success(self, mock_pdf_loader_class, mock_producer, mock_consumer):
"""Test successful PDF processing"""
# Mock PDF content
- pdf_content = b"%PDF-1.7\nfake pdf content"
+ pdf_content = b"fake pdf content"
pdf_base64 = base64.b64encode(pdf_content).decode('utf-8')
# Mock PyPDFLoader
@@ -88,55 +88,13 @@ class TestPdfDecoderProcessor(IsolatedAsyncioTestCase):
# Verify triples were sent for each page (provenance)
assert mock_triples_flow.send.call_count == 2
- @patch('trustgraph.base.librarian_client.Consumer')
- @patch('trustgraph.base.librarian_client.Producer')
- @patch('trustgraph.decoding.pdf.pdf_decoder.PyPDFLoader')
- @patch('trustgraph.base.async_processor.AsyncProcessor', MockAsyncProcessor)
- async def test_on_message_rejects_librarian_content_that_is_not_pdf(self, mock_pdf_loader_class, mock_producer, mock_consumer):
- """Test rejecting non-PDF content before invoking the PDF loader"""
- html_content = b"
Not found"
- html_base64 = base64.b64encode(html_content)
-
- mock_metadata = Metadata(id="test-doc")
- mock_document = Document(metadata=mock_metadata, document_id="doc-123")
- mock_msg = MagicMock()
- mock_msg.value.return_value = mock_document
-
- mock_output_flow = AsyncMock()
- mock_triples_flow = AsyncMock()
- mock_flow = MagicMock(side_effect=lambda name: {
- "output": mock_output_flow,
- "triples": mock_triples_flow,
- }.get(name))
- mock_flow.librarian.fetch_document_metadata = AsyncMock(
- return_value=MagicMock(kind="application/pdf")
- )
- mock_flow.librarian.fetch_document_content = AsyncMock(
- return_value=html_base64
- )
- mock_flow.librarian.save_child_document = AsyncMock()
-
- config = {
- 'id': 'test-pdf-decoder',
- 'taskgroup': AsyncMock()
- }
-
- processor = Processor(**config)
-
- await processor.on_message(mock_msg, None, mock_flow)
-
- mock_pdf_loader_class.assert_not_called()
- mock_output_flow.send.assert_not_called()
- mock_triples_flow.send.assert_not_called()
- mock_flow.librarian.save_child_document.assert_not_called()
-
@patch('trustgraph.base.librarian_client.Consumer')
@patch('trustgraph.base.librarian_client.Producer')
@patch('trustgraph.decoding.pdf.pdf_decoder.PyPDFLoader')
@patch('trustgraph.base.async_processor.AsyncProcessor', MockAsyncProcessor)
async def test_on_message_empty_pdf(self, mock_pdf_loader_class, mock_producer, mock_consumer):
"""Test handling of empty PDF"""
- pdf_content = b"%PDF-1.7\nfake pdf content"
+ pdf_content = b"fake pdf content"
pdf_base64 = base64.b64encode(pdf_content).decode('utf-8')
mock_loader = MagicMock()
@@ -168,7 +126,7 @@ class TestPdfDecoderProcessor(IsolatedAsyncioTestCase):
@patch('trustgraph.base.async_processor.AsyncProcessor', MockAsyncProcessor)
async def test_on_message_unicode_content(self, mock_pdf_loader_class, mock_producer, mock_consumer):
"""Test handling of unicode content in PDF"""
- pdf_content = b"%PDF-1.7\nfake pdf content"
+ pdf_content = b"fake pdf content"
pdf_base64 = base64.b64encode(pdf_content).decode('utf-8')
mock_loader = MagicMock()
diff --git a/tests/unit/test_query/test_rows_cassandra_query.py b/tests/unit/test_query/test_rows_cassandra_query.py
index fb385f43..b61500a4 100644
--- a/tests/unit/test_query/test_rows_cassandra_query.py
+++ b/tests/unit/test_query/test_rows_cassandra_query.py
@@ -333,8 +333,8 @@ class TestUnifiedTableQueries:
"""Test queries against the unified rows table"""
@pytest.mark.asyncio
- @patch('trustgraph.query.rows.cassandra.service.async_execute_paged', new_callable=AsyncMock)
- async def test_query_with_index_match(self, mock_async_execute_paged):
+ @patch('trustgraph.query.rows.cassandra.service.async_execute', new_callable=AsyncMock)
+ async def test_query_with_index_match(self, mock_async_execute):
"""Test query execution with matching index"""
processor = MagicMock()
processor.session = MagicMock()
@@ -344,10 +344,10 @@ class TestUnifiedTableQueries:
processor.find_matching_index = Processor.find_matching_index.__get__(processor, Processor)
processor.query_cassandra = Processor.query_cassandra.__get__(processor, Processor)
- # Mock async_execute_paged to return test data (list of pages)
+ # Mock async_execute to return test data
mock_row = MagicMock()
mock_row.data = {"id": "123", "name": "Test Product", "category": "electronics"}
- mock_async_execute_paged.return_value = [[mock_row]]
+ mock_async_execute.return_value = [mock_row]
schema = RowSchema(
name="products",
@@ -370,10 +370,10 @@ class TestUnifiedTableQueries:
# Verify Cassandra was connected and queried
processor.connect_cassandra.assert_called_once()
- mock_async_execute_paged.assert_called_once()
+ mock_async_execute.assert_called_once()
# Verify query structure - should query unified rows table
- call_args = mock_async_execute_paged.call_args
+ call_args = mock_async_execute.call_args
query = call_args[0][1]
params = call_args[0][2]
@@ -394,8 +394,8 @@ class TestUnifiedTableQueries:
assert results[0]["category"] == "electronics"
@pytest.mark.asyncio
- @patch('trustgraph.query.rows.cassandra.service.async_scan', new_callable=AsyncMock)
- async def test_query_without_index_match(self, mock_async_scan):
+ @patch('trustgraph.query.rows.cassandra.service.async_execute', new_callable=AsyncMock)
+ async def test_query_without_index_match(self, mock_async_execute):
"""Test query execution without matching index (scan mode)"""
processor = MagicMock()
processor.session = MagicMock()
@@ -406,10 +406,12 @@ class TestUnifiedTableQueries:
processor._matches_filters = Processor._matches_filters.__get__(processor, Processor)
processor.query_cassandra = Processor.query_cassandra.__get__(processor, Processor)
- # Mock async_scan to return filtered test data
+ # Mock async_execute to return test data
mock_row1 = MagicMock()
mock_row1.data = {"id": "1", "name": "Product A", "price": "100"}
- mock_async_scan.return_value = [mock_row1]
+ mock_row2 = MagicMock()
+ mock_row2.data = {"id": "2", "name": "Product B", "price": "200"}
+ mock_async_execute.return_value = [mock_row1, mock_row2]
schema = RowSchema(
name="products",
@@ -430,16 +432,13 @@ class TestUnifiedTableQueries:
limit=10
)
- # Verify async_scan was called
- mock_async_scan.assert_called_once()
-
- # Verify query structure
- call_args = mock_async_scan.call_args
+ # Query should use ALLOW FILTERING for scan
+ call_args = mock_async_execute.call_args
query = call_args[0][1]
assert "ALLOW FILTERING" in query
- # Should return filtered results
+ # Should post-filter results
assert len(results) == 1
assert results[0]["name"] == "Product A"
diff --git a/tests/unit/test_reliability/test_null_embedding_protection.py b/tests/unit/test_reliability/test_null_embedding_protection.py
index 41d0f88b..dbe06b40 100644
--- a/tests/unit/test_reliability/test_null_embedding_protection.py
+++ b/tests/unit/test_reliability/test_null_embedding_protection.py
@@ -259,8 +259,6 @@ class TestGraphEmbeddingsNullProtection:
proc.collection_exists = MagicMock(return_value=True)
proc._cache_lock = asyncio.Lock()
proc._known_collections = set()
- proc.replication_factor = 1
- proc.shard_number = 1
msg = MagicMock()
msg.metadata.collection = "graphs"
diff --git a/trustgraph-base/trustgraph/base/cassandra_config.py b/trustgraph-base/trustgraph/base/cassandra_config.py
index b2e36fbd..78505c68 100644
--- a/trustgraph-base/trustgraph/base/cassandra_config.py
+++ b/trustgraph-base/trustgraph/base/cassandra_config.py
@@ -103,19 +103,35 @@ def resolve_cassandra_config(
host: Optional[str] = None,
username: Optional[str] = None,
password: Optional[str] = None,
- default_keyspace: Optional[str] = None,
- replication_factor: Optional[int] = None,
+ default_keyspace: Optional[str] = None
) -> Tuple[List[str], Optional[str], Optional[str], Optional[str], int]:
+ """
+ Resolve Cassandra configuration from various sources.
+
+ Can accept either argparse args object or explicit parameters.
+ Converts host string to list format for Cassandra driver.
+
+ Args:
+ args: Optional argparse namespace with cassandra_host, cassandra_username, cassandra_password, cassandra_keyspace, cassandra_replication_factor
+ host: Optional explicit host parameter (overrides args)
+ username: Optional explicit username parameter (overrides args)
+ password: Optional explicit password parameter (overrides args)
+ default_keyspace: Optional default keyspace if not specified elsewhere
+
+ Returns:
+ tuple: (hosts_list, username, password, keyspace, replication_factor)
+ """
+ # If args provided, extract values
keyspace = None
+ replication_factor = 1
if args is not None:
host = host or getattr(args, 'cassandra_host', None)
username = username or getattr(args, 'cassandra_username', None)
password = password or getattr(args, 'cassandra_password', None)
keyspace = getattr(args, 'cassandra_keyspace', None)
- replication_factor = replication_factor or getattr(
- args, 'cassandra_replication_factor', None
- )
+ replication_factor = getattr(args, 'cassandra_replication_factor', 1)
+ # Apply defaults if still None
defaults = get_cassandra_defaults()
host = host or defaults['host']
username = username or defaults['username']
diff --git a/trustgraph-base/trustgraph/base/qdrant_config.py b/trustgraph-base/trustgraph/base/qdrant_config.py
deleted file mode 100644
index f3e015ca..00000000
--- a/trustgraph-base/trustgraph/base/qdrant_config.py
+++ /dev/null
@@ -1,87 +0,0 @@
-
-import os
-import argparse
-from typing import Optional, Any, Tuple
-
-
-def get_qdrant_defaults() -> dict:
- return {
- 'url': os.getenv('QDRANT_URL', 'http://localhost:6333'),
- 'api_key': os.getenv('QDRANT_API_KEY'),
- 'replication_factor': int(os.getenv('QDRANT_REPLICATION_FACTOR', '1')),
- 'shard_number': int(os.getenv('QDRANT_SHARD_NUMBER', '1')),
- }
-
-
-def add_qdrant_args(parser: argparse.ArgumentParser) -> None:
- defaults = get_qdrant_defaults()
-
- url_help = f"Qdrant URL (default: {defaults['url']})"
- if 'QDRANT_URL' in os.environ:
- url_help += " [from QDRANT_URL]"
-
- api_key_help = "Qdrant API key"
- if defaults['api_key']:
- api_key_help += " (default: )"
- if 'QDRANT_API_KEY' in os.environ:
- api_key_help += " [from QDRANT_API_KEY]"
-
- replication_help = f"Qdrant collection replication factor (default: {defaults['replication_factor']})"
- if 'QDRANT_REPLICATION_FACTOR' in os.environ:
- replication_help += " [from QDRANT_REPLICATION_FACTOR]"
-
- shard_help = f"Qdrant collection shard number (default: {defaults['shard_number']})"
- if 'QDRANT_SHARD_NUMBER' in os.environ:
- shard_help += " [from QDRANT_SHARD_NUMBER]"
-
- parser.add_argument(
- '--store-uri',
- default=defaults['url'],
- help=url_help,
- )
-
- parser.add_argument(
- '--api-key',
- default=defaults['api_key'],
- help=api_key_help,
- )
-
- parser.add_argument(
- '--qdrant-replication-factor',
- type=int,
- default=defaults['replication_factor'],
- help=replication_help,
- )
-
- parser.add_argument(
- '--qdrant-shard-number',
- type=int,
- default=defaults['shard_number'],
- help=shard_help,
- )
-
-
-def resolve_qdrant_config(
- args: Optional[Any] = None,
- url: Optional[str] = None,
- api_key: Optional[str] = None,
- replication_factor: Optional[int] = None,
- shard_number: Optional[int] = None,
-) -> Tuple[str, Optional[str], int, int]:
- if args is not None:
- url = url or getattr(args, 'store_uri', None)
- api_key = api_key or getattr(args, 'api_key', None)
- replication_factor = replication_factor or getattr(
- args, 'qdrant_replication_factor', None
- )
- shard_number = shard_number or getattr(
- args, 'qdrant_shard_number', None
- )
-
- defaults = get_qdrant_defaults()
- url = url or defaults['url']
- api_key = api_key or defaults['api_key']
- replication_factor = replication_factor or defaults['replication_factor']
- shard_number = shard_number or defaults['shard_number']
-
- return url, api_key, replication_factor, shard_number
diff --git a/trustgraph-cli/trustgraph/cli/load_structured_data.py b/trustgraph-cli/trustgraph/cli/load_structured_data.py
index 5649a5ae..dccf548e 100644
--- a/trustgraph-cli/trustgraph/cli/load_structured_data.py
+++ b/trustgraph-cli/trustgraph/cli/load_structured_data.py
@@ -78,7 +78,7 @@ def load_structured_data(
logger.info("Step 1: Analyzing data to discover best matching schema...")
# Step 1: Auto-discover schema (reuse discover_schema logic)
- discovered_schema = _auto_discover_schema(api_url, input_file, sample_chars, flow, logger, token=token, workspace=workspace)
+ discovered_schema = _auto_discover_schema(api_url, input_file, sample_chars, flow, logger, workspace=workspace)
if not discovered_schema:
logger.error("Failed to discover suitable schema automatically")
print("❌ Could not automatically determine the best schema for your data.")
@@ -90,7 +90,7 @@ def load_structured_data(
# Step 2: Auto-generate descriptor
logger.info("Step 2: Generating descriptor configuration...")
- auto_descriptor = _auto_generate_descriptor(api_url, input_file, discovered_schema, sample_chars, flow, logger, token=token, workspace=workspace)
+ auto_descriptor = _auto_generate_descriptor(api_url, input_file, discovered_schema, sample_chars, flow, logger, workspace=workspace)
if not auto_descriptor:
logger.error("Failed to generate descriptor automatically")
print("❌ Could not automatically generate descriptor configuration.")
@@ -172,7 +172,7 @@ def load_structured_data(
logger.info(f"Sample chars: {sample_chars} characters")
# Use the helper function to discover schema (get raw response for display)
- response = _auto_discover_schema(api_url, input_file, sample_chars, flow, logger, return_raw_response=True, token=token, workspace=workspace)
+ response = _auto_discover_schema(api_url, input_file, sample_chars, flow, logger, return_raw_response=True, workspace=workspace)
if response:
# Debug: print response type and content
@@ -203,7 +203,7 @@ def load_structured_data(
# If no schema specified, discover it first
if not schema_name:
logger.info("No schema specified, auto-discovering...")
- schema_name = _auto_discover_schema(api_url, input_file, sample_chars, flow, logger, token=token, workspace=workspace)
+ schema_name = _auto_discover_schema(api_url, input_file, sample_chars, flow, logger, workspace=workspace)
if not schema_name:
print("Error: Could not determine schema automatically.")
print("Please specify a schema using --schema-name or run --discover-schema first.")
@@ -213,7 +213,7 @@ def load_structured_data(
logger.info(f"Target schema: {schema_name}")
# Generate descriptor using helper function
- descriptor = _auto_generate_descriptor(api_url, input_file, schema_name, sample_chars, flow, logger, token=token, workspace=workspace)
+ descriptor = _auto_generate_descriptor(api_url, input_file, schema_name, sample_chars, flow, logger, workspace=workspace)
if descriptor:
# Output the generated descriptor
@@ -603,7 +603,7 @@ def _send_to_trustgraph(rows, api_url, flow, batch_size=1000, token=None, worksp
# Helper functions for auto mode
-def _auto_discover_schema(api_url, input_file, sample_chars, flow, logger, return_raw_response=False, token=None, workspace="default"):
+def _auto_discover_schema(api_url, input_file, sample_chars, flow, logger, return_raw_response=False, workspace="default"):
"""Auto-discover the best matching schema for the input data
Args:
@@ -626,7 +626,7 @@ def _auto_discover_schema(api_url, input_file, sample_chars, flow, logger, retur
# Import API modules
from trustgraph.api import Api
from trustgraph.api.types import ConfigKey
- api = Api(api_url, token=token, workspace=workspace)
+ api = Api(api_url, workspace=workspace)
config_api = api.config()
# Get available schemas
@@ -707,7 +707,7 @@ def _auto_discover_schema(api_url, input_file, sample_chars, flow, logger, retur
return None
-def _auto_generate_descriptor(api_url, input_file, schema_name, sample_chars, flow, logger, token=None, workspace="default"):
+def _auto_generate_descriptor(api_url, input_file, schema_name, sample_chars, flow, logger, workspace="default"):
"""Auto-generate descriptor configuration for the discovered schema"""
try:
# Read sample data
@@ -717,7 +717,7 @@ def _auto_generate_descriptor(api_url, input_file, schema_name, sample_chars, fl
# Import API modules
from trustgraph.api import Api
from trustgraph.api.types import ConfigKey
- api = Api(api_url, token=token, workspace=workspace)
+ api = Api(api_url, workspace=workspace)
config_api = api.config()
# Get schema definition
diff --git a/trustgraph-flow/trustgraph/config/service/service.py b/trustgraph-flow/trustgraph/config/service/service.py
index 725f1106..c5fac198 100644
--- a/trustgraph-flow/trustgraph/config/service/service.py
+++ b/trustgraph-flow/trustgraph/config/service/service.py
@@ -83,8 +83,7 @@ class Processor(AsyncProcessor):
host=cassandra_host,
username=cassandra_username,
password=cassandra_password,
- default_keyspace="config",
- replication_factor=params.get("cassandra_replication_factor"),
+ default_keyspace="config"
)
# Store resolved configuration
diff --git a/trustgraph-flow/trustgraph/cores/service.py b/trustgraph-flow/trustgraph/cores/service.py
index 5c50c207..a8f52efd 100755
--- a/trustgraph-flow/trustgraph/cores/service.py
+++ b/trustgraph-flow/trustgraph/cores/service.py
@@ -61,8 +61,7 @@ class Processor(WorkspaceProcessor):
host=cassandra_host,
username=cassandra_username,
password=cassandra_password,
- default_keyspace="knowledge",
- replication_factor=params.get("cassandra_replication_factor"),
+ default_keyspace="knowledge"
)
self.cassandra_host = hosts
diff --git a/trustgraph-flow/trustgraph/decoding/pdf/pdf_decoder.py b/trustgraph-flow/trustgraph/decoding/pdf/pdf_decoder.py
index ae393028..ca242265 100755
--- a/trustgraph-flow/trustgraph/decoding/pdf/pdf_decoder.py
+++ b/trustgraph-flow/trustgraph/decoding/pdf/pdf_decoder.py
@@ -32,10 +32,6 @@ logger = logging.getLogger(__name__)
default_ident = "document-decoder"
-def _looks_like_pdf(content):
- return content.lstrip().startswith(b"%PDF-")
-
-
class Processor(FlowProcessor):
def __init__(self, **params):
@@ -98,37 +94,33 @@ class Processor(FlowProcessor):
)
return
- # Check if we should fetch from librarian or use inline data
- if v.document_id:
- # Fetch from librarian via Pulsar
- logger.info(f"Fetching document {v.document_id} from librarian...")
-
- content = await flow.librarian.fetch_document_content(
- document_id=v.document_id,
-
- )
-
- # Content is base64 encoded
- if isinstance(content, str):
- content = content.encode('utf-8')
- decoded_content = base64.b64decode(content)
-
- logger.info(f"Fetched {len(decoded_content)} bytes from librarian")
- else:
- # Use inline data (backward compatibility)
- decoded_content = base64.b64decode(v.data)
-
- if not _looks_like_pdf(decoded_content):
- logger.error(
- f"Document {v.metadata.id} is not valid PDF content. "
- f"Ignoring document."
- )
- return
-
- with tempfile.NamedTemporaryFile(delete=False, suffix='.pdf') as fp:
+ with tempfile.NamedTemporaryFile(delete_on_close=False, suffix='.pdf') as fp:
temp_path = fp.name
- fp.write(decoded_content)
- fp.close()
+
+ # Check if we should fetch from librarian or use inline data
+ if v.document_id:
+ # Fetch from librarian via Pulsar
+ logger.info(f"Fetching document {v.document_id} from librarian...")
+ fp.close()
+
+ content = await flow.librarian.fetch_document_content(
+ document_id=v.document_id,
+
+ )
+
+ # Content is base64 encoded
+ if isinstance(content, str):
+ content = content.encode('utf-8')
+ decoded_content = base64.b64decode(content)
+
+ with open(temp_path, 'wb') as f:
+ f.write(decoded_content)
+
+ logger.info(f"Fetched {len(decoded_content)} bytes from librarian")
+ else:
+ # Use inline data (backward compatibility)
+ fp.write(base64.b64decode(v.data))
+ fp.close()
global PyPDFLoader
if PyPDFLoader is None:
diff --git a/trustgraph-flow/trustgraph/direct/cassandra_kg.py b/trustgraph-flow/trustgraph/direct/cassandra_kg.py
index f1e4a577..d7abd1a9 100644
--- a/trustgraph-flow/trustgraph/direct/cassandra_kg.py
+++ b/trustgraph-flow/trustgraph/direct/cassandra_kg.py
@@ -6,7 +6,7 @@ import logging
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
from cassandra.query import BatchStatement, SimpleStatement
-import ssl
+from ssl import SSLContext, PROTOCOL_TLSv1_2
from ..tables.cassandra_async import async_execute
@@ -41,15 +41,13 @@ class KnowledgeGraph:
def __init__(
self, hosts=None,
- keyspace="trustgraph", username=None, password=None,
- replication_factor=1,
+ keyspace="trustgraph", username=None, password=None
):
if hosts is None:
hosts = ["localhost"]
self.keyspace = keyspace
- self.replication_factor = replication_factor
self.username = username
# 7-table schema for quads with full query pattern support
@@ -70,7 +68,7 @@ class KnowledgeGraph:
self.collection_metadata_table = "collection_metadata"
if username and password:
- ssl_context = ssl.create_default_context()
+ ssl_context = SSLContext(PROTOCOL_TLSv1_2)
auth_provider = PlainTextAuthProvider(username=username, password=password)
self.cluster = Cluster(hosts, auth_provider=auth_provider, ssl_context=ssl_context)
else:
@@ -94,7 +92,7 @@ class KnowledgeGraph:
create keyspace if not exists {self.keyspace}
with replication = {{
'class' : 'SimpleStrategy',
- 'replication_factor' : {self.replication_factor}
+ 'replication_factor' : 1
}};
""")
@@ -541,15 +539,13 @@ class EntityCentricKnowledgeGraph:
def __init__(
self, hosts=None,
- keyspace="trustgraph", username=None, password=None,
- replication_factor=1,
+ keyspace="trustgraph", username=None, password=None
):
if hosts is None:
hosts = ["localhost"]
self.keyspace = keyspace
- self.replication_factor = replication_factor
self.username = username
# 2-table entity-centric schema
@@ -560,7 +556,7 @@ class EntityCentricKnowledgeGraph:
self.collection_metadata_table = "collection_metadata"
if username and password:
- ssl_context = ssl.create_default_context()
+ ssl_context = SSLContext(PROTOCOL_TLSv1_2)
auth_provider = PlainTextAuthProvider(username=username, password=password)
self.cluster = Cluster(hosts, auth_provider=auth_provider, ssl_context=ssl_context)
else:
@@ -584,7 +580,7 @@ class EntityCentricKnowledgeGraph:
create keyspace if not exists {self.keyspace}
with replication = {{
'class' : 'SimpleStrategy',
- 'replication_factor' : {self.replication_factor}
+ 'replication_factor' : 1
}};
""")
diff --git a/trustgraph-flow/trustgraph/iam/service/service.py b/trustgraph-flow/trustgraph/iam/service/service.py
index b2f3976d..8ce22757 100644
--- a/trustgraph-flow/trustgraph/iam/service/service.py
+++ b/trustgraph-flow/trustgraph/iam/service/service.py
@@ -101,7 +101,6 @@ class Processor(AsyncProcessor):
username=cassandra_username,
password=cassandra_password,
default_keyspace="iam",
- replication_factor=params.get("cassandra_replication_factor"),
)
self.cassandra_host = hosts
diff --git a/trustgraph-flow/trustgraph/librarian/service.py b/trustgraph-flow/trustgraph/librarian/service.py
index 4d3efbfb..ee5e9c1b 100755
--- a/trustgraph-flow/trustgraph/librarian/service.py
+++ b/trustgraph-flow/trustgraph/librarian/service.py
@@ -146,8 +146,7 @@ class Processor(WorkspaceProcessor):
host=cassandra_host,
username=cassandra_username,
password=cassandra_password,
- default_keyspace="librarian",
- replication_factor=params.get("cassandra_replication_factor"),
+ default_keyspace="librarian"
)
# Store resolved configuration
diff --git a/trustgraph-flow/trustgraph/query/doc_embeddings/qdrant/service.py b/trustgraph-flow/trustgraph/query/doc_embeddings/qdrant/service.py
index de25a139..f6770744 100755
--- a/trustgraph-flow/trustgraph/query/doc_embeddings/qdrant/service.py
+++ b/trustgraph-flow/trustgraph/query/doc_embeddings/qdrant/service.py
@@ -12,33 +12,31 @@ from qdrant_client import QdrantClient
from .... schema import DocumentEmbeddingsResponse, ChunkMatch
from .... schema import Error
from .... base import DocumentEmbeddingsQueryService
-from .... base.qdrant_config import add_qdrant_args, resolve_qdrant_config
# Module logger
logger = logging.getLogger(__name__)
default_ident = "doc-embeddings-query"
+default_store_uri = 'http://localhost:6333'
+
class Processor(DocumentEmbeddingsQueryService):
def __init__(self, **params):
- store_uri = params.get("store_uri")
- api_key = params.get("api_key")
+ store_uri = params.get("store_uri", default_store_uri)
- url, api_key, _, _ = resolve_qdrant_config(
- url=store_uri,
- api_key=api_key,
- )
+ #optional api key
+ api_key = params.get("api_key", None)
super(Processor, self).__init__(
**params | {
- "store_uri": url,
+ "store_uri": store_uri,
"api_key": api_key,
}
)
- self.qdrant = QdrantClient(url=url, api_key=api_key)
+ self.qdrant = QdrantClient(url=store_uri, api_key=api_key)
async def query_document_embeddings(self, workspace, msg):
@@ -87,7 +85,18 @@ class Processor(DocumentEmbeddingsQueryService):
def add_args(parser):
DocumentEmbeddingsQueryService.add_args(parser)
- add_qdrant_args(parser)
+
+ parser.add_argument(
+ '-t', '--store-uri',
+ default=default_store_uri,
+ help=f'Qdrant store URI (default: {default_store_uri})'
+ )
+
+ parser.add_argument(
+ '-k', '--api-key',
+ default=None,
+ help=f'API key for qdrant (default: None)'
+ )
def run():
diff --git a/trustgraph-flow/trustgraph/query/graph_embeddings/qdrant/service.py b/trustgraph-flow/trustgraph/query/graph_embeddings/qdrant/service.py
index aa93925d..167130c9 100755
--- a/trustgraph-flow/trustgraph/query/graph_embeddings/qdrant/service.py
+++ b/trustgraph-flow/trustgraph/query/graph_embeddings/qdrant/service.py
@@ -12,32 +12,31 @@ from qdrant_client import QdrantClient
from .... schema import GraphEmbeddingsResponse, EntityMatch
from .... schema import Error, Term, IRI, LITERAL
from .... base import GraphEmbeddingsQueryService
-from .... base.qdrant_config import add_qdrant_args, resolve_qdrant_config
# Module logger
logger = logging.getLogger(__name__)
default_ident = "graph-embeddings-query"
+default_store_uri = 'http://localhost:6333'
+
class Processor(GraphEmbeddingsQueryService):
def __init__(self, **params):
- store_uri = params.get("store_uri")
- api_key = params.get("api_key")
+ store_uri = params.get("store_uri", default_store_uri)
- url, api_key, _, _ = resolve_qdrant_config(
- url=store_uri, api_key=api_key,
- )
+ #optional api key
+ api_key = params.get("api_key", None)
super(Processor, self).__init__(
**params | {
- "store_uri": url,
+ "store_uri": store_uri,
"api_key": api_key,
}
)
- self.qdrant = QdrantClient(url=url, api_key=api_key)
+ self.qdrant = QdrantClient(url=store_uri, api_key=api_key)
def create_value(self, ent):
if ent.startswith("http://") or ent.startswith("https://"):
@@ -105,7 +104,18 @@ class Processor(GraphEmbeddingsQueryService):
def add_args(parser):
GraphEmbeddingsQueryService.add_args(parser)
- add_qdrant_args(parser)
+
+ parser.add_argument(
+ '-t', '--store-uri',
+ default=default_store_uri,
+ help=f'Qdrant store URI (default: {default_store_uri})'
+ )
+
+ parser.add_argument(
+ '-k', '--api-key',
+ default=None,
+ help=f'API key for qdrant (default: None)'
+ )
def run():
diff --git a/trustgraph-flow/trustgraph/query/ontology/sparql_cassandra.py b/trustgraph-flow/trustgraph/query/ontology/sparql_cassandra.py
index a9005ee4..b7f0f423 100644
--- a/trustgraph-flow/trustgraph/query/ontology/sparql_cassandra.py
+++ b/trustgraph-flow/trustgraph/query/ontology/sparql_cassandra.py
@@ -116,7 +116,7 @@ class CassandraTripleStore(Store if RDFLIB_AVAILABLE else object):
# Create keyspace
self.session.execute(f"""
CREATE KEYSPACE IF NOT EXISTS {self.keyspace}
- WITH replication = {{'class': 'SimpleStrategy', 'replication_factor': {self.cassandra_config.get('replication_factor', 1)}}}
+ WITH replication = {{'class': 'SimpleStrategy', 'replication_factor': 1}}
""")
# Create triples table optimized for SPARQL queries
diff --git a/trustgraph-flow/trustgraph/query/row_embeddings/qdrant/service.py b/trustgraph-flow/trustgraph/query/row_embeddings/qdrant/service.py
index 7e1a5851..1534c044 100644
--- a/trustgraph-flow/trustgraph/query/row_embeddings/qdrant/service.py
+++ b/trustgraph-flow/trustgraph/query/row_embeddings/qdrant/service.py
@@ -19,12 +19,12 @@ from .... schema import (
RowIndexMatch, Error
)
from .... base import FlowProcessor, ConsumerSpec, ProducerSpec
-from .... base.qdrant_config import add_qdrant_args, resolve_qdrant_config
# Module logger
logger = logging.getLogger(__name__)
default_ident = "row-embeddings-query"
+default_store_uri = 'http://localhost:6333'
default_concurrency = 10
@@ -35,17 +35,13 @@ class Processor(FlowProcessor):
id = params.get("id", default_ident)
concurrency = params.get("concurrency", default_concurrency)
- store_uri = params.get("store_uri")
- api_key = params.get("api_key")
-
- url, api_key, _, _ = resolve_qdrant_config(
- url=store_uri, api_key=api_key,
- )
+ store_uri = params.get("store_uri", default_store_uri)
+ api_key = params.get("api_key", None)
super(Processor, self).__init__(
**params | {
"id": id,
- "store_uri": url,
+ "store_uri": store_uri,
"api_key": api_key,
}
)
@@ -66,7 +62,7 @@ class Processor(FlowProcessor):
)
)
- self.qdrant = QdrantClient(url=url, api_key=api_key)
+ self.qdrant = QdrantClient(url=store_uri, api_key=api_key)
def sanitize_name(self, name: str) -> str:
"""Sanitize names for Qdrant collection naming"""
@@ -196,9 +192,21 @@ class Processor(FlowProcessor):
@staticmethod
def add_args(parser):
+ """Add command-line arguments"""
FlowProcessor.add_args(parser)
- add_qdrant_args(parser)
+
+ parser.add_argument(
+ '-t', '--store-uri',
+ default=default_store_uri,
+ help=f'Qdrant store URI (default: {default_store_uri})'
+ )
+
+ parser.add_argument(
+ '-k', '--api-key',
+ default=None,
+ help='API key for Qdrant (default: None)'
+ )
parser.add_argument(
'-c', '--concurrency',
diff --git a/trustgraph-flow/trustgraph/query/rows/cassandra/service.py b/trustgraph-flow/trustgraph/query/rows/cassandra/service.py
index f9868d67..7157daae 100644
--- a/trustgraph-flow/trustgraph/query/rows/cassandra/service.py
+++ b/trustgraph-flow/trustgraph/query/rows/cassandra/service.py
@@ -24,7 +24,7 @@ from .... schema import RowsQueryRequest, RowsQueryResponse, GraphQLError
from .... schema import Error, RowSchema, Field as SchemaField
from .... base import FlowProcessor, ConsumerSpec, ProducerSpec
from .... base.cassandra_config import add_cassandra_args, resolve_cassandra_config
-from .... tables.cassandra_async import async_execute, async_execute_paged, async_scan
+from .... tables.cassandra_async import async_execute
from ... graphql import GraphQLSchemaBuilder, SortDirection
@@ -180,7 +180,7 @@ class Processor(FlowProcessor):
description=field_def.get("description", ""),
required=field_def.get("required", False),
enum_values=field_def.get("enum", []),
- indexed=field_def.get("indexed", False),
+ indexed=field_def.get("indexed", False)
)
fields.append(field)
@@ -232,8 +232,6 @@ class Processor(FlowProcessor):
for index_name in index_names:
if index_name in filters:
value = filters[index_name]
- if value == "" or value is None:
- continue
# Single field index -> single element list
index_value = [str(value)]
return (index_name, index_value)
@@ -284,13 +282,11 @@ class Processor(FlowProcessor):
query += f" LIMIT {limit}"
try:
- pages = await async_execute_paged(
- self.session, query, params
- )
- for page in pages:
- for row in page:
- row_dict = dict(row.data) if row.data else {}
- results.append(row_dict)
+ rows = await async_execute(self.session, query, params)
+ for row in rows:
+ # Convert data map to dict with proper field names
+ row_dict = dict(row.data) if row.data else {}
+ results.append(row_dict)
except Exception as e:
logger.error(f"Failed to query rows: {e}", exc_info=True)
raise
@@ -312,6 +308,8 @@ class Processor(FlowProcessor):
# Query using the first index (arbitrary choice for scan)
primary_index = index_names[0]
+ # We need to scan all values for this index
+ # This requires ALLOW FILTERING or a different approach
query = f"""
SELECT data, source FROM {safe_keyspace}.rows
WHERE collection = %s
@@ -322,18 +320,17 @@ class Processor(FlowProcessor):
params = [collection, schema_name, primary_index]
try:
- def row_filter(row):
- row_dict = dict(row.data) if row.data else {}
- return self._matches_filters(row_dict, filters, row_schema)
+ rows = await async_execute(self.session, query, params)
- matched_rows = await async_scan(
- self.session, query, params,
- row_filter=row_filter,
- limit=limit,
- )
- for row in matched_rows:
+ for row in rows:
row_dict = dict(row.data) if row.data else {}
- results.append(row_dict)
+
+ # Apply post-filters
+ if self._matches_filters(row_dict, filters, row_schema):
+ results.append(row_dict)
+
+ if limit and len(results) >= limit:
+ break
except Exception as e:
logger.error(f"Failed to scan rows: {e}", exc_info=True)
@@ -366,7 +363,7 @@ class Processor(FlowProcessor):
# Parse filter key for operator
if '_' in filter_key:
parts = filter_key.rsplit('_', 1)
- if parts[1] in ['gt', 'gte', 'lt', 'lte', 'contains', 'in', 'not', 'startsWith', 'endsWith', 'not_in']:
+ if parts[1] in ['gt', 'gte', 'lt', 'lte', 'contains', 'in']:
field_name = parts[0]
operator = parts[1]
else:
@@ -403,18 +400,6 @@ class Processor(FlowProcessor):
elif operator == 'in':
if str(row_value) not in [str(v) for v in filter_value]:
return False
- elif operator == 'not':
- if str(row_value) == str(filter_value):
- return False
- elif operator == 'startsWith':
- if not str(row_value).startswith(str(filter_value)):
- return False
- elif operator == 'endsWith':
- if not str(row_value).endswith(str(filter_value)):
- return False
- elif operator == 'not_in':
- if str(row_value) in [str(v) for v in filter_value]:
- return False
except (ValueError, TypeError):
return False
diff --git a/trustgraph-flow/trustgraph/storage/doc_embeddings/qdrant/write.py b/trustgraph-flow/trustgraph/storage/doc_embeddings/qdrant/write.py
index 08d88849..2bfef99c 100644
--- a/trustgraph-flow/trustgraph/storage/doc_embeddings/qdrant/write.py
+++ b/trustgraph-flow/trustgraph/storage/doc_embeddings/qdrant/write.py
@@ -14,36 +14,29 @@ from qdrant_client.models import Distance, VectorParams
from .... base import DocumentEmbeddingsStoreService, CollectionConfigHandler
from .... base import AsyncProcessor, Consumer, Producer
from .... base import ConsumerMetrics, ProducerMetrics
-from .... base.qdrant_config import add_qdrant_args, resolve_qdrant_config
# Module logger
logger = logging.getLogger(__name__)
default_ident = "doc-embeddings-write"
+default_store_uri = 'http://localhost:6333'
+
class Processor(CollectionConfigHandler, DocumentEmbeddingsStoreService):
def __init__(self, **params):
- store_uri = params.get("store_uri")
- api_key = params.get("api_key")
-
- url, api_key, replication_factor, shard_number = resolve_qdrant_config(
- url=store_uri, api_key=api_key,
- replication_factor=params.get("qdrant_replication_factor"),
- shard_number=params.get("qdrant_shard_number"),
- )
+ store_uri = params.get("store_uri", default_store_uri)
+ api_key = params.get("api_key", None)
super(Processor, self).__init__(
**params | {
- "store_uri": url,
+ "store_uri": store_uri,
"api_key": api_key,
}
)
- self.qdrant = QdrantClient(url=url, api_key=api_key)
- self.replication_factor = replication_factor
- self.shard_number = shard_number
+ self.qdrant = QdrantClient(url=store_uri, api_key=api_key)
self._cache_lock = asyncio.Lock()
self._known_collections: set[str] = set()
@@ -68,8 +61,6 @@ class Processor(CollectionConfigHandler, DocumentEmbeddingsStoreService):
vectors_config=VectorParams(
size=dim, distance=Distance.COSINE
),
- replication_factor=self.replication_factor,
- shard_number=self.shard_number,
)
self._known_collections.add(collection_name)
@@ -118,7 +109,18 @@ class Processor(CollectionConfigHandler, DocumentEmbeddingsStoreService):
def add_args(parser):
DocumentEmbeddingsStoreService.add_args(parser)
- add_qdrant_args(parser)
+
+ parser.add_argument(
+ '-t', '--store-uri',
+ default=default_store_uri,
+ help=f'Qdrant URI (default: {default_store_uri})'
+ )
+
+ parser.add_argument(
+ '-k', '--api-key',
+ default=None,
+ help=f'Qdrant API key (default: None)'
+ )
async def create_collection(self, workspace: str, collection: str, metadata: dict):
"""
diff --git a/trustgraph-flow/trustgraph/storage/graph_embeddings/qdrant/write.py b/trustgraph-flow/trustgraph/storage/graph_embeddings/qdrant/write.py
index b6072bdc..13dcdba8 100755
--- a/trustgraph-flow/trustgraph/storage/graph_embeddings/qdrant/write.py
+++ b/trustgraph-flow/trustgraph/storage/graph_embeddings/qdrant/write.py
@@ -14,7 +14,6 @@ from qdrant_client.models import Distance, VectorParams
from .... base import GraphEmbeddingsStoreService, CollectionConfigHandler
from .... base import AsyncProcessor, Consumer, Producer
from .... base import ConsumerMetrics, ProducerMetrics
-from .... base.qdrant_config import add_qdrant_args, resolve_qdrant_config
from .... schema import IRI, LITERAL
# Module logger
@@ -30,34 +29,29 @@ def get_term_value(term):
elif term.type == LITERAL:
return term.value
else:
+ # For blank nodes or other types, use id or value
return term.id or term.value
default_ident = "graph-embeddings-write"
+default_store_uri = 'http://localhost:6333'
+
class Processor(CollectionConfigHandler, GraphEmbeddingsStoreService):
def __init__(self, **params):
- store_uri = params.get("store_uri")
- api_key = params.get("api_key")
-
- url, api_key, replication_factor, shard_number = resolve_qdrant_config(
- url=store_uri, api_key=api_key,
- replication_factor=params.get("qdrant_replication_factor"),
- shard_number=params.get("qdrant_shard_number"),
- )
+ store_uri = params.get("store_uri", default_store_uri)
+ api_key = params.get("api_key", None)
super(Processor, self).__init__(
**params | {
- "store_uri": url,
+ "store_uri": store_uri,
"api_key": api_key,
}
)
- self.qdrant = QdrantClient(url=url, api_key=api_key)
- self.replication_factor = replication_factor
- self.shard_number = shard_number
+ self.qdrant = QdrantClient(url=store_uri, api_key=api_key)
self._cache_lock = asyncio.Lock()
self._known_collections: set[str] = set()
@@ -82,8 +76,6 @@ class Processor(CollectionConfigHandler, GraphEmbeddingsStoreService):
vectors_config=VectorParams(
size=dim, distance=Distance.COSINE
),
- replication_factor=self.replication_factor,
- shard_number=self.shard_number,
)
self._known_collections.add(collection_name)
@@ -136,7 +128,18 @@ class Processor(CollectionConfigHandler, GraphEmbeddingsStoreService):
def add_args(parser):
GraphEmbeddingsStoreService.add_args(parser)
- add_qdrant_args(parser)
+
+ parser.add_argument(
+ '-t', '--store-uri',
+ default=default_store_uri,
+ help=f'Qdrant store URI (default: {default_store_uri})'
+ )
+
+ parser.add_argument(
+ '-k', '--api-key',
+ default=None,
+ help=f'Qdrant API key'
+ )
async def create_collection(self, workspace: str, collection: str, metadata: dict):
"""
diff --git a/trustgraph-flow/trustgraph/storage/knowledge/store.py b/trustgraph-flow/trustgraph/storage/knowledge/store.py
index f6e12a85..162a4057 100644
--- a/trustgraph-flow/trustgraph/storage/knowledge/store.py
+++ b/trustgraph-flow/trustgraph/storage/knowledge/store.py
@@ -27,8 +27,7 @@ class Processor(FlowProcessor):
host=params.get("cassandra_host"),
username=params.get("cassandra_username"),
password=params.get("cassandra_password"),
- default_keyspace='knowledge',
- replication_factor=params.get("cassandra_replication_factor"),
+ default_keyspace='knowledge'
)
super(Processor, self).__init__(
diff --git a/trustgraph-flow/trustgraph/storage/row_embeddings/qdrant/write.py b/trustgraph-flow/trustgraph/storage/row_embeddings/qdrant/write.py
index 4c65edb1..a01629c5 100644
--- a/trustgraph-flow/trustgraph/storage/row_embeddings/qdrant/write.py
+++ b/trustgraph-flow/trustgraph/storage/row_embeddings/qdrant/write.py
@@ -27,12 +27,12 @@ from qdrant_client.models import PointStruct, Distance, VectorParams
from .... schema import RowEmbeddings
from .... base import FlowProcessor, ConsumerSpec
from .... base import CollectionConfigHandler
-from .... base.qdrant_config import add_qdrant_args, resolve_qdrant_config
# Module logger
logger = logging.getLogger(__name__)
default_ident = "row-embeddings-write"
+default_store_uri = 'http://localhost:6333'
class Processor(CollectionConfigHandler, FlowProcessor):
@@ -41,19 +41,13 @@ class Processor(CollectionConfigHandler, FlowProcessor):
id = params.get("id", default_ident)
- store_uri = params.get("store_uri")
- api_key = params.get("api_key")
-
- url, api_key, replication_factor, shard_number = resolve_qdrant_config(
- url=store_uri, api_key=api_key,
- replication_factor=params.get("qdrant_replication_factor"),
- shard_number=params.get("qdrant_shard_number"),
- )
+ store_uri = params.get("store_uri", default_store_uri)
+ api_key = params.get("api_key", None)
super(Processor, self).__init__(
**params | {
"id": id,
- "store_uri": url,
+ "store_uri": store_uri,
"api_key": api_key,
}
)
@@ -69,9 +63,7 @@ class Processor(CollectionConfigHandler, FlowProcessor):
# Register config handler for collection management
self.register_config_handler(self.on_collection_config, types=["collection"])
- self.qdrant = QdrantClient(url=url, api_key=api_key)
- self.replication_factor = replication_factor
- self.shard_number = shard_number
+ self.qdrant = QdrantClient(url=store_uri, api_key=api_key)
self._cache_lock = asyncio.Lock()
self._known_collections: set[str] = set()
@@ -111,8 +103,6 @@ class Processor(CollectionConfigHandler, FlowProcessor):
size=dimension,
distance=Distance.COSINE
),
- replication_factor=self.replication_factor,
- shard_number=self.shard_number,
)
self._known_collections.add(collection_name)
@@ -259,9 +249,21 @@ class Processor(CollectionConfigHandler, FlowProcessor):
@staticmethod
def add_args(parser):
+ """Add command-line arguments"""
FlowProcessor.add_args(parser)
- add_qdrant_args(parser)
+
+ parser.add_argument(
+ '-t', '--store-uri',
+ default=default_store_uri,
+ help=f'Qdrant URI (default: {default_store_uri})'
+ )
+
+ parser.add_argument(
+ '-k', '--api-key',
+ default=None,
+ help='Qdrant API key (default: None)'
+ )
def run():
diff --git a/trustgraph-flow/trustgraph/storage/rows/cassandra/write.py b/trustgraph-flow/trustgraph/storage/rows/cassandra/write.py
index 31fc41a7..65eeee06 100755
--- a/trustgraph-flow/trustgraph/storage/rows/cassandra/write.py
+++ b/trustgraph-flow/trustgraph/storage/rows/cassandra/write.py
@@ -47,18 +47,16 @@ class Processor(CollectionConfigHandler, FlowProcessor):
cassandra_password = params.get("cassandra_password")
# Resolve configuration with environment variable fallback
- hosts, username, password, keyspace, replication_factor = resolve_cassandra_config(
+ hosts, username, password, keyspace, _ = resolve_cassandra_config(
host=cassandra_host,
username=cassandra_username,
- password=cassandra_password,
- replication_factor=params.get("cassandra_replication_factor"),
+ password=cassandra_password
)
# Store resolved configuration with proper names
self.cassandra_host = hosts # Store as list
self.cassandra_username = username
self.cassandra_password = password
- self.replication_factor = replication_factor
# Config key for schemas
self.config_key = params.get("config_type", "schema")
@@ -172,7 +170,7 @@ class Processor(CollectionConfigHandler, FlowProcessor):
description=field_def.get("description", ""),
required=field_def.get("required", False),
enum_values=field_def.get("enum", []),
- indexed=field_def.get("indexed", False),
+ indexed=field_def.get("indexed", False)
)
fields.append(field)
@@ -234,7 +232,7 @@ class Processor(CollectionConfigHandler, FlowProcessor):
CREATE KEYSPACE IF NOT EXISTS {safe_keyspace}
WITH REPLICATION = {{
'class': 'SimpleStrategy',
- 'replication_factor': {self.replication_factor}
+ 'replication_factor': 1
}}
"""
diff --git a/trustgraph-flow/trustgraph/tables/cassandra_async.py b/trustgraph-flow/trustgraph/tables/cassandra_async.py
index fe410a26..205ed6b9 100644
--- a/trustgraph-flow/trustgraph/tables/cassandra_async.py
+++ b/trustgraph-flow/trustgraph/tables/cassandra_async.py
@@ -80,14 +80,14 @@ def _set_exception_if_pending(fut, exc):
fut.set_exception(exc)
-async def async_execute_paged(session, query, parameters=None, fetch_size=5000):
+async def async_execute_paged(session, query, parameters=None, fetch_size=100):
"""Execute a CQL query with page-by-page iteration.
Uses synchronous session.execute() inside run_in_executor so that
the driver's ResultSet paging works correctly without materialising
the entire result set in memory.
- Returns all pages as a list of lists.
+ Yields one page of rows at a time (as a list).
"""
loop = asyncio.get_running_loop()
@@ -111,50 +111,3 @@ async def async_execute_paged(session, query, parameters=None, fetch_size=5000):
return await loop.run_in_executor(
None, _fetch_all_pages
)
-
-
-async def async_scan(
- session, query, parameters=None, row_filter=None,
- limit=None, fetch_size=5000,
-):
- """Scan a CQL query page-by-page, applying a filter and limit.
-
- Only matching rows accumulate in memory. Each page is discarded
- after processing, so peak memory is bounded by fetch_size plus
- the number of matching rows (capped by limit).
-
- Args:
- session: cassandra.cluster.Session
- query: CQL statement string
- parameters: bind params
- row_filter: callable(row) -> bool, or None to accept all
- limit: max results to return, or None for unlimited
- fetch_size: rows per Cassandra page fetch
-
- Returns:
- List of matching rows.
- """
- loop = asyncio.get_running_loop()
-
- if isinstance(query, str):
- stmt = SimpleStatement(query, fetch_size=fetch_size)
- else:
- stmt = query
- stmt.fetch_size = fetch_size
-
- def _scan():
- results = []
- result_set = session.execute(stmt, parameters)
- while True:
- for row in result_set.current_rows:
- if row_filter is None or row_filter(row):
- results.append(row)
- if limit and len(results) >= limit:
- return results
- if result_set.has_more_pages:
- result_set.fetch_next_page()
- else:
- break
- return results
-
- return await loop.run_in_executor(None, _scan)
diff --git a/trustgraph-flow/trustgraph/tables/config.py b/trustgraph-flow/trustgraph/tables/config.py
index c87cb3b5..74ceb6f4 100644
--- a/trustgraph-flow/trustgraph/tables/config.py
+++ b/trustgraph-flow/trustgraph/tables/config.py
@@ -4,7 +4,7 @@ from .. schema import Metadata, GraphEmbeddings
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
-import ssl
+from ssl import SSLContext, PROTOCOL_TLSv1_2
import uuid
import time
@@ -33,7 +33,7 @@ class ConfigTableStore:
cassandra_host = [h.strip() for h in cassandra_host.split(',')]
if cassandra_username and cassandra_password:
- ssl_context = ssl.create_default_context()
+ ssl_context = SSLContext(PROTOCOL_TLSv1_2)
auth_provider = PlainTextAuthProvider(
username=cassandra_username, password=cassandra_password
)
diff --git a/trustgraph-flow/trustgraph/tables/iam.py b/trustgraph-flow/trustgraph/tables/iam.py
index b60e9cff..d7bf5e3d 100644
--- a/trustgraph-flow/trustgraph/tables/iam.py
+++ b/trustgraph-flow/trustgraph/tables/iam.py
@@ -15,7 +15,7 @@ import logging
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
-import ssl
+from ssl import SSLContext, PROTOCOL_TLSv1_2
from . cassandra_async import async_execute
@@ -39,7 +39,7 @@ class IamTableStore:
cassandra_host = [h.strip() for h in cassandra_host.split(",")]
if cassandra_username and cassandra_password:
- ssl_context = ssl.create_default_context()
+ ssl_context = SSLContext(PROTOCOL_TLSv1_2)
auth_provider = PlainTextAuthProvider(
username=cassandra_username, password=cassandra_password,
)
diff --git a/trustgraph-flow/trustgraph/tables/knowledge.py b/trustgraph-flow/trustgraph/tables/knowledge.py
index 53a12b35..4fcb2dd3 100644
--- a/trustgraph-flow/trustgraph/tables/knowledge.py
+++ b/trustgraph-flow/trustgraph/tables/knowledge.py
@@ -23,7 +23,7 @@ def tuple_to_term(value, is_uri):
else:
return Term(type=LITERAL, value=value)
from cassandra.auth import PlainTextAuthProvider
-import ssl
+from ssl import SSLContext, PROTOCOL_TLSv1_2
import uuid
import time
@@ -50,7 +50,7 @@ class KnowledgeTableStore:
cassandra_host = [h.strip() for h in cassandra_host.split(',')]
if cassandra_username and cassandra_password:
- ssl_context = ssl.create_default_context()
+ ssl_context = SSLContext(PROTOCOL_TLSv1_2)
auth_provider = PlainTextAuthProvider(
username=cassandra_username, password=cassandra_password
)
diff --git a/trustgraph-flow/trustgraph/tables/library.py b/trustgraph-flow/trustgraph/tables/library.py
index 5094e103..58486f0e 100644
--- a/trustgraph-flow/trustgraph/tables/library.py
+++ b/trustgraph-flow/trustgraph/tables/library.py
@@ -24,7 +24,7 @@ from .. exceptions import RequestError
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
from cassandra.query import BatchStatement
-import ssl
+from ssl import SSLContext, PROTOCOL_TLSv1_2
import uuid
import time
@@ -53,7 +53,7 @@ class LibraryTableStore:
cassandra_host = [h.strip() for h in cassandra_host.split(',')]
if cassandra_username and cassandra_password:
- ssl_context = ssl.create_default_context()
+ ssl_context = SSLContext(PROTOCOL_TLSv1_2)
auth_provider = PlainTextAuthProvider(
username=cassandra_username, password=cassandra_password
)
diff --git a/trustgraph-mcp/trustgraph/mcp_server/mcp.py b/trustgraph-mcp/trustgraph/mcp_server/mcp.py
index 11b975b2..7378db64 100755
--- a/trustgraph-mcp/trustgraph/mcp_server/mcp.py
+++ b/trustgraph-mcp/trustgraph/mcp_server/mcp.py
@@ -8,180 +8,71 @@ import logging
import json
import uuid
import argparse
-from dataclasses import dataclass, field
+from dataclasses import dataclass
from collections.abc import AsyncIterator
from functools import partial
from mcp.server.fastmcp import FastMCP, Context
-from mcp.server.auth.provider import AccessToken, TokenVerifier
-from mcp.server.auth.middleware.auth_context import get_access_token
+from mcp.types import TextContent
+from websockets.asyncio.client import connect
from trustgraph.base.logging import add_logging_args, setup_logging
-from . tg_socket import WebSocketManager, _token_key
-
-logger = logging.getLogger(__name__)
-
-
-# Wire-format Term type codes (match TermTranslator compact keys)
-_TERM_TYPES = {
- "iri": "i",
- "literal": "l",
- "blank": "b",
-}
-
-
-def _make_term(value: str, term_type: str) -> dict:
- """Build a compact-key Term dict for the gateway wire format.
-
- Args:
- value: The term value (IRI string, literal text, or blank node id).
- term_type: One of "iri", "literal", "blank".
- """
- t = _TERM_TYPES.get(term_type)
- if t is None:
- raise ValueError(
- f"Unknown term type '{term_type}' — "
- f"expected one of: {', '.join(_TERM_TYPES)}"
- )
-
- if t == "i":
- return {"t": t, "i": value}
- elif t == "l":
- return {"t": t, "v": value}
- elif t == "b":
- return {"t": t, "d": value}
- return {"t": t}
-
-# ── Security boundary: MCP client → MCP server ──
-# The MCP client authenticates to this server via a Bearer token in the
-# HTTP Authorization header. The SDK's auth middleware extracts and
-# verifies the token before any tool handler runs.
-#
-# We implement a pass-through TokenVerifier: the gateway is the real
-# authority, so we accept any non-empty Bearer token here and forward
-# it to the gateway for validation. The gateway's in-band auth
-# protocol and IAM regime decide whether the token is valid.
-#
-# This means an invalid token will connect to the MCP server but will
-# fail when the first WebSocket auth frame is sent to the gateway.
-# That is intentional — the gateway is the single source of truth.
-
-
-class PassthroughTokenVerifier(TokenVerifier):
- """Accept any non-empty Bearer token and forward it downstream.
-
- The TrustGraph gateway is the authority for token validation, not
- this MCP server. We store the raw token in the AccessToken so that
- tool handlers can retrieve it via ``get_access_token().token`` and
- forward it to the gateway.
- """
-
- async def verify_token(self, token: str) -> AccessToken | None:
- if not token:
- return None
- return AccessToken(
- token=token,
- client_id="mcp-caller",
- scopes=[],
- )
-
+from . tg_socket import WebSocketManager
@dataclass
class AppContext:
- sockets: dict[str, WebSocketManager] = field(default_factory=dict)
- websocket_url: str = ""
-
+ sockets: dict[str, WebSocketManager]
+ websocket_url: str
+ gateway_token: str
@asynccontextmanager
-async def app_lifespan(
- server: FastMCP,
- websocket_url: str = "ws://api-gateway:8088/api/v1/socket",
-) -> AsyncIterator[AppContext]:
- """Manage per-server state: the pool of per-caller WebSocket
- connections to the gateway."""
+async def app_lifespan(server: FastMCP, websocket_url: str = "ws://api-gateway:8088/api/v1/socket", gateway_token: str = "") -> AsyncIterator[AppContext]:
- sockets: dict[str, WebSocketManager] = {}
+ """
+ Manage application lifecycle with type-safe context
+ """
+
+ # Initialize on startup
+ sockets = {}
try:
- yield AppContext(sockets=sockets, websocket_url=websocket_url)
+ yield AppContext(sockets=sockets, websocket_url=websocket_url, gateway_token=gateway_token)
finally:
- logger.info("Shutting down — closing %d WebSocket(s)", len(sockets))
+ # Cleanup on shutdown
+ logging.info("Shutting down context")
- for key, manager in sockets.items():
- try:
- await manager.stop()
- except Exception as e:
- logger.warning("Error closing socket %s: %s", key, e)
+ for k, manager in sockets.items():
+ logging.info(f"Closing socket for {k}")
+ await manager.stop()
- logger.info("Shutdown complete")
+ logging.info("Shutdown complete")
-
-def _require_token() -> str:
- """Extract the caller's Bearer token from the MCP auth context.
-
- Raises RuntimeError if no token is present (the caller did not
- authenticate).
- """
- # ── Security boundary: token extraction ──
- # get_access_token() reads the contextvar set by the SDK's
- # AuthContextMiddleware. The token was placed there by
- # PassthroughTokenVerifier.verify_token() and is the raw Bearer
- # value from the MCP client's Authorization header.
- access = get_access_token()
- if access is None or not access.token:
- raise RuntimeError(
- "Authentication required — send a Bearer token in the "
- "Authorization header"
- )
- return access.token
-
-
-async def get_socket_manager(ctx, token):
- """Return (or create) an authenticated WebSocket for this token.
-
- Each unique token gets its own WebSocket connection so that
- gateway-side identity, workspace binding, and capability scoping
- are preserved per caller.
- """
+async def get_socket_manager(ctx):
lifespan_context = ctx.request_context.lifespan_context
sockets = lifespan_context.sockets
websocket_url = lifespan_context.websocket_url
+ gateway_token = lifespan_context.gateway_token
- key = _token_key(token)
+ if "default" in sockets:
+ logging.info("Return existing socket manager")
+ return sockets["default"]
- if key in sockets:
- manager = sockets[key]
- if manager.socket is not None:
- return manager
- # Socket was closed (e.g. server-side timeout) — reconnect.
- del sockets[key]
+ logging.info(f"Opening socket to {websocket_url}...")
- logger.info("Opening authenticated WebSocket to %s …", websocket_url)
+ # Create manager with empty pending requests
+ manager = WebSocketManager(websocket_url, token=gateway_token)
- manager = WebSocketManager(websocket_url, token=token)
+ # Start reader task with the proper manager
await manager.start()
- # Verify the token is valid by calling whoami. This confirms the
- # gateway accepted the token and gives us the caller's identity.
- try:
- identity = await manager.whoami()
- logger.info(
- "WebSocket ready — caller: %s",
- identity.get("handle", "unknown"),
- )
- except Exception as e:
- await manager.stop()
- raise RuntimeError(
- f"Token rejected by gateway (whoami failed): {e}"
- ) from e
+ sockets["default"] = manager
- sockets[key] = manager
+ logging.info("Return new socket manager")
return manager
-
@dataclass
class EmbeddingsResponse:
vectors: List[List[float]]
@@ -291,23 +182,10 @@ class PutConfigResponse:
class DeleteConfigResponse:
pass
-@dataclass
-class SparqlQueryResponse:
- query_type: str
- variables: List[str]
- bindings: List[Dict[str, Any]]
- ask_result: bool
- triples: List[Dict[str, Any]]
-
-@dataclass
-class GraphQLQueryResponse:
- data: Any
- errors: List[Dict[str, Any]]
-
@dataclass
class GetPromptsResponse:
prompts: List[str]
-
+
@dataclass
class GetPromptResponse:
prompt: Dict[str, Any]
@@ -316,61 +194,31 @@ class GetPromptResponse:
class GetSystemPromptResponse:
prompt: str
-
class McpServer:
- def __init__(
- self,
- host: str = "0.0.0.0",
- port: int = 8000,
- websocket_url: str = "ws://api-gateway:8088/api/v1/socket",
- auth_issuer: str = "",
- auth_resource_url: str = "",
- ):
+ def __init__(self, host: str = "0.0.0.0", port: int = 8000, websocket_url: str = "ws://api-gateway:8088/api/v1/socket", gateway_token: str = ""):
self.host = host
self.port = port
self.websocket_url = websocket_url
+ self.gateway_token = gateway_token
- lifespan_with_url = partial(
- app_lifespan, websocket_url=websocket_url,
- )
-
- # ── Security: MCP-level auth configuration ──
- # The SDK requires AuthSettings whenever a token_verifier is
- # present. The issuer_url tells MCP clients where to obtain
- # tokens; resource_server_url identifies this server in OAuth
- # protected-resource metadata.
- #
- # The PassthroughTokenVerifier accepts any non-empty Bearer
- # token — real validation happens at the gateway. This is
- # intentional: the gateway is the single source of truth for
- # identity and capability checks.
- from mcp.server.auth.settings import AuthSettings
-
- auth_settings = AuthSettings(
- issuer_url=auth_issuer or f"http://{host}:{port}",
- resource_server_url=auth_resource_url or f"http://{host}:{port}",
- )
-
+ # Create a partial function to pass websocket_url to app_lifespan
+ lifespan_with_url = partial(app_lifespan, websocket_url=websocket_url, gateway_token=gateway_token)
+
self.mcp = FastMCP(
- "TrustGraph",
- dependencies=["trustgraph-base"],
- host=self.host,
- port=self.port,
+ "TrustGraph", dependencies=["trustgraph-base"],
+ host=self.host, port=self.port,
lifespan=lifespan_with_url,
- token_verifier=PassthroughTokenVerifier(),
- auth=auth_settings,
)
self._register_tools()
-
+
def _register_tools(self):
"""Register all MCP tools"""
+ # Register all the tools that were previously registered globally
self.mcp.tool()(self.embeddings)
self.mcp.tool()(self.text_completion)
self.mcp.tool()(self.graph_rag)
self.mcp.tool()(self.agent)
self.mcp.tool()(self.triples_query)
- self.mcp.tool()(self.sparql_query)
- self.mcp.tool()(self.graphql_query)
self.mcp.tool()(self.graph_embeddings_query)
self.mcp.tool()(self.get_config_all)
self.mcp.tool()(self.get_config)
@@ -395,69 +243,67 @@ class McpServer:
self.mcp.tool()(self.load_document)
self.mcp.tool()(self.remove_document)
self.mcp.tool()(self.add_processing)
-
+
def run(self):
"""Run the MCP server"""
self.mcp.run(transport="streamable-http")
- async def _get_manager(self, ctx):
- """Get an authenticated WebSocket manager for the current caller.
-
- Extracts the Bearer token from the MCP auth context and returns
- a per-token WebSocket connection to the gateway.
- """
- token = _require_token()
- return await get_socket_manager(ctx, token)
-
async def embeddings(
self,
- texts: List[str],
+ text: str,
flow_id: str | None = None,
- workspace: str | None = None,
ctx: Context = None,
) -> EmbeddingsResponse:
"""
- Generate vector embeddings for the given texts using TrustGraph's embedding models.
-
+ Generate vector embeddings for the given text using TrustGraph's embedding models.
+
This tool converts text into high-dimensional vectors that capture semantic meaning,
enabling similarity searches, clustering, and other vector-based operations.
-
+
Args:
- texts: List of input texts to convert into embeddings. Each text can be a
- sentence, paragraph, or document.
+ text: The input text to convert into embeddings. Can be a sentence, paragraph,
+ or document. The text will be processed by the configured embedding model.
flow_id: Optional flow identifier to use for processing (default: "default").
Different flows may use different embedding models or configurations.
- workspace: Optional workspace to query. If omitted, uses the caller's
- default workspace.
-
+
Returns:
- EmbeddingsResponse containing a list of vectors, one per input text.
+ EmbeddingsResponse containing a list of vectors. Each vector is a list of floats
+ representing the text's semantic embedding in the model's vector space.
+
+ Example usage:
+ - Convert a query into embeddings for similarity search
+ - Generate embeddings for documents before storing them
+ - Create embeddings for comparison with existing knowledge
"""
- logger.info("Embeddings request")
+ logging.info("Embeddings request made")
if flow_id is None: flow_id = "default"
- manager = await self._get_manager(ctx)
+ manager = await get_socket_manager(ctx, "trustgraph")
- if ctx:
- await ctx.session.send_log_message(
- level="info",
- data="Computing embeddings via websocket...",
- logger="notification_stream",
- related_request_id=ctx.request_id,
- )
+ if ctx is None:
+ raise RuntimeError("No context provided")
- request_data = {"texts": texts}
-
- gen = manager.request(
- "embeddings", request_data, flow_id, workspace=workspace,
+ await ctx.session.send_log_message(
+ level="info",
+ data=f"Computing embeddings via websocket...",
+ logger="notification_stream",
+ related_request_id=ctx.request_id,
)
+ # Send websocket request
+ request_data = {"text": text}
+ logging.info("making request")
+
+ gen = manager.request("embeddings", request_data, flow_id)
+
async for response in gen:
+
+ # Extract vectors from response
vectors = response.get("vectors", [[]])
break
-
+
return EmbeddingsResponse(vectors=vectors)
async def text_completion(
@@ -465,47 +311,62 @@ class McpServer:
prompt: str,
system: str | None = None,
flow_id: str | None = None,
- workspace: str | None = None,
ctx: Context = None,
) -> TextCompletionResponse:
"""
Generate text completions using TrustGraph's language models.
-
+
+ This tool sends prompts to configured language models and returns generated text.
+ It supports both user prompts and system instructions for controlling generation.
+
Args:
prompt: The main prompt or question to send to the language model.
+ This is the primary input that guides the model's response.
system: Optional system prompt that sets the context, role, or behavior
- for the AI assistant.
- flow_id: Optional flow identifier (default: "default").
- workspace: Optional workspace to query. If omitted, uses the caller's
- default workspace.
-
+ for the AI assistant (e.g., "You are a helpful coding assistant").
+ System prompts influence how the model interprets and responds.
+ flow_id: Optional flow identifier (default: "default"). Different flows
+ may use different models, parameters, or processing pipelines.
+
Returns:
TextCompletionResponse containing the generated text response from the model.
+
+ Example usage:
+ - Ask questions and get AI-generated answers
+ - Generate code, documentation, or creative content
+ - Perform text analysis, summarization, or transformation tasks
+ - Use system prompts to control tone, style, or domain expertise
"""
if system is None: system = ""
if flow_id is None: flow_id = "default"
- manager = await self._get_manager(ctx)
+ if ctx is None:
+ raise RuntimeError("No context provided")
- if ctx:
- await ctx.session.send_log_message(
- level="info",
- data="Generating text completion via websocket...",
- logger="notification_stream",
- related_request_id=ctx.request_id,
- )
+ # Use websocket if context is available
+ logging.info("Text completion request made via websocket")
- request_data = {"system": system, "prompt": prompt}
+ manager = await get_socket_manager(ctx, "trustgraph")
- gen = manager.request(
- "text-completion", request_data, flow_id, workspace=workspace,
+ await ctx.session.send_log_message(
+ level="info",
+ data=f"Generating text completion via websocket...",
+ logger="notification_stream",
+ related_request_id=ctx.request_id,
)
+ # Send websocket request
+ request_data = {"system": system, "prompt": prompt}
+
+ gen = manager.request("text-completion", request_data, flow_id)
+
async for response in gen:
+
+ # Extract vectors from response
text = response.get("response", "")
break
-
+
return TextCompletionResponse(response=text)
async def graph_rag(
@@ -517,43 +378,58 @@ class McpServer:
max_subgraph_size: int | None = None,
max_path_length: int | None = None,
flow_id: str | None = None,
- workspace: str | None = None,
ctx: Context = None,
) -> GraphRagResponse:
"""
Perform Graph-based Retrieval Augmented Generation (GraphRAG) queries.
-
+
GraphRAG combines knowledge graph traversal with language model generation to provide
- contextually rich answers.
-
+ contextually rich answers. It explores relationships between entities to build relevant
+ context before generating responses.
+
Args:
question: The question or query to answer using the knowledge graph.
+ The system will find relevant entities and relationships to inform the response.
collection: Knowledge collection to query (default: "default").
+ Different collections may contain domain-specific knowledge.
entity_limit: Maximum number of entities to retrieve during graph traversal.
+ Higher limits provide more context but increase processing time.
triple_limit: Maximum number of relationship triples to consider.
+ Controls the depth of relationship exploration.
max_subgraph_size: Maximum size of the subgraph to extract for context.
+ Larger subgraphs provide richer context but use more resources.
max_path_length: Maximum path length to traverse in the knowledge graph.
+ Longer paths can discover distant but relevant relationships.
flow_id: Processing flow to use (default: "default").
- workspace: Optional workspace to query. If omitted, uses the caller's
- default workspace.
-
+
Returns:
GraphRagResponse containing the generated answer informed by knowledge graph context.
+
+ Example usage:
+ - Answer complex questions requiring multi-hop reasoning
+ - Explore relationships between entities in your knowledge base
+ - Generate responses grounded in structured knowledge
+ - Perform research queries across connected information
"""
if collection is None: collection = "default"
if flow_id is None: flow_id = "default"
- manager = await self._get_manager(ctx)
+ if ctx is None:
+ raise RuntimeError("No context provided")
- if ctx:
- await ctx.session.send_log_message(
- level="info",
- data="Processing GraphRAG query via websocket...",
- logger="notification_stream",
- related_request_id=ctx.request_id,
- )
+ logging.info("GraphRAG request made via websocket")
+ manager = await get_socket_manager(ctx)
+
+ await ctx.session.send_log_message(
+ level="info",
+ data=f"Processing GraphRAG query via websocket...",
+ logger="notification_stream",
+ related_request_id=ctx.request_id,
+ )
+
+ # Build request data with all parameters
request_data = {
"query": question
}
@@ -564,19 +440,20 @@ class McpServer:
if max_subgraph_size: request_data["max_subgraph_size"] = max_subgraph_size
if max_path_length: request_data["max_path_length"] = max_path_length
- gen = manager.request(
- "graph-rag", request_data, flow_id, workspace=workspace,
- )
+ gen = manager.request("graph-rag", request_data, flow_id)
text_chunks = []
async for response in gen:
+ # Handle new message format with message_type
message_type = response.get("message_type", "chunk")
+ # Only collect text from chunk messages
if message_type == "chunk":
chunk_text = response.get("response", "")
if chunk_text:
text_chunks.append(chunk_text)
+ # Check if session is complete
if response.get("end_of_session"):
break
@@ -587,447 +464,404 @@ class McpServer:
question: str,
collection: str | None = None,
flow_id: str | None = None,
- workspace: str | None = None,
ctx: Context = None,
) -> AgentResponse:
"""
Execute intelligent agent queries with reasoning and tool usage capabilities.
-
+
+ The agent can perform complex multi-step reasoning, use tools, and provide
+ detailed thought processes. It's designed for tasks requiring planning,
+ analysis, and iterative problem-solving.
+
Args:
- question: The question or task for the agent to solve.
+ question: The question or task for the agent to solve. Can be complex
+ queries requiring multiple steps, analysis, or tool usage.
collection: Knowledge collection the agent can access (default: "default").
- flow_id: Agent workflow to use (default: "default").
- workspace: Optional workspace to query. If omitted, uses the caller's
- default workspace.
-
+ Determines what information and tools are available.
+ flow_id: Agent workflow to use (default: "default"). Different flows
+ may have different capabilities, tools, or reasoning strategies.
+
Returns:
AgentResponse containing the final answer after the agent's reasoning process.
+ During execution, you'll see intermediate thoughts and observations.
+
+ Example usage:
+ - Solve complex analytical problems requiring multiple steps
+ - Perform research tasks across multiple information sources
+ - Handle queries that need tool usage and decision-making
+ - Get detailed explanations of reasoning processes
+
+ Note: This tool provides real-time updates on the agent's thinking process
+ through log messages, so you can follow its reasoning steps.
"""
if collection is None: collection = "default"
if flow_id is None: flow_id = "default"
- manager = await self._get_manager(ctx)
+ if ctx is None:
+ raise RuntimeError("No context provided")
- if ctx:
- await ctx.session.send_log_message(
- level="info",
- data="Processing agent query via websocket...",
- logger="notification_stream",
- related_request_id=ctx.request_id,
- )
+ logging.info("Agent request made via websocket")
+ manager = await get_socket_manager(ctx)
+
+ await ctx.session.send_log_message(
+ level="info",
+ data=f"Processing agent query via websocket...",
+ logger="notification_stream",
+ related_request_id=ctx.request_id,
+ )
+
+ # Build request data with all parameters
request_data = {
"question": question
}
if collection: request_data["collection"] = collection
- gen = manager.request(
- "agent", request_data, flow_id, workspace=workspace,
- )
+ gen = manager.request("agent", request_data, flow_id)
async for response in gen:
- logger.debug("Agent response: %s", response)
+ logging.debug(f"Agent response: {response}")
- if ctx:
- if "thought" in response:
- await ctx.session.send_log_message(
- level="info",
- data=f"Thinking: {response['thought']}",
- logger="notification_stream",
- related_request_id=ctx.request_id,
- )
+ if "thought" in response:
+ await ctx.session.send_log_message(
+ level="info",
+ data=f"Thinking: {response['thought']}",
+ logger="notification_stream",
+ related_request_id=ctx.request_id,
+ )
- if "observation" in response:
- await ctx.session.send_log_message(
- level="info",
- data=f"Observation: {response['observation']}",
- logger="notification_stream",
- related_request_id=ctx.request_id,
- )
+ if "observation" in response:
+ await ctx.session.send_log_message(
+ level="info",
+ data=f"Observation: {response['observation']}",
+ logger="notification_stream",
+ related_request_id=ctx.request_id,
+ )
+ # Extract vectors from response
if "answer" in response:
answer = response.get("answer", "")
return AgentResponse(answer=answer)
async def triples_query(
self,
- s: str | None = None,
- s_type: str | None = None,
- p: str | None = None,
- p_type: str | None = None,
- o: str | None = None,
- o_type: str | None = None,
- collection: str | None = None,
- graph: str | None = None,
+ s_v: str | None = None,
+ s_e: bool | None = None,
+ p_v: str | None = None,
+ p_e: bool | None = None,
+ o_v: str | None = None,
+ o_e: bool | None = None,
limit: int | None = None,
flow_id: str | None = None,
- workspace: str | None = None,
ctx: Context = None,
) -> TriplesQueryResponse:
"""
Query knowledge graph triples using subject-predicate-object patterns.
-
- Each of s, p, o is an RDF term value. Use the corresponding _type
- parameter to specify the term kind:
- - "iri" (default for s and p): an IRI / entity reference
- - "literal" (default for o): a plain literal value
- - "blank": a blank node identifier
-
+
+ Knowledge graphs store information as triples (subject, predicate, object).
+ This tool allows flexible querying by specifying any combination of these
+ components, with wildcards for unspecified parts.
+
Args:
- s: Subject value to match. Leave None for wildcard.
- s_type: Subject term type: "iri" (default), "literal", or "blank".
- p: Predicate value to match. Leave None for wildcard.
- p_type: Predicate term type: "iri" (default), "literal", or "blank".
- o: Object value to match. Leave None for wildcard.
- o_type: Object term type: "iri", "literal" (default), or "blank".
- collection: Knowledge collection to query (default: "default").
- graph: Named graph IRI to restrict the query. None = default graph,
- "*" = all graphs.
+ s_v: Subject value to match (e.g., "John", "Apple Inc."). Leave None for wildcard.
+ s_e: Whether subject should be treated as an entity (True) or literal (False).
+ p_v: Predicate/relationship value (e.g., "works_for", "type_of"). Leave None for wildcard.
+ p_e: Whether predicate should be treated as an entity (True) or literal (False).
+ o_v: Object value to match (e.g., "Engineer", "Company"). Leave None for wildcard.
+ o_e: Whether object should be treated as an entity (True) or literal (False).
limit: Maximum number of triples to return (default: 20).
flow_id: Processing flow identifier (default: "default").
- workspace: Optional workspace to query. If omitted, uses the caller's
- default workspace.
-
+
Returns:
TriplesQueryResponse containing matching triples from the knowledge graph.
+
+ Example queries:
+ - Find all relationships for an entity: s_v="John", others None
+ - Find all instances of a relationship: p_v="works_for", others None
+ - Find specific facts: s_v="John", p_v="works_for", o_v=None
+ - Explore entity types: p_v="type_of", others None
+
+ Use this for:
+ - Exploring knowledge graph structure
+ - Finding specific facts or relationships
+ - Discovering connections between entities
+ - Validating or debugging knowledge content
"""
if flow_id is None: flow_id = "default"
if limit is None: limit = 20
- if collection is None: collection = "default"
- manager = await self._get_manager(ctx)
+ if ctx is None:
+ raise RuntimeError("No context provided")
- if ctx:
- await ctx.session.send_log_message(
- level="info",
- data="Processing triples query via websocket...",
- logger="notification_stream",
- related_request_id=ctx.request_id,
- )
+ logging.info("Triples query request made via websocket")
- request_data = {
- "limit": limit,
- "collection": collection,
- }
+ manager = await get_socket_manager(ctx, "trustgraph")
- if s is not None:
- request_data["s"] = _make_term(s, s_type or "iri")
-
- if p is not None:
- request_data["p"] = _make_term(p, p_type or "iri")
-
- if o is not None:
- request_data["o"] = _make_term(o, o_type or "literal")
-
- if graph is not None:
- request_data["g"] = graph
-
- gen = manager.request(
- "triples", request_data, flow_id, workspace=workspace,
+ await ctx.session.send_log_message(
+ level="info",
+ data=f"Processing triples query via websocket...",
+ logger="notification_stream",
+ related_request_id=ctx.request_id,
)
+ # Build request data with Value objects
+ request_data = {
+ "limit": limit
+ }
+
+ # Add subject if provided
+ if s_v is not None:
+ request_data["s"] = {"v": s_v, "e": s_e }
+
+ # Add predicate if provided
+ if p_v is not None:
+ request_data["p"] = {"v": p_v, "e": p_e }
+
+ # Add object if provided
+ if o_v is not None:
+ request_data["o"] = {"v": o_v, "e": o_e }
+
+ gen = manager.request("triples", request_data, flow_id)
+
async for response in gen:
+ # Extract response data
triples = response.get("response", [])
break
-
+
return TriplesQueryResponse(triples=triples)
- async def sparql_query(
- self,
- query: str,
- collection: str | None = None,
- limit: int | None = None,
- flow_id: str | None = None,
- workspace: str | None = None,
- ctx: Context = None,
- ) -> SparqlQueryResponse:
- """
- Execute a SPARQL query against the knowledge graph.
-
- Supports SELECT, ASK, CONSTRUCT, and DESCRIBE query forms.
-
- Args:
- query: SPARQL query string (e.g. "SELECT ?s ?p ?o WHERE { ?s ?p ?o } LIMIT 10").
- collection: Knowledge collection to query (default: "default").
- limit: Safety limit on number of results (default: 10000).
- flow_id: Processing flow identifier (default: "default").
- workspace: Optional workspace to query. If omitted, uses the caller's
- default workspace.
-
- Returns:
- SparqlQueryResponse containing the query results. The structure depends
- on query type:
- - SELECT: variables (column names) and bindings (rows of Term values)
- - ASK: ask_result (boolean)
- - CONSTRUCT/DESCRIBE: triples
- """
-
- if collection is None: collection = "default"
- if flow_id is None: flow_id = "default"
- if limit is None: limit = 10000
-
- manager = await self._get_manager(ctx)
-
- if ctx:
- await ctx.session.send_log_message(
- level="info",
- data="Processing SPARQL query via websocket...",
- logger="notification_stream",
- related_request_id=ctx.request_id,
- )
-
- request_data = {
- "query": query,
- "collection": collection,
- "limit": limit,
- }
-
- gen = manager.request(
- "sparql", request_data, flow_id, workspace=workspace,
- )
-
- async for response in gen:
- query_type = response.get("query-type", "")
- return SparqlQueryResponse(
- query_type=query_type,
- variables=response.get("variables", []),
- bindings=response.get("bindings", []),
- ask_result=response.get("ask-result", False),
- triples=response.get("triples", []),
- )
-
- async def graphql_query(
- self,
- query: str,
- collection: str | None = None,
- variables: Dict[str, Any] | None = None,
- operation_name: str | None = None,
- flow_id: str | None = None,
- workspace: str | None = None,
- ctx: Context = None,
- ) -> GraphQLQueryResponse:
- """
- Execute a GraphQL query against structured data (rows).
-
- Queries structured data schemas that have been loaded into TrustGraph.
- The available types and fields depend on the schemas configured in the
- target workspace.
-
- Args:
- query: GraphQL query string (e.g. '{ customers(where: {status: {eq: "active"}}) { id name } }').
- collection: Data collection to query (default: "default").
- variables: Optional GraphQL variables as a dict.
- operation_name: Optional operation name for multi-operation documents.
- flow_id: Processing flow identifier (default: "default").
- workspace: Optional workspace to query. If omitted, uses the caller's
- default workspace.
-
- Returns:
- GraphQLQueryResponse containing data (the query result) and errors
- (any GraphQL field-level errors).
- """
-
- if collection is None: collection = "default"
- if flow_id is None: flow_id = "default"
-
- manager = await self._get_manager(ctx)
-
- if ctx:
- await ctx.session.send_log_message(
- level="info",
- data="Processing GraphQL query via websocket...",
- logger="notification_stream",
- related_request_id=ctx.request_id,
- )
-
- request_data = {
- "query": query,
- "collection": collection,
- "variables": variables or {},
- }
-
- if operation_name is not None:
- request_data["operation_name"] = operation_name
-
- gen = manager.request(
- "rows", request_data, flow_id, workspace=workspace,
- )
-
- async for response in gen:
- return GraphQLQueryResponse(
- data=response.get("data"),
- errors=response.get("errors", []),
- )
-
async def graph_embeddings_query(
self,
vectors: List[List[float]],
limit: int | None = None,
flow_id: str | None = None,
- workspace: str | None = None,
ctx: Context = None,
) -> GraphEmbeddingsQueryResponse:
"""
Find entities in the knowledge graph using vector similarity search.
-
+
+ This tool performs semantic search by comparing embedding vectors to find
+ the most similar entities in the knowledge graph. It's useful for finding
+ conceptually related information even when exact text matches don't exist.
+
Args:
- vectors: List of embedding vectors to search with.
+ vectors: List of embedding vectors to search with. Each vector should be
+ a list of floats representing semantic embeddings (typically from
+ the embeddings tool). Multiple vectors can be provided for batch queries.
limit: Maximum number of similar entities to return (default: 20).
+ Higher limits provide more results but may include less relevant matches.
flow_id: Processing flow identifier (default: "default").
- workspace: Optional workspace to query. If omitted, uses the caller's
- default workspace.
-
+
Returns:
- GraphEmbeddingsQueryResponse containing entities ranked by similarity.
+ GraphEmbeddingsQueryResponse containing entities ranked by similarity to the
+ input vectors, along with similarity scores and entity metadata.
+
+ Example workflow:
+ 1. Use the 'embeddings' tool to convert text to vectors
+ 2. Use this tool to find similar entities in the knowledge graph
+ 3. Explore the returned entities for relevant information
+
+ Use this for:
+ - Semantic search across knowledge entities
+ - Finding conceptually similar content
+ - Discovering related entities without exact keyword matches
+ - Building recommendation systems based on entity similarity
"""
if flow_id is None: flow_id = "default"
if limit is None: limit = 20
- manager = await self._get_manager(ctx)
+ if ctx is None:
+ raise RuntimeError("No context provided")
- if ctx:
- await ctx.session.send_log_message(
- level="info",
- data="Processing graph embeddings query via websocket...",
- logger="notification_stream",
- related_request_id=ctx.request_id,
- )
+ logging.info("Graph embeddings query request made via websocket")
+ manager = await get_socket_manager(ctx, "trustgraph")
+
+ await ctx.session.send_log_message(
+ level="info",
+ data=f"Processing graph embeddings query via websocket...",
+ logger="notification_stream",
+ related_request_id=ctx.request_id,
+ )
+
+ # Build request data
request_data = {
"vectors": vectors,
"limit": limit
}
- gen = manager.request(
- "graph-embeddings", request_data, flow_id, workspace=workspace,
- )
+ gen = manager.request("graph-embeddings", request_data, flow_id)
async for response in gen:
+ # Extract entities from response
entities = response.get("entities", [])
break
-
+
return GraphEmbeddingsQueryResponse(entities=entities)
async def get_config_all(
self,
- workspace: str | None = None,
ctx: Context = None,
) -> ConfigResponse:
"""
Retrieve the complete TrustGraph system configuration.
-
- Args:
- workspace: Optional workspace. If omitted, uses the caller's
- default workspace.
-
+
+ This tool returns all configuration settings for the TrustGraph system,
+ including model configurations, API keys, flow definitions, and system parameters.
+
Returns:
- ConfigResponse containing the full configuration as a nested dictionary.
+ ConfigResponse containing the full configuration as a nested dictionary
+ with all system settings, organized by category (e.g., models, flows, storage).
+
+ Use this for:
+ - Inspecting current system configuration
+ - Debugging configuration issues
+ - Understanding available models and settings
+ - Auditing system setup and parameters
"""
- manager = await self._get_manager(ctx)
+ if ctx is None:
+ raise RuntimeError("No context provided")
- if ctx:
- await ctx.session.send_log_message(
- level="info",
- data="Retrieving all configuration via websocket...",
- logger="notification_stream",
- related_request_id=ctx.request_id,
- )
+ logging.info("Get config all request made via websocket")
+
+ manager = await get_socket_manager(ctx, "trustgraph")
+
+ await ctx.session.send_log_message(
+ level="info",
+ data=f"Retrieving all configuration via websocket...",
+ logger="notification_stream",
+ related_request_id=ctx.request_id,
+ )
request_data = {
"operation": "config"
}
- gen = manager.request("config", request_data, None, workspace=workspace)
+ gen = manager.request("config", request_data, None)
async for response in gen:
config = response.get("config", {})
break
-
+
return ConfigResponse(config=config)
async def get_config(
self,
keys: List[Dict[str, str]],
- workspace: str | None = None,
ctx: Context = None,
) -> ConfigGetResponse:
"""
Retrieve specific configuration values by key.
-
+
+ This tool allows you to fetch specific configuration settings without
+ retrieving the entire configuration. Useful for checking particular
+ settings or API keys.
+
Args:
- keys: List of configuration keys to retrieve. Each key should be a dict with
- 'type' and 'key' fields.
- workspace: Optional workspace. If omitted, uses the caller's
- default workspace.
-
+ keys: List of configuration keys to retrieve. Each key should be a dict with:
+ - 'type': Configuration category (e.g., 'llm', 'embeddings', 'storage')
+ - 'key': Specific setting name within that category
+
Returns:
ConfigGetResponse containing the requested configuration values.
+
+ Example keys:
+ - {'type': 'llm', 'key': 'openai.model'}
+ - {'type': 'embeddings', 'key': 'default.model'}
+ - {'type': 'storage', 'key': 'database.url'}
+
+ Use this for:
+ - Checking specific model configurations
+ - Validating API key settings
+ - Inspecting individual system parameters
"""
- manager = await self._get_manager(ctx)
+ if ctx is None:
+ raise RuntimeError("No context provided")
- if ctx:
- await ctx.session.send_log_message(
- level="info",
- data="Retrieving specific configuration via websocket...",
- logger="notification_stream",
- related_request_id=ctx.request_id,
- )
+ logging.info("Get config request made via websocket")
+
+ manager = await get_socket_manager(ctx, "trustgraph")
+
+ await ctx.session.send_log_message(
+ level="info",
+ data=f"Retrieving specific configuration via websocket...",
+ logger="notification_stream",
+ related_request_id=ctx.request_id,
+ )
request_data = {
"operation": "get",
"keys": keys
}
- gen = manager.request("config", request_data, None, workspace=workspace)
+ gen = manager.request("config", request_data, None)
async for response in gen:
values = response.get("values", [])
break
-
+
return ConfigGetResponse(values=values)
async def put_config(
self,
values: List[Dict[str, str]],
- workspace: str | None = None,
ctx: Context = None,
) -> PutConfigResponse:
"""
Update system configuration values.
-
+
+ This tool allows you to modify TrustGraph system settings, such as
+ model parameters, API keys, and system behavior configurations.
+
Args:
- values: List of configuration updates. Each should be a dict with
- 'type', 'key', and 'value' fields.
- workspace: Optional workspace. If omitted, uses the caller's
- default workspace.
-
+ values: List of configuration updates. Each update should be a dict with:
+ - 'type': Configuration category (e.g., 'llm', 'embeddings')
+ - 'key': Specific setting name to update
+ - 'value': New value for the setting
+
Returns:
PutConfigResponse confirming the configuration update.
+
+ Example updates:
+ - {'type': 'llm', 'key': 'openai.model', 'value': 'gpt-4'}
+ - {'type': 'embeddings', 'key': 'batch_size', 'value': '100'}
+
+ Use this for:
+ - Switching between different models
+ - Updating API credentials
+ - Modifying system behavior parameters
+ - Configuring processing settings
+
+ Note: Configuration changes may require system restart to take effect.
"""
- manager = await self._get_manager(ctx)
+ if ctx is None:
+ raise RuntimeError("No context provided")
- if ctx:
- await ctx.session.send_log_message(
- level="info",
- data="Updating configuration via websocket...",
- logger="notification_stream",
- related_request_id=ctx.request_id,
- )
+ logging.info("Put config request made via websocket")
+
+ manager = await get_socket_manager(ctx, "trustgraph")
+
+ await ctx.session.send_log_message(
+ level="info",
+ data=f"Updating configuration via websocket...",
+ logger="notification_stream",
+ related_request_id=ctx.request_id,
+ )
request_data = {
"operation": "put",
"values": values
}
- gen = manager.request("config", request_data, None, workspace=workspace)
+ gen = manager.request("config", request_data, None)
async for response in gen:
return PutConfigResponse()
@@ -1035,73 +869,97 @@ class McpServer:
async def delete_config(
self,
keys: List[Dict[str, str]],
- workspace: str | None = None,
ctx: Context = None,
) -> DeleteConfigResponse:
"""
Delete specific configuration entries from the system.
-
+
+ This tool removes configuration settings, reverting them to system defaults
+ or disabling specific features.
+
Args:
- keys: List of configuration keys to delete. Each should be a dict with
- 'type' and 'key' fields.
- workspace: Optional workspace. If omitted, uses the caller's
- default workspace.
-
+ keys: List of configuration keys to delete. Each key should be a dict with:
+ - 'type': Configuration category (e.g., 'llm', 'embeddings')
+ - 'key': Specific setting name to remove
+
Returns:
DeleteConfigResponse confirming the deletion.
+
+ Use this for:
+ - Removing custom model configurations
+ - Clearing API credentials
+ - Resetting settings to defaults
+ - Cleaning up obsolete configurations
+
+ Warning: Deleting essential configuration may cause system functionality
+ to be disabled until properly reconfigured.
"""
- manager = await self._get_manager(ctx)
+ if ctx is None:
+ raise RuntimeError("No context provided")
- if ctx:
- await ctx.session.send_log_message(
- level="info",
- data="Deleting configuration via websocket...",
- logger="notification_stream",
- related_request_id=ctx.request_id,
- )
+ logging.info("Delete config request made via websocket")
+
+ manager = await get_socket_manager(ctx, "trustgraph")
+
+ await ctx.session.send_log_message(
+ level="info",
+ data=f"Deleting configuration via websocket...",
+ logger="notification_stream",
+ related_request_id=ctx.request_id,
+ )
request_data = {
"operation": "delete",
"keys": keys
}
- gen = manager.request("config", request_data, None, workspace=workspace)
+ gen = manager.request("config", request_data, None)
async for response in gen:
return DeleteConfigResponse()
async def get_prompts(
self,
- workspace: str | None = None,
ctx: Context = None,
) -> GetPromptsResponse:
"""
List all available prompt templates in the system.
-
- Args:
- workspace: Optional workspace. If omitted, uses the caller's
- default workspace.
-
+
+ Prompt templates are reusable prompts that can be used with language models
+ for consistent behavior across different queries and use cases.
+
Returns:
GetPromptsResponse containing a list of available prompt template IDs.
+ Each ID can be used with get_prompt to retrieve the full template.
+
+ Use this for:
+ - Discovering available prompt templates
+ - Exploring pre-configured prompts for different tasks
+ - Finding templates for specific use cases
+ - Understanding what prompt options are available
"""
- manager = await self._get_manager(ctx)
+ if ctx is None:
+ raise RuntimeError("No context provided")
- if ctx:
- await ctx.session.send_log_message(
- level="info",
- data="Retrieving prompt templates via websocket...",
- logger="notification_stream",
- related_request_id=ctx.request_id,
- )
+ logging.info("Get prompts request made via websocket")
+ manager = await get_socket_manager(ctx, "trustgraph")
+
+ await ctx.session.send_log_message(
+ level="info",
+ data=f"Retrieving prompt templates via websocket...",
+ logger="notification_stream",
+ related_request_id=ctx.request_id,
+ )
+
+ # First get all config
request_data = {
"operation": "config"
}
- gen = manager.request("config", request_data, None, workspace=workspace)
+ gen = manager.request("config", request_data, None)
async for response in gen:
config = response.get("config", {})
@@ -1113,36 +971,49 @@ class McpServer:
async def get_prompt(
self,
prompt_id: str,
- workspace: str | None = None,
ctx: Context = None,
) -> GetPromptResponse:
"""
Retrieve a specific prompt template by ID.
-
+
+ Prompt templates contain structured prompts with placeholders, instructions,
+ and metadata for specific tasks or domains.
+
Args:
prompt_id: The unique identifier of the prompt template to retrieve.
- workspace: Optional workspace. If omitted, uses the caller's
- default workspace.
-
+ Use get_prompts to see available template IDs.
+
Returns:
- GetPromptResponse containing the complete prompt template.
+ GetPromptResponse containing the complete prompt template with its
+ structure, placeholders, and usage instructions.
+
+ Use this for:
+ - Examining prompt template structure
+ - Understanding how to use specific templates
+ - Copying or modifying existing prompts
+ - Learning prompt engineering patterns
"""
- manager = await self._get_manager(ctx)
+ if ctx is None:
+ raise RuntimeError("No context provided")
- if ctx:
- await ctx.session.send_log_message(
- level="info",
- data=f"Retrieving prompt template '{prompt_id}' via websocket...",
- logger="notification_stream",
- related_request_id=ctx.request_id,
- )
+ logging.info("Get prompt request made via websocket")
+ manager = await get_socket_manager(ctx, "trustgraph")
+
+ await ctx.session.send_log_message(
+ level="info",
+ data=f"Retrieving prompt template '{prompt_id}' via websocket...",
+ logger="notification_stream",
+ related_request_id=ctx.request_id,
+ )
+
+ # First get all config
request_data = {
"operation": "config"
}
- gen = manager.request("config", request_data, None, workspace=workspace)
+ gen = manager.request("config", request_data, None)
async for response in gen:
config = response.get("config", {})
@@ -1154,35 +1025,44 @@ class McpServer:
async def get_system_prompt(
self,
- workspace: str | None = None,
ctx: Context = None,
) -> GetSystemPromptResponse:
"""
Retrieve the current system prompt configuration.
-
- Args:
- workspace: Optional workspace. If omitted, uses the caller's
- default workspace.
-
+
+ The system prompt defines the default behavior, personality, and instructions
+ for language models across the TrustGraph system.
+
Returns:
- GetSystemPromptResponse containing the system prompt text.
+ GetSystemPromptResponse containing the system prompt text and configuration.
+
+ Use this for:
+ - Understanding default AI behavior settings
+ - Checking current system-wide prompt configuration
+ - Auditing AI personality and instruction settings
+ - Debugging unexpected AI responses
"""
- manager = await self._get_manager(ctx)
+ if ctx is None:
+ raise RuntimeError("No context provided")
- if ctx:
- await ctx.session.send_log_message(
- level="info",
- data="Retrieving system prompt via websocket...",
- logger="notification_stream",
- related_request_id=ctx.request_id,
- )
+ logging.info("Get system prompt request made via websocket")
+ manager = await get_socket_manager(ctx, "trustgraph")
+
+ await ctx.session.send_log_message(
+ level="info",
+ data=f"Retrieving system prompt via websocket...",
+ logger="notification_stream",
+ related_request_id=ctx.request_id,
+ )
+
+ # First get all config
request_data = {
"operation": "config"
}
- gen = manager.request("config", request_data, None, workspace=workspace)
+ gen = manager.request("config", request_data, None)
async for response in gen:
config = response.get("config", {})
@@ -1193,39 +1073,51 @@ class McpServer:
async def get_token_costs(
self,
- workspace: str | None = None,
ctx: Context = None,
) -> ConfigTokenCostsResponse:
"""
Retrieve token pricing information for all configured AI models.
-
- Args:
- workspace: Optional workspace. If omitted, uses the caller's
- default workspace.
-
+
+ This tool provides cost information for input and output tokens across
+ different language models, helping with budget planning and cost optimization.
+
Returns:
- ConfigTokenCostsResponse containing pricing data for each model.
+ ConfigTokenCostsResponse containing pricing data for each model including:
+ - Model name/identifier
+ - Input token cost (per token)
+ - Output token cost (per token)
+
+ Use this for:
+ - Estimating costs for different models
+ - Choosing cost-effective models for tasks
+ - Budget planning and cost analysis
+ - Monitoring and optimizing AI spending
"""
- manager = await self._get_manager(ctx)
+ if ctx is None:
+ raise RuntimeError("No context provided")
- if ctx:
- await ctx.session.send_log_message(
- level="info",
- data="Retrieving token costs via websocket...",
- logger="notification_stream",
- related_request_id=ctx.request_id,
- )
+ logging.info("Get token costs request made via websocket")
+
+ manager = await get_socket_manager(ctx, "trustgraph")
+
+ await ctx.session.send_log_message(
+ level="info",
+ data=f"Retrieving token costs via websocket...",
+ logger="notification_stream",
+ related_request_id=ctx.request_id,
+ )
request_data = {
"operation": "getvalues",
"type": "token-costs"
}
- gen = manager.request("config", request_data, None, workspace=workspace)
+ gen = manager.request("config", request_data, None)
async for response in gen:
values = response.get("values", [])
+ # Transform to match TypeScript API format
costs = []
for item in values:
try:
@@ -1238,89 +1130,106 @@ class McpServer:
except (json.JSONDecodeError, AttributeError):
continue
break
-
+
return ConfigTokenCostsResponse(costs=costs)
async def get_knowledge_cores(
self,
- workspace: str | None = None,
ctx: Context = None,
) -> KnowledgeCoresResponse:
"""
List all available knowledge graph cores in the current workspace.
- Args:
- workspace: Optional workspace. If omitted, uses the caller's
- default workspace.
+ Knowledge cores are packaged collections of structured knowledge that can
+ be loaded into the system for querying and reasoning. They contain entities,
+ relationships, and facts organized as knowledge graphs.
Returns:
KnowledgeCoresResponse containing a list of available knowledge core IDs.
+
+ Use this for:
+ - Discovering available knowledge collections
+ - Understanding what knowledge domains are accessible
+ - Planning which cores to load for specific tasks
+ - Managing knowledge resources
"""
- manager = await self._get_manager(ctx)
+ if ctx is None:
+ raise RuntimeError("No context provided")
- if ctx:
- await ctx.session.send_log_message(
- level="info",
- data="Retrieving knowledge graph cores via websocket...",
- logger="notification_stream",
- related_request_id=ctx.request_id,
- )
+ logging.info("Get knowledge cores request made via websocket")
+
+ manager = await get_socket_manager(ctx)
+
+ await ctx.session.send_log_message(
+ level="info",
+ data=f"Retrieving knowledge graph cores via websocket...",
+ logger="notification_stream",
+ related_request_id=ctx.request_id,
+ )
request_data = {
"operation": "list-kg-cores",
}
- gen = manager.request(
- "knowledge", request_data, None, workspace=workspace,
- )
+ gen = manager.request("knowledge", request_data, None)
async for response in gen:
ids = response.get("ids", [])
break
-
+
return KnowledgeCoresResponse(ids=ids)
async def delete_kg_core(
self,
core_id: str,
- workspace: str | None = None,
ctx: Context = None,
) -> DeleteKgCoreResponse:
"""
Permanently delete a knowledge graph core.
+ This operation removes a knowledge core from storage. Use with caution
+ as this action cannot be undone.
+
Args:
core_id: Unique identifier of the knowledge core to delete.
- workspace: Optional workspace. If omitted, uses the caller's
- default workspace.
Returns:
DeleteKgCoreResponse confirming the deletion.
+
+ Use this for:
+ - Cleaning up obsolete knowledge cores
+ - Removing test or experimental data
+ - Managing storage space
+ - Maintaining organized knowledge collections
+
+ Warning: This permanently deletes the knowledge core and all its data.
"""
- manager = await self._get_manager(ctx)
+ if ctx is None:
+ raise RuntimeError("No context provided")
- if ctx:
- await ctx.session.send_log_message(
- level="info",
- data=f"Deleting knowledge graph core '{core_id}' via websocket...",
- logger="notification_stream",
- related_request_id=ctx.request_id,
- )
+ logging.info("Delete KG core request made via websocket")
+
+ manager = await get_socket_manager(ctx)
+
+ await ctx.session.send_log_message(
+ level="info",
+ data=f"Deleting knowledge graph core '{core_id}' via websocket...",
+ logger="notification_stream",
+ related_request_id=ctx.request_id,
+ )
request_data = {
"operation": "delete-kg-core",
"id": core_id,
}
- gen = manager.request(
- "knowledge", request_data, None, workspace=workspace,
- )
+ gen = manager.request("knowledge", request_data, None)
async for response in gen:
break
-
+
return DeleteKgCoreResponse()
async def load_kg_core(
@@ -1328,34 +1237,46 @@ class McpServer:
core_id: str,
flow: str,
collection: str | None = None,
- workspace: str | None = None,
ctx: Context = None,
) -> LoadKgCoreResponse:
"""
Load a knowledge graph core into the active system for querying.
+ This operation makes a knowledge core available for GraphRAG queries,
+ triple searches, and other knowledge-based operations.
+
Args:
core_id: Unique identifier of the knowledge core to load.
- flow: Processing flow to use for loading the core.
- collection: Target collection name (default: "default").
- workspace: Optional workspace. If omitted, uses the caller's
- default workspace.
+ flow: Processing flow to use for loading the core. Different flows
+ may apply different processing, indexing, or optimization steps.
+ collection: Target collection name (default: "default"). The loaded
+ knowledge will be available under this collection name.
Returns:
LoadKgCoreResponse confirming the core has been loaded.
+
+ Use this for:
+ - Making knowledge cores available for queries
+ - Switching between different knowledge domains
+ - Loading domain-specific knowledge for tasks
+ - Preparing knowledge for GraphRAG operations
"""
if collection is None: collection = "default"
- manager = await self._get_manager(ctx)
+ if ctx is None:
+ raise RuntimeError("No context provided")
- if ctx:
- await ctx.session.send_log_message(
- level="info",
- data=f"Loading knowledge graph core '{core_id}' via websocket...",
- logger="notification_stream",
- related_request_id=ctx.request_id,
- )
+ logging.info("Load KG core request made via websocket")
+
+ manager = await get_socket_manager(ctx)
+
+ await ctx.session.send_log_message(
+ level="info",
+ data=f"Loading knowledge graph core '{core_id}' via websocket...",
+ logger="notification_stream",
+ related_request_id=ctx.request_id,
+ )
request_data = {
"operation": "load-kg-core",
@@ -1364,241 +1285,292 @@ class McpServer:
"collection": collection
}
- gen = manager.request(
- "knowledge", request_data, None, workspace=workspace,
- )
+ gen = manager.request("knowledge", request_data, None)
async for response in gen:
break
-
+
return LoadKgCoreResponse()
async def get_kg_core(
self,
core_id: str,
- workspace: str | None = None,
ctx: Context = None,
) -> GetKgCoreResponse:
"""
Download and retrieve the complete content of a knowledge graph core.
+ This tool streams the entire content of a knowledge core, returning all
+ entities, relationships, and metadata. Due to potentially large data sizes,
+ the content is streamed in chunks.
+
Args:
core_id: Unique identifier of the knowledge core to retrieve.
- workspace: Optional workspace. If omitted, uses the caller's
- default workspace.
Returns:
GetKgCoreResponse containing all chunks of the knowledge core data.
+ Each chunk contains part of the knowledge graph structure.
+
+ Use this for:
+ - Examining knowledge core content and structure
+ - Debugging knowledge graph data
+ - Exporting knowledge for backup or analysis
+ - Understanding the scope and quality of knowledge
+
+ Note: Large knowledge cores may take significant time to download.
+ Progress updates are provided through log messages during streaming.
"""
- manager = await self._get_manager(ctx)
+ if ctx is None:
+ raise RuntimeError("No context provided")
- if ctx:
- await ctx.session.send_log_message(
- level="info",
- data=f"Retrieving knowledge graph core '{core_id}' via websocket...",
- logger="notification_stream",
- related_request_id=ctx.request_id,
- )
+ logging.info("Get KG core request made via websocket")
+
+ manager = await get_socket_manager(ctx)
+
+ await ctx.session.send_log_message(
+ level="info",
+ data=f"Retrieving knowledge graph core '{core_id}' via websocket...",
+ logger="notification_stream",
+ related_request_id=ctx.request_id,
+ )
request_data = {
"operation": "get-kg-core",
"id": core_id,
}
+ # Collect all streaming responses
chunks = []
- gen = manager.request(
- "knowledge", request_data, None, workspace=workspace,
- )
+ gen = manager.request("knowledge", request_data, None)
async for response in gen:
+ # Check for end of stream
if response.get("eos", False):
- if ctx:
- await ctx.session.send_log_message(
- level="info",
- data="Completed streaming KG core data",
- logger="notification_stream",
- related_request_id=ctx.request_id,
- )
+ await ctx.session.send_log_message(
+ level="info",
+ data=f"Completed streaming KG core data",
+ logger="notification_stream",
+ related_request_id=ctx.request_id,
+ )
break
else:
chunks.append(response)
- if ctx:
- await ctx.session.send_log_message(
- level="info",
- data=f"Received KG core chunk ({len(chunks)} chunks so far)",
- logger="notification_stream",
- related_request_id=ctx.request_id,
- )
-
+ await ctx.session.send_log_message(
+ level="info",
+ data=f"Received KG core chunk ({len(chunks)} chunks so far)",
+ logger="notification_stream",
+ related_request_id=ctx.request_id,
+ )
+
return GetKgCoreResponse(chunks=chunks)
async def get_flows(
self,
- workspace: str | None = None,
ctx: Context = None,
) -> FlowsResponse:
"""
List all available processing flows in the system.
-
- Args:
- workspace: Optional workspace. If omitted, uses the caller's
- default workspace.
-
+
+ Flows define processing pipelines for different types of operations
+ (e.g., document processing, knowledge extraction, query handling).
+ Each flow encapsulates a specific workflow with configured steps.
+
Returns:
FlowsResponse containing a list of available flow identifiers.
+
+ Use this for:
+ - Discovering available processing workflows
+ - Understanding what processing options are available
+ - Choosing appropriate flows for specific tasks
+ - Planning workflow-based operations
"""
- manager = await self._get_manager(ctx)
+ if ctx is None:
+ raise RuntimeError("No context provided")
- if ctx:
- await ctx.session.send_log_message(
- level="info",
- data="Retrieving available flows via websocket...",
- logger="notification_stream",
- related_request_id=ctx.request_id,
- )
+ logging.info("Get flows request made via websocket")
+
+ manager = await get_socket_manager(ctx, "trustgraph")
+
+ await ctx.session.send_log_message(
+ level="info",
+ data=f"Retrieving available flows via websocket...",
+ logger="notification_stream",
+ related_request_id=ctx.request_id,
+ )
request_data = {
"operation": "list-flows"
}
- gen = manager.request(
- "flow", request_data, None, workspace=workspace,
- )
+ gen = manager.request("flow", request_data, None)
async for response in gen:
flow_ids = response.get("flow-ids", [])
break
-
+
return FlowsResponse(flow_ids=flow_ids)
async def get_flow(
self,
flow_id: str,
- workspace: str | None = None,
ctx: Context = None,
) -> FlowResponse:
"""
Retrieve the complete definition of a specific processing flow.
-
+
+ This tool returns the detailed configuration, steps, and parameters
+ of a processing flow, showing how it processes data and what operations it performs.
+
Args:
flow_id: Unique identifier of the flow to retrieve.
- workspace: Optional workspace. If omitted, uses the caller's
- default workspace.
-
+
Returns:
- FlowResponse containing the complete flow definition.
+ FlowResponse containing the complete flow definition including:
+ - Flow configuration and parameters
+ - Processing steps and their order
+ - Input/output specifications
+ - Dependencies and requirements
+
+ Use this for:
+ - Understanding how specific flows work
+ - Debugging flow processing issues
+ - Learning flow configuration patterns
+ - Customizing or duplicating flows
"""
- manager = await self._get_manager(ctx)
+ if ctx is None:
+ raise RuntimeError("No context provided")
- if ctx:
- await ctx.session.send_log_message(
- level="info",
- data=f"Retrieving flow definition for '{flow_id}' via websocket...",
- logger="notification_stream",
- related_request_id=ctx.request_id,
- )
+ logging.info("Get flow request made via websocket")
+
+ manager = await get_socket_manager(ctx, "trustgraph")
+
+ await ctx.session.send_log_message(
+ level="info",
+ data=f"Retrieving flow definition for '{flow_id}' via websocket...",
+ logger="notification_stream",
+ related_request_id=ctx.request_id,
+ )
request_data = {
"operation": "get-flow",
"flow-id": flow_id,
}
- gen = manager.request(
- "flow", request_data, None, workspace=workspace,
- )
+ gen = manager.request("flow", request_data, None)
async for response in gen:
flow_data = response.get("flow", "{}")
+ # Parse JSON flow definition as done in TypeScript
flow = json.loads(flow_data) if isinstance(flow_data, str) else flow_data
break
-
+
return FlowResponse(flow=flow)
async def get_flow_classes(
self,
- workspace: str | None = None,
ctx: Context = None,
) -> FlowClassesResponse:
"""
List all available flow class templates.
-
- Args:
- workspace: Optional workspace. If omitted, uses the caller's
- default workspace.
-
+
+ Flow classes are templates that define types of processing workflows.
+ They serve as blueprints for creating specific flow instances with
+ customized parameters.
+
Returns:
FlowClassesResponse containing a list of available flow class names.
+
+ Use this for:
+ - Discovering available flow templates
+ - Understanding what types of processing are supported
+ - Planning new flow creation
+ - Exploring system capabilities
"""
- manager = await self._get_manager(ctx)
+ if ctx is None:
+ raise RuntimeError("No context provided")
- if ctx:
- await ctx.session.send_log_message(
- level="info",
- data="Retrieving flow classes via websocket...",
- logger="notification_stream",
- related_request_id=ctx.request_id,
- )
+ logging.info("Get flow classes request made via websocket")
+
+ manager = await get_socket_manager(ctx, "trustgraph")
+
+ await ctx.session.send_log_message(
+ level="info",
+ data=f"Retrieving flow classes via websocket...",
+ logger="notification_stream",
+ related_request_id=ctx.request_id,
+ )
request_data = {
"operation": "list-classes"
}
- gen = manager.request(
- "flow", request_data, None, workspace=workspace,
- )
+ gen = manager.request("flow", request_data, None)
async for response in gen:
class_names = response.get("class-names", [])
break
-
+
return FlowClassesResponse(class_names=class_names)
async def get_flow_class(
self,
class_name: str,
- workspace: str | None = None,
ctx: Context = None,
) -> FlowClassResponse:
"""
Retrieve the definition of a specific flow class template.
-
+
+ Flow classes define the structure, parameters, and capabilities of
+ flow types. This tool returns the class specification including
+ configurable parameters and processing logic.
+
Args:
class_name: Name of the flow class to retrieve.
- workspace: Optional workspace. If omitted, uses the caller's
- default workspace.
-
+
Returns:
- FlowClassResponse containing the flow class definition.
+ FlowClassResponse containing the flow class definition with:
+ - Class parameters and configuration options
+ - Processing capabilities and requirements
+ - Usage instructions and examples
+
+ Use this for:
+ - Understanding flow class capabilities
+ - Learning how to configure new flows
+ - Troubleshooting flow creation issues
+ - Exploring advanced flow features
"""
- manager = await self._get_manager(ctx)
+ if ctx is None:
+ raise RuntimeError("No context provided")
- if ctx:
- await ctx.session.send_log_message(
- level="info",
- data=f"Retrieving flow class definition for '{class_name}' via websocket...",
- logger="notification_stream",
- related_request_id=ctx.request_id,
- )
+ logging.info("Get flow class request made via websocket")
+
+ manager = await get_socket_manager(ctx, "trustgraph")
+
+ await ctx.session.send_log_message(
+ level="info",
+ data=f"Retrieving flow class definition for '{class_name}' via websocket...",
+ logger="notification_stream",
+ related_request_id=ctx.request_id,
+ )
request_data = {
"operation": "get-class",
"class-name": class_name
}
- gen = manager.request(
- "flow", request_data, None, workspace=workspace,
- )
+ gen = manager.request("flow", request_data, None)
async for response in gen:
class_def_data = response.get("class-definition", "{}")
+ # Parse JSON class definition as done in TypeScript
class_definition = json.loads(class_def_data) if isinstance(class_def_data, str) else class_def_data
break
-
+
return FlowClassResponse(class_definition=class_definition)
async def start_flow(
@@ -1606,32 +1578,43 @@ class McpServer:
flow_id: str,
class_name: str,
description: str,
- workspace: str | None = None,
ctx: Context = None,
) -> StartFlowResponse:
"""
Create and start a new processing flow instance.
-
+
+ This tool creates a new flow based on a flow class template and starts
+ it running. The flow will begin processing according to its configuration.
+
Args:
flow_id: Unique identifier for the new flow instance.
class_name: Flow class template to use for creating the flow.
+ Use get_flow_classes to see available classes.
description: Human-readable description of the flow's purpose.
- workspace: Optional workspace. If omitted, uses the caller's
- default workspace.
-
+
Returns:
StartFlowResponse confirming the flow has been started.
+
+ Use this for:
+ - Creating new processing workflows
+ - Starting automated processing tasks
+ - Launching background operations
+ - Initiating data processing pipelines
"""
- manager = await self._get_manager(ctx)
+ if ctx is None:
+ raise RuntimeError("No context provided")
- if ctx:
- await ctx.session.send_log_message(
- level="info",
- data=f"Starting flow '{flow_id}' with class '{class_name}' via websocket...",
- logger="notification_stream",
- related_request_id=ctx.request_id,
- )
+ logging.info("Start flow request made via websocket")
+
+ manager = await get_socket_manager(ctx, "trustgraph")
+
+ await ctx.session.send_log_message(
+ level="info",
+ data=f"Starting flow '{flow_id}' with class '{class_name}' via websocket...",
+ logger="notification_stream",
+ related_request_id=ctx.request_id,
+ )
request_data = {
"operation": "start-flow",
@@ -1640,135 +1623,162 @@ class McpServer:
"description": description
}
- gen = manager.request(
- "flow", request_data, None, workspace=workspace,
- )
+ gen = manager.request("flow", request_data, None)
async for response in gen:
break
-
+
return StartFlowResponse()
async def stop_flow(
self,
flow_id: str,
- workspace: str | None = None,
ctx: Context = None,
) -> StopFlowResponse:
"""
Stop a running flow instance.
-
+
+ This tool gracefully stops a running flow, allowing it to complete
+ current operations before shutting down.
+
Args:
flow_id: Unique identifier of the flow instance to stop.
- workspace: Optional workspace. If omitted, uses the caller's
- default workspace.
-
+
Returns:
StopFlowResponse confirming the flow has been stopped.
+
+ Use this for:
+ - Stopping unwanted or completed flows
+ - Managing system resources
+ - Interrupting long-running processes
+ - Maintaining flow lifecycle
"""
- manager = await self._get_manager(ctx)
+ if ctx is None:
+ raise RuntimeError("No context provided")
- if ctx:
- await ctx.session.send_log_message(
- level="info",
- data=f"Stopping flow '{flow_id}' via websocket...",
- logger="notification_stream",
- related_request_id=ctx.request_id,
- )
+ logging.info("Stop flow request made via websocket")
+
+ manager = await get_socket_manager(ctx, "trustgraph")
+
+ await ctx.session.send_log_message(
+ level="info",
+ data=f"Stopping flow '{flow_id}' via websocket...",
+ logger="notification_stream",
+ related_request_id=ctx.request_id,
+ )
request_data = {
"operation": "stop-flow",
"flow-id": flow_id
}
- gen = manager.request(
- "flow", request_data, None, workspace=workspace,
- )
+ gen = manager.request("flow", request_data, None)
async for response in gen:
break
-
+
return StopFlowResponse()
async def get_documents(
self,
- workspace: str | None = None,
ctx: Context = None,
) -> DocumentsResponse:
"""
List all documents stored in the TrustGraph document library.
- Args:
- workspace: Optional workspace. If omitted, uses the caller's
- default workspace.
+ This tool returns metadata for all documents that have been uploaded
+ to the system, including their processing status and properties.
Returns:
- DocumentsResponse containing metadata for each document.
+ DocumentsResponse containing metadata for each document including:
+ - Document ID and title
+ - Upload timestamp
+ - MIME type and size information
+ - Tags and custom metadata
+ - Processing status
+
+ Use this for:
+ - Browsing available documents
+ - Managing document collections
+ - Finding documents by metadata
+ - Auditing document storage
"""
- manager = await self._get_manager(ctx)
+ if ctx is None:
+ raise RuntimeError("No context provided")
- if ctx:
- await ctx.session.send_log_message(
- level="info",
- data="Retrieving documents list via websocket...",
- logger="notification_stream",
- related_request_id=ctx.request_id,
- )
+ logging.info("Get documents request made via websocket")
+
+ manager = await get_socket_manager(ctx)
+
+ await ctx.session.send_log_message(
+ level="info",
+ data=f"Retrieving documents list via websocket...",
+ logger="notification_stream",
+ related_request_id=ctx.request_id,
+ )
request_data = {
"operation": "list-documents",
}
- gen = manager.request(
- "librarian", request_data, None, workspace=workspace,
- )
+ gen = manager.request("librarian", request_data, None)
async for response in gen:
document_metadatas = response.get("document-metadatas", [])
break
-
+
return DocumentsResponse(document_metadatas=document_metadatas)
async def get_processing(
self,
- workspace: str | None = None,
ctx: Context = None,
) -> ProcessingResponse:
"""
List all documents currently in the processing queue.
- Args:
- workspace: Optional workspace. If omitted, uses the caller's
- default workspace.
+ This tool shows documents that are being processed or waiting to be
+ processed, along with their processing status and configuration.
Returns:
- ProcessingResponse containing processing metadata.
+ ProcessingResponse containing processing metadata including:
+ - Processing job ID and document ID
+ - Processing flow and status
+ - Target collection
+ - Timestamp and progress information
+
+ Use this for:
+ - Monitoring document processing progress
+ - Debugging processing issues
+ - Managing processing queues
+ - Understanding system workload
"""
- manager = await self._get_manager(ctx)
+ if ctx is None:
+ raise RuntimeError("No context provided")
- if ctx:
- await ctx.session.send_log_message(
- level="info",
- data="Retrieving processing list via websocket...",
- logger="notification_stream",
- related_request_id=ctx.request_id,
- )
+ logging.info("Get processing request made via websocket")
+
+ manager = await get_socket_manager(ctx)
+
+ await ctx.session.send_log_message(
+ level="info",
+ data=f"Retrieving processing list via websocket...",
+ logger="notification_stream",
+ related_request_id=ctx.request_id,
+ )
request_data = {
"operation": "list-processing",
}
- gen = manager.request(
- "librarian", request_data, None, workspace=workspace,
- )
+ gen = manager.request("librarian", request_data, None)
async for response in gen:
processing_metadatas = response.get("processing-metadatas", [])
break
-
+
return ProcessingResponse(processing_metadatas=processing_metadatas)
async def load_document(
@@ -1780,39 +1790,50 @@ class McpServer:
title: str = "",
comments: str = "",
tags: List[str] | None = None,
- workspace: str | None = None,
ctx: Context = None,
) -> LoadDocumentResponse:
"""
Upload a document to the TrustGraph document library.
+ This tool stores documents with rich metadata for later processing,
+ search, and knowledge extraction. Documents can be text files, PDFs,
+ or other supported formats.
+
Args:
document: The document content as a string. For binary files,
this should be base64-encoded content.
document_id: Optional unique identifier. If not provided, one will be generated.
metadata: Optional list of custom metadata key-value pairs.
- mime_type: MIME type of the document.
+ mime_type: MIME type of the document (e.g., 'text/plain', 'application/pdf').
title: Human-readable title for the document.
comments: Optional description or notes about the document.
- tags: List of tags for categorizing the document.
- workspace: Optional workspace. If omitted, uses the caller's
- default workspace.
+ tags: List of tags for categorizing and finding the document.
Returns:
LoadDocumentResponse confirming the document has been stored.
+
+ Use this for:
+ - Adding new documents to the knowledge base
+ - Storing reference materials and data sources
+ - Building document collections for processing
+ - Importing external content for analysis
"""
if tags is None: tags = []
- manager = await self._get_manager(ctx)
+ if ctx is None:
+ raise RuntimeError("No context provided")
- if ctx:
- await ctx.session.send_log_message(
- level="info",
- data="Loading document to library via websocket...",
- logger="notification_stream",
- related_request_id=ctx.request_id,
- )
+ logging.info("Load document request made via websocket")
+
+ manager = await get_socket_manager(ctx)
+
+ await ctx.session.send_log_message(
+ level="info",
+ data=f"Loading document to library via websocket...",
+ logger="notification_stream",
+ related_request_id=ctx.request_id,
+ )
import time
timestamp = int(time.time())
@@ -1831,55 +1852,63 @@ class McpServer:
"content": document
}
- gen = manager.request(
- "librarian", request_data, None, workspace=workspace,
- )
+ gen = manager.request("librarian", request_data, None)
async for response in gen:
break
-
+
return LoadDocumentResponse()
async def remove_document(
self,
document_id: str,
- workspace: str | None = None,
ctx: Context = None,
) -> RemoveDocumentResponse:
"""
Permanently remove a document from the library.
+ This operation deletes a document and all its associated metadata.
+ Use with caution as this action cannot be undone.
+
Args:
document_id: Unique identifier of the document to remove.
- workspace: Optional workspace. If omitted, uses the caller's
- default workspace.
Returns:
RemoveDocumentResponse confirming the document has been deleted.
+
+ Use this for:
+ - Cleaning up obsolete or incorrect documents
+ - Managing storage space
+ - Removing sensitive or inappropriate content
+ - Maintaining organized document collections
+
+ Warning: This permanently deletes the document and all its metadata.
"""
- manager = await self._get_manager(ctx)
+ if ctx is None:
+ raise RuntimeError("No context provided")
- if ctx:
- await ctx.session.send_log_message(
- level="info",
- data=f"Removing document '{document_id}' from library via websocket...",
- logger="notification_stream",
- related_request_id=ctx.request_id,
- )
+ logging.info("Remove document request made via websocket")
+
+ manager = await get_socket_manager(ctx)
+
+ await ctx.session.send_log_message(
+ level="info",
+ data=f"Removing document '{document_id}' from library via websocket...",
+ logger="notification_stream",
+ related_request_id=ctx.request_id,
+ )
request_data = {
"operation": "remove-document",
"document-id": document_id,
}
- gen = manager.request(
- "librarian", request_data, None, workspace=workspace,
- )
+ gen = manager.request("librarian", request_data, None)
async for response in gen:
break
-
+
return RemoveDocumentResponse()
async def add_processing(
@@ -1889,37 +1918,53 @@ class McpServer:
flow: str,
collection: str | None = None,
tags: List[str] | None = None,
- workspace: str | None = None,
ctx: Context = None,
) -> AddProcessingResponse:
"""
Queue a document for processing through a specific workflow.
+ This tool adds a document to the processing queue where it will be
+ processed by the specified flow to extract knowledge, create embeddings,
+ or perform other analysis operations.
+
Args:
processing_id: Unique identifier for this processing job.
document_id: ID of the document to process (must exist in library).
- flow: Processing flow to use.
+ flow: Processing flow to use. Different flows perform different
+ types of analysis (e.g., knowledge extraction, summarization).
collection: Target collection for processed knowledge (default: "default").
+ Results will be stored under this collection name.
tags: Optional tags for categorizing this processing job.
- workspace: Optional workspace. If omitted, uses the caller's
- default workspace.
Returns:
AddProcessingResponse confirming the document has been queued.
+
+ Use this for:
+ - Processing uploaded documents into knowledge
+ - Extracting entities and relationships from text
+ - Creating searchable embeddings
+ - Converting documents into structured knowledge
+
+ Note: Processing may take time depending on document size and flow complexity.
+ Use get_processing to monitor progress.
"""
if collection is None: collection = "default"
if tags is None: tags = []
- manager = await self._get_manager(ctx)
+ if ctx is None:
+ raise RuntimeError("No context provided")
- if ctx:
- await ctx.session.send_log_message(
- level="info",
- data=f"Adding document '{document_id}' to processing queue via websocket...",
- logger="notification_stream",
- related_request_id=ctx.request_id,
- )
+ logging.info("Add processing request made via websocket")
+
+ manager = await get_socket_manager(ctx)
+
+ await ctx.session.send_log_message(
+ level="info",
+ data=f"Adding document '{document_id}' to processing queue via websocket...",
+ logger="notification_stream",
+ related_request_id=ctx.request_id,
+ )
import time
timestamp = int(time.time())
@@ -1936,61 +1981,38 @@ class McpServer:
}
}
- gen = manager.request(
- "librarian", request_data, None, workspace=workspace,
- )
+ gen = manager.request("librarian", request_data, None)
async for response in gen:
break
-
+
return AddProcessingResponse()
-
def main():
parser = argparse.ArgumentParser(description='TrustGraph MCP Server')
- parser.add_argument(
- '--host', default='0.0.0.0',
- help='Host to bind to (default: 0.0.0.0)',
- )
- parser.add_argument(
- '--port', type=int, default=8000,
- help='Port to bind to (default: 8000)',
- )
- parser.add_argument(
- '--websocket-url',
- default='ws://api-gateway:8088/api/v1/socket',
- help='WebSocket URL for the TrustGraph gateway',
- )
- parser.add_argument(
- '--auth-issuer',
- default=os.environ.get("AUTH_ISSUER", ""),
- help='OAuth issuer URL for MCP auth metadata discovery',
- )
- parser.add_argument(
- '--auth-resource-url',
- default=os.environ.get("AUTH_RESOURCE_URL", ""),
- help='Resource server URL for OAuth protected resource metadata',
- )
+ parser.add_argument('--host', default='0.0.0.0', help='Host to bind to (default: 0.0.0.0)')
+ parser.add_argument('--port', type=int, default=8000, help='Port to bind to (default: 8000)')
+ parser.add_argument('--websocket-url', default='ws://api-gateway:8088/api/v1/socket', help='WebSocket URL to connect to (default: ws://api-gateway:8088/api/v1/socket)')
+ # Add logging arguments
add_logging_args(parser)
args = parser.parse_args()
+ # Setup logging before creating server
setup_logging(vars(args))
- server = McpServer(
- host=args.host,
- port=args.port,
- websocket_url=args.websocket_url,
- auth_issuer=args.auth_issuer,
- auth_resource_url=args.auth_resource_url,
- )
+ # Read gateway auth token from environment
+ gateway_token = os.environ.get("GATEWAY_SECRET", "")
+
+ # Create and run the MCP server
+ server = McpServer(host=args.host, port=args.port, websocket_url=args.websocket_url, gateway_token=gateway_token)
server.run()
-
def run():
+ """Legacy function for backward compatibility"""
main()
-
if __name__ == "__main__":
main()
+
diff --git a/trustgraph-mcp/trustgraph/mcp_server/tg_socket.py b/trustgraph-mcp/trustgraph/mcp_server/tg_socket.py
index 9fbf7459..bff8ae75 100644
--- a/trustgraph-mcp/trustgraph/mcp_server/tg_socket.py
+++ b/trustgraph-mcp/trustgraph/mcp_server/tg_socket.py
@@ -1,110 +1,49 @@
+from dataclasses import dataclass
from websockets.asyncio.client import connect
+from urllib.parse import urlencode, urlparse, urlunparse, parse_qs
import asyncio
import logging
import json
import uuid
-import hashlib
-
-logger = logging.getLogger(__name__)
-
-
-def _token_key(token):
- """Derive a dict key from a token without storing the raw secret."""
- return hashlib.sha256(token.encode()).hexdigest()[:16]
-
+import time
class WebSocketManager:
- """Manages an authenticated WebSocket connection to the TrustGraph
- gateway on behalf of a single caller.
- Each caller token gets its own WebSocketManager so that gateway-side
- identity, workspace, and capability scoping are preserved end-to-end.
- """
-
- def __init__(self, url, token):
+ def __init__(self, url, token=None):
self.url = url
- # ── Security boundary: token storage ──
- # This is the MCP caller's Bearer token, forwarded verbatim to
- # the gateway. It MUST NOT be logged, persisted, or shared
- # across callers. It is held only for the lifetime of this
- # connection so that re-auth (e.g. after a reconnect) is
- # possible.
self.token = token
self.socket = None
- self.identity = None
- self.last_used = None
+
+ # FIXME: authentication is broken. The /api/v1/socket endpoint uses
+ # in-band auth (first-frame protocol via the Mux dispatcher), not
+ # query-parameter tokens. This query-string token is silently ignored.
+ # Fix: after connect(), send an auth frame with the bearer token as
+ # the first message, matching the gateway's in-band auth protocol.
+ def _build_url(self):
+ if not self.token:
+ return self.url
+ parsed = urlparse(self.url)
+ params = parse_qs(parsed.query)
+ params["token"] = [self.token]
+ new_query = urlencode(params, doseq=True)
+ return urlunparse(parsed._replace(query=new_query))
async def start(self):
- """Connect and authenticate via the gateway's in-band auth
- protocol. Raises on auth failure."""
-
- # ── Security boundary: MCP server → gateway ──
- # The WebSocket connects to the gateway and authenticates using
- # the caller's Bearer token via the in-band first-frame auth
- # protocol. The token belongs to the MCP client — we forward
- # it as-is and never interpret its contents.
- self.socket = await connect(self.url)
+ self.socket = await connect(self._build_url())
self.pending_requests = {}
self.running = True
-
- await self._authenticate()
-
self.reader_task = asyncio.create_task(self.reader())
- async def _authenticate(self):
- """Send in-band auth frame and wait for auth-ok / auth-failed.
-
- The gateway expects ``{"type": "auth", "token": "..."}`` as the
- first frame on a new WebSocket. Any service frame sent before
- auth-ok is rejected.
- """
- await self.socket.send(json.dumps({
- "type": "auth",
- "token": self.token,
- }))
-
- response_text = await asyncio.wait_for(self.socket.recv(), 10)
- response = json.loads(response_text)
-
- if response.get("type") == "auth-ok":
- logger.info(
- "WebSocket authenticated, default workspace: %s",
- response.get("workspace"),
- )
- return
-
- # Auth failed — close immediately, do not leave an
- # unauthenticated socket open.
- await self.socket.close()
- self.socket = None
-
- if response.get("type") == "auth-failed":
- raise RuntimeError(
- "Gateway rejected the authentication token"
- )
-
- raise RuntimeError(
- f"Unexpected auth response type: {response.get('type')}"
- )
-
- async def whoami(self):
- """Verify the token by calling the gateway's whoami endpoint.
- Returns the identity dict and caches it on ``self.identity``.
- """
- gen = self.request("iam", {"operation": "whoami"}, flow_id=None)
- async for response in gen:
- self.identity = response
- return response
-
async def stop(self):
self.running = False
- if hasattr(self, "reader_task"):
- await self.reader_task
+ await self.reader_task
async def reader(self):
- """Background task: read WebSocket frames and route them to the
- correct pending-request queue by ``id``."""
+ """
+ Background task to read websocket responses and route to correct
+ request
+ """
while self.running:
try:
@@ -120,21 +59,23 @@ class WebSocketManager:
request_id = response.get("id")
if request_id and request_id in self.pending_requests:
+ # Put the response in the queue
queue = self.pending_requests[request_id]
await queue.put(response)
else:
- logger.warning(
- "Response for unknown request ID: %s", request_id
+ logging.warning(
+ f"Response for unknown request ID: {request_id}"
)
except Exception as e:
- logger.error("Error in websocket reader: %s", e)
+ logging.error(f"Error in websocket reader: {e}")
+ # Put error in all pending queues
for queue in self.pending_requests.values():
try:
await queue.put({"error": str(e)})
- except Exception:
+ except:
pass
self.pending_requests.clear()
@@ -145,29 +86,25 @@ class WebSocketManager:
async def request(
self, service, request_data, flow_id="default",
- workspace=None,
):
- """Send a request via WebSocket and yield responses.
-
- Args:
- service: Gateway service name (e.g. "graph-rag", "config").
- request_data: Inner request payload.
- flow_id: Optional flow identifier. ``None`` omits the field
- (workspace-level services don't use flows).
- workspace: Optional workspace override. When ``None`` the
- gateway uses the caller's default workspace.
+ """
+ Send a request via websocket and handle single or streaming responses
"""
- import time
- self.last_used = time.monotonic()
-
+ # Generate unique request ID
request_id = f"{uuid.uuid4()}"
+ # Determine if this service streams responses
+ streaming_services = {"agent"}
+ is_streaming = service in streaming_services
+
+ # Create a queue for all responses (streaming and single)
response_queue = asyncio.Queue()
self.pending_requests[request_id] = response_queue
try:
+ # Build request message
message = {
"id": request_id,
"service": service,
@@ -177,16 +114,7 @@ class WebSocketManager:
if flow_id is not None:
message["flow"] = flow_id
- # ── Security boundary: workspace scoping ──
- # When the caller supplies a workspace, we set it on the
- # message envelope. The gateway's enforce_workspace()
- # validates that the authenticated identity is permitted
- # to access the target workspace — we MUST NOT skip or
- # override that check. When workspace is None, the
- # gateway default-fills from the identity's bound workspace.
- if workspace is not None:
- message["workspace"] = workspace
-
+ # Send request
await self.socket.send(json.dumps(message))
while self.running:
@@ -199,17 +127,19 @@ class WebSocketManager:
continue
if "error" in response:
- if isinstance(response["error"], dict):
- raise RuntimeError(
- response["error"].get("message", str(response["error"]))
- )
+ if "message" in response["error"]:
+ raise RuntimeError(response["error"]["text"])
else:
raise RuntimeError(str(response["error"]))
yield response["response"]
- if response.get("complete"):
- break
+ if "complete" in response:
+ if response["complete"]:
+ break
- finally:
+ except Exception as e:
+ # Clean up on error
self.pending_requests.pop(request_id, None)
+ raise e
+