Compare commits

...

29 commits

Author SHA1 Message Date
Cyber MacGeddon
a3df4f62bb Merge branch 'master' into release/v2.6 2026-06-22 21:20:29 +01:00
cybermaggedon
09b8a1d347
feat: fine-grained capabilities and enterprise IAM schema extensions (#996)
Split coarse gateway capabilities into fine-grained variants to
support per-operation access control in the enterprise IAM regime.
Add additive schema fields for enterprise group and grant management.

Capability split (gateway registry):
- graph:read -> triples:read, sparql:read, graph-rag:read,
  graph-embeddings:read
- graph:write -> triples:write, graph-embeddings:write,
  entity-contexts:write
- documents:read -> documents:read, document-rag:read,
  document-embeddings:read, entity-contexts:read
- documents:write -> documents:write, document-embeddings:write
- rows:read -> rows:read, nlp-query:read, structured-query:read,
  row-embeddings:read

OSS role definitions expanded to include all new fine-grained
capability names — no behavioral change for OSS deployments.

Schema additions (IamRequest):
- group_id, member_type, member_id for group membership operations
- group (GroupInput), grant (GrantInput) for create/update payloads
- Decoder now handles capability, resource_json, parameters_json,
  authorise_checks (previously missing from translator)

Schema additions (IamResponse):
- group_json, groups_json, members_json, grants_json,
  effective_permissions_json for enterprise operation responses
- Encoder now emits authorise decision fields

Gateway registry:
- 16 enterprise IAM operations registered (create-group,
  add-group-member, add-user-grant, etc.) under iam:admin capability
2026-06-22 20:23:34 +01:00
Jack Colquitt
fa264ded46
Update section titles for Holonic Context Graph (#995) 2026-06-18 19:56:44 -07:00
Jack Colquitt
cae931409a
Update TrustGraph description in README (#994)
Clarified the description of TrustGraph's capabilities and API integrations.
2026-06-18 19:46:25 -07:00
Jack Colquitt
6b0475e315
Revise README for clarity on TrustGraph features (#993)
Updated the README to clarify the concept of holons and the functionality of TrustGraph. Improved the structure and flow of information regarding context management and agent explainability.
2026-06-18 19:42:16 -07:00
Jack Colquitt
cb0ad1a450
Change video link in README (#992)
Updated video source link in README.md.
2026-06-17 17:52:30 -07:00
Jack Colquitt
fc0ecc770a
Format terms as code in README.md (#991) 2026-06-17 16:53:22 -07:00
Jack Colquitt
345da375b1
Document Workspaces, Collections, and Flows in README (#990)
Added section on Workspaces, Collections, and Flows to explain the organizational structure of TrustGraph.
2026-06-17 16:48:22 -07:00
Jack Colquitt
0ba1eeeda0
Enhance README with token consumption details (#989)
Added a note about reducing token consumption in context management.
2026-06-17 16:27:16 -07:00
Jack Colquitt
eb1e38d7d0
Add hyperlink to 'holon' in README.md (#988) 2026-06-17 16:13:41 -07:00
Jack Colquitt
b8770a6005
Update README with new context and features (#987) 2026-06-17 16:08:22 -07:00
Jack Colquitt
28802a644a
Update license badge in README.md (#986) 2026-06-11 20:45:39 -07:00
cybermaggedon
8797d9d9ff feat: per-caller Bearer token auth and new query tools for MCP server (#984)
Replace the broken GATEWAY_SECRET auth (token was sent as a query
parameter, silently ignored by the gateway) with end-to-end Bearer
token forwarding.  Each MCP caller gets a dedicated WebSocket
authenticated via the gateway's in-band first-frame protocol, with
whoami verification on first connect.

Also fix and extend the tool surface:
- embeddings: accept list of texts (was single string)
- triples_query: use Term wire format with compact keys (was legacy
  Value format), add collection and graph parameters
- sparql_query: new tool for SPARQL SELECT/ASK/CONSTRUCT/DESCRIBE
- graphql_query: new tool for structured data (rows) GraphQL queries
- all tools: add optional workspace parameter
2026-06-10 14:11:49 +01:00
cybermaggedon
627c669097
feat: per-caller Bearer token auth and new query tools for MCP server (#984)
Replace the broken GATEWAY_SECRET auth (token was sent as a query
parameter, silently ignored by the gateway) with end-to-end Bearer
token forwarding.  Each MCP caller gets a dedicated WebSocket
authenticated via the gateway's in-band first-frame protocol, with
whoami verification on first connect.

Also fix and extend the tool surface:
- embeddings: accept list of texts (was single string)
- triples_query: use Term wire format with compact keys (was legacy
  Value format), add collection and graph parameters
- sparql_query: new tool for SPARQL SELECT/ASK/CONSTRUCT/DESCRIBE
- graphql_query: new tool for structured data (rows) GraphQL queries
- all tools: add optional workspace parameter
2026-06-10 14:10:43 +01:00
cybermaggedon
8b0619e5d8
Bump version numbers to 2.6 (#983) 2026-06-09 20:03:14 +01:00
cybermaggedon
e3f9f8c357
Merge pull request #982 from trustgraph-ai/master
master -> release/v2.6
2026-06-09 19:46:50 +01:00
Cyber MacGeddon
81d57826c8 Merge branch 'release/v2.5' 2026-06-09 19:43:31 +01:00
Jacob Molz
79d7ef6a90 fix: reject invalid PDF decoder input (#977) 2026-06-09 16:37:39 +01:00
Jacob Molz
28a51c244f
fix: reject invalid PDF decoder input (#977) 2026-06-09 16:37:10 +01:00
Cyber MacGeddon
fa5ebe2393 Merge branch 'release/v2.5' 2026-06-09 16:34:20 +01:00
cybermaggedon
e1c9351454
fix: update row query tests to mock async_execute_paged and async_scan (#979)
The query service now uses async_execute_paged (indexed path) and
async_scan (scan path) instead of async_execute. Tests were mocking
the old function, causing them to hang indefinitely.
2026-06-09 16:29:32 +01:00
cybermaggedon
dbc21c0bb9
fix: structured data query and auth fixes (#978)
- Pass auth token to schema discovery and descriptor generation in
  tg-load-structured-data, fixing 401 errors with IAM enabled
- Fix row query pagination: replace single-page async_execute with
  async_scan that streams pages and applies filters without
  materialising the full result set (OOM on large datasets)
- Add missing filter operators (not, startsWith, endsWith, not_in)
  to row query post-filter matching
- Fall back to scan path when an indexed field is queried with an
  empty string value, since empty index values are not stored
- Revert top-level indexes array support — the current table schema
  overwrites rows with duplicate index values, so only primary_key
  fields are safe to index until the schema is redesigned
2026-06-08 15:22:11 +01:00
cybermaggedon
08bfec1539
fix: wire replication params through YAML/params path for Cassandra and Qdrant (#976)
resolve_cassandra_config did not accept replication_factor as a kwarg,
so cassandra_replication_factor from YAML params was silently ignored
by all 6 callers. Add the kwarg and pass it from every caller.

Same fix for Qdrant: 3 writers now pass qdrant_replication_factor and
qdrant_shard_number from params.

Add tests covering the params path for both helpers.
2026-06-04 12:36:36 +01:00
cybermaggedon
4913f8c2eb
feat: data store replication configuration and TLS upgrade (#975)
- Add centralised qdrant_config.py helper with env-var fallback for
  QDRANT_URL, QDRANT_API_KEY, QDRANT_REPLICATION_FACTOR, QDRANT_SHARD_NUMBER
- Update all 6 Qdrant processors to use the helper; writers pass
  replication_factor and shard_number to create_collection
- Fix hardcoded Cassandra replication_factor=1 in cassandra_kg.py,
  write.py, and sparql_cassandra.py to respect CASSANDRA_REPLICATION_FACTOR
- Upgrade Cassandra TLS from deprecated PROTOCOL_TLSv1_2 to
  ssl.create_default_context() across all connectors
2026-06-04 11:49:29 +01:00
cybermaggedon
acf182c265
feat: add env-var fallback for librarian object-store config (#974)
The librarian now reads OBJECT_STORE_ENDPOINT, OBJECT_STORE_ACCESS_KEY,
OBJECT_STORE_SECRET_KEY, OBJECT_STORE_REGION, and OBJECT_STORE_USE_SSL
from the environment when not set via params. This lets K8s Secrets
supply credentials without them appearing in launch.yaml.
2026-06-03 10:59:58 +01:00
Jack Colquitt
97453d9b83
Change project title to 'The semantic deployment platform' (#968)
Updated the project title in the README.
2026-06-01 14:08:30 -07:00
Jack Colquitt
6dfa47aac8
Revise README for semantic infrastructure terminology (#962)
Updated the README to reflect changes in terminology and improve clarity regarding the platform's features.
2026-05-30 17:07:19 -07:00
Cyber MacGeddon
dcee842455 Merge branch 'release/v2.5' 2026-05-28 11:26:43 +01:00
cybermaggedon
36eadbda3a
Merge pull request #953 from trustgraph-ai/release/v2.5
release/v2.5 -> master
2026-05-26 15:01:44 +01:00
45 changed files with 1925 additions and 1459 deletions

View file

@ -22,7 +22,7 @@ jobs:
uses: actions/checkout@v3
- name: Setup packages
run: make update-package-versions VERSION=2.5.999
run: make update-package-versions VERSION=2.6.999
- name: Setup environment
run: python3 -m venv env

282
README.md
View file

@ -3,7 +3,7 @@
<img src="TG-fullname-logo.svg" width=100% />
[![PyPI version](https://img.shields.io/pypi/v/trustgraph.svg)](https://pypi.org/project/trustgraph/) [![License](https://img.shields.io/github/license/trustgraph-ai/trustgraph?color=blue)](LICENSE) ![E2E Tests](https://github.com/trustgraph-ai/trustgraph/actions/workflows/release.yaml/badge.svg)
[![PyPI version](https://img.shields.io/pypi/v/trustgraph.svg)](https://pypi.org/project/trustgraph/) ![License](https://img.shields.io/badge/license-Apache%202.0-blue) ![E2E Tests](https://github.com/trustgraph-ai/trustgraph/actions/workflows/release.yaml/badge.svg)
[![Discord](https://img.shields.io/discord/1251652173201149994
)](https://discord.gg/sQMwkRz5GX) [![Ask DeepWiki](https://deepwiki.com/badge.svg)](https://deepwiki.com/trustgraph-ai/trustgraph)
@ -11,44 +11,89 @@
<a href="https://trendshift.io/repositories/17291" target="_blank"><img src="https://trendshift.io/api/badge/repositories/17291" alt="trustgraph-ai%2Ftrustgraph | Trendshift" style="width: 250px; height: 55px;" width="250" height="55"/></a>
# The agent runtime platform
# Write context once. Run agents anywhere.
</div>
TrustGraph is an agent runtime platform built around context graphs — structured, queryable representations of your domain knowledge that ground every agent query in verified, explainable facts in private deployments with sovereign control. The platform is the full stack for agentic systems: context graphs, memory, retrieval, orchestration, and inference for precision-critical agent workloads.
Stop rebuilding context from scratch. TrustGraph treats context as a holon — a modular, independent whole that naturally snaps into a larger domain-wide intelligence layer. By deploying context as holonic context graphs, TrustGraph powers multi-tenant agent workflows, dramatically reduces token consumption, and aligns with semantic web standards (RDF, OWL, SKOS, SHACL). Version your context, share it across teams, and scale with full provenance.
The platform:
- [x] Multi-model and multimodal database system
- [x] Tabular/relational, key-value
- [x] Document, graph, and vectors
- [x] Images, video, and audio
- [x] Context Graph engine
- [x] Automated entity and relationship extraction
- [x] Ontology-driven graph construction
- [x] Graph-grounded retrieval for explainable outputs
- [x] Automated data ingest and loading
- [x] Quick ingest with semantic similarity retrieval
- [x] Ontology structuring for precision retrieval
- [x] Out-of-the-box RAG pipelines
- [x] DocumentRAG
- [x] GraphRAG
- [x] OntologyRAG
- [x] 3D GraphViz for exploring context
- [x] Fully Agentic System
- [x] Single or Multi Agent
- [x] ReAct, Plan-then-Execute, and Supervisor patterns
- [x] MCP integration
- [x] Run anywhere
- [x] Deploy locally with Docker
- [x] Deploy in cloud with Kubernetes
- [x] Support for all major LLMs
- [x] API support for Anthropic, Cohere, Gemini, Mistral, OpenAI, and others
- [x] Model inferencing with vLLM, Ollama, TGI, LM Studio, and Llamafiles
- [x] Developer friendly
- [x] REST API [Docs](https://docs.trustgraph.ai/reference/apis/rest.html)
- [x] Websocket API [Docs](https://docs.trustgraph.ai/reference/apis/websocket.html)
- [x] Python API [Docs](https://docs.trustgraph.ai/reference/apis/python)
- [x] CLI [Docs](https://docs.trustgraph.ai/reference/cli/)
## What TrustGraph Does
TrustGraph is a complete holonic context harness for all LLMs. It provides the full infrastructure layer underneath your agents: knowledge ingestion, structured storage, graph-grounded retrieval, agent orchestration, and a full LLM inferencing stack.
TrustGraph relies on absolutely no 3rd party services aside from optional API integrations to cloud-hosted LLMs. Whether you are using Anthropic's or OpenAI's API, or self-hosting Qwen3.7 via vLLM, TrustGraph handles it all with pre-built API connectors and a full LLM inferencing stack to enrich the models with a sovereign, private holonic system that grounds your agents in reality.
## The Problem: Why Agents Break
When you build an AI agent today, you spend most of your time fighting context:
- **RAG retrieves fragments, not meaning**. Chunks of text have no structure. Relationships between facts are invisible. Your agent guesses at the connections.
- **Context is disposable**. What the agent learned in one session is gone in the next. There is no persistent, structured knowledge layer underneath.
- **Answers aren't traceable**. You can't explain why the agent said what it said, which means you can't trust it in production.
- **Knowledge can't be reused**. You rebuild the same context pipelines for every new project, every new agent, every new environment.
These aren't retrieval problems. They are structural problems. Context needs to be organized, versioned, and composable — exactly the way software infrastructure is.
## The Solution: A Holonic Context System
The philosopher Arthur Koestler coined the word [holon](https://en.wikipedia.org/wiki/Holon_(philosophy)) to describe something that is simultaneously a whole in itself and a part of something larger. A fact is whole. It is also part of a domain. A domain is whole. It is also part of an organization's knowledge.
AI agents break down because this holonic structure is never built. Context gets shoved into flat text windows, scattered across vector stores, or hardwired into one-off prompts. Facts lose their relationships.
TrustGraph solves this by organizing your domain into holonic context graphs. Entities, relationships, and evidence are treated as first-class objects. Every agent query is grounded against these holons—marrying symbolic graph structures with vector embeddings. Every answer carries provenance. Every fact is traceable.
## Context Cores: Knowledge as a First-Class Citizen
A Context Core is the deployable unit of knowledge in TrustGraph. It packages everything an agent needs to reason reliably over a domain into a single, portable artifact.
### What's inside a Context Core
- **Ontology** — your domain schema and entity mappings
- **Holon** — entities, relationships, and supporting evidence
- **Embeddings** — vector indexes for fast semantic entry-point lookup
- **Provenance** — where every fact came from, when, and how it was derived
- **Retrieval policies** — traversal rules, freshness controls, authority ranking
Context Cores decouple what agents know from how agents are deployed. Build once. Run in Docker locally, Kubernetes in production, or on any cloud. Pin a version. Roll back. Promote across environments. This is context engineering — and it works because knowledge is finally treated like the infrastructure it is.
## Explainability: Trust Your Agents in Production
LLMs are black boxes, and traditional RAG makes it worse. When an agent pulls flat text chunks from a vector store, you have no idea how it connected those fragments to form an answer. You cannot ship agents to production if you can't explain why they said what they said.
### How TrustGraph makes agents explainable:
- **Traceable Reasoning Paths**: Instead of guessing at connections between text chunks, TrustGraph traverses explicit relationship paths in the holonic context graph. You can inspect exactly which entities, relationships, and sub-graphs were pulled into the LLM's context window to generate a given response.
- **Fact-Level Provenance**: Every node and edge in the graph carries strict provenance. When an agent makes a claim, you can trace it back to the exact source document, the time it was ingested, and the extraction method used to derive it.
- **No Black-Box Guesses**: By grounding the LLM in a structured, symbolic graph, you eliminate the hallucinations that occur when models are forced to infer relationships from unstructured text. If a fact isn't in the graph, the agent doesn't use it.
TrustGraph doesn't just give you answers - it gives you the receipt. Every fact is traceable, every connection is visible, and every output is verifiable.
## Workspaces, Collections, and Flows
TrustGraph has a [three-level system](https://docs.trustgraph.ai/overview/workspaces) for organizing and isolating knowledge.
A `Workspace` is the outermost boundary — a fully isolated tenancy scope where all data, users, configuration, and pipelines live independently from every other workspace. Isolation is structural: enforced at the pub/sub queue, storage, and API gateway layers, not by trusting a field in a message body.
Within a workspace, a `Collection` groups related holons, graph structures, embeddings, and documents together — think of it as a dedicated shelf in a library, scoped to a specific domain, project, or customer.
A `Flow` is a running data processing pipeline that defines how raw data moves through ingestion, extraction, structuring, and storage — the assembly line that turns documents into queryable knowledge. Together, the three layers let you run multiple isolated tenants on a single deployment, separate knowledge by domain within each tenant, and process that knowledge through fully configurable pipelines — all without restarting the system or rebuilding your infrastructure.
## The Full Stack
TrustGraph is not a wrapper around a graph database. It is the complete backend for production agentic systems.
- **Holonic context graph engine**: automated entity and relationship extraction, ontology-driven graph construction, graph-grounded retrieval for explainable outputs
- **Multi-model database**: tabular/relational, key-value, document, graph, vectors, images, video, and audio — all managed in Cassandra and S3-compatible Garage
- **Out-of-the-box RAG pipelines**: DocumentRAG, GraphRAG, and OntologyRAG ready to deploy
- **Fully agentic orchestration**: single or multi-agent, ReAct, Plan-then-Execute, Supervisor patterns, and MCP integration
- **3D Knowledge Explorer**: interactive graph visualization with BFS neighborhood extraction and edge pulse animation
- **Automated data ingest**: quick ingest with semantic similarity or ontology-structured precision retrieval
- **Run anywhere**: Docker/Podman locally, Kubernetes in the cloud
All major LLMs — Anthropic, Cohere, Gemini, Mistral, OpenAI, and more via API.
vLLM, Ollama, TGI, LM Studio, and Llamafiles for fully local inferencing.
Verified cloud deployments for Alibaba Cloud, AWS, Azure, GCP, OVHcloud, and Scaleway.
## No API Keys Required
@ -62,12 +107,12 @@ Everything else is included.
- [x] Managed Multi-model storage in [Cassandra](https://cassandra.apache.org/_/index.html)
- [x] Managed Vector embedding storage in [Qdrant](https://github.com/qdrant/qdrant)
- [x] Managed File and Object storage in [Garage](https://github.com/deuxfleurs-org/garage) (S3 compatible)
- [x] Managed High-speed Pub/Sub messaging fabric with [Pulsar](https://github.com/apache/pulsar)
- [x] Managed High-speed Pub/Sub messaging fabric with [Pulsar](https://github.com/apache/pulsar) or [RabbitMQ](https://www.rabbitmq.com/)
- [x] Complete LLM inferencing stack for open LLMs with [vLLM](https://github.com/vllm-project/vllm), [TGI](https://github.com/huggingface/text-generation-inference), [Ollama](https://github.com/ollama/ollama), [LM Studio](https://github.com/lmstudio-ai), and [Llamafiles](https://github.com/mozilla-ai/llamafile)
## Quickstart
There's no need to clone this repo, unless you want to build from source. TrustGraph is a fully containerized app that deploys as a set of Docker containers. To configure TrustGraph on the command line:
No need to clone the repo unless you are building from source. TrustGraph deploys as a set of Docker containers. Configure it on the command line in one step:
```
npx @trustgraph/config
@ -78,44 +123,39 @@ The config process will generate an app config that can be run locally with Dock
- Deployment instructions as `INSTALLATION.md`
<p align="center">
<video src="https://github.com/user-attachments/assets/2978a6aa-4c9c-4d7c-ad02-8f3d01a1c602"
<video src="https://github.com/user-attachments/assets/33434c3c-f586-4610-8bb2-d7b7b586a672"
width="80%" controls></video>
</p>
For a browser based configuration, try the [Configuration Terminal](https://config-ui.demo.trustgraph.ai/).
## Watch What is a Context Graph?
## Watch What is a Holonic Context Graph?
[![What is a Context Graph?](https://img.youtube.com/vi/gZjlt5WcWB4/maxresdefault.jpg)](https://www.youtube.com/watch?v=gZjlt5WcWB4)
## Watch Context Graphs in Action
## Watch Holonic Context Graphs in Action
[![Context Graphs in Action with TrustGraph](https://img.youtube.com/vi/sWc7mkhITIo/maxresdefault.jpg)](https://www.youtube.com/watch?v=sWc7mkhITIo)
## Getting Started with TrustGraph
- [**Getting Started Guides**](https://docs.trustgraph.ai/getting-started)
- [**Using the Workbench**](#workbench)
- [**Developer APIs and CLI**](https://docs.trustgraph.ai/reference)
- [**Deployment Guides**](https://docs.trustgraph.ai/deployment)
## Workbench
## TrustGraph UI
The **Workbench** provides tools for all major features of TrustGraph. The **Workbench** is on port `8888` by default.
<img width="1389" height="961" alt="Image" src="https://github.com/user-attachments/assets/35c9250d-0f01-40cb-9294-1ee8fd9a1b56" />
- **Vector Search**: Search the installed knowledge bases
- **Agentic, GraphRAG and LLM Chat**: Chat interface for agents, GraphRAG queries, or direct to LLMs
- **Relationships**: Analyze deep relationships in the installed knowledge bases
- **Graph Visualizer**: 3D GraphViz of the installed knowledge bases
- **Library**: Staging area for installing knowledge bases
- **Flow Classes**: Workflow preset configurations
- **Flows**: Create custom workflows and adjust LLM parameters during runtime
- **Knowledge Cores**: Manage resuable knowledge bases
- **Prompts**: Manage and adjust prompts during runtime
- **Schemas**: Define custom schemas for structured data knowledge bases
- **Ontologies**: Define custom ontologies for unstructured data knowledge bases
- **Agent Tools**: Define tools with collections, knowledge cores, MCP connections, and tool groups
- **MCP Tools**: Connect to MCP servers
The UI provides tools for all major features of TrustGraph. The UI deploys on port `8888` by default.
- **Agent Console** — Query your agents directly with streaming responses and live explainability event tracking, so you can watch reasoning unfold in real time
- **GraphRAG View** — Interactive graph RAG queries with a visual explainability DAG and inline provenance display, making it easy to see exactly where answers came from
- **Context Explorer** — An interactive 3D context graph explorer with dynamic graph loading, BFS neighborhood extraction, edge pulse animation, and multiple navigation views
- **Document Ingestion** — A complete upload and submission workflow with page and chunk inspection and document structure browsing
- **Ontology Workbench** — A full ontology editor with class and property trees, OWL/XML and Turtle import/export with round-trip fidelity, circular dependency detection, and safe-delete confirmation dialogs
- **Schema Workbench** — Interactive schema management with list, create, edit, and delete operations including field and index management
- **Prompt Editor** — A dedicated prompt editing workflow
## TypeScript Library for UIs
@ -125,134 +165,6 @@ There are 3 libraries for quick UI integration of TrustGraph services.
- [@trustgraph/react-state](https://www.npmjs.com/package/@trustgraph/react-state)
- [@trustgraph/react-provider](https://www.npmjs.com/package/@trustgraph/react-provider)
## Context Cores
Context Cores are how TrustGraph treats context like code. A Context Core is a **portable, versioned bundle of context** that you can ship between projects and environments, pin in production, and reuse across agents. It packages the “stuff agents need to know” (structured knowledge + embeddings + evidence + policies) into a single artifact, so you can treat context like code: build it, test it, version it, promote it, and roll it back. TrustGraph is built to support this kind of end-to-end context engineering and orchestration workflow.
### Whats inside a Context Core
A Context Core typically includes:
- Ontology (your domain schema) and mappings
- Context Graph (entities, relationships, supporting evidence)
- Embeddings / vector indexes for fast semantic entry-point lookup
- Source manifests + provenance (where facts came from, when, and how they were derived)
- Retrieval policies (traversal rules, freshness, authority ranking)
## Tech Stack
TrustGraph provides component flexibility to optimize agent workflows.
<details>
<summary>LLM APIs</summary>
<br>
- Anthropic<br>
- AWS Bedrock<br>
- AzureAI<br>
- AzureOpenAI<br>
- Cohere<br>
- Google AI Studio<br>
- Google VertexAI<br>
- Mistral<br>
- OpenAI<br>
</details>
<details>
<summary>LLM Orchestration</summary>
<br>
- LM Studio<br>
- Llamafiles<br>
- Ollama<br>
- TGI<br>
- vLLM<br>
</details>
<details>
<summary>Multi-model storage</summary>
<br>
- Apache Cassandra<br>
</details>
<details>
<summary>VectorDB</summary>
<br>
- Qdrant<br>
</details>
<details>
<summary>File and Object Storage</summary>
<br>
- Garage<br>
</details>
<details>
<summary>Observability</summary>
<br>
- Prometheus<br>
- Grafana<br>
- Loki<br>
</details>
<details>
<summary>Data Streaming</summary>
<br>
- Apache Pulsar<br>
- RabbitMQ<br>
- Apache Kafka<br>
</details>
<details>
<summary>Clouds</summary>
<br>
- AWS<br>
- Azure<br>
- Google Cloud<br>
- OVHcloud<br>
- Scaleway<br>
</details>
## Observability & Telemetry
Once the platform is running, access the Grafana dashboard at:
```
http://localhost:3000
```
Default credentials are:
```
user: admin
password: admin
```
The default Grafana dashboard tracks the following:
<details>
<summary>Telemetry</summary>
<br>
- LLM Latency<br>
- Error Rate<br>
- Service Request Rates<br>
- Queue Backlogs<br>
- Chunking Histogram<br>
- Error Source by Service<br>
- Rate Limit Events<br>
- CPU usage by Service<br>
- Memory usage by Service<br>
- Models Deployed<br>
- Token Throughput (Tokens/second)<br>
- Cost Throughput (Cost/second)<br>
</details>
## Contributing
[Developer's Guide](https://docs.trustgraph.ai/guides/building/introduction.html)
@ -261,7 +173,7 @@ The default Grafana dashboard tracks the following:
**TrustGraph** is licensed under [Apache 2.0](https://www.apache.org/licenses/LICENSE-2.0).
Copyright 2024-2025 TrustGraph
Copyright 2024-2026 TrustGraph
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.

View file

@ -410,3 +410,56 @@ class TestEdgeCases:
assert hosts == ['mixed-host']
assert username is None # Stays None
assert password == 'mixed-pass'
class TestReplicationFactorParamPath:
def test_explicit_kwarg(self):
with patch.dict(os.environ, {}, clear=True):
_, _, _, _, rf = resolve_cassandra_config(
replication_factor=3,
)
assert rf == 3
def test_kwarg_overrides_env(self):
with patch.dict(os.environ, {'CASSANDRA_REPLICATION_FACTOR': '5'}, clear=True):
_, _, _, _, rf = resolve_cassandra_config(
replication_factor=3,
)
assert rf == 3
def test_env_fallback_when_kwarg_none(self):
with patch.dict(os.environ, {'CASSANDRA_REPLICATION_FACTOR': '5'}, clear=True):
_, _, _, _, rf = resolve_cassandra_config(
replication_factor=None,
)
assert rf == 5
def test_default_when_no_kwarg_no_env(self):
with patch.dict(os.environ, {}, clear=True):
_, _, _, _, rf = resolve_cassandra_config()
assert rf == 1
def test_params_dict_path(self):
with patch.dict(os.environ, {}, clear=True):
params = {'cassandra_replication_factor': 3}
_, _, _, _, rf = resolve_cassandra_config(
replication_factor=params.get('cassandra_replication_factor'),
)
assert rf == 3
def test_params_dict_overrides_env(self):
with patch.dict(os.environ, {'CASSANDRA_REPLICATION_FACTOR': '5'}, clear=True):
params = {'cassandra_replication_factor': 3}
_, _, _, _, rf = resolve_cassandra_config(
replication_factor=params.get('cassandra_replication_factor'),
)
assert rf == 3
def test_params_dict_missing_falls_to_env(self):
with patch.dict(os.environ, {'CASSANDRA_REPLICATION_FACTOR': '5'}, clear=True):
params = {}
_, _, _, _, rf = resolve_cassandra_config(
replication_factor=params.get('cassandra_replication_factor'),
)
assert rf == 5

View file

@ -0,0 +1,136 @@
import os
import pytest
from unittest.mock import patch
from trustgraph.base.qdrant_config import (
get_qdrant_defaults,
resolve_qdrant_config,
)
class TestGetQdrantDefaults:
def test_defaults_with_no_env_vars(self):
with patch.dict(os.environ, {}, clear=True):
defaults = get_qdrant_defaults()
assert defaults['url'] == 'http://localhost:6333'
assert defaults['api_key'] is None
assert defaults['replication_factor'] == 1
assert defaults['shard_number'] == 1
def test_defaults_from_env(self):
env = {
'QDRANT_URL': 'http://qdrant:6333',
'QDRANT_API_KEY': 'secret',
'QDRANT_REPLICATION_FACTOR': '3',
'QDRANT_SHARD_NUMBER': '5',
}
with patch.dict(os.environ, env, clear=True):
defaults = get_qdrant_defaults()
assert defaults['url'] == 'http://qdrant:6333'
assert defaults['api_key'] == 'secret'
assert defaults['replication_factor'] == 3
assert defaults['shard_number'] == 5
class TestResolveQdrantConfig:
def test_defaults(self):
with patch.dict(os.environ, {}, clear=True):
url, api_key, rf, sn = resolve_qdrant_config()
assert url == 'http://localhost:6333'
assert api_key is None
assert rf == 1
assert sn == 1
def test_explicit_kwargs(self):
with patch.dict(os.environ, {}, clear=True):
url, api_key, rf, sn = resolve_qdrant_config(
url='http://custom:6333',
api_key='key',
replication_factor=3,
shard_number=5,
)
assert url == 'http://custom:6333'
assert api_key == 'key'
assert rf == 3
assert sn == 5
def test_kwargs_override_env(self):
env = {
'QDRANT_URL': 'http://env:6333',
'QDRANT_REPLICATION_FACTOR': '10',
'QDRANT_SHARD_NUMBER': '10',
}
with patch.dict(os.environ, env, clear=True):
url, _, rf, sn = resolve_qdrant_config(
url='http://explicit:6333',
replication_factor=3,
shard_number=5,
)
assert url == 'http://explicit:6333'
assert rf == 3
assert sn == 5
def test_env_fallback_when_kwargs_none(self):
env = {
'QDRANT_URL': 'http://env:6333',
'QDRANT_REPLICATION_FACTOR': '3',
'QDRANT_SHARD_NUMBER': '5',
}
with patch.dict(os.environ, env, clear=True):
url, _, rf, sn = resolve_qdrant_config()
assert url == 'http://env:6333'
assert rf == 3
assert sn == 5
def test_params_dict_path(self):
with patch.dict(os.environ, {}, clear=True):
params = {
'store_uri': 'http://params:6333',
'api_key': 'pkey',
'qdrant_replication_factor': 3,
'qdrant_shard_number': 5,
}
url, api_key, rf, sn = resolve_qdrant_config(
url=params.get('store_uri'),
api_key=params.get('api_key'),
replication_factor=params.get('qdrant_replication_factor'),
shard_number=params.get('qdrant_shard_number'),
)
assert url == 'http://params:6333'
assert api_key == 'pkey'
assert rf == 3
assert sn == 5
def test_params_dict_overrides_env(self):
env = {
'QDRANT_REPLICATION_FACTOR': '10',
'QDRANT_SHARD_NUMBER': '10',
}
with patch.dict(os.environ, env, clear=True):
params = {
'qdrant_replication_factor': 3,
'qdrant_shard_number': 5,
}
_, _, rf, sn = resolve_qdrant_config(
replication_factor=params.get('qdrant_replication_factor'),
shard_number=params.get('qdrant_shard_number'),
)
assert rf == 3
assert sn == 5
def test_params_dict_missing_falls_to_env(self):
env = {
'QDRANT_REPLICATION_FACTOR': '3',
'QDRANT_SHARD_NUMBER': '5',
}
with patch.dict(os.environ, env, clear=True):
params = {}
_, _, rf, sn = resolve_qdrant_config(
replication_factor=params.get('qdrant_replication_factor'),
shard_number=params.get('qdrant_shard_number'),
)
assert rf == 3
assert sn == 5

View file

@ -49,7 +49,7 @@ class TestPdfDecoderProcessor(IsolatedAsyncioTestCase):
async def test_on_message_success(self, mock_pdf_loader_class, mock_producer, mock_consumer):
"""Test successful PDF processing"""
# Mock PDF content
pdf_content = b"fake pdf content"
pdf_content = b"%PDF-1.7\nfake pdf content"
pdf_base64 = base64.b64encode(pdf_content).decode('utf-8')
# Mock PyPDFLoader
@ -88,13 +88,55 @@ class TestPdfDecoderProcessor(IsolatedAsyncioTestCase):
# Verify triples were sent for each page (provenance)
assert mock_triples_flow.send.call_count == 2
@patch('trustgraph.base.librarian_client.Consumer')
@patch('trustgraph.base.librarian_client.Producer')
@patch('trustgraph.decoding.pdf.pdf_decoder.PyPDFLoader')
@patch('trustgraph.base.async_processor.AsyncProcessor', MockAsyncProcessor)
async def test_on_message_rejects_librarian_content_that_is_not_pdf(self, mock_pdf_loader_class, mock_producer, mock_consumer):
"""Test rejecting non-PDF content before invoking the PDF loader"""
html_content = b"<html><body>Not found</body></html>"
html_base64 = base64.b64encode(html_content)
mock_metadata = Metadata(id="test-doc")
mock_document = Document(metadata=mock_metadata, document_id="doc-123")
mock_msg = MagicMock()
mock_msg.value.return_value = mock_document
mock_output_flow = AsyncMock()
mock_triples_flow = AsyncMock()
mock_flow = MagicMock(side_effect=lambda name: {
"output": mock_output_flow,
"triples": mock_triples_flow,
}.get(name))
mock_flow.librarian.fetch_document_metadata = AsyncMock(
return_value=MagicMock(kind="application/pdf")
)
mock_flow.librarian.fetch_document_content = AsyncMock(
return_value=html_base64
)
mock_flow.librarian.save_child_document = AsyncMock()
config = {
'id': 'test-pdf-decoder',
'taskgroup': AsyncMock()
}
processor = Processor(**config)
await processor.on_message(mock_msg, None, mock_flow)
mock_pdf_loader_class.assert_not_called()
mock_output_flow.send.assert_not_called()
mock_triples_flow.send.assert_not_called()
mock_flow.librarian.save_child_document.assert_not_called()
@patch('trustgraph.base.librarian_client.Consumer')
@patch('trustgraph.base.librarian_client.Producer')
@patch('trustgraph.decoding.pdf.pdf_decoder.PyPDFLoader')
@patch('trustgraph.base.async_processor.AsyncProcessor', MockAsyncProcessor)
async def test_on_message_empty_pdf(self, mock_pdf_loader_class, mock_producer, mock_consumer):
"""Test handling of empty PDF"""
pdf_content = b"fake pdf content"
pdf_content = b"%PDF-1.7\nfake pdf content"
pdf_base64 = base64.b64encode(pdf_content).decode('utf-8')
mock_loader = MagicMock()
@ -126,7 +168,7 @@ class TestPdfDecoderProcessor(IsolatedAsyncioTestCase):
@patch('trustgraph.base.async_processor.AsyncProcessor', MockAsyncProcessor)
async def test_on_message_unicode_content(self, mock_pdf_loader_class, mock_producer, mock_consumer):
"""Test handling of unicode content in PDF"""
pdf_content = b"fake pdf content"
pdf_content = b"%PDF-1.7\nfake pdf content"
pdf_base64 = base64.b64encode(pdf_content).decode('utf-8')
mock_loader = MagicMock()

View file

@ -333,8 +333,8 @@ class TestUnifiedTableQueries:
"""Test queries against the unified rows table"""
@pytest.mark.asyncio
@patch('trustgraph.query.rows.cassandra.service.async_execute', new_callable=AsyncMock)
async def test_query_with_index_match(self, mock_async_execute):
@patch('trustgraph.query.rows.cassandra.service.async_execute_paged', new_callable=AsyncMock)
async def test_query_with_index_match(self, mock_async_execute_paged):
"""Test query execution with matching index"""
processor = MagicMock()
processor.session = MagicMock()
@ -344,10 +344,10 @@ class TestUnifiedTableQueries:
processor.find_matching_index = Processor.find_matching_index.__get__(processor, Processor)
processor.query_cassandra = Processor.query_cassandra.__get__(processor, Processor)
# Mock async_execute to return test data
# Mock async_execute_paged to return test data (list of pages)
mock_row = MagicMock()
mock_row.data = {"id": "123", "name": "Test Product", "category": "electronics"}
mock_async_execute.return_value = [mock_row]
mock_async_execute_paged.return_value = [[mock_row]]
schema = RowSchema(
name="products",
@ -370,10 +370,10 @@ class TestUnifiedTableQueries:
# Verify Cassandra was connected and queried
processor.connect_cassandra.assert_called_once()
mock_async_execute.assert_called_once()
mock_async_execute_paged.assert_called_once()
# Verify query structure - should query unified rows table
call_args = mock_async_execute.call_args
call_args = mock_async_execute_paged.call_args
query = call_args[0][1]
params = call_args[0][2]
@ -394,8 +394,8 @@ class TestUnifiedTableQueries:
assert results[0]["category"] == "electronics"
@pytest.mark.asyncio
@patch('trustgraph.query.rows.cassandra.service.async_execute', new_callable=AsyncMock)
async def test_query_without_index_match(self, mock_async_execute):
@patch('trustgraph.query.rows.cassandra.service.async_scan', new_callable=AsyncMock)
async def test_query_without_index_match(self, mock_async_scan):
"""Test query execution without matching index (scan mode)"""
processor = MagicMock()
processor.session = MagicMock()
@ -406,12 +406,10 @@ class TestUnifiedTableQueries:
processor._matches_filters = Processor._matches_filters.__get__(processor, Processor)
processor.query_cassandra = Processor.query_cassandra.__get__(processor, Processor)
# Mock async_execute to return test data
# Mock async_scan to return filtered test data
mock_row1 = MagicMock()
mock_row1.data = {"id": "1", "name": "Product A", "price": "100"}
mock_row2 = MagicMock()
mock_row2.data = {"id": "2", "name": "Product B", "price": "200"}
mock_async_execute.return_value = [mock_row1, mock_row2]
mock_async_scan.return_value = [mock_row1]
schema = RowSchema(
name="products",
@ -432,13 +430,16 @@ class TestUnifiedTableQueries:
limit=10
)
# Query should use ALLOW FILTERING for scan
call_args = mock_async_execute.call_args
# Verify async_scan was called
mock_async_scan.assert_called_once()
# Verify query structure
call_args = mock_async_scan.call_args
query = call_args[0][1]
assert "ALLOW FILTERING" in query
# Should post-filter results
# Should return filtered results
assert len(results) == 1
assert results[0]["name"] == "Product A"

View file

@ -259,6 +259,8 @@ class TestGraphEmbeddingsNullProtection:
proc.collection_exists = MagicMock(return_value=True)
proc._cache_lock = asyncio.Lock()
proc._known_collections = set()
proc.replication_factor = 1
proc.shard_number = 1
msg = MagicMock()
msg.metadata.collection = "graphs"

View file

@ -103,35 +103,19 @@ def resolve_cassandra_config(
host: Optional[str] = None,
username: Optional[str] = None,
password: Optional[str] = None,
default_keyspace: Optional[str] = None
default_keyspace: Optional[str] = None,
replication_factor: Optional[int] = None,
) -> Tuple[List[str], Optional[str], Optional[str], Optional[str], int]:
"""
Resolve Cassandra configuration from various sources.
Can accept either argparse args object or explicit parameters.
Converts host string to list format for Cassandra driver.
Args:
args: Optional argparse namespace with cassandra_host, cassandra_username, cassandra_password, cassandra_keyspace, cassandra_replication_factor
host: Optional explicit host parameter (overrides args)
username: Optional explicit username parameter (overrides args)
password: Optional explicit password parameter (overrides args)
default_keyspace: Optional default keyspace if not specified elsewhere
Returns:
tuple: (hosts_list, username, password, keyspace, replication_factor)
"""
# If args provided, extract values
keyspace = None
replication_factor = 1
if args is not None:
host = host or getattr(args, 'cassandra_host', None)
username = username or getattr(args, 'cassandra_username', None)
password = password or getattr(args, 'cassandra_password', None)
keyspace = getattr(args, 'cassandra_keyspace', None)
replication_factor = getattr(args, 'cassandra_replication_factor', 1)
replication_factor = replication_factor or getattr(
args, 'cassandra_replication_factor', None
)
# Apply defaults if still None
defaults = get_cassandra_defaults()
host = host or defaults['host']
username = username or defaults['username']

View file

@ -0,0 +1,87 @@
import os
import argparse
from typing import Optional, Any, Tuple
def get_qdrant_defaults() -> dict:
return {
'url': os.getenv('QDRANT_URL', 'http://localhost:6333'),
'api_key': os.getenv('QDRANT_API_KEY'),
'replication_factor': int(os.getenv('QDRANT_REPLICATION_FACTOR', '1')),
'shard_number': int(os.getenv('QDRANT_SHARD_NUMBER', '1')),
}
def add_qdrant_args(parser: argparse.ArgumentParser) -> None:
defaults = get_qdrant_defaults()
url_help = f"Qdrant URL (default: {defaults['url']})"
if 'QDRANT_URL' in os.environ:
url_help += " [from QDRANT_URL]"
api_key_help = "Qdrant API key"
if defaults['api_key']:
api_key_help += " (default: <set>)"
if 'QDRANT_API_KEY' in os.environ:
api_key_help += " [from QDRANT_API_KEY]"
replication_help = f"Qdrant collection replication factor (default: {defaults['replication_factor']})"
if 'QDRANT_REPLICATION_FACTOR' in os.environ:
replication_help += " [from QDRANT_REPLICATION_FACTOR]"
shard_help = f"Qdrant collection shard number (default: {defaults['shard_number']})"
if 'QDRANT_SHARD_NUMBER' in os.environ:
shard_help += " [from QDRANT_SHARD_NUMBER]"
parser.add_argument(
'--store-uri',
default=defaults['url'],
help=url_help,
)
parser.add_argument(
'--api-key',
default=defaults['api_key'],
help=api_key_help,
)
parser.add_argument(
'--qdrant-replication-factor',
type=int,
default=defaults['replication_factor'],
help=replication_help,
)
parser.add_argument(
'--qdrant-shard-number',
type=int,
default=defaults['shard_number'],
help=shard_help,
)
def resolve_qdrant_config(
args: Optional[Any] = None,
url: Optional[str] = None,
api_key: Optional[str] = None,
replication_factor: Optional[int] = None,
shard_number: Optional[int] = None,
) -> Tuple[str, Optional[str], int, int]:
if args is not None:
url = url or getattr(args, 'store_uri', None)
api_key = api_key or getattr(args, 'api_key', None)
replication_factor = replication_factor or getattr(
args, 'qdrant_replication_factor', None
)
shard_number = shard_number or getattr(
args, 'qdrant_shard_number', None
)
defaults = get_qdrant_defaults()
url = url or defaults['url']
api_key = api_key or defaults['api_key']
replication_factor = replication_factor or defaults['replication_factor']
shard_number = shard_number or defaults['shard_number']
return url, api_key, replication_factor, shard_number

View file

@ -5,6 +5,7 @@ from ...schema import (
UserInput, UserRecord,
WorkspaceInput, WorkspaceRecord,
ApiKeyInput, ApiKeyRecord,
GroupInput, GrantInput,
)
from .base import MessageTranslator
@ -43,6 +44,25 @@ def _api_key_input_from_dict(d):
)
def _group_input_from_dict(d):
if d is None:
return None
return GroupInput(
name=d.get("name", ""),
description=d.get("description", ""),
enabled=d.get("enabled", True),
)
def _grant_input_from_dict(d):
if d is None:
return None
return GrantInput(
capability=d.get("capability", ""),
workspace=d.get("workspace", ""),
)
def _user_record_to_dict(r):
if r is None:
return None
@ -102,6 +122,15 @@ class IamRequestTranslator(MessageTranslator):
data.get("workspace_record")
),
key=_api_key_input_from_dict(data.get("key")),
group_id=data.get("group_id", ""),
member_type=data.get("member_type", ""),
member_id=data.get("member_id", ""),
group=_group_input_from_dict(data.get("group")),
grant=_grant_input_from_dict(data.get("grant")),
capability=data.get("capability", ""),
resource_json=data.get("resource_json", ""),
parameters_json=data.get("parameters_json", ""),
authorise_checks=data.get("authorise_checks", ""),
)
def encode(self, obj: IamRequest) -> Dict[str, Any]:
@ -109,6 +138,9 @@ class IamRequestTranslator(MessageTranslator):
for fname in (
"workspace", "actor", "user_id", "username", "key_id",
"api_key", "password", "new_password",
"group_id", "member_type", "member_id",
"capability", "resource_json", "parameters_json",
"authorise_checks",
):
v = getattr(obj, fname, "")
if v:
@ -135,6 +167,17 @@ class IamRequestTranslator(MessageTranslator):
"name": obj.key.name,
"expires": obj.key.expires,
}
if obj.group is not None:
result["group"] = {
"name": obj.group.name,
"description": obj.group.description,
"enabled": obj.group.enabled,
}
if obj.grant is not None:
result["grant"] = {
"capability": obj.grant.capability,
"workspace": obj.grant.workspace,
}
return result
@ -190,6 +233,23 @@ class IamResponseTranslator(MessageTranslator):
# setup, so it can't be dropped by a truthy-only filter.
result["bootstrap_available"] = bool(obj.bootstrap_available)
# authorise / authorise-many outputs.
if obj.decision_allow:
result["decision_allow"] = obj.decision_allow
if obj.decision_ttl_seconds:
result["decision_ttl_seconds"] = obj.decision_ttl_seconds
if obj.decisions_json:
result["decisions_json"] = obj.decisions_json
# Enterprise IAM outputs.
for fname in (
"group_json", "groups_json", "members_json",
"grants_json", "effective_permissions_json",
):
v = getattr(obj, fname, "")
if v:
result[fname] = v
return result
def encode_with_completion(

View file

@ -74,6 +74,21 @@ class ApiKeyRecord:
last_used: str = ""
# ---- Enterprise IAM types (additive) ----
@dataclass
class GroupInput:
name: str = ""
description: str = ""
enabled: bool = True
@dataclass
class GrantInput:
capability: str = ""
workspace: str = ""
@dataclass
class IamRequest:
operation: str = ""
@ -99,6 +114,13 @@ class IamRequest:
workspace_record: WorkspaceInput | None = None
key: ApiKeyInput | None = None
# ---- Enterprise IAM inputs (additive) ----
group_id: str = ""
member_type: str = ""
member_id: str = ""
group: GroupInput | None = None
grant: GrantInput | None = None
# ---- authorise / authorise-many inputs ----
# Capability string from the vocabulary in capabilities.md.
capability: str = ""
@ -164,6 +186,14 @@ class IamResponse:
# authorise_checks.
decisions_json: str = ""
# ---- Enterprise IAM outputs (additive) ----
# JSON-serialised payloads for enterprise group/grant operations.
group_json: str = ""
groups_json: str = ""
members_json: str = ""
grants_json: str = ""
effective_permissions_json: str = ""
error: Error | None = None

View file

@ -10,7 +10,7 @@ description = "TrustGraph provides a means to run a pipeline of flexible AI proc
readme = "README.md"
requires-python = ">=3.8"
dependencies = [
"trustgraph-base>=2.5,<2.6",
"trustgraph-base>=2.6,<2.7",
"pulsar-client",
"prometheus-client",
"boto3",

View file

@ -10,7 +10,7 @@ description = "TrustGraph provides a means to run a pipeline of flexible AI proc
readme = "README.md"
requires-python = ">=3.8"
dependencies = [
"trustgraph-base>=2.5,<2.6",
"trustgraph-base>=2.6,<2.7",
"requests",
"pulsar-client",
"aiohttp",

View file

@ -78,7 +78,7 @@ def load_structured_data(
logger.info("Step 1: Analyzing data to discover best matching schema...")
# Step 1: Auto-discover schema (reuse discover_schema logic)
discovered_schema = _auto_discover_schema(api_url, input_file, sample_chars, flow, logger, workspace=workspace)
discovered_schema = _auto_discover_schema(api_url, input_file, sample_chars, flow, logger, token=token, workspace=workspace)
if not discovered_schema:
logger.error("Failed to discover suitable schema automatically")
print("❌ Could not automatically determine the best schema for your data.")
@ -90,7 +90,7 @@ def load_structured_data(
# Step 2: Auto-generate descriptor
logger.info("Step 2: Generating descriptor configuration...")
auto_descriptor = _auto_generate_descriptor(api_url, input_file, discovered_schema, sample_chars, flow, logger, workspace=workspace)
auto_descriptor = _auto_generate_descriptor(api_url, input_file, discovered_schema, sample_chars, flow, logger, token=token, workspace=workspace)
if not auto_descriptor:
logger.error("Failed to generate descriptor automatically")
print("❌ Could not automatically generate descriptor configuration.")
@ -172,7 +172,7 @@ def load_structured_data(
logger.info(f"Sample chars: {sample_chars} characters")
# Use the helper function to discover schema (get raw response for display)
response = _auto_discover_schema(api_url, input_file, sample_chars, flow, logger, return_raw_response=True, workspace=workspace)
response = _auto_discover_schema(api_url, input_file, sample_chars, flow, logger, return_raw_response=True, token=token, workspace=workspace)
if response:
# Debug: print response type and content
@ -203,7 +203,7 @@ def load_structured_data(
# If no schema specified, discover it first
if not schema_name:
logger.info("No schema specified, auto-discovering...")
schema_name = _auto_discover_schema(api_url, input_file, sample_chars, flow, logger, workspace=workspace)
schema_name = _auto_discover_schema(api_url, input_file, sample_chars, flow, logger, token=token, workspace=workspace)
if not schema_name:
print("Error: Could not determine schema automatically.")
print("Please specify a schema using --schema-name or run --discover-schema first.")
@ -213,7 +213,7 @@ def load_structured_data(
logger.info(f"Target schema: {schema_name}")
# Generate descriptor using helper function
descriptor = _auto_generate_descriptor(api_url, input_file, schema_name, sample_chars, flow, logger, workspace=workspace)
descriptor = _auto_generate_descriptor(api_url, input_file, schema_name, sample_chars, flow, logger, token=token, workspace=workspace)
if descriptor:
# Output the generated descriptor
@ -603,7 +603,7 @@ def _send_to_trustgraph(rows, api_url, flow, batch_size=1000, token=None, worksp
# Helper functions for auto mode
def _auto_discover_schema(api_url, input_file, sample_chars, flow, logger, return_raw_response=False, workspace="default"):
def _auto_discover_schema(api_url, input_file, sample_chars, flow, logger, return_raw_response=False, token=None, workspace="default"):
"""Auto-discover the best matching schema for the input data
Args:
@ -626,7 +626,7 @@ def _auto_discover_schema(api_url, input_file, sample_chars, flow, logger, retur
# Import API modules
from trustgraph.api import Api
from trustgraph.api.types import ConfigKey
api = Api(api_url, workspace=workspace)
api = Api(api_url, token=token, workspace=workspace)
config_api = api.config()
# Get available schemas
@ -707,7 +707,7 @@ def _auto_discover_schema(api_url, input_file, sample_chars, flow, logger, retur
return None
def _auto_generate_descriptor(api_url, input_file, schema_name, sample_chars, flow, logger, workspace="default"):
def _auto_generate_descriptor(api_url, input_file, schema_name, sample_chars, flow, logger, token=None, workspace="default"):
"""Auto-generate descriptor configuration for the discovered schema"""
try:
# Read sample data
@ -717,7 +717,7 @@ def _auto_generate_descriptor(api_url, input_file, schema_name, sample_chars, fl
# Import API modules
from trustgraph.api import Api
from trustgraph.api.types import ConfigKey
api = Api(api_url, workspace=workspace)
api = Api(api_url, token=token, workspace=workspace)
config_api = api.config()
# Get schema definition

View file

@ -10,8 +10,8 @@ description = "HuggingFace embeddings support for TrustGraph."
readme = "README.md"
requires-python = ">=3.8"
dependencies = [
"trustgraph-base>=2.5,<2.6",
"trustgraph-flow>=2.5,<2.6",
"trustgraph-base>=2.6,<2.7",
"trustgraph-flow>=2.6,<2.7",
"torch",
"urllib3",
"transformers",

View file

@ -10,7 +10,7 @@ description = "TrustGraph provides a means to run a pipeline of flexible AI proc
readme = "README.md"
requires-python = ">=3.8"
dependencies = [
"trustgraph-base>=2.5,<2.6",
"trustgraph-base>=2.6,<2.7",
"aiohttp",
"anthropic",
"scylla-driver",

View file

@ -83,7 +83,8 @@ class Processor(AsyncProcessor):
host=cassandra_host,
username=cassandra_username,
password=cassandra_password,
default_keyspace="config"
default_keyspace="config",
replication_factor=params.get("cassandra_replication_factor"),
)
# Store resolved configuration

View file

@ -61,7 +61,8 @@ class Processor(WorkspaceProcessor):
host=cassandra_host,
username=cassandra_username,
password=cassandra_password,
default_keyspace="knowledge"
default_keyspace="knowledge",
replication_factor=params.get("cassandra_replication_factor"),
)
self.cassandra_host = hosts

View file

@ -32,6 +32,10 @@ logger = logging.getLogger(__name__)
default_ident = "document-decoder"
def _looks_like_pdf(content):
return content.lstrip().startswith(b"%PDF-")
class Processor(FlowProcessor):
def __init__(self, **params):
@ -94,14 +98,10 @@ class Processor(FlowProcessor):
)
return
with tempfile.NamedTemporaryFile(delete_on_close=False, suffix='.pdf') as fp:
temp_path = fp.name
# Check if we should fetch from librarian or use inline data
if v.document_id:
# Fetch from librarian via Pulsar
logger.info(f"Fetching document {v.document_id} from librarian...")
fp.close()
content = await flow.librarian.fetch_document_content(
document_id=v.document_id,
@ -113,13 +113,21 @@ class Processor(FlowProcessor):
content = content.encode('utf-8')
decoded_content = base64.b64decode(content)
with open(temp_path, 'wb') as f:
f.write(decoded_content)
logger.info(f"Fetched {len(decoded_content)} bytes from librarian")
else:
# Use inline data (backward compatibility)
fp.write(base64.b64decode(v.data))
decoded_content = base64.b64decode(v.data)
if not _looks_like_pdf(decoded_content):
logger.error(
f"Document {v.metadata.id} is not valid PDF content. "
f"Ignoring document."
)
return
with tempfile.NamedTemporaryFile(delete=False, suffix='.pdf') as fp:
temp_path = fp.name
fp.write(decoded_content)
fp.close()
global PyPDFLoader

View file

@ -6,7 +6,7 @@ import logging
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
from cassandra.query import BatchStatement, SimpleStatement
from ssl import SSLContext, PROTOCOL_TLSv1_2
import ssl
from ..tables.cassandra_async import async_execute
@ -41,13 +41,15 @@ class KnowledgeGraph:
def __init__(
self, hosts=None,
keyspace="trustgraph", username=None, password=None
keyspace="trustgraph", username=None, password=None,
replication_factor=1,
):
if hosts is None:
hosts = ["localhost"]
self.keyspace = keyspace
self.replication_factor = replication_factor
self.username = username
# 7-table schema for quads with full query pattern support
@ -68,7 +70,7 @@ class KnowledgeGraph:
self.collection_metadata_table = "collection_metadata"
if username and password:
ssl_context = SSLContext(PROTOCOL_TLSv1_2)
ssl_context = ssl.create_default_context()
auth_provider = PlainTextAuthProvider(username=username, password=password)
self.cluster = Cluster(hosts, auth_provider=auth_provider, ssl_context=ssl_context)
else:
@ -92,7 +94,7 @@ class KnowledgeGraph:
create keyspace if not exists {self.keyspace}
with replication = {{
'class' : 'SimpleStrategy',
'replication_factor' : 1
'replication_factor' : {self.replication_factor}
}};
""")
@ -539,13 +541,15 @@ class EntityCentricKnowledgeGraph:
def __init__(
self, hosts=None,
keyspace="trustgraph", username=None, password=None
keyspace="trustgraph", username=None, password=None,
replication_factor=1,
):
if hosts is None:
hosts = ["localhost"]
self.keyspace = keyspace
self.replication_factor = replication_factor
self.username = username
# 2-table entity-centric schema
@ -556,7 +560,7 @@ class EntityCentricKnowledgeGraph:
self.collection_metadata_table = "collection_metadata"
if username and password:
ssl_context = SSLContext(PROTOCOL_TLSv1_2)
ssl_context = ssl.create_default_context()
auth_provider = PlainTextAuthProvider(username=username, password=password)
self.cluster = Cluster(hosts, auth_provider=auth_provider, ssl_context=ssl_context)
else:
@ -580,7 +584,7 @@ class EntityCentricKnowledgeGraph:
create keyspace if not exists {self.keyspace}
with replication = {{
'class' : 'SimpleStrategy',
'replication_factor' : 1
'replication_factor' : {self.replication_factor}
}};
""")

View file

@ -506,18 +506,18 @@ _FLOW_SERVICES = {
"text-completion": "llm",
"prompt": "llm",
"mcp-tool": "mcp",
"graph-rag": "graph:read",
"document-rag": "documents:read",
"graph-rag": "graph-rag:read",
"document-rag": "document-rag:read",
"embeddings": "embeddings",
"graph-embeddings": "graph:read",
"document-embeddings": "documents:read",
"triples": "graph:read",
"graph-embeddings": "graph-embeddings:read",
"document-embeddings": "document-embeddings:read",
"triples": "triples:read",
"rows": "rows:read",
"nlp-query": "rows:read",
"structured-query": "rows:read",
"structured-diag": "rows:read",
"row-embeddings": "rows:read",
"sparql": "graph:read",
"nlp-query": "nlp-query:read",
"structured-query": "structured-query:read",
"structured-diag": "structured-query:read",
"row-embeddings": "row-embeddings:read",
"sparql": "sparql:read",
}
for _kind, _cap in _FLOW_SERVICES.items():
_register_flow_kind("flow-service", _kind, _cap)
@ -525,10 +525,10 @@ for _kind, _cap in _FLOW_SERVICES.items():
# Streaming import socket endpoints.
_FLOW_IMPORTS = {
"triples": "graph:write",
"graph-embeddings": "graph:write",
"document-embeddings": "documents:write",
"entity-contexts": "documents:write",
"triples": "triples:write",
"graph-embeddings": "graph-embeddings:write",
"document-embeddings": "document-embeddings:write",
"entity-contexts": "entity-contexts:write",
"rows": "rows:write",
}
for _kind, _cap in _FLOW_IMPORTS.items():
@ -537,10 +537,35 @@ for _kind, _cap in _FLOW_IMPORTS.items():
# Streaming export socket endpoints.
_FLOW_EXPORTS = {
"triples": "graph:read",
"graph-embeddings": "graph:read",
"document-embeddings": "documents:read",
"entity-contexts": "documents:read",
"triples": "triples:read",
"graph-embeddings": "graph-embeddings:read",
"document-embeddings": "document-embeddings:read",
"entity-contexts": "entity-contexts:read",
}
for _kind, _cap in _FLOW_EXPORTS.items():
_register_flow_kind("flow-export", _kind, _cap)
# ---------------------------------------------------------------------------
# Enterprise IAM operations.
#
# These are additive — they register alongside the OSS IAM operations.
# When the OSS regime receives an unknown operation it returns an error;
# when the enterprise regime is running, it handles them.
# ---------------------------------------------------------------------------
for _op in (
"create-group", "get-group", "list-groups",
"update-group", "delete-group",
"add-group-member", "remove-group-member", "list-group-members",
"add-group-grant", "remove-group-grant", "list-group-grants",
"add-user-grant", "remove-user-grant", "list-user-grants",
"resolve-effective-permissions",
):
register(Operation(
name=_op,
capability="iam:admin",
resource_level=ResourceLevel.SYSTEM,
extract_resource=_empty_resource,
extract_parameters=_no_parameters,
))

View file

@ -58,8 +58,18 @@ AUTHZ_CACHE_TTL_SECONDS = 60
_READER_CAPS = {
"agent",
"graph:read",
"triples:read",
"sparql:read",
"graph-rag:read",
"graph-embeddings:read",
"documents:read",
"document-rag:read",
"document-embeddings:read",
"entity-contexts:read",
"rows:read",
"nlp-query:read",
"structured-query:read",
"row-embeddings:read",
"llm",
"embeddings",
"mcp",
@ -73,6 +83,10 @@ _READER_CAPS = {
_WRITER_CAPS = _READER_CAPS | {
"graph:write",
"triples:write",
"graph-embeddings:write",
"document-embeddings:write",
"entity-contexts:write",
"documents:write",
"rows:write",
"collections:write",

View file

@ -101,6 +101,7 @@ class Processor(AsyncProcessor):
username=cassandra_username,
password=cassandra_password,
default_keyspace="iam",
replication_factor=params.get("cassandra_replication_factor"),
)
self.cassandra_host = hosts

View file

@ -8,6 +8,7 @@ import asyncio
import base64
import json
import logging
import os
from datetime import datetime
from .. base import WorkspaceProcessor, Consumer, Producer, Publisher, Subscriber
@ -54,6 +55,16 @@ default_object_store_access_key = "object-user"
default_object_store_secret_key = "object-password"
default_object_store_use_ssl = False
default_object_store_region = None
# Environment variables consulted as a fallback when the
# corresponding params field is not set in the processor-group YAML
# or via CLI. Intended for K8s Secret / env-var injection so
# credentials never have to live in the YAML (and thus in git).
ENV_OBJECT_STORE_ENDPOINT = "OBJECT_STORE_ENDPOINT"
ENV_OBJECT_STORE_ACCESS_KEY = "OBJECT_STORE_ACCESS_KEY"
ENV_OBJECT_STORE_SECRET_KEY = "OBJECT_STORE_SECRET_KEY"
ENV_OBJECT_STORE_USE_SSL = "OBJECT_STORE_USE_SSL"
ENV_OBJECT_STORE_REGION = "OBJECT_STORE_REGION"
default_cassandra_host = "cassandra"
default_min_chunk_size = 1 # No minimum by default (for Garage)
@ -89,22 +100,36 @@ class Processor(WorkspaceProcessor):
"config_response_queue", default_config_response_queue
)
object_store_endpoint = params.get("object_store_endpoint", default_object_store_endpoint)
object_store_access_key = params.get(
"object_store_access_key",
default_object_store_access_key
# Resolve object-store config. Precedence: explicit params
# (CLI / processor-group YAML) → environment variable →
# hardcoded default. The env-var path lets K8s Secrets feed
# credentials without them appearing in the YAML.
object_store_endpoint = (
params.get("object_store_endpoint")
or os.environ.get(ENV_OBJECT_STORE_ENDPOINT)
or default_object_store_endpoint
)
object_store_secret_key = params.get(
"object_store_secret_key",
default_object_store_secret_key
object_store_access_key = (
params.get("object_store_access_key")
or os.environ.get(ENV_OBJECT_STORE_ACCESS_KEY)
or default_object_store_access_key
)
object_store_use_ssl = params.get(
"object_store_use_ssl",
default_object_store_use_ssl
object_store_secret_key = (
params.get("object_store_secret_key")
or os.environ.get(ENV_OBJECT_STORE_SECRET_KEY)
or default_object_store_secret_key
)
object_store_region = params.get(
"object_store_region",
default_object_store_region
object_store_use_ssl = params.get("object_store_use_ssl")
if object_store_use_ssl is None:
env_ssl = os.environ.get(ENV_OBJECT_STORE_USE_SSL)
if env_ssl is not None:
object_store_use_ssl = env_ssl.lower() in ("true", "1", "yes")
else:
object_store_use_ssl = default_object_store_use_ssl
object_store_region = (
params.get("object_store_region")
or os.environ.get(ENV_OBJECT_STORE_REGION)
or default_object_store_region
)
min_chunk_size = params.get(
@ -121,7 +146,8 @@ class Processor(WorkspaceProcessor):
host=cassandra_host,
username=cassandra_username,
password=cassandra_password,
default_keyspace="librarian"
default_keyspace="librarian",
replication_factor=params.get("cassandra_replication_factor"),
)
# Store resolved configuration

View file

@ -12,31 +12,33 @@ from qdrant_client import QdrantClient
from .... schema import DocumentEmbeddingsResponse, ChunkMatch
from .... schema import Error
from .... base import DocumentEmbeddingsQueryService
from .... base.qdrant_config import add_qdrant_args, resolve_qdrant_config
# Module logger
logger = logging.getLogger(__name__)
default_ident = "doc-embeddings-query"
default_store_uri = 'http://localhost:6333'
class Processor(DocumentEmbeddingsQueryService):
def __init__(self, **params):
store_uri = params.get("store_uri", default_store_uri)
store_uri = params.get("store_uri")
api_key = params.get("api_key")
#optional api key
api_key = params.get("api_key", None)
url, api_key, _, _ = resolve_qdrant_config(
url=store_uri,
api_key=api_key,
)
super(Processor, self).__init__(
**params | {
"store_uri": store_uri,
"store_uri": url,
"api_key": api_key,
}
)
self.qdrant = QdrantClient(url=store_uri, api_key=api_key)
self.qdrant = QdrantClient(url=url, api_key=api_key)
async def query_document_embeddings(self, workspace, msg):
@ -85,18 +87,7 @@ class Processor(DocumentEmbeddingsQueryService):
def add_args(parser):
DocumentEmbeddingsQueryService.add_args(parser)
parser.add_argument(
'-t', '--store-uri',
default=default_store_uri,
help=f'Qdrant store URI (default: {default_store_uri})'
)
parser.add_argument(
'-k', '--api-key',
default=None,
help=f'API key for qdrant (default: None)'
)
add_qdrant_args(parser)
def run():

View file

@ -12,31 +12,32 @@ from qdrant_client import QdrantClient
from .... schema import GraphEmbeddingsResponse, EntityMatch
from .... schema import Error, Term, IRI, LITERAL
from .... base import GraphEmbeddingsQueryService
from .... base.qdrant_config import add_qdrant_args, resolve_qdrant_config
# Module logger
logger = logging.getLogger(__name__)
default_ident = "graph-embeddings-query"
default_store_uri = 'http://localhost:6333'
class Processor(GraphEmbeddingsQueryService):
def __init__(self, **params):
store_uri = params.get("store_uri", default_store_uri)
store_uri = params.get("store_uri")
api_key = params.get("api_key")
#optional api key
api_key = params.get("api_key", None)
url, api_key, _, _ = resolve_qdrant_config(
url=store_uri, api_key=api_key,
)
super(Processor, self).__init__(
**params | {
"store_uri": store_uri,
"store_uri": url,
"api_key": api_key,
}
)
self.qdrant = QdrantClient(url=store_uri, api_key=api_key)
self.qdrant = QdrantClient(url=url, api_key=api_key)
def create_value(self, ent):
if ent.startswith("http://") or ent.startswith("https://"):
@ -104,18 +105,7 @@ class Processor(GraphEmbeddingsQueryService):
def add_args(parser):
GraphEmbeddingsQueryService.add_args(parser)
parser.add_argument(
'-t', '--store-uri',
default=default_store_uri,
help=f'Qdrant store URI (default: {default_store_uri})'
)
parser.add_argument(
'-k', '--api-key',
default=None,
help=f'API key for qdrant (default: None)'
)
add_qdrant_args(parser)
def run():

View file

@ -116,7 +116,7 @@ class CassandraTripleStore(Store if RDFLIB_AVAILABLE else object):
# Create keyspace
self.session.execute(f"""
CREATE KEYSPACE IF NOT EXISTS {self.keyspace}
WITH replication = {{'class': 'SimpleStrategy', 'replication_factor': 1}}
WITH replication = {{'class': 'SimpleStrategy', 'replication_factor': {self.cassandra_config.get('replication_factor', 1)}}}
""")
# Create triples table optimized for SPARQL queries

View file

@ -19,12 +19,12 @@ from .... schema import (
RowIndexMatch, Error
)
from .... base import FlowProcessor, ConsumerSpec, ProducerSpec
from .... base.qdrant_config import add_qdrant_args, resolve_qdrant_config
# Module logger
logger = logging.getLogger(__name__)
default_ident = "row-embeddings-query"
default_store_uri = 'http://localhost:6333'
default_concurrency = 10
@ -35,13 +35,17 @@ class Processor(FlowProcessor):
id = params.get("id", default_ident)
concurrency = params.get("concurrency", default_concurrency)
store_uri = params.get("store_uri", default_store_uri)
api_key = params.get("api_key", None)
store_uri = params.get("store_uri")
api_key = params.get("api_key")
url, api_key, _, _ = resolve_qdrant_config(
url=store_uri, api_key=api_key,
)
super(Processor, self).__init__(
**params | {
"id": id,
"store_uri": store_uri,
"store_uri": url,
"api_key": api_key,
}
)
@ -62,7 +66,7 @@ class Processor(FlowProcessor):
)
)
self.qdrant = QdrantClient(url=store_uri, api_key=api_key)
self.qdrant = QdrantClient(url=url, api_key=api_key)
def sanitize_name(self, name: str) -> str:
"""Sanitize names for Qdrant collection naming"""
@ -192,21 +196,9 @@ class Processor(FlowProcessor):
@staticmethod
def add_args(parser):
"""Add command-line arguments"""
FlowProcessor.add_args(parser)
parser.add_argument(
'-t', '--store-uri',
default=default_store_uri,
help=f'Qdrant store URI (default: {default_store_uri})'
)
parser.add_argument(
'-k', '--api-key',
default=None,
help='API key for Qdrant (default: None)'
)
add_qdrant_args(parser)
parser.add_argument(
'-c', '--concurrency',

View file

@ -24,7 +24,7 @@ from .... schema import RowsQueryRequest, RowsQueryResponse, GraphQLError
from .... schema import Error, RowSchema, Field as SchemaField
from .... base import FlowProcessor, ConsumerSpec, ProducerSpec
from .... base.cassandra_config import add_cassandra_args, resolve_cassandra_config
from .... tables.cassandra_async import async_execute
from .... tables.cassandra_async import async_execute, async_execute_paged, async_scan
from ... graphql import GraphQLSchemaBuilder, SortDirection
@ -180,7 +180,7 @@ class Processor(FlowProcessor):
description=field_def.get("description", ""),
required=field_def.get("required", False),
enum_values=field_def.get("enum", []),
indexed=field_def.get("indexed", False)
indexed=field_def.get("indexed", False),
)
fields.append(field)
@ -232,6 +232,8 @@ class Processor(FlowProcessor):
for index_name in index_names:
if index_name in filters:
value = filters[index_name]
if value == "" or value is None:
continue
# Single field index -> single element list
index_value = [str(value)]
return (index_name, index_value)
@ -282,9 +284,11 @@ class Processor(FlowProcessor):
query += f" LIMIT {limit}"
try:
rows = await async_execute(self.session, query, params)
for row in rows:
# Convert data map to dict with proper field names
pages = await async_execute_paged(
self.session, query, params
)
for page in pages:
for row in page:
row_dict = dict(row.data) if row.data else {}
results.append(row_dict)
except Exception as e:
@ -308,8 +312,6 @@ class Processor(FlowProcessor):
# Query using the first index (arbitrary choice for scan)
primary_index = index_names[0]
# We need to scan all values for this index
# This requires ALLOW FILTERING or a different approach
query = f"""
SELECT data, source FROM {safe_keyspace}.rows
WHERE collection = %s
@ -320,18 +322,19 @@ class Processor(FlowProcessor):
params = [collection, schema_name, primary_index]
try:
rows = await async_execute(self.session, query, params)
for row in rows:
def row_filter(row):
row_dict = dict(row.data) if row.data else {}
return self._matches_filters(row_dict, filters, row_schema)
# Apply post-filters
if self._matches_filters(row_dict, filters, row_schema):
matched_rows = await async_scan(
self.session, query, params,
row_filter=row_filter,
limit=limit,
)
for row in matched_rows:
row_dict = dict(row.data) if row.data else {}
results.append(row_dict)
if limit and len(results) >= limit:
break
except Exception as e:
logger.error(f"Failed to scan rows: {e}", exc_info=True)
raise
@ -363,7 +366,7 @@ class Processor(FlowProcessor):
# Parse filter key for operator
if '_' in filter_key:
parts = filter_key.rsplit('_', 1)
if parts[1] in ['gt', 'gte', 'lt', 'lte', 'contains', 'in']:
if parts[1] in ['gt', 'gte', 'lt', 'lte', 'contains', 'in', 'not', 'startsWith', 'endsWith', 'not_in']:
field_name = parts[0]
operator = parts[1]
else:
@ -400,6 +403,18 @@ class Processor(FlowProcessor):
elif operator == 'in':
if str(row_value) not in [str(v) for v in filter_value]:
return False
elif operator == 'not':
if str(row_value) == str(filter_value):
return False
elif operator == 'startsWith':
if not str(row_value).startswith(str(filter_value)):
return False
elif operator == 'endsWith':
if not str(row_value).endswith(str(filter_value)):
return False
elif operator == 'not_in':
if str(row_value) in [str(v) for v in filter_value]:
return False
except (ValueError, TypeError):
return False

View file

@ -14,29 +14,36 @@ from qdrant_client.models import Distance, VectorParams
from .... base import DocumentEmbeddingsStoreService, CollectionConfigHandler
from .... base import AsyncProcessor, Consumer, Producer
from .... base import ConsumerMetrics, ProducerMetrics
from .... base.qdrant_config import add_qdrant_args, resolve_qdrant_config
# Module logger
logger = logging.getLogger(__name__)
default_ident = "doc-embeddings-write"
default_store_uri = 'http://localhost:6333'
class Processor(CollectionConfigHandler, DocumentEmbeddingsStoreService):
def __init__(self, **params):
store_uri = params.get("store_uri", default_store_uri)
api_key = params.get("api_key", None)
store_uri = params.get("store_uri")
api_key = params.get("api_key")
url, api_key, replication_factor, shard_number = resolve_qdrant_config(
url=store_uri, api_key=api_key,
replication_factor=params.get("qdrant_replication_factor"),
shard_number=params.get("qdrant_shard_number"),
)
super(Processor, self).__init__(
**params | {
"store_uri": store_uri,
"store_uri": url,
"api_key": api_key,
}
)
self.qdrant = QdrantClient(url=store_uri, api_key=api_key)
self.qdrant = QdrantClient(url=url, api_key=api_key)
self.replication_factor = replication_factor
self.shard_number = shard_number
self._cache_lock = asyncio.Lock()
self._known_collections: set[str] = set()
@ -61,6 +68,8 @@ class Processor(CollectionConfigHandler, DocumentEmbeddingsStoreService):
vectors_config=VectorParams(
size=dim, distance=Distance.COSINE
),
replication_factor=self.replication_factor,
shard_number=self.shard_number,
)
self._known_collections.add(collection_name)
@ -109,18 +118,7 @@ class Processor(CollectionConfigHandler, DocumentEmbeddingsStoreService):
def add_args(parser):
DocumentEmbeddingsStoreService.add_args(parser)
parser.add_argument(
'-t', '--store-uri',
default=default_store_uri,
help=f'Qdrant URI (default: {default_store_uri})'
)
parser.add_argument(
'-k', '--api-key',
default=None,
help=f'Qdrant API key (default: None)'
)
add_qdrant_args(parser)
async def create_collection(self, workspace: str, collection: str, metadata: dict):
"""

View file

@ -14,6 +14,7 @@ from qdrant_client.models import Distance, VectorParams
from .... base import GraphEmbeddingsStoreService, CollectionConfigHandler
from .... base import AsyncProcessor, Consumer, Producer
from .... base import ConsumerMetrics, ProducerMetrics
from .... base.qdrant_config import add_qdrant_args, resolve_qdrant_config
from .... schema import IRI, LITERAL
# Module logger
@ -29,29 +30,34 @@ def get_term_value(term):
elif term.type == LITERAL:
return term.value
else:
# For blank nodes or other types, use id or value
return term.id or term.value
default_ident = "graph-embeddings-write"
default_store_uri = 'http://localhost:6333'
class Processor(CollectionConfigHandler, GraphEmbeddingsStoreService):
def __init__(self, **params):
store_uri = params.get("store_uri", default_store_uri)
api_key = params.get("api_key", None)
store_uri = params.get("store_uri")
api_key = params.get("api_key")
url, api_key, replication_factor, shard_number = resolve_qdrant_config(
url=store_uri, api_key=api_key,
replication_factor=params.get("qdrant_replication_factor"),
shard_number=params.get("qdrant_shard_number"),
)
super(Processor, self).__init__(
**params | {
"store_uri": store_uri,
"store_uri": url,
"api_key": api_key,
}
)
self.qdrant = QdrantClient(url=store_uri, api_key=api_key)
self.qdrant = QdrantClient(url=url, api_key=api_key)
self.replication_factor = replication_factor
self.shard_number = shard_number
self._cache_lock = asyncio.Lock()
self._known_collections: set[str] = set()
@ -76,6 +82,8 @@ class Processor(CollectionConfigHandler, GraphEmbeddingsStoreService):
vectors_config=VectorParams(
size=dim, distance=Distance.COSINE
),
replication_factor=self.replication_factor,
shard_number=self.shard_number,
)
self._known_collections.add(collection_name)
@ -128,18 +136,7 @@ class Processor(CollectionConfigHandler, GraphEmbeddingsStoreService):
def add_args(parser):
GraphEmbeddingsStoreService.add_args(parser)
parser.add_argument(
'-t', '--store-uri',
default=default_store_uri,
help=f'Qdrant store URI (default: {default_store_uri})'
)
parser.add_argument(
'-k', '--api-key',
default=None,
help=f'Qdrant API key'
)
add_qdrant_args(parser)
async def create_collection(self, workspace: str, collection: str, metadata: dict):
"""

View file

@ -27,7 +27,8 @@ class Processor(FlowProcessor):
host=params.get("cassandra_host"),
username=params.get("cassandra_username"),
password=params.get("cassandra_password"),
default_keyspace='knowledge'
default_keyspace='knowledge',
replication_factor=params.get("cassandra_replication_factor"),
)
super(Processor, self).__init__(

View file

@ -27,12 +27,12 @@ from qdrant_client.models import PointStruct, Distance, VectorParams
from .... schema import RowEmbeddings
from .... base import FlowProcessor, ConsumerSpec
from .... base import CollectionConfigHandler
from .... base.qdrant_config import add_qdrant_args, resolve_qdrant_config
# Module logger
logger = logging.getLogger(__name__)
default_ident = "row-embeddings-write"
default_store_uri = 'http://localhost:6333'
class Processor(CollectionConfigHandler, FlowProcessor):
@ -41,13 +41,19 @@ class Processor(CollectionConfigHandler, FlowProcessor):
id = params.get("id", default_ident)
store_uri = params.get("store_uri", default_store_uri)
api_key = params.get("api_key", None)
store_uri = params.get("store_uri")
api_key = params.get("api_key")
url, api_key, replication_factor, shard_number = resolve_qdrant_config(
url=store_uri, api_key=api_key,
replication_factor=params.get("qdrant_replication_factor"),
shard_number=params.get("qdrant_shard_number"),
)
super(Processor, self).__init__(
**params | {
"id": id,
"store_uri": store_uri,
"store_uri": url,
"api_key": api_key,
}
)
@ -63,7 +69,9 @@ class Processor(CollectionConfigHandler, FlowProcessor):
# Register config handler for collection management
self.register_config_handler(self.on_collection_config, types=["collection"])
self.qdrant = QdrantClient(url=store_uri, api_key=api_key)
self.qdrant = QdrantClient(url=url, api_key=api_key)
self.replication_factor = replication_factor
self.shard_number = shard_number
self._cache_lock = asyncio.Lock()
self._known_collections: set[str] = set()
@ -103,6 +111,8 @@ class Processor(CollectionConfigHandler, FlowProcessor):
size=dimension,
distance=Distance.COSINE
),
replication_factor=self.replication_factor,
shard_number=self.shard_number,
)
self._known_collections.add(collection_name)
@ -249,21 +259,9 @@ class Processor(CollectionConfigHandler, FlowProcessor):
@staticmethod
def add_args(parser):
"""Add command-line arguments"""
FlowProcessor.add_args(parser)
parser.add_argument(
'-t', '--store-uri',
default=default_store_uri,
help=f'Qdrant URI (default: {default_store_uri})'
)
parser.add_argument(
'-k', '--api-key',
default=None,
help='Qdrant API key (default: None)'
)
add_qdrant_args(parser)
def run():

View file

@ -47,16 +47,18 @@ class Processor(CollectionConfigHandler, FlowProcessor):
cassandra_password = params.get("cassandra_password")
# Resolve configuration with environment variable fallback
hosts, username, password, keyspace, _ = resolve_cassandra_config(
hosts, username, password, keyspace, replication_factor = resolve_cassandra_config(
host=cassandra_host,
username=cassandra_username,
password=cassandra_password
password=cassandra_password,
replication_factor=params.get("cassandra_replication_factor"),
)
# Store resolved configuration with proper names
self.cassandra_host = hosts # Store as list
self.cassandra_username = username
self.cassandra_password = password
self.replication_factor = replication_factor
# Config key for schemas
self.config_key = params.get("config_type", "schema")
@ -170,7 +172,7 @@ class Processor(CollectionConfigHandler, FlowProcessor):
description=field_def.get("description", ""),
required=field_def.get("required", False),
enum_values=field_def.get("enum", []),
indexed=field_def.get("indexed", False)
indexed=field_def.get("indexed", False),
)
fields.append(field)
@ -232,7 +234,7 @@ class Processor(CollectionConfigHandler, FlowProcessor):
CREATE KEYSPACE IF NOT EXISTS {safe_keyspace}
WITH REPLICATION = {{
'class': 'SimpleStrategy',
'replication_factor': 1
'replication_factor': {self.replication_factor}
}}
"""

View file

@ -80,14 +80,14 @@ def _set_exception_if_pending(fut, exc):
fut.set_exception(exc)
async def async_execute_paged(session, query, parameters=None, fetch_size=100):
async def async_execute_paged(session, query, parameters=None, fetch_size=5000):
"""Execute a CQL query with page-by-page iteration.
Uses synchronous session.execute() inside run_in_executor so that
the driver's ResultSet paging works correctly without materialising
the entire result set in memory.
Yields one page of rows at a time (as a list).
Returns all pages as a list of lists.
"""
loop = asyncio.get_running_loop()
@ -111,3 +111,50 @@ async def async_execute_paged(session, query, parameters=None, fetch_size=100):
return await loop.run_in_executor(
None, _fetch_all_pages
)
async def async_scan(
session, query, parameters=None, row_filter=None,
limit=None, fetch_size=5000,
):
"""Scan a CQL query page-by-page, applying a filter and limit.
Only matching rows accumulate in memory. Each page is discarded
after processing, so peak memory is bounded by fetch_size plus
the number of matching rows (capped by limit).
Args:
session: cassandra.cluster.Session
query: CQL statement string
parameters: bind params
row_filter: callable(row) -> bool, or None to accept all
limit: max results to return, or None for unlimited
fetch_size: rows per Cassandra page fetch
Returns:
List of matching rows.
"""
loop = asyncio.get_running_loop()
if isinstance(query, str):
stmt = SimpleStatement(query, fetch_size=fetch_size)
else:
stmt = query
stmt.fetch_size = fetch_size
def _scan():
results = []
result_set = session.execute(stmt, parameters)
while True:
for row in result_set.current_rows:
if row_filter is None or row_filter(row):
results.append(row)
if limit and len(results) >= limit:
return results
if result_set.has_more_pages:
result_set.fetch_next_page()
else:
break
return results
return await loop.run_in_executor(None, _scan)

View file

@ -4,7 +4,7 @@ from .. schema import Metadata, GraphEmbeddings
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
from ssl import SSLContext, PROTOCOL_TLSv1_2
import ssl
import uuid
import time
@ -33,7 +33,7 @@ class ConfigTableStore:
cassandra_host = [h.strip() for h in cassandra_host.split(',')]
if cassandra_username and cassandra_password:
ssl_context = SSLContext(PROTOCOL_TLSv1_2)
ssl_context = ssl.create_default_context()
auth_provider = PlainTextAuthProvider(
username=cassandra_username, password=cassandra_password
)

View file

@ -15,7 +15,7 @@ import logging
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
from ssl import SSLContext, PROTOCOL_TLSv1_2
import ssl
from . cassandra_async import async_execute
@ -39,7 +39,7 @@ class IamTableStore:
cassandra_host = [h.strip() for h in cassandra_host.split(",")]
if cassandra_username and cassandra_password:
ssl_context = SSLContext(PROTOCOL_TLSv1_2)
ssl_context = ssl.create_default_context()
auth_provider = PlainTextAuthProvider(
username=cassandra_username, password=cassandra_password,
)

View file

@ -23,7 +23,7 @@ def tuple_to_term(value, is_uri):
else:
return Term(type=LITERAL, value=value)
from cassandra.auth import PlainTextAuthProvider
from ssl import SSLContext, PROTOCOL_TLSv1_2
import ssl
import uuid
import time
@ -50,7 +50,7 @@ class KnowledgeTableStore:
cassandra_host = [h.strip() for h in cassandra_host.split(',')]
if cassandra_username and cassandra_password:
ssl_context = SSLContext(PROTOCOL_TLSv1_2)
ssl_context = ssl.create_default_context()
auth_provider = PlainTextAuthProvider(
username=cassandra_username, password=cassandra_password
)

View file

@ -24,7 +24,7 @@ from .. exceptions import RequestError
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
from cassandra.query import BatchStatement
from ssl import SSLContext, PROTOCOL_TLSv1_2
import ssl
import uuid
import time
@ -53,7 +53,7 @@ class LibraryTableStore:
cassandra_host = [h.strip() for h in cassandra_host.split(',')]
if cassandra_username and cassandra_password:
ssl_context = SSLContext(PROTOCOL_TLSv1_2)
ssl_context = ssl.create_default_context()
auth_provider = PlainTextAuthProvider(
username=cassandra_username, password=cassandra_password
)

File diff suppressed because it is too large Load diff

View file

@ -1,49 +1,110 @@
from dataclasses import dataclass
from websockets.asyncio.client import connect
from urllib.parse import urlencode, urlparse, urlunparse, parse_qs
import asyncio
import logging
import json
import uuid
import time
import hashlib
logger = logging.getLogger(__name__)
def _token_key(token):
"""Derive a dict key from a token without storing the raw secret."""
return hashlib.sha256(token.encode()).hexdigest()[:16]
class WebSocketManager:
"""Manages an authenticated WebSocket connection to the TrustGraph
gateway on behalf of a single caller.
def __init__(self, url, token=None):
Each caller token gets its own WebSocketManager so that gateway-side
identity, workspace, and capability scoping are preserved end-to-end.
"""
def __init__(self, url, token):
self.url = url
# ── Security boundary: token storage ──
# This is the MCP caller's Bearer token, forwarded verbatim to
# the gateway. It MUST NOT be logged, persisted, or shared
# across callers. It is held only for the lifetime of this
# connection so that re-auth (e.g. after a reconnect) is
# possible.
self.token = token
self.socket = None
# FIXME: authentication is broken. The /api/v1/socket endpoint uses
# in-band auth (first-frame protocol via the Mux dispatcher), not
# query-parameter tokens. This query-string token is silently ignored.
# Fix: after connect(), send an auth frame with the bearer token as
# the first message, matching the gateway's in-band auth protocol.
def _build_url(self):
if not self.token:
return self.url
parsed = urlparse(self.url)
params = parse_qs(parsed.query)
params["token"] = [self.token]
new_query = urlencode(params, doseq=True)
return urlunparse(parsed._replace(query=new_query))
self.identity = None
self.last_used = None
async def start(self):
self.socket = await connect(self._build_url())
"""Connect and authenticate via the gateway's in-band auth
protocol. Raises on auth failure."""
# ── Security boundary: MCP server → gateway ──
# The WebSocket connects to the gateway and authenticates using
# the caller's Bearer token via the in-band first-frame auth
# protocol. The token belongs to the MCP client — we forward
# it as-is and never interpret its contents.
self.socket = await connect(self.url)
self.pending_requests = {}
self.running = True
await self._authenticate()
self.reader_task = asyncio.create_task(self.reader())
async def _authenticate(self):
"""Send in-band auth frame and wait for auth-ok / auth-failed.
The gateway expects ``{"type": "auth", "token": "..."}`` as the
first frame on a new WebSocket. Any service frame sent before
auth-ok is rejected.
"""
await self.socket.send(json.dumps({
"type": "auth",
"token": self.token,
}))
response_text = await asyncio.wait_for(self.socket.recv(), 10)
response = json.loads(response_text)
if response.get("type") == "auth-ok":
logger.info(
"WebSocket authenticated, default workspace: %s",
response.get("workspace"),
)
return
# Auth failed — close immediately, do not leave an
# unauthenticated socket open.
await self.socket.close()
self.socket = None
if response.get("type") == "auth-failed":
raise RuntimeError(
"Gateway rejected the authentication token"
)
raise RuntimeError(
f"Unexpected auth response type: {response.get('type')}"
)
async def whoami(self):
"""Verify the token by calling the gateway's whoami endpoint.
Returns the identity dict and caches it on ``self.identity``.
"""
gen = self.request("iam", {"operation": "whoami"}, flow_id=None)
async for response in gen:
self.identity = response
return response
async def stop(self):
self.running = False
if hasattr(self, "reader_task"):
await self.reader_task
async def reader(self):
"""
Background task to read websocket responses and route to correct
request
"""
"""Background task: read WebSocket frames and route them to the
correct pending-request queue by ``id``."""
while self.running:
try:
@ -59,23 +120,21 @@ class WebSocketManager:
request_id = response.get("id")
if request_id and request_id in self.pending_requests:
# Put the response in the queue
queue = self.pending_requests[request_id]
await queue.put(response)
else:
logging.warning(
f"Response for unknown request ID: {request_id}"
logger.warning(
"Response for unknown request ID: %s", request_id
)
except Exception as e:
logging.error(f"Error in websocket reader: {e}")
logger.error("Error in websocket reader: %s", e)
# Put error in all pending queues
for queue in self.pending_requests.values():
try:
await queue.put({"error": str(e)})
except:
except Exception:
pass
self.pending_requests.clear()
@ -86,25 +145,29 @@ class WebSocketManager:
async def request(
self, service, request_data, flow_id="default",
workspace=None,
):
"""
Send a request via websocket and handle single or streaming responses
"""Send a request via WebSocket and yield responses.
Args:
service: Gateway service name (e.g. "graph-rag", "config").
request_data: Inner request payload.
flow_id: Optional flow identifier. ``None`` omits the field
(workspace-level services don't use flows).
workspace: Optional workspace override. When ``None`` the
gateway uses the caller's default workspace.
"""
# Generate unique request ID
import time
self.last_used = time.monotonic()
request_id = f"{uuid.uuid4()}"
# Determine if this service streams responses
streaming_services = {"agent"}
is_streaming = service in streaming_services
# Create a queue for all responses (streaming and single)
response_queue = asyncio.Queue()
self.pending_requests[request_id] = response_queue
try:
# Build request message
message = {
"id": request_id,
"service": service,
@ -114,7 +177,16 @@ class WebSocketManager:
if flow_id is not None:
message["flow"] = flow_id
# Send request
# ── Security boundary: workspace scoping ──
# When the caller supplies a workspace, we set it on the
# message envelope. The gateway's enforce_workspace()
# validates that the authenticated identity is permitted
# to access the target workspace — we MUST NOT skip or
# override that check. When workspace is None, the
# gateway default-fills from the identity's bound workspace.
if workspace is not None:
message["workspace"] = workspace
await self.socket.send(json.dumps(message))
while self.running:
@ -127,19 +199,17 @@ class WebSocketManager:
continue
if "error" in response:
if "message" in response["error"]:
raise RuntimeError(response["error"]["text"])
if isinstance(response["error"], dict):
raise RuntimeError(
response["error"].get("message", str(response["error"]))
)
else:
raise RuntimeError(str(response["error"]))
yield response["response"]
if "complete" in response:
if response["complete"]:
if response.get("complete"):
break
except Exception as e:
# Clean up on error
finally:
self.pending_requests.pop(request_id, None)
raise e

View file

@ -10,7 +10,7 @@ description = "TrustGraph provides a means to run a pipeline of flexible AI proc
readme = "README.md"
requires-python = ">=3.8"
dependencies = [
"trustgraph-base>=2.5,<2.6",
"trustgraph-base>=2.6,<2.7",
"pulsar-client",
"prometheus-client",
"boto3",

View file

@ -10,7 +10,7 @@ description = "TrustGraph provides a means to run a pipeline of flexible AI proc
readme = "README.md"
requires-python = ">=3.8"
dependencies = [
"trustgraph-base>=2.5,<2.6",
"trustgraph-base>=2.6,<2.7",
"pulsar-client",
"prometheus-client",
"python-magic",

View file

@ -10,7 +10,7 @@ description = "TrustGraph provides a means to run a pipeline of flexible AI proc
readme = "README.md"
requires-python = ">=3.8"
dependencies = [
"trustgraph-base>=2.5,<2.6",
"trustgraph-base>=2.6,<2.7",
"pulsar-client",
"google-genai",
"google-api-core",

View file

@ -10,13 +10,13 @@ description = "TrustGraph provides a means to run a pipeline of flexible AI proc
readme = "README.md"
requires-python = ">=3.8"
dependencies = [
"trustgraph-base>=2.5,<2.6",
"trustgraph-bedrock>=2.5,<2.6",
"trustgraph-cli>=2.5,<2.6",
"trustgraph-embeddings-hf>=2.5,<2.6",
"trustgraph-flow>=2.5,<2.6",
"trustgraph-unstructured>=2.5,<2.6",
"trustgraph-vertexai>=2.5,<2.6",
"trustgraph-base>=2.6,<2.7",
"trustgraph-bedrock>=2.6,<2.7",
"trustgraph-cli>=2.6,<2.7",
"trustgraph-embeddings-hf>=2.6,<2.7",
"trustgraph-flow>=2.6,<2.7",
"trustgraph-unstructured>=2.6,<2.7",
"trustgraph-vertexai>=2.6,<2.7",
]
classifiers = [
"Programming Language :: Python :: 3",