Merge branch 'release/v2.5'

This commit is contained in:
Cyber MacGeddon 2026-06-09 16:34:20 +01:00
commit fa5ebe2393
139 changed files with 4909 additions and 2375 deletions

View file

@ -7,7 +7,7 @@ FROM docker.io/fedora:42 AS base
ENV PIP_BREAK_SYSTEM_PACKAGES=1
RUN dnf install -y python3.13 libxcb mesa-libGL && \
RUN dnf install -y python3.13 libxcb mesa-libGL poppler-utils && \
alternatives --install /usr/bin/python python /usr/bin/python3.13 1 && \
python -m ensurepip --upgrade && \
pip3 install --no-cache-dir --upgrade 'pip>=26.0' 'setuptools>=78.1.1' && \

File diff suppressed because one or more lines are too long

File diff suppressed because it is too large Load diff

View file

@ -100,6 +100,7 @@ multi-word subsystems.
| `users:admin` | Assign / remove roles on users within the workspace |
| `keys:self` | Create / revoke / list **own** API keys |
| `keys:admin` | Create / revoke / list **any user's** API keys within the workspace |
| `workspaces:list-own` | List workspaces the caller has access to |
| `workspaces:admin` | Create / delete / disable workspaces (system-level) |
| `iam:admin` | JWT signing-key rotation, IAM-level operations |
| `metrics:read` | Prometheus metrics proxy |
@ -110,7 +111,7 @@ The open-source edition ships three roles:
| Role | Capabilities |
|---|---|
| `reader` | `agent`, `graph:read`, `documents:read`, `rows:read`, `llm`, `embeddings`, `mcp`, `collections:read`, `knowledge:read`, `flows:read`, `config:read`, `keys:self` |
| `reader` | `agent`, `graph:read`, `documents:read`, `rows:read`, `llm`, `embeddings`, `mcp`, `collections:read`, `knowledge:read`, `flows:read`, `config:read`, `keys:self`, `workspaces:list-own` |
| `writer` | everything in `reader` **+** `graph:write`, `documents:write`, `rows:write`, `collections:write`, `knowledge:write` |
| `admin` | everything in `writer` **+** `config:write`, `flows:write`, `users:read`, `users:write`, `users:admin`, `keys:admin`, `workspaces:admin`, `iam:admin`, `metrics:read` |

View file

@ -224,6 +224,7 @@ class ApiKeyRecord:
| `enable-user` | `user_id`, `workspace` (optional integrity check) | — | Re-enables a previously disabled user; does not restore API keys. |
| `delete-user` | `user_id`, `workspace` (optional integrity check) | — | Hard-delete; removes user record, username lookup, and all the user's API keys. |
| `create-workspace` | `workspace_record` | `workspace` | System-level. |
| `list-my-workspaces` | `actor` (gateway-injected) | `workspaces` | Returns the workspaces the calling user has access to. OSS: the user's home workspace; if the caller holds the `admin` role, returns all workspaces instead. Enterprise regimes return whatever workspaces the user has been granted access to. |
| `list-workspaces` | — | `workspaces` | System-level. |
| `get-workspace` | `workspace_record` (id only) | `workspace` | System-level. |
| `update-workspace` | `workspace_record` | `workspace` | System-level. |

View file

@ -0,0 +1,535 @@
---
layout: default
title: "Knowledge Core Completeness"
parent: "Tech Specs"
---
# Knowledge Core Completeness
## Overview
Knowledge cores are portable snapshots of extracted knowledge: triples, graph
embeddings, and document embeddings stored in Cassandra's `knowledge` keyspace.
They can be downloaded as files, transferred between TrustGraph instances, and
loaded back into vector and graph stores.
Recent additions to TrustGraph — explainability/provenance and named graphs —
were not carried through to the knowledge core system. This means that
exporting and re-importing a core loses provenance links, graph assignments,
and source material, breaking the explainability chain.
This specification addresses three gaps:
1. **Named graphs not stored** — The `g` (graph name) field on triples is
silently dropped when writing to the core store and comes back as `None`
on read.
2. **Provenance triples not captured** — Provenance triples (PROV-O) are
generated during extraction and flow to graph stores, but never enter
the knowledge core store. It is unclear whether they arrive at the store
in the correct form.
3. **Source material not included** — Documents, text pages, and chunks in
the librarian's bucket store are not part of the core. After loading a
core on a different instance, provenance links to source material point
at nothing.
## Goals
- **Self-contained cores**: A downloaded knowledge core file contains
everything needed to reconstruct the full knowledge graph including
provenance and source attribution on a fresh instance.
- **Named graph preservation**: Round-tripping a core preserves graph
assignments on all triples.
- **Backward compatibility**: Existing core files (without graph names or
source material) can still be uploaded and loaded. New fields are optional
on import.
- **No change to core identity**: A core is still identified by its document
ID. The additional data is associated with the same core ID.
- **Minimal file format changes**: Extend the existing msgpack record format
with new record types rather than restructuring existing ones.
## Background
### Current Lifecycle
```
Extraction pipeline
├─ triples ──────────────────► knowledge core store (Cassandra)
├─ graph embeddings ─────────► knowledge core store (Cassandra)
├─ document embeddings ──────► knowledge core store (Cassandra)
├─ provenance triples ───────► graph store (only)
└─ source documents ─────────► librarian bucket store (only)
Download: Cassandra ──► knowledge manager ──► API gateway ──► client file
Upload: client file ──► API gateway ──► knowledge manager ──► Cassandra
Load: Cassandra ──► knowledge manager ──► Pulsar topics ──► graph/vector stores
```
### Current Core File Format (msgpack)
A core file is a sequence of concatenated msgpack records. Each record is a
2-element tuple: `(type_tag, payload)`.
| Type tag | Payload | Description |
|----------|---------|-------------|
| `"t"` | `{"m": {id, root, collection}, "t": [triple_dicts]}` | Triple batch |
| `"ge"` | `{"m": {id, root, collection}, "e": [{entity, vector}]}` | Graph embedding batch |
### What's Missing
#### Named Graphs
The `Triple` dataclass has a `g: str | None` field (graph name IRI), used to
separate provenance graphs (`urn:graph:source`, `urn:graph:retrieval`) from
the default graph. However:
- **Cassandra schema** (`knowledge.triples` table): stores a 6-tuple per
triple `(s_val, s_is_uri, p_val, p_is_uri, o_val, o_is_uri)` — no graph
field.
- **`add_triples()`** (`tables/knowledge.py:231`): destructures only `s`,
`p`, `o``g` is discarded.
- **`get_triples()`** (`tables/knowledge.py:396`): reconstructs `Triple`
with `g` defaulting to `None`.
- **Core file format**: triple dicts do not include a graph field.
#### Provenance Triples
Provenance triples are generated in the extraction pipeline
(`trustgraph-base/trustgraph/provenance/triples.py`) and published to graph
store topics. They use named graphs (`urn:graph:source`,
`urn:graph:retrieval`) and PROV-O vocabulary.
The knowledge core store processor (`storage/knowledge/store.py`) listens on
`triples-input` and `graph-embeddings-input`. Whether provenance triples
arrive on the same `triples-input` topic or a separate one needs
verification. Even if they do arrive, the graph name would be lost (per
above).
#### Source Material
The librarian stores the full document hierarchy in a separate system:
- **Blob store** (S3/MinIO): original documents, text pages, chunks —
keyed by object UUID under `doc/{object_id}`.
- **Cassandra `library` keyspace**: document metadata including `id`,
`kind` (MIME type), `title`, `parent_id`, `document_type`
(`source`/`extracted`), `object_id` (blob reference).
Provenance triples link extracted facts back to chunk/page/document IDs.
Those IDs resolve through the librarian. When a core is loaded on a
different instance, the librarian has no matching documents, so the entire
provenance chain is broken.
### Key Source Files
| Component | File | Purpose |
|-----------|------|---------|
| Core Cassandra schema | `trustgraph-flow/trustgraph/tables/knowledge.py` | Table definitions, read/write |
| Core manager | `trustgraph-flow/trustgraph/cores/knowledge.py` | API operations, load-to-store |
| Core store processor | `trustgraph-flow/trustgraph/storage/knowledge/store.py` | Extraction → Cassandra |
| CLI download | `trustgraph-cli/trustgraph/cli/get_kg_core.py` | Core → msgpack file |
| CLI upload | `trustgraph-cli/trustgraph/cli/put_kg_core.py` | Msgpack file → core |
| CLI load | `trustgraph-cli/trustgraph/cli/load_kg_core.py` | Core → graph/vector stores |
| API client | `trustgraph-base/trustgraph/api/knowledge.py` | Client-side knowledge API |
| Triple schema | `trustgraph-base/trustgraph/schema/core/primitives.py` | Triple dataclass with `g` field |
| Provenance generation | `trustgraph-base/trustgraph/provenance/triples.py` | PROV-O triple creation |
| Librarian | `trustgraph-flow/trustgraph/librarian/librarian.py` | Document storage service |
| Library tables | `trustgraph-flow/trustgraph/tables/library.py` | Document metadata in Cassandra |
| Blob store | `trustgraph-flow/trustgraph/librarian/blob_store.py` | S3/MinIO object storage |
## Technical Design
### Change 1: Named Graph Field in Core Storage
#### Cassandra Schema
Extend the `triples` tuple from 6 to 7 elements, adding the graph name:
```
triples list<tuple<
text, boolean, -- s_val, s_is_uri
text, boolean, -- p_val, p_is_uri
text, boolean, -- o_val, o_is_uri
text -- graph name (empty string = default graph)
>>
```
**Migration**: The schema change uses `ALTER TABLE` or is handled by
creating a new table version. Existing rows with 6-element tuples must be
handled gracefully on read — if the tuple has 6 elements, treat graph as
default.
#### Write Path (`add_triples`)
Change `tables/knowledge.py:add_triples()` to include `triple.g`:
```python
triples = [
(
*term_to_tuple(v.s), *term_to_tuple(v.p), *term_to_tuple(v.o),
v.g or ""
)
for v in m.triples
]
```
#### Read Path (`get_triples`)
Change `tables/knowledge.py:get_triples()` to restore the graph name:
```python
Triple(
s = tuple_to_term(elt[0], elt[1]),
p = tuple_to_term(elt[2], elt[3]),
o = tuple_to_term(elt[4], elt[5]),
g = elt[6] if len(elt) > 6 and elt[6] else None,
)
```
The `len(elt) > 6` guard provides backward compatibility with existing
6-element rows.
#### Core File Format
Extend triple dicts in the `"t"` record to include the graph name:
```python
# In get_kg_core.py write_triple — each triple dict gains "g" key
{"s": ..., "p": ..., "o": ..., "g": "urn:graph:source"}
```
On read (`put_kg_core.py`), treat missing `"g"` key as default graph for
backward compatibility with old core files.
### Change 2: Provenance Triples in Cores
#### Investigation Required
Before implementation, verify:
1. Whether provenance triples arrive on the `triples-input` topic that the
knowledge core store processor already listens on.
2. If not, which topic they use, and whether the store processor should
subscribe to it.
#### If provenance triples already arrive at the store
The only change needed is Change 1 (named graphs) — the provenance triples
are already being stored, just without their graph name. Once graph names
are preserved, provenance triples will round-trip correctly.
#### If provenance triples do NOT arrive at the store
Two options:
**Option A — Route provenance to the existing store topic**: Configure the
flow so provenance triples are published to the same `triples-input` topic.
This is the simpler approach and keeps the store processor unchanged.
**Option B — Add a subscription**: Add a new `ConsumerSpec` in the store
processor for the provenance topic. This keeps provenance routing
independent but adds complexity.
Recommendation: Option A, unless there is a reason provenance triples are
intentionally kept off the core store topic.
### Change 3: Source Material in Cores
This is the largest change. The goal is that when a core is loaded on a
fresh instance, provenance links to source material resolve.
#### Architecture
Source material is **not stored in the knowledge core tables**. It lives in
the librarian (Cassandra `library` keyspace + S3/MinIO blob store) and is
fetched on demand via the librarian's existing service API.
The knowledge manager acts as a **client of the librarian service** — it
calls the librarian's request/response API over pub/sub to retrieve document
metadata and content. It does not access the library's Cassandra tables or
blob store directly.
#### Transport
The librarian's pub/sub API already handles chunking of large documents.
This chunking is designed to be websocket-friendly, so library content
flowing through the API gateway to external clients does not require
re-chunking. The API gateway remains a transport layer.
```
Download:
Knowledge manager ──pub/sub──► Librarian (fetch metadata + content)
Knowledge manager ──pub/sub──► API gateway ──websocket──► Client
Upload:
Client ──websocket──► API gateway ──pub/sub──► Knowledge manager
Knowledge manager ──pub/sub──► Librarian (store metadata + content)
```
#### What to Include
The provenance chain links facts → chunks → pages → documents. For the
chain to resolve, the core must include:
1. **Document metadata** — the library record for each document in the
hierarchy (id, kind, title, parent_id, document_type, etc.)
2. **Document content** — the blob data for each document (original file,
extracted text pages, text chunks)
Including the full hierarchy is necessary because:
- A user viewing provenance needs to traverse fact → chunk → page → document
- The chunk text is needed to show what text a fact was extracted from
- The page text provides broader context
- The original document is needed for full source attribution
#### Size Implications
Source material will significantly increase core file sizes. A rough model:
| Component | Typical size per document |
|-----------|-------------------------|
| Triples + embeddings (current) | 1-10 MB |
| Chunk text (all chunks) | ~same as original document |
| Page text (all pages) | ~same as original document |
| Original document (PDF, etc.) | Varies widely (KB to hundreds of MB) |
For a 10 MB PDF, the core could grow from ~5 MB to ~25 MB (original +
derived text + existing data). For large document sets, cores could become
very large.
**Decision needed**: Whether to include original documents or just derived
text (pages + chunks). Including only derived text still allows provenance
display but loses the ability to serve the original file.
#### New Core File Record Types
Add new msgpack record types for library content:
| Type tag | Payload | Description |
|----------|---------|-------------|
| `"lm"` | `{"id", "kind", "title", "parent_id", "document_type", "comments", "tags", "metadata"}` | Library document metadata |
| `"lb"` | `{"id", "data"}` | Library document blob content (chunked by pub/sub layer) |
These are emitted after the existing `"t"` and `"ge"` records during
download and processed during upload.
#### Download Path
Extend `KnowledgeManager.get_kg_core()` to:
1. Stream triples and graph embeddings from the core store (existing
behavior).
2. Use the librarian service API to retrieve documents associated with
this core ID:
a. Fetch the root document metadata and content.
b. Use `list-children` to discover child documents (pages, chunks).
c. Recursively fetch metadata and content for each child.
3. Stream each document as `"lm"` (metadata) and `"lb"` (content) records.
The knowledge manager gains the librarian service as a pub/sub dependency.
Large document content is chunked by the librarian's existing pub/sub
transport — the knowledge manager receives and forwards these chunks without
buffering the full blob in memory.
#### Upload Path
Extend `KnowledgeManager.put_kg_core()` to handle the new record types:
1. For `"lm"` records: call the librarian service API to create/update
the document metadata.
2. For `"lb"` records: call the librarian service API to store the
document content.
Parent-child relationships are preserved because `parent_id` is stored in
the metadata. Documents should be processed in hierarchy order (parent
before child) to satisfy any ordering constraints.
#### Load Path
The load path (`_load_kg_core`) publishes triples and embeddings to Pulsar
topics for ingestion into graph/vector stores. Source material does not need
to flow through the load path — it is already in the librarian after the
upload step and can be accessed directly by services that need it.
No changes to the load path for source material.
#### CLI Changes
**`tg-get-kg-core`**: Add handling for `"lm"` and `"lb"` record types in
the file writer.
**`tg-put-kg-core`**: Add handling for `"lm"` and `"lb"` record types in
the file reader. Send library records to the knowledge manager alongside
triple/embedding records.
#### Associating Documents with Cores
The core ID is `metadata.root`, which is the root document ID from the
librarian. This provides a natural join: the core's root document and all
its children (pages, chunks) are the source material for that core.
The librarian's `list-children` API provides the child documents. A
recursive traversal from the root document collects the full hierarchy.
### API Changes
#### KnowledgeResponse Schema
Add optional fields to `KnowledgeResponse` for library data:
```python
@dataclass
class KnowledgeResponse:
error: Error | None = None
ids: list | None = None
eos: bool = False
triples: Triples | None = None
graph_embeddings: GraphEmbeddings | None = None
document_embeddings: DocumentEmbeddings | None = None
library_metadata: LibraryMetadata | None = None # new
library_blob: LibraryBlob | None = None # new
```
#### New Schema Types
```python
@dataclass
class LibraryMetadata:
id: str
kind: str | None = None
title: str | None = None
parent_id: str | None = None
document_type: str | None = None
comments: str | None = None
tags: list[str] | None = None
metadata: list[Triple] | None = None
@dataclass
class LibraryBlob:
id: str
data: bytes
```
#### Socket API
The existing streaming protocol for `get-kg-core` / `put-kg-core` carries
these new fields naturally — responses already stream multiple record types.
### Dependencies Between Changes
```
Change 1 (named graphs) ◄── Change 2 depends on this
└── Change 2 (provenance triples)
└── Change 3 (source material) is independent
```
Change 1 is a prerequisite for Change 2 (provenance triples use named
graphs). Change 3 is independent and can be implemented in parallel.
## Security Considerations
- **Workspace isolation**: Core download/upload must respect workspace
boundaries. Source material from the librarian must only be included if
it belongs to the same workspace as the core. This is already enforced
by the existing workspace-scoped queries.
- **Large blob transfer**: Streaming large documents through the API
is handled by the librarian's existing pub/sub chunking, which is
designed to be websocket-friendly. No additional chunking layer is
needed.
- **Cross-instance trust**: When uploading a core from an external source,
the library content should be treated as untrusted input. Document
metadata and blob content should be validated before insertion.
## Performance Considerations
- **Core file size**: Including source material will significantly increase
core file sizes. Consider adding a flag to download/upload commands to
optionally exclude source material for use cases where only the knowledge
graph is needed.
- **Streaming**: All paths already use streaming (paged Cassandra queries,
msgpack record-at-a-time). Library content should follow the same pattern.
- **Cassandra schema migration**: Changing the tuple width in the `triples`
table requires careful handling. Cassandra frozen tuples cannot be altered
in place — a migration strategy is needed (see Migration Plan).
## Testing Strategy
- **Unit tests**: Triple round-trip with graph name (write → read →
verify `g` field preserved). Backward compatibility with 6-element tuples.
- **Integration tests**: Full lifecycle — extract with provenance → download
core → upload to fresh instance → load → verify provenance chain resolves.
- **File format tests**: Read old-format core files (no graph name, no
library records) and verify they load without error.
- **Library inclusion tests**: Download core with source material → upload →
verify documents accessible through librarian.
## Migration Plan
### Cassandra Schema
The `triples` table stores tuples in a `list<tuple<...>>` column. Cassandra
does not support altering the type of an existing column. Options:
**Option A — New table**: Create a `triples_v2` table with the 7-element
tuple. Migrate data from `triples` to `triples_v2`. The read path checks
both tables during a transition period, then the old table is dropped.
**Option B — Dual read**: Keep the existing table. The read path handles
both 6-element and 7-element tuples by checking length. New writes use
7-element tuples. This works if Cassandra accepts variable-length tuples in
a list — **needs verification**.
**Option C — Separate graph column**: Instead of extending the tuple, add a
parallel `graphs list<text>` column where `graphs[i]` corresponds to
`triples[i]`. This avoids tuple migration entirely but requires keeping the
two lists in sync.
Recommendation: Verify Option B first (simplest). Fall back to Option A if
Cassandra rejects mixed tuple lengths.
### Core File Format
Backward compatible by design:
- Old files lack `"g"` in triple dicts and have no `"lm"`/`"lb"` records →
handled by defaults.
- New files read by old code → old code ignores unknown record types (the
existing `read_message` raises on unknown types, so this needs a small
fix to skip unknown types gracefully).
## Open Questions
1. **Provenance topic routing**: Do provenance triples currently arrive at
the `triples-input` topic consumed by the knowledge core store? If not,
what topic are they on?
2. **Include original documents?**: Should cores include the original
uploaded document (e.g. PDF), or only derived text (pages + chunks)?
Including originals makes cores fully self-contained but potentially
very large. Excluding them preserves provenance text display but loses
the ability to serve the original file.
3. **Optional source material**: Should there be a flag on download/upload
to include or exclude source material? This would let users choose
between compact cores (knowledge only) and complete cores (knowledge +
sources).
4. **Cassandra tuple migration**: Can Cassandra handle mixed-length tuples
in a `list<tuple<...>>` column, or is a table migration required?
5. **Document embedding cores**: DE cores are managed alongside KG cores.
Do they need the same treatment (source material inclusion)? The
document embeddings reference chunk IDs — the same provenance chain
applies.
6. **Core versioning**: Should the core file include a version marker so
readers can distinguish old-format from new-format files without
trial-and-error parsing?
## References
- Extraction-time provenance: `docs/tech-specs/extraction-time-provenance.md`
- Query-time explainability: `docs/tech-specs/query-time-explainability.md`
- Agent explainability: `docs/tech-specs/agent-explainability.md`
- Data ownership model: `docs/tech-specs/data-ownership-model.md`

File diff suppressed because one or more lines are too long

View file

@ -28,8 +28,9 @@ specs/
Location: `specs/api/openapi.yaml`
The REST API specification documents:
- **5 Global Services**: config, flow, librarian, knowledge, collection-management
- **16 Flow-Hosted Services**: agent, RAG, embeddings, queries, loading, tools
- **Global Services**: IAM (user management, authentication)
- **5 Workspace-Scoped Services**: config, flow, librarian, knowledge, collection-management
- **16 Flow-Scoped Services**: agent, RAG, embeddings, queries, loading, tools
- **Import/Export**: Bulk data operations
- **Metrics**: Prometheus monitoring

View file

@ -2,6 +2,55 @@
This directory contains the modular OpenAPI 3.1 specification for the TrustGraph REST API Gateway.
## Authentication
Clients authenticate by passing an opaque bearer token in the
`Authorization` header. The gateway resolves the token to an
authenticated identity and an associated workspace. Tokens are
obtained via the IAM service (e.g. `tg-login` or `tg-create-api-key`).
## Service Tiers
API services are organized into three tiers based on their scoping:
### Global services
These services are not scoped to a workspace. They manage
system-wide resources.
- **IAM** — user management, authentication, API key lifecycle
### Workspace-scoped services
These services operate within the workspace associated with the
authenticated token. The workspace is resolved by the gateway from
the bearer token — it is not passed as an explicit parameter.
- **Config** — configuration management (prompts, token costs, etc.)
- **Librarian** — document library management
- **Knowledge** — knowledge graph core management
- **Collection Management** — collection metadata
- **Flow** — flow lifecycle and blueprint management
### Flow-scoped services
These services require a `flow` parameter identifying the processing
flow to use, in addition to the workspace context from the token.
- **Agent** — agentic AI interactions
- **Document RAG** — retrieval-augmented generation over documents
- **Graph RAG** — retrieval-augmented generation over knowledge graphs
- **Text Completion** — LLM text completion
- **Prompt** — prompt template expansion
- **Embeddings** — vector embedding generation
- **SPARQL Query** — SPARQL queries against the knowledge graph
- **Graph Embeddings** — knowledge graph embedding queries
- **Document Embeddings** — document embedding queries
- **Structured Query** — structured data queries
- **Row Embeddings** — structured data embedding queries
- **Rows Query** — row-level data queries
- **Triples Query** — knowledge graph triple queries
## Structure
```

View file

@ -14,7 +14,7 @@ properties:
- delete-collection
description: |
Collection operation:
- `list-collections`: List collections in workspace
- `list-collections`: List collections in the current workspace (resolved from token)
- `update-collection`: Create or update collection metadata
- `delete-collection`: Delete collection
collection:

View file

@ -0,0 +1,21 @@
type: object
description: |
API key creation fields. Used with `create-api-key`.
properties:
user_id:
type: string
description: User to create the key for.
examples:
- usr_abc123
name:
type: string
description: Operator-facing label for the key (e.g. "laptop", "CI").
examples:
- laptop
expires:
type: string
description: |
Optional expiry timestamp in ISO-8601 UTC. Empty string or
omitted means the key does not expire.
examples:
- "2027-01-01T00:00:00Z"

View file

@ -0,0 +1,38 @@
type: object
description: API key record returned by IAM operations.
properties:
id:
type: string
description: Key identifier.
examples:
- key_xyz789
user_id:
type: string
description: Owning user identifier.
examples:
- usr_abc123
name:
type: string
description: Operator-facing label.
examples:
- laptop
prefix:
type: string
description: |
First 4 characters of the plaintext key, for identification
in listings. Never enough to reconstruct the key.
examples:
- tg_a
expires:
type: string
description: Expiry timestamp (ISO-8601 UTC). Empty if no expiry.
examples:
- "2027-01-01T00:00:00Z"
created:
type: string
description: Creation timestamp (ISO-8601 UTC).
examples:
- "2026-01-15T10:30:00Z"
last_used:
type: string
description: Last-used timestamp (ISO-8601 UTC). Empty if never used.

View file

@ -0,0 +1,106 @@
type: object
description: |
IAM service request.
The IAM service is a **global service** — it operates at system level,
not scoped to a specific workspace. All operations are dispatched via
the `operation` field.
Some operations require admin capabilities; others (like `whoami` and
`list-my-workspaces`) are available to any authenticated user. See
the capability vocabulary for details.
The `actor` field is injected by the gateway and cannot be set by
the client. It identifies the authenticated caller.
required:
- operation
properties:
operation:
type: string
enum:
- whoami
- list-my-workspaces
- create-user
- list-users
- get-user
- update-user
- disable-user
- enable-user
- delete-user
- create-workspace
- list-workspaces
- get-workspace
- update-workspace
- disable-workspace
- create-api-key
- list-api-keys
- revoke-api-key
- reset-password
- rotate-signing-key
description: |
Operation to perform.
**Any authenticated user:**
- `whoami`: Return the caller's own user record
- `list-my-workspaces`: List workspaces the caller has access to
**User management (requires `users:read`/`users:write`/`users:admin`):**
- `create-user`: Create a new user in a workspace
- `list-users`: List users (optionally filtered by workspace)
- `get-user`: Get a specific user record
- `update-user`: Update user fields (name, email, roles, enabled)
- `disable-user`: Soft-disable a user and revoke their API keys
- `enable-user`: Re-enable a previously disabled user
- `delete-user`: Hard-delete a user and their API keys
**Workspace management (requires `workspaces:admin`):**
- `create-workspace`: Create a new workspace
- `list-workspaces`: List all workspaces (admin view)
- `get-workspace`: Get a specific workspace record
- `update-workspace`: Update workspace name or enabled state
- `disable-workspace`: Disable workspace and all its users
**API key management (requires `keys:self` or `keys:admin`):**
- `create-api-key`: Create an API key for a user
- `list-api-keys`: List API keys for a user
- `revoke-api-key`: Revoke (delete) an API key
**Password management:**
- `reset-password`: Admin-initiated password reset (requires `users:admin`)
**System (requires `iam:admin`):**
- `rotate-signing-key`: Rotate the JWT signing key
workspace:
type: string
description: |
Workspace scope. Required on workspace-scoped operations
(e.g. `create-user`). Acts as an optional integrity check on
operations that target a user or key — when supplied, the target's
home workspace must match.
Omitted for system-level operations (`list-workspaces`,
`rotate-signing-key`) and for identity-resolution operations
(`whoami`, `list-my-workspaces`).
examples:
- default
- production
user_id:
type: string
description: |
Target user identifier. Required for operations that act on a
specific user: `get-user`, `update-user`, `disable-user`,
`enable-user`, `delete-user`, `reset-password`, `list-api-keys`.
examples:
- usr_abc123
user:
$ref: './UserInput.yaml'
workspace_record:
$ref: './WorkspaceInput.yaml'
key:
$ref: './ApiKeyInput.yaml'
key_id:
type: string
description: |
API key identifier. Required for `revoke-api-key`.
examples:
- key_xyz789

View file

@ -0,0 +1,51 @@
type: object
description: |
IAM service response. Fields are populated depending on the
operation that was invoked.
properties:
user:
$ref: './UserRecord.yaml'
users:
type: array
description: List of user records (populated by `list-users`).
items:
$ref: './UserRecord.yaml'
workspace:
$ref: './WorkspaceRecord.yaml'
workspaces:
type: array
description: |
List of workspace records (populated by `list-workspaces` and
`list-my-workspaces`).
items:
$ref: './WorkspaceRecord.yaml'
api_key_plaintext:
type: string
description: |
Plaintext API key. Returned **once** by `create-api-key`.
Never populated on any other operation. The caller must
capture this value — it cannot be retrieved again.
api_key:
$ref: './ApiKeyRecord.yaml'
api_keys:
type: array
description: List of API key records (populated by `list-api-keys`).
items:
$ref: './ApiKeyRecord.yaml'
temporary_password:
type: string
description: |
Temporary password returned once by `reset-password`.
error:
type: object
description: Error details (present on failure).
properties:
type:
type: string
description: |
Error type. One of: `invalid-argument`, `not-found`,
`duplicate`, `auth-failed`, `weak-password`, `disabled`,
`operation-not-permitted`, `internal-error`.
message:
type: string
description: Human-readable error description (not surfaced to end users).

View file

@ -0,0 +1,42 @@
type: object
description: |
User creation/update fields. Used with `create-user` and `update-user`.
The `password` field is only accepted on `create-user`.
properties:
username:
type: string
description: Login username. Unique within a workspace.
examples:
- alice
name:
type: string
description: Display name.
examples:
- Alice Smith
email:
type: string
description: Email address.
examples:
- alice@example.com
password:
type: string
description: |
Initial password. Only accepted on `create-user`; rejected on
`update-user`. Use `reset-password` or `change-password` to
modify passwords.
roles:
type: array
items:
type: string
description: |
Roles to assign. Open-source roles: `reader`, `writer`, `admin`.
examples:
- - reader
enabled:
type: boolean
description: Whether the user is enabled.
default: true
must_change_password:
type: boolean
description: Force password change on next login.
default: false

View file

@ -0,0 +1,46 @@
type: object
description: User record returned by IAM operations.
properties:
id:
type: string
description: Unique user identifier.
examples:
- usr_abc123
workspace:
type: string
description: User's home workspace.
examples:
- default
username:
type: string
description: Login username (unique within workspace).
examples:
- alice
name:
type: string
description: Display name.
examples:
- Alice Smith
email:
type: string
description: Email address.
examples:
- alice@example.com
roles:
type: array
items:
type: string
description: Assigned roles.
examples:
- - reader
enabled:
type: boolean
description: Whether the user is enabled.
must_change_password:
type: boolean
description: Whether the user must change password on next login.
created:
type: string
description: Creation timestamp (ISO-8601 UTC).
examples:
- "2026-01-15T10:30:00Z"

View file

@ -0,0 +1,23 @@
type: object
description: |
Workspace creation/update fields. Used with `create-workspace` and
`update-workspace`.
properties:
id:
type: string
description: |
Workspace identifier. Required for all workspace operations.
Immutable after creation.
examples:
- default
- production
name:
type: string
description: Human-readable workspace name.
examples:
- Default Workspace
- Production
enabled:
type: boolean
description: Whether the workspace is enabled.
default: true

View file

@ -0,0 +1,21 @@
type: object
description: Workspace record returned by IAM operations.
properties:
id:
type: string
description: Workspace identifier.
examples:
- default
name:
type: string
description: Human-readable workspace name.
examples:
- Default Workspace
enabled:
type: boolean
description: Whether the workspace is enabled.
created:
type: string
description: Creation timestamp (ISO-8601 UTC).
examples:
- "2026-01-01T00:00:00Z"

View file

@ -18,7 +18,7 @@ properties:
- unload-kg-core
description: |
Knowledge core operation:
- `list-kg-cores`: List knowledge cores in workspace
- `list-kg-cores`: List knowledge cores in the current workspace (resolved from token)
- `get-kg-core`: Get knowledge core by ID
- `put-kg-core`: Store triples and/or embeddings
- `delete-kg-core`: Delete knowledge core by ID

View file

@ -2,21 +2,44 @@ openapi: 3.1.0
info:
title: TrustGraph API Gateway
version: "2.2"
version: "2.4"
description: |
REST API for TrustGraph - an AI-powered knowledge graph and RAG system.
## Overview
The API provides access to:
- **Global Services**: Configuration, flow management, knowledge storage, library management
- **Flow-Hosted Services**: AI services like RAG, text completion, embeddings (require running flow)
- **Global Services**: IAM (user management, authentication)
- **Workspace-Scoped Services**: Configuration, flow management, knowledge storage, library management
- **Flow-Scoped Services**: AI services like RAG, text completion, embeddings (require running flow)
- **Import/Export**: Bulk data operations for triples, embeddings, entity contexts
- **WebSocket**: Multiplexed interface for all services
## Service Types
## Authentication
Clients authenticate by passing an opaque bearer token in the
`Authorization` header. The token is obtained via the IAM service
(e.g. `tg-login` or `tg-create-api-key`).
```
Authorization: Bearer <token>
```
The gateway resolves the token to an authenticated identity and an
associated workspace. The token is an opaque string — clients must
not make assumptions about its internal structure.
## Service Tiers
### Global Services
System-wide services with no workspace scoping:
- `iam` - User management, authentication, API key lifecycle
### Workspace-Scoped Services
Operate within the workspace associated with the authenticated
token. The workspace is resolved by the gateway — it is not
passed as an explicit parameter.
Fixed endpoints accessible via `/api/v1/{kind}`:
- `config` - Configuration management
- `flow` - Flow lifecycle and blueprints
@ -24,24 +47,17 @@ info:
- `knowledge` - Knowledge graph core management
- `collection-management` - Collection metadata
### Flow-Hosted Services
Require running flow instance, accessed via `/api/v1/flow/{flow}/service/{kind}`:
### Flow-Scoped Services
Require a `flow` parameter identifying the processing flow to use.
Workspace context comes from the authenticated token.
Accessed via `/api/v1/flow/{flow}/service/{kind}`:
- AI services: agent, text-completion, prompt, RAG (document/graph)
- Embeddings: embeddings, graph-embeddings, document-embeddings
- Query: triples, rows, nlp-query, structured-query, sparql-query, row-embeddings
- Data loading: text-load, document-load
- Utilities: mcp-tool, structured-diag
## Authentication
Bearer token authentication when `GATEWAY_SECRET` environment variable is set.
Include token in Authorization header:
```
Authorization: Bearer <token>
```
If `GATEWAY_SECRET` is not set, API runs without authentication (development mode).
## Field Naming
All JSON fields use **kebab-case**: `flow-id`, `blueprint-name`, `doc-limit`, etc.
@ -73,18 +89,20 @@ security:
- bearerAuth: []
tags:
- name: IAM
description: Identity and access management (global)
- name: Config
description: Configuration management (global service)
description: Configuration management (workspace-scoped)
- name: Flow
description: Flow lifecycle and blueprint management (global service)
description: Flow lifecycle and blueprint management (workspace-scoped)
- name: Librarian
description: Document library management (global service)
description: Document library management (workspace-scoped)
- name: Knowledge
description: Knowledge graph core management (global service)
description: Knowledge graph core management (workspace-scoped)
- name: Collection
description: Collection metadata management (global service)
description: Collection metadata management (workspace-scoped)
- name: Flow Services
description: Services hosted within flow instances
description: AI and query services hosted within flow instances (flow-scoped)
- name: Import/Export
description: Bulk data import and export
- name: WebSocket
@ -93,6 +111,11 @@ tags:
description: System metrics and monitoring
paths:
# Global services
/api/v1/iam:
$ref: './paths/iam.yaml'
# Workspace-scoped services
/api/v1/config:
$ref: './paths/config.yaml'
/api/v1/flow:

View file

@ -1,10 +1,13 @@
post:
tags:
- Collection
summary: Collection metadata management
summary: Collection metadata management (workspace-scoped)
description: |
Manage collection metadata for organizing documents and knowledge.
This is a **workspace-scoped** service. All operations apply to the
workspace associated with the authenticated bearer token.
## Collections
Collections are organizational units for grouping:

View file

@ -1,9 +1,13 @@
post:
tags:
- Config
summary: Configuration service
summary: Configuration service (workspace-scoped)
description: |
Manage TrustGraph configuration including flows, prompts, token costs, parameter types, and more.
Manage TrustGraph configuration including flows, prompts, token costs,
parameter types, and more.
This is a **workspace-scoped** service. All operations apply to the
workspace associated with the authenticated bearer token.
## Operations

View file

@ -1,10 +1,13 @@
post:
tags:
- Flow
summary: Flow lifecycle and blueprint management
summary: Flow lifecycle and blueprint management (workspace-scoped)
description: |
Manage flow instances and blueprints.
This is a **workspace-scoped** service. All operations apply to the
workspace associated with the authenticated bearer token.
## Important Distinction
The **flow service** manages *running flow instances*.

View file

@ -5,6 +5,10 @@ post:
description: |
AI agent that can understand questions, reason about them, and take actions.
This is a **flow-scoped** service. It requires a flow instance
and operates within the workspace associated with the
authenticated bearer token.
## Agent Overview
The agent service provides a conversational AI that:

View file

@ -5,6 +5,10 @@ post:
description: |
Query document embeddings to find similar text chunks by vector similarity.
This is a **flow-scoped** service. It requires a flow instance
and operates within the workspace associated with the
authenticated bearer token.
## Document Embeddings Query Overview
Find document chunks semantically similar to a query vector:

View file

@ -5,6 +5,10 @@ post:
description: |
Load binary documents (PDF, Word, etc.) into processing pipeline.
This is a **flow-scoped** service. It requires a flow instance
and operates within the workspace associated with the
authenticated bearer token.
## Document Load Overview
Fire-and-forget binary document loading:

View file

@ -5,6 +5,10 @@ post:
description: |
Retrieval-Augmented Generation over document embeddings.
This is a **flow-scoped** service. It requires a flow instance
and operates within the workspace associated with the
authenticated bearer token.
## Document RAG Overview
Document RAG combines:

View file

@ -5,6 +5,10 @@ post:
description: |
Convert text to embedding vectors for semantic similarity search.
This is a **flow-scoped** service. It requires a flow instance
and operates within the workspace associated with the
authenticated bearer token.
## Embeddings Overview
Embeddings transform text into dense vector representations that:

View file

@ -5,6 +5,10 @@ post:
description: |
Query graph embeddings to find similar entities by vector similarity.
This is a **flow-scoped** service. It requires a flow instance
and operates within the workspace associated with the
authenticated bearer token.
## Graph Embeddings Query Overview
Find entities semantically similar to a query vector:

View file

@ -5,6 +5,10 @@ post:
description: |
Retrieval-Augmented Generation over knowledge graph.
This is a **flow-scoped** service. It requires a flow instance
and operates within the workspace associated with the
authenticated bearer token.
## Graph RAG Overview
Graph RAG combines:

View file

@ -5,6 +5,10 @@ post:
description: |
Execute MCP (Model Context Protocol) tools for agent capabilities.
This is a **flow-scoped** service. It requires a flow instance
and operates within the workspace associated with the
authenticated bearer token.
## MCP Tool Overview
MCP tools provide agent capabilities through standardized protocol:

View file

@ -5,6 +5,10 @@ post:
description: |
Convert natural language questions to structured GraphQL queries.
This is a **flow-scoped** service. It requires a flow instance
and operates within the workspace associated with the
authenticated bearer token.
## NLP Query Overview
Transforms user questions into executable GraphQL:

View file

@ -5,6 +5,10 @@ post:
description: |
Execute stored prompt templates with variable substitution.
This is a **flow-scoped** service. It requires a flow instance
and operates within the workspace associated with the
authenticated bearer token.
## Prompt Service Overview
The prompt service enables:

View file

@ -4,6 +4,11 @@ post:
summary: Row Embeddings Query - semantic search on structured data
description: |
Query row embeddings to find similar rows by vector similarity on indexed fields.
This is a **flow-scoped** service. It requires a flow instance
and operates within the workspace associated with the
authenticated bearer token.
Enables fuzzy/semantic matching on structured data.
## Row Embeddings Query Overview

View file

@ -5,6 +5,10 @@ post:
description: |
Query structured data using GraphQL for row-oriented data access.
This is a **flow-scoped** service. It requires a flow instance
and operates within the workspace associated with the
authenticated bearer token.
## Rows Query Overview
GraphQL interface to structured data:

View file

@ -5,6 +5,10 @@ post:
description: |
Execute a SPARQL 1.1 query against the knowledge graph.
This is a **flow-scoped** service. It requires a flow instance
and operates within the workspace associated with the
authenticated bearer token.
## Supported Query Types
- **SELECT**: Returns variable bindings as a table of results

View file

@ -5,6 +5,10 @@ post:
description: |
Analyze and understand structured data (CSV, JSON, XML).
This is a **flow-scoped** service. It requires a flow instance
and operates within the workspace associated with the
authenticated bearer token.
## Structured Diag Overview
Helps process unknown structured data:

View file

@ -5,6 +5,10 @@ post:
description: |
Ask natural language questions and get results directly.
This is a **flow-scoped** service. It requires a flow instance
and operates within the workspace associated with the
authenticated bearer token.
## Structured Query Overview
Combines two operations in one call:

View file

@ -5,6 +5,10 @@ post:
description: |
Direct text completion using LLM without retrieval augmentation.
This is a **flow-scoped** service. It requires a flow instance
and operates within the workspace associated with the
authenticated bearer token.
## Text Completion Overview
Pure LLM generation for:

View file

@ -5,6 +5,10 @@ post:
description: |
Load text documents into processing pipeline for indexing and embedding.
This is a **flow-scoped** service. It requires a flow instance
and operates within the workspace associated with the
authenticated bearer token.
## Text Load Overview
Fire-and-forget document loading:

View file

@ -5,6 +5,10 @@ post:
description: |
Query knowledge graph using subject-predicate-object patterns.
This is a **flow-scoped** service. It requires a flow instance
and operates within the workspace associated with the
authenticated bearer token.
## Triples Query Overview
Query RDF triples with flexible pattern matching:

206
specs/api/paths/iam.yaml Normal file
View file

@ -0,0 +1,206 @@
post:
tags:
- IAM
summary: IAM service (global)
description: |
Identity and access management service.
This is a **global service** — it operates at system level, not
scoped to a specific workspace. The `workspace` field in the
request body is used as a scope filter or integrity check on
certain operations, not as an addressing component.
## Authentication
Most operations require a bearer token. The gateway resolves the
token to an authenticated identity and injects the `actor` field
(the caller's user ID) into the request. Clients cannot set
`actor` — the gateway overwrites it.
## Operations by Capability
### Any authenticated user
- `whoami`: Return the caller's own user record
- `list-my-workspaces`: List workspaces the caller has access to.
For open-source IAM: returns the caller's home workspace, or all
workspaces if the caller has the `admin` role.
### User management (`users:read` / `users:write` / `users:admin`)
- `create-user`: Create a new user in a workspace
- `list-users`: List users, optionally filtered by workspace
- `get-user`: Get a user record by ID
- `update-user`: Update user fields (name, email, roles, enabled)
- `disable-user`: Soft-disable a user and revoke their API keys
- `enable-user`: Re-enable a disabled user
- `delete-user`: Hard-delete a user and their API keys
### Workspace management (`workspaces:admin`)
- `create-workspace`: Create a new workspace
- `list-workspaces`: List all workspaces (admin view)
- `get-workspace`: Get a workspace record
- `update-workspace`: Update workspace name or enabled state
- `disable-workspace`: Disable a workspace and all its users
### API key management (`keys:self` / `keys:admin`)
- `create-api-key`: Create an API key (plaintext returned once)
- `list-api-keys`: List API keys for a user
- `revoke-api-key`: Revoke (delete) an API key
### Password management (`users:admin`)
- `reset-password`: Admin-initiated password reset (returns temporary password)
### System (`iam:admin`)
- `rotate-signing-key`: Rotate the JWT signing key
operationId: iamService
security:
- bearerAuth: []
requestBody:
required: true
content:
application/json:
schema:
$ref: '../components/schemas/iam/IamRequest.yaml'
examples:
whoami:
summary: Get the caller's own user record
value:
operation: whoami
listMyWorkspaces:
summary: List workspaces the caller has access to
value:
operation: list-my-workspaces
createUser:
summary: Create a new user
value:
operation: create-user
workspace: default
user:
username: alice
name: Alice Smith
email: alice@example.com
password: changeme123
roles:
- writer
listUsers:
summary: List users in a workspace
value:
operation: list-users
workspace: default
getUser:
summary: Get a specific user
value:
operation: get-user
user_id: usr_abc123
updateUser:
summary: Update a user's roles
value:
operation: update-user
user_id: usr_abc123
user:
roles:
- admin
disableUser:
summary: Disable a user
value:
operation: disable-user
user_id: usr_abc123
createWorkspace:
summary: Create a workspace
value:
operation: create-workspace
workspace_record:
id: production
name: Production Workspace
listWorkspaces:
summary: List all workspaces (admin)
value:
operation: list-workspaces
createApiKey:
summary: Create an API key
value:
operation: create-api-key
key:
user_id: usr_abc123
name: laptop
expires: "2027-01-01T00:00:00Z"
listApiKeys:
summary: List a user's API keys
value:
operation: list-api-keys
user_id: usr_abc123
revokeApiKey:
summary: Revoke an API key
value:
operation: revoke-api-key
key_id: key_xyz789
resetPassword:
summary: Admin-initiated password reset
value:
operation: reset-password
user_id: usr_abc123
responses:
'200':
description: Successful response
content:
application/json:
schema:
$ref: '../components/schemas/iam/IamResponse.yaml'
examples:
whoami:
summary: Caller's user record
value:
user:
id: usr_abc123
workspace: default
username: alice
name: Alice Smith
email: alice@example.com
roles:
- writer
enabled: true
must_change_password: false
created: "2026-01-15T10:30:00Z"
listMyWorkspaces:
summary: Workspaces the caller can access
value:
workspaces:
- id: default
name: Default Workspace
enabled: true
created: "2026-01-01T00:00:00Z"
listUsers:
summary: Users in a workspace
value:
users:
- id: usr_abc123
workspace: default
username: alice
name: Alice Smith
roles:
- writer
enabled: true
created: "2026-01-15T10:30:00Z"
createApiKey:
summary: New API key (plaintext returned once)
value:
api_key_plaintext: tg_aBcDeFgHiJkLmNoPqRsTuVwXyZ
api_key:
id: key_xyz789
user_id: usr_abc123
name: laptop
prefix: tg_a
expires: "2027-01-01T00:00:00Z"
created: "2026-05-29T14:00:00Z"
resetPassword:
summary: Temporary password (returned once)
value:
temporary_password: tmp_xK9mQ2pL
'400':
description: Bad request (unknown operation, missing required fields)
'401':
$ref: '../components/responses/Unauthorized.yaml'
'403':
description: Access denied (insufficient capabilities)
'500':
$ref: '../components/responses/Error.yaml'

View file

@ -1,9 +1,13 @@
post:
tags:
- Knowledge
summary: Knowledge graph core management
summary: Knowledge graph core management (workspace-scoped)
description: |
Manage knowledge graph cores - persistent storage of triples and embeddings.
Manage knowledge graph cores - persistent storage of triples and
embeddings.
This is a **workspace-scoped** service. All operations apply to the
workspace associated with the authenticated bearer token.
## Knowledge Cores

View file

@ -1,9 +1,13 @@
post:
tags:
- Librarian
summary: Document library management
summary: Document library management (workspace-scoped)
description: |
Manage document library: add, remove, list documents, and control processing.
Manage document library: add, remove, list documents, and control
processing.
This is a **workspace-scoped** service. All operations apply to the
workspace associated with the authenticated bearer token.
## Document Library

View file

@ -26,7 +26,7 @@ get:
### Request Message Format
**Global Service Request** (no flow parameter):
**Workspace-Scoped Service Request** (no flow parameter):
```json
{
"id": "req-123",
@ -38,7 +38,7 @@ get:
}
```
**Flow-Hosted Service Request** (with flow parameter):
**Flow-Scoped Service Request** (with flow parameter):
```json
{
"id": "req-456",
@ -54,7 +54,7 @@ get:
**Request Fields**:
- `id` (string, required): Client-generated unique identifier for this request within the session. Used to match responses to requests.
- `service` (string, required): Service identifier (e.g., "config", "agent", "document-rag"). Same as `{kind}` in REST URLs.
- `flow` (string, optional): Flow ID for flow-hosted services. Omit for global services.
- `flow` (string, optional): Flow ID for flow-scoped services. Omit for workspace-scoped and global services.
- `request` (object, required): Service-specific request payload. Same structure as REST API request body.
### Response Message Format
@ -96,14 +96,14 @@ get:
| `POST /api/v1/config` | `{"service": "config"}` |
| `POST /api/v1/flow/{flow}/service/agent` | `{"service": "agent", "flow": "my-flow"}` |
**Global Services** (no `flow` parameter):
**Workspace-Scoped Services** (no `flow` parameter, workspace from token):
- `config` - Configuration management
- `flow` - Flow lifecycle and blueprints
- `librarian` - Document library management
- `knowledge` - Knowledge graph core management
- `collection-management` - Collection metadata
**Flow-Hosted Services** (require `flow` parameter):
**Flow-Scoped Services** (require `flow` parameter, workspace from token):
- AI services: `agent`, `text-completion`, `prompt`, `document-rag`, `graph-rag`
- Embeddings: `embeddings`, `graph-embeddings`, `document-embeddings`
- Query: `triples`, `objects`, `nlp-query`, `structured-query`
@ -146,9 +146,11 @@ get:
## Authentication
When `GATEWAY_SECRET` is set, include bearer token:
- As query parameter: `ws://localhost:8088/api/v1/socket?token=<token>`
- Or in WebSocket subprotocol header
The `/api/v1/socket` endpoint uses in-band authentication.
The WebSocket handshake is accepted unconditionally. After
connecting, the client sends a bearer token as the first frame.
The gateway resolves the token to an identity and workspace.
All subsequent requests operate within that workspace context.
## Benefits Over REST

View file

@ -3,10 +3,19 @@ scheme: bearer
description: |
Bearer token authentication.
Set via `GATEWAY_SECRET` environment variable on the gateway.
If `GATEWAY_SECRET` is not set, authentication is disabled (development mode).
Clients authenticate by passing an opaque token in the
`Authorization` header. The token is treated as an opaque string by
clients — its internal structure is a gateway implementation detail
and must not be relied upon.
The gateway resolves the token to an authenticated identity and an
associated workspace. All workspace-scoped and flow-scoped operations
then execute within that workspace context.
Tokens are obtained via the IAM service (e.g. `tg-login` or
`tg-create-api-key`).
Example:
```
Authorization: Bearer your-secret-token
Authorization: Bearer <token>
```

View file

@ -24,7 +24,7 @@ echo
# Build WebSocket API documentation
echo "Building WebSocket API documentation (AsyncAPI)..."
cd ../websocket
npx --yes -p @asyncapi/cli asyncapi generate fromTemplate asyncapi.yaml @asyncapi/html-template -o /tmp/asyncapi-build -p singleFile=true --force-write
npx --yes @asyncapi/cli generate fromTemplate asyncapi.yaml @asyncapi/html-template -o /tmp/asyncapi-build -p singleFile=true --force-write --use-new-generator
mv /tmp/asyncapi-build/index.html ../../docs/websocket.html
rm -rf /tmp/asyncapi-build
echo "✓ WebSocket API docs generated: docs/websocket.html"

View file

@ -2,7 +2,7 @@ asyncapi: 3.0.0
info:
title: TrustGraph WebSocket API
version: "2.2"
version: "2.4"
description: |
WebSocket API for TrustGraph - providing multiplexed, asynchronous access to all services.
@ -14,21 +14,35 @@ info:
- **Efficient**: Lower overhead than HTTP REST
- **Streaming**: Real-time progressive responses
## Authentication
The `/api/v1/socket` endpoint uses **in-band authentication**.
The WebSocket handshake is accepted unconditionally. The client
must authenticate by sending a bearer token as the first message
after connecting. The gateway resolves the token to an
authenticated identity and workspace.
All subsequent requests execute within the workspace context
established by the authentication frame.
## Protocol Summary
All messages are JSON with:
- `id`: Client-generated unique identifier for request/response correlation
- `service`: Service identifier (e.g., "config", "agent", "document-rag")
- `flow`: Optional flow ID for flow-hosted services
- `flow`: Optional flow ID for flow-scoped services
- `request`/`response`: Service-specific payload (identical to REST API schemas)
- `error`: Error information on failure
## Service Types
## Service Tiers
**Global Services** (no `flow` parameter):
**Global Services** (no workspace scoping):
- iam
**Workspace-Scoped Services** (workspace resolved from token):
- config, flow, librarian, knowledge, collection-management
**Flow-Hosted Services** (require `flow` parameter):
**Flow-Scoped Services** (require `flow` parameter, workspace from token):
- agent, text-completion, prompt, document-rag, graph-rag
- embeddings, graph-embeddings, document-embeddings
- triples, rows, nlp-query, structured-query, sparql-query, structured-diag, row-embeddings
@ -64,11 +78,14 @@ components:
securitySchemes:
bearerAuth:
type: httpApiKey
name: token
in: query
name: Authorization
in: header
description: |
Bearer token authentication when GATEWAY_SECRET is configured.
Include as query parameter: ws://localhost:8088/api/v1/socket?token=<token>
Bearer token authentication. The `/api/v1/socket` endpoint
uses in-band authentication: the WebSocket handshake is
accepted unconditionally and the client sends a bearer token
as the first frame after connecting. The token is an opaque
string obtained via the IAM service.
messages:
ServiceRequest:

View file

@ -3,8 +3,16 @@ description: |
Primary WebSocket channel for all TrustGraph services.
This single channel provides multiplexed access to:
- All global services (config, flow, librarian, knowledge, collection-management)
- All flow-hosted services (agent, RAG, embeddings, queries, loading, etc.)
- Global services (IAM)
- Workspace-scoped services (config, flow, librarian, knowledge, collection-management)
- Flow-scoped services (agent, RAG, embeddings, queries, loading, etc.)
## Authentication
The handshake is accepted unconditionally. The client must send a
bearer token as the first frame after connecting (in-band auth).
The gateway resolves the token to an identity and workspace. All
subsequent requests execute within that workspace context.
## Multiplexing
@ -13,16 +21,17 @@ description: |
## Message Flow
1. Client sends request with unique `id`, `service`, optional `flow`, and `request` payload
2. Server processes request asynchronously
3. Server sends response(s) with matching `id` and either `response` or `error`
4. For streaming services, multiple responses may be sent with the same `id`
1. Client connects and sends bearer token as first frame (authentication)
2. Client sends requests with unique `id`, `service`, optional `flow`, and `request` payload
3. Server processes request asynchronously
4. Server sends response(s) with matching `id` and either `response` or `error`
5. For streaming services, multiple responses may be sent with the same `id`
## Service Routing
Messages are routed to services based on:
- `service`: Service identifier (required)
- `flow`: Flow ID (required for flow-hosted services, omitted for global services)
- `flow`: Flow ID (required for flow-scoped services, omitted for workspace-scoped and global services)
messages:
request:

View file

@ -9,14 +9,17 @@ description: |
payload:
description: Service request envelope with id, service, optional flow, and service-specific request payload
oneOf:
# Global services (no flow parameter)
# Global services
- $ref: './requests/IamRequest.yaml'
# Workspace-scoped services (no flow parameter)
- $ref: './requests/ConfigRequest.yaml'
- $ref: './requests/FlowRequest.yaml'
- $ref: './requests/LibrarianRequest.yaml'
- $ref: './requests/KnowledgeRequest.yaml'
- $ref: './requests/CollectionManagementRequest.yaml'
# Flow-hosted services (require flow parameter)
# Flow-scoped services (require flow parameter)
- $ref: './requests/AgentRequest.yaml'
- $ref: './requests/DocumentRagRequest.yaml'
- $ref: './requests/GraphRagRequest.yaml'

View file

@ -1,5 +1,5 @@
type: object
description: WebSocket request for agent service (flow-hosted service)
description: WebSocket request for agent service (flow-scoped service)
required:
- id
- service

View file

@ -1,5 +1,5 @@
type: object
description: WebSocket request for collection-management service (global service)
description: WebSocket request for collection-management service (workspace-scoped service)
required:
- id
- service

View file

@ -1,5 +1,5 @@
type: object
description: WebSocket request for config service (global service)
description: WebSocket request for config service (workspace-scoped service)
required:
- id
- service

View file

@ -1,5 +1,5 @@
type: object
description: WebSocket request for document-embeddings service (flow-hosted service)
description: WebSocket request for document-embeddings service (flow-scoped service)
required:
- id
- service

View file

@ -1,5 +1,5 @@
type: object
description: WebSocket request for document-load service (flow-hosted service)
description: WebSocket request for document-load service (flow-scoped service)
required:
- id
- service

View file

@ -1,5 +1,5 @@
type: object
description: WebSocket request for document-rag service (flow-hosted service)
description: WebSocket request for document-rag service (flow-scoped service)
required:
- id
- service

View file

@ -1,5 +1,5 @@
type: object
description: WebSocket request for embeddings service (flow-hosted service)
description: WebSocket request for embeddings service (flow-scoped service)
required:
- id
- service

View file

@ -1,5 +1,5 @@
type: object
description: WebSocket request for flow service (global service)
description: WebSocket request for flow service (workspace-scoped service)
required:
- id
- service

View file

@ -1,5 +1,5 @@
type: object
description: WebSocket request for graph-embeddings service (flow-hosted service)
description: WebSocket request for graph-embeddings service (flow-scoped service)
required:
- id
- service

View file

@ -1,5 +1,5 @@
type: object
description: WebSocket request for graph-rag service (flow-hosted service)
description: WebSocket request for graph-rag service (flow-scoped service)
required:
- id
- service

View file

@ -0,0 +1,25 @@
type: object
description: WebSocket request for IAM service (global service)
required:
- id
- service
- request
properties:
id:
type: string
description: Unique request identifier
service:
type: string
const: iam
description: Service identifier for IAM service
request:
$ref: '../../../../api/components/schemas/iam/IamRequest.yaml'
examples:
- id: req-1
service: iam
request:
operation: whoami
- id: req-2
service: iam
request:
operation: list-my-workspaces

View file

@ -1,5 +1,5 @@
type: object
description: WebSocket request for knowledge service (global service)
description: WebSocket request for knowledge service (workspace-scoped service)
required:
- id
- service

View file

@ -1,5 +1,5 @@
type: object
description: WebSocket request for librarian service (global service)
description: WebSocket request for librarian service (workspace-scoped service)
required:
- id
- service

View file

@ -1,5 +1,5 @@
type: object
description: WebSocket request for mcp-tool service (flow-hosted service)
description: WebSocket request for mcp-tool service (flow-scoped service)
required:
- id
- service

View file

@ -1,5 +1,5 @@
type: object
description: WebSocket request for nlp-query service (flow-hosted service)
description: WebSocket request for nlp-query service (flow-scoped service)
required:
- id
- service

View file

@ -1,5 +1,5 @@
type: object
description: WebSocket request for prompt service (flow-hosted service)
description: WebSocket request for prompt service (flow-scoped service)
required:
- id
- service

View file

@ -1,5 +1,5 @@
type: object
description: WebSocket request for row-embeddings service (flow-hosted service)
description: WebSocket request for row-embeddings service (flow-scoped service)
required:
- id
- service

View file

@ -1,5 +1,5 @@
type: object
description: WebSocket request for rows service (flow-hosted service)
description: WebSocket request for rows service (flow-scoped service)
required:
- id
- service

View file

@ -1,5 +1,5 @@
type: object
description: WebSocket request for sparql-query service (flow-hosted service)
description: WebSocket request for sparql-query service (flow-scoped service)
required:
- id
- service

View file

@ -1,5 +1,5 @@
type: object
description: WebSocket request for structured-diag service (flow-hosted service)
description: WebSocket request for structured-diag service (flow-scoped service)
required:
- id
- service

View file

@ -1,5 +1,5 @@
type: object
description: WebSocket request for structured-query service (flow-hosted service)
description: WebSocket request for structured-query service (flow-scoped service)
required:
- id
- service

View file

@ -1,5 +1,5 @@
type: object
description: WebSocket request for text-completion service (flow-hosted service)
description: WebSocket request for text-completion service (flow-scoped service)
required:
- id
- service

View file

@ -1,5 +1,5 @@
type: object
description: WebSocket request for text-load service (flow-hosted service)
description: WebSocket request for text-load service (flow-scoped service)
required:
- id
- service

View file

@ -1,5 +1,5 @@
type: object
description: WebSocket request for triples service (flow-hosted service)
description: WebSocket request for triples service (flow-scoped service)
required:
- id
- service

View file

@ -23,8 +23,9 @@ properties:
description: |
Service identifier. Same as {kind} in REST API URLs.
Global services: config, flow, librarian, knowledge, collection-management
Flow-hosted services: agent, text-completion, prompt, document-rag, graph-rag,
Global services: iam
Workspace-scoped services: config, flow, librarian, knowledge, collection-management
Flow-scoped services: agent, text-completion, prompt, document-rag, graph-rag,
embeddings, graph-embeddings, document-embeddings, triples, objects,
nlp-query, structured-query, structured-diag, text-load, document-load, mcp-tool
examples:
@ -34,10 +35,12 @@ properties:
flow:
type: string
description: |
Flow ID for flow-hosted services. Required for services accessed via
Flow ID for flow-scoped services. Required for services accessed via
/api/v1/flow/{flow}/service/{kind} in REST API.
Omit this field for global services (config, flow, librarian, knowledge, collection-management).
Omit for global services (iam) and workspace-scoped services
(config, flow, librarian, knowledge, collection-management).
Workspace context is resolved from the authenticated token.
examples:
- my-flow
- production-flow

View file

@ -409,4 +409,57 @@ class TestEdgeCases:
assert hosts == ['mixed-host']
assert username is None # Stays None
assert password == 'mixed-pass'
assert password == 'mixed-pass'
class TestReplicationFactorParamPath:
def test_explicit_kwarg(self):
with patch.dict(os.environ, {}, clear=True):
_, _, _, _, rf = resolve_cassandra_config(
replication_factor=3,
)
assert rf == 3
def test_kwarg_overrides_env(self):
with patch.dict(os.environ, {'CASSANDRA_REPLICATION_FACTOR': '5'}, clear=True):
_, _, _, _, rf = resolve_cassandra_config(
replication_factor=3,
)
assert rf == 3
def test_env_fallback_when_kwarg_none(self):
with patch.dict(os.environ, {'CASSANDRA_REPLICATION_FACTOR': '5'}, clear=True):
_, _, _, _, rf = resolve_cassandra_config(
replication_factor=None,
)
assert rf == 5
def test_default_when_no_kwarg_no_env(self):
with patch.dict(os.environ, {}, clear=True):
_, _, _, _, rf = resolve_cassandra_config()
assert rf == 1
def test_params_dict_path(self):
with patch.dict(os.environ, {}, clear=True):
params = {'cassandra_replication_factor': 3}
_, _, _, _, rf = resolve_cassandra_config(
replication_factor=params.get('cassandra_replication_factor'),
)
assert rf == 3
def test_params_dict_overrides_env(self):
with patch.dict(os.environ, {'CASSANDRA_REPLICATION_FACTOR': '5'}, clear=True):
params = {'cassandra_replication_factor': 3}
_, _, _, _, rf = resolve_cassandra_config(
replication_factor=params.get('cassandra_replication_factor'),
)
assert rf == 3
def test_params_dict_missing_falls_to_env(self):
with patch.dict(os.environ, {'CASSANDRA_REPLICATION_FACTOR': '5'}, clear=True):
params = {}
_, _, _, _, rf = resolve_cassandra_config(
replication_factor=params.get('cassandra_replication_factor'),
)
assert rf == 5

View file

@ -0,0 +1,136 @@
import os
import pytest
from unittest.mock import patch
from trustgraph.base.qdrant_config import (
get_qdrant_defaults,
resolve_qdrant_config,
)
class TestGetQdrantDefaults:
def test_defaults_with_no_env_vars(self):
with patch.dict(os.environ, {}, clear=True):
defaults = get_qdrant_defaults()
assert defaults['url'] == 'http://localhost:6333'
assert defaults['api_key'] is None
assert defaults['replication_factor'] == 1
assert defaults['shard_number'] == 1
def test_defaults_from_env(self):
env = {
'QDRANT_URL': 'http://qdrant:6333',
'QDRANT_API_KEY': 'secret',
'QDRANT_REPLICATION_FACTOR': '3',
'QDRANT_SHARD_NUMBER': '5',
}
with patch.dict(os.environ, env, clear=True):
defaults = get_qdrant_defaults()
assert defaults['url'] == 'http://qdrant:6333'
assert defaults['api_key'] == 'secret'
assert defaults['replication_factor'] == 3
assert defaults['shard_number'] == 5
class TestResolveQdrantConfig:
def test_defaults(self):
with patch.dict(os.environ, {}, clear=True):
url, api_key, rf, sn = resolve_qdrant_config()
assert url == 'http://localhost:6333'
assert api_key is None
assert rf == 1
assert sn == 1
def test_explicit_kwargs(self):
with patch.dict(os.environ, {}, clear=True):
url, api_key, rf, sn = resolve_qdrant_config(
url='http://custom:6333',
api_key='key',
replication_factor=3,
shard_number=5,
)
assert url == 'http://custom:6333'
assert api_key == 'key'
assert rf == 3
assert sn == 5
def test_kwargs_override_env(self):
env = {
'QDRANT_URL': 'http://env:6333',
'QDRANT_REPLICATION_FACTOR': '10',
'QDRANT_SHARD_NUMBER': '10',
}
with patch.dict(os.environ, env, clear=True):
url, _, rf, sn = resolve_qdrant_config(
url='http://explicit:6333',
replication_factor=3,
shard_number=5,
)
assert url == 'http://explicit:6333'
assert rf == 3
assert sn == 5
def test_env_fallback_when_kwargs_none(self):
env = {
'QDRANT_URL': 'http://env:6333',
'QDRANT_REPLICATION_FACTOR': '3',
'QDRANT_SHARD_NUMBER': '5',
}
with patch.dict(os.environ, env, clear=True):
url, _, rf, sn = resolve_qdrant_config()
assert url == 'http://env:6333'
assert rf == 3
assert sn == 5
def test_params_dict_path(self):
with patch.dict(os.environ, {}, clear=True):
params = {
'store_uri': 'http://params:6333',
'api_key': 'pkey',
'qdrant_replication_factor': 3,
'qdrant_shard_number': 5,
}
url, api_key, rf, sn = resolve_qdrant_config(
url=params.get('store_uri'),
api_key=params.get('api_key'),
replication_factor=params.get('qdrant_replication_factor'),
shard_number=params.get('qdrant_shard_number'),
)
assert url == 'http://params:6333'
assert api_key == 'pkey'
assert rf == 3
assert sn == 5
def test_params_dict_overrides_env(self):
env = {
'QDRANT_REPLICATION_FACTOR': '10',
'QDRANT_SHARD_NUMBER': '10',
}
with patch.dict(os.environ, env, clear=True):
params = {
'qdrant_replication_factor': 3,
'qdrant_shard_number': 5,
}
_, _, rf, sn = resolve_qdrant_config(
replication_factor=params.get('qdrant_replication_factor'),
shard_number=params.get('qdrant_shard_number'),
)
assert rf == 3
assert sn == 5
def test_params_dict_missing_falls_to_env(self):
env = {
'QDRANT_REPLICATION_FACTOR': '3',
'QDRANT_SHARD_NUMBER': '5',
}
with patch.dict(os.environ, env, clear=True):
params = {}
_, _, rf, sn = resolve_qdrant_config(
replication_factor=params.get('qdrant_replication_factor'),
shard_number=params.get('qdrant_shard_number'),
)
assert rf == 3
assert sn == 5

View file

@ -11,7 +11,12 @@ from unittest.mock import AsyncMock, Mock, patch, MagicMock
from unittest.mock import call
from trustgraph.cores.knowledge import KnowledgeManager
from trustgraph.schema import KnowledgeResponse, Triples, GraphEmbeddings, Metadata, Triple, Term, EntityEmbeddings, IRI, LITERAL
from trustgraph.schema import (
KnowledgeResponse, Triples, GraphEmbeddings, Metadata, Triple, Term,
EntityEmbeddings, IRI, LITERAL,
LibraryMetadata, LibraryBlob,
LibrarianResponse, DocumentMetadata,
)
@pytest.fixture
@ -373,11 +378,252 @@ class TestKnowledgeManagerOtherMethods:
mock_respond = AsyncMock()
await knowledge_manager.delete_kg_core(mock_request, mock_respond, "test-user")
# Verify table store was called correctly
knowledge_manager.table_store.delete_kg_core.assert_called_once_with("test-user", "test-doc-id")
# Verify response
mock_respond.assert_called_once()
response = mock_respond.call_args[0][0]
assert response.error is None
assert response.error is None
class TestKnowledgeManagerLibraryDownload:
"""Test get_kg_core streaming of library documents."""
@pytest.fixture
def manager_with_librarian(self, mock_flow_config):
with patch('trustgraph.cores.knowledge.KnowledgeTableStore'):
mock_librarian = AsyncMock()
manager = KnowledgeManager(
cassandra_host=["localhost"],
cassandra_username="test_user",
cassandra_password="test_pass",
keyspace="test_keyspace",
flow_config=mock_flow_config,
librarian=mock_librarian,
)
manager.table_store = AsyncMock()
return manager
@pytest.mark.asyncio
async def test_get_kg_core_streams_library_docs(self, manager_with_librarian):
mock_request = Mock()
mock_request.id = "root-doc"
mock_respond = AsyncMock()
manager_with_librarian.table_store.get_triples = AsyncMock()
manager_with_librarian.table_store.get_graph_embeddings = AsyncMock()
root_meta = DocumentMetadata(
id="root-doc", kind="application/pdf", title="Test PDF",
document_type="source",
)
child_meta = DocumentMetadata(
id="chunk-1", kind="text/plain", title="Chunk 1",
parent_id="root-doc", document_type="chunk",
)
manager_with_librarian.librarian.fetch_document_metadata.return_value = root_meta
manager_with_librarian.librarian.request.return_value = LibrarianResponse(
document_metadatas=[child_meta],
)
manager_with_librarian.librarian.fetch_document_content.side_effect = [
b"cm9vdCBjb250ZW50",
b"Y2h1bmsgY29udGVudA==",
]
await manager_with_librarian.get_kg_core(
mock_request, mock_respond, "test-user"
)
responses = [c[0][0] for c in mock_respond.call_args_list]
lm_responses = [r for r in responses if r.library_metadata is not None]
lb_responses = [r for r in responses if r.library_blob is not None]
eos_responses = [r for r in responses if r.eos is True]
assert len(lm_responses) == 2
assert lm_responses[0].library_metadata.id == "root-doc"
assert lm_responses[0].library_metadata.document_type == "source"
assert lm_responses[1].library_metadata.id == "chunk-1"
assert lm_responses[1].library_metadata.parent_id == "root-doc"
assert len(lb_responses) == 2
assert lb_responses[0].library_blob.id == "root-doc"
assert lb_responses[0].library_blob.data == b"cm9vdCBjb250ZW50"
assert lb_responses[1].library_blob.id == "chunk-1"
assert len(eos_responses) == 1
@pytest.mark.asyncio
async def test_get_kg_core_no_librarian_skips_library(self, mock_flow_config):
with patch('trustgraph.cores.knowledge.KnowledgeTableStore'):
manager = KnowledgeManager(
cassandra_host=["localhost"],
cassandra_username="u", cassandra_password="p",
keyspace="ks", flow_config=mock_flow_config,
)
manager.table_store = AsyncMock()
manager.table_store.get_triples = AsyncMock()
manager.table_store.get_graph_embeddings = AsyncMock()
mock_request = Mock()
mock_request.id = "doc-1"
mock_respond = AsyncMock()
await manager.get_kg_core(mock_request, mock_respond, "w")
responses = [c[0][0] for c in mock_respond.call_args_list]
assert all(r.library_metadata is None for r in responses)
assert all(r.library_blob is None for r in responses)
@pytest.mark.asyncio
async def test_get_kg_core_librarian_metadata_failure_is_graceful(
self, manager_with_librarian,
):
mock_request = Mock()
mock_request.id = "missing-doc"
mock_respond = AsyncMock()
manager_with_librarian.table_store.get_triples = AsyncMock()
manager_with_librarian.table_store.get_graph_embeddings = AsyncMock()
manager_with_librarian.librarian.fetch_document_metadata.side_effect = (
RuntimeError("not found")
)
await manager_with_librarian.get_kg_core(
mock_request, mock_respond, "test-user"
)
responses = [c[0][0] for c in mock_respond.call_args_list]
assert all(r.library_metadata is None for r in responses)
assert any(r.eos for r in responses)
class TestKnowledgeManagerLibraryUpload:
"""Test put_kg_core handling of library metadata and blob records."""
@pytest.fixture
def manager_with_librarian(self, mock_flow_config):
with patch('trustgraph.cores.knowledge.KnowledgeTableStore'):
mock_librarian = AsyncMock()
manager = KnowledgeManager(
cassandra_host=["localhost"],
cassandra_username="u", cassandra_password="p",
keyspace="ks", flow_config=mock_flow_config,
librarian=mock_librarian,
)
manager.table_store = AsyncMock()
return manager
@pytest.mark.asyncio
async def test_put_metadata_then_blob_calls_librarian(
self, manager_with_librarian,
):
mock_respond = AsyncMock()
manager_with_librarian.librarian.request.return_value = LibrarianResponse()
# First call: metadata
req_meta = Mock()
req_meta.triples = None
req_meta.graph_embeddings = None
req_meta.library_metadata = LibraryMetadata(
id="doc-1", kind="application/pdf", title="Test",
document_type="source",
)
req_meta.library_blob = None
await manager_with_librarian.put_kg_core(req_meta, mock_respond, "ws")
# Metadata is buffered, librarian not called yet
manager_with_librarian.librarian.request.assert_not_called()
# Second call: blob
req_blob = Mock()
req_blob.triples = None
req_blob.graph_embeddings = None
req_blob.library_metadata = None
req_blob.library_blob = LibraryBlob(
id="doc-1", data=b"dGVzdA==",
)
await manager_with_librarian.put_kg_core(req_blob, mock_respond, "ws")
# Now librarian should have been called with add-document
manager_with_librarian.librarian.request.assert_called_once()
call_args = manager_with_librarian.librarian.request.call_args[0][0]
assert call_args.operation == "add-document"
assert call_args.document_metadata.id == "doc-1"
assert call_args.document_metadata.kind == "application/pdf"
assert call_args.content == b"dGVzdA=="
@pytest.mark.asyncio
async def test_put_child_document_uses_add_child_operation(
self, manager_with_librarian,
):
mock_respond = AsyncMock()
manager_with_librarian.librarian.request.return_value = LibrarianResponse()
req_meta = Mock()
req_meta.triples = None
req_meta.graph_embeddings = None
req_meta.library_metadata = LibraryMetadata(
id="chunk-1", kind="text/plain", title="Chunk",
parent_id="doc-1", document_type="chunk",
)
req_meta.library_blob = None
await manager_with_librarian.put_kg_core(req_meta, mock_respond, "ws")
req_blob = Mock()
req_blob.triples = None
req_blob.graph_embeddings = None
req_blob.library_metadata = None
req_blob.library_blob = LibraryBlob(id="chunk-1", data=b"Y2h1bms=")
await manager_with_librarian.put_kg_core(req_blob, mock_respond, "ws")
call_args = manager_with_librarian.librarian.request.call_args[0][0]
assert call_args.operation == "add-child-document"
assert call_args.document_metadata.parent_id == "doc-1"
@pytest.mark.asyncio
async def test_put_blob_without_metadata_logs_warning(
self, manager_with_librarian,
):
mock_respond = AsyncMock()
req_blob = Mock()
req_blob.triples = None
req_blob.graph_embeddings = None
req_blob.library_metadata = None
req_blob.library_blob = LibraryBlob(id="orphan", data=b"data")
await manager_with_librarian.put_kg_core(req_blob, mock_respond, "ws")
# Librarian should not be called for orphan blob
manager_with_librarian.librarian.request.assert_not_called()
@pytest.mark.asyncio
async def test_put_existing_document_is_graceful(
self, manager_with_librarian,
):
mock_respond = AsyncMock()
manager_with_librarian.librarian.request.side_effect = RuntimeError(
"Document already exists"
)
req_meta = Mock()
req_meta.triples = None
req_meta.graph_embeddings = None
req_meta.library_metadata = LibraryMetadata(
id="doc-1", kind="application/pdf", title="Test",
document_type="source",
)
req_meta.library_blob = None
await manager_with_librarian.put_kg_core(req_meta, mock_respond, "ws")
req_blob = Mock()
req_blob.triples = None
req_blob.graph_embeddings = None
req_blob.library_metadata = None
req_blob.library_blob = LibraryBlob(id="doc-1", data=b"data")
await manager_with_librarian.put_kg_core(req_blob, mock_respond, "ws")
# Should not raise — "already exists" is handled gracefully

View file

@ -18,7 +18,7 @@ from trustgraph.embeddings.hf.hf import Processor
class TestHuggingFaceDynamicModelLoading(IsolatedAsyncioTestCase):
"""Test HuggingFace dynamic model loading and caching"""
@patch('trustgraph.embeddings.hf.hf.HuggingFaceEmbeddings')
@patch('langchain_huggingface.HuggingFaceEmbeddings')
@patch('trustgraph.base.async_processor.AsyncProcessor.__init__')
@patch('trustgraph.base.embeddings_service.EmbeddingsService.__init__')
async def test_default_model_loaded_on_init(self, mock_embeddings_init, mock_async_init, mock_hf_class):
@ -39,7 +39,7 @@ class TestHuggingFaceDynamicModelLoading(IsolatedAsyncioTestCase):
assert processor.cached_model_name == "test-model"
assert processor.embeddings is not None
@patch('trustgraph.embeddings.hf.hf.HuggingFaceEmbeddings')
@patch('langchain_huggingface.HuggingFaceEmbeddings')
@patch('trustgraph.base.async_processor.AsyncProcessor.__init__')
@patch('trustgraph.base.embeddings_service.EmbeddingsService.__init__')
async def test_model_caching_avoids_reload(self, mock_embeddings_init, mock_async_init, mock_hf_class):
@ -63,7 +63,7 @@ class TestHuggingFaceDynamicModelLoading(IsolatedAsyncioTestCase):
mock_hf_class.assert_not_called()
assert processor.cached_model_name == "test-model"
@patch('trustgraph.embeddings.hf.hf.HuggingFaceEmbeddings')
@patch('langchain_huggingface.HuggingFaceEmbeddings')
@patch('trustgraph.base.async_processor.AsyncProcessor.__init__')
@patch('trustgraph.base.embeddings_service.EmbeddingsService.__init__')
async def test_model_reload_on_name_change(self, mock_embeddings_init, mock_async_init, mock_hf_class):
@ -84,7 +84,7 @@ class TestHuggingFaceDynamicModelLoading(IsolatedAsyncioTestCase):
mock_hf_class.assert_called_once_with(model_name="different-model")
assert processor.cached_model_name == "different-model"
@patch('trustgraph.embeddings.hf.hf.HuggingFaceEmbeddings')
@patch('langchain_huggingface.HuggingFaceEmbeddings')
@patch('trustgraph.base.async_processor.AsyncProcessor.__init__')
@patch('trustgraph.base.embeddings_service.EmbeddingsService.__init__')
async def test_on_embeddings_uses_default_model(self, mock_embeddings_init, mock_async_init, mock_hf_class):
@ -107,7 +107,7 @@ class TestHuggingFaceDynamicModelLoading(IsolatedAsyncioTestCase):
assert processor.cached_model_name == "test-model" # Still using default
assert result == [[0.1, 0.2, 0.3, 0.4, 0.5]]
@patch('trustgraph.embeddings.hf.hf.HuggingFaceEmbeddings')
@patch('langchain_huggingface.HuggingFaceEmbeddings')
@patch('trustgraph.base.async_processor.AsyncProcessor.__init__')
@patch('trustgraph.base.embeddings_service.EmbeddingsService.__init__')
async def test_on_embeddings_uses_specified_model(self, mock_embeddings_init, mock_async_init, mock_hf_class):
@ -130,7 +130,7 @@ class TestHuggingFaceDynamicModelLoading(IsolatedAsyncioTestCase):
assert processor.cached_model_name == "custom-model"
mock_hf_instance.embed_documents.assert_called_once_with(["test text"])
@patch('trustgraph.embeddings.hf.hf.HuggingFaceEmbeddings')
@patch('langchain_huggingface.HuggingFaceEmbeddings')
@patch('trustgraph.base.async_processor.AsyncProcessor.__init__')
@patch('trustgraph.base.embeddings_service.EmbeddingsService.__init__')
async def test_multiple_model_switches(self, mock_embeddings_init, mock_async_init, mock_hf_class):
@ -164,7 +164,7 @@ class TestHuggingFaceDynamicModelLoading(IsolatedAsyncioTestCase):
assert call_count_after_b == initial_call_count + 2 # Reload for model-b
assert call_count_after_a_again == initial_call_count + 3 # Reload back to model-a
@patch('trustgraph.embeddings.hf.hf.HuggingFaceEmbeddings')
@patch('langchain_huggingface.HuggingFaceEmbeddings')
@patch('trustgraph.base.async_processor.AsyncProcessor.__init__')
@patch('trustgraph.base.embeddings_service.EmbeddingsService.__init__')
async def test_none_model_uses_default(self, mock_embeddings_init, mock_async_init, mock_hf_class):
@ -187,7 +187,7 @@ class TestHuggingFaceDynamicModelLoading(IsolatedAsyncioTestCase):
assert mock_hf_class.call_count == initial_count
assert processor.cached_model_name == "test-model"
@patch('trustgraph.embeddings.hf.hf.HuggingFaceEmbeddings')
@patch('langchain_huggingface.HuggingFaceEmbeddings')
@patch('trustgraph.base.async_processor.AsyncProcessor.__init__')
@patch('trustgraph.base.embeddings_service.EmbeddingsService.__init__')
async def test_initialization_without_model_uses_default(self, mock_embeddings_init, mock_async_init, mock_hf_class):

View file

@ -333,8 +333,8 @@ class TestUnifiedTableQueries:
"""Test queries against the unified rows table"""
@pytest.mark.asyncio
@patch('trustgraph.query.rows.cassandra.service.async_execute', new_callable=AsyncMock)
async def test_query_with_index_match(self, mock_async_execute):
@patch('trustgraph.query.rows.cassandra.service.async_execute_paged', new_callable=AsyncMock)
async def test_query_with_index_match(self, mock_async_execute_paged):
"""Test query execution with matching index"""
processor = MagicMock()
processor.session = MagicMock()
@ -344,10 +344,10 @@ class TestUnifiedTableQueries:
processor.find_matching_index = Processor.find_matching_index.__get__(processor, Processor)
processor.query_cassandra = Processor.query_cassandra.__get__(processor, Processor)
# Mock async_execute to return test data
# Mock async_execute_paged to return test data (list of pages)
mock_row = MagicMock()
mock_row.data = {"id": "123", "name": "Test Product", "category": "electronics"}
mock_async_execute.return_value = [mock_row]
mock_async_execute_paged.return_value = [[mock_row]]
schema = RowSchema(
name="products",
@ -370,10 +370,10 @@ class TestUnifiedTableQueries:
# Verify Cassandra was connected and queried
processor.connect_cassandra.assert_called_once()
mock_async_execute.assert_called_once()
mock_async_execute_paged.assert_called_once()
# Verify query structure - should query unified rows table
call_args = mock_async_execute.call_args
call_args = mock_async_execute_paged.call_args
query = call_args[0][1]
params = call_args[0][2]
@ -394,8 +394,8 @@ class TestUnifiedTableQueries:
assert results[0]["category"] == "electronics"
@pytest.mark.asyncio
@patch('trustgraph.query.rows.cassandra.service.async_execute', new_callable=AsyncMock)
async def test_query_without_index_match(self, mock_async_execute):
@patch('trustgraph.query.rows.cassandra.service.async_scan', new_callable=AsyncMock)
async def test_query_without_index_match(self, mock_async_scan):
"""Test query execution without matching index (scan mode)"""
processor = MagicMock()
processor.session = MagicMock()
@ -406,12 +406,10 @@ class TestUnifiedTableQueries:
processor._matches_filters = Processor._matches_filters.__get__(processor, Processor)
processor.query_cassandra = Processor.query_cassandra.__get__(processor, Processor)
# Mock async_execute to return test data
# Mock async_scan to return filtered test data
mock_row1 = MagicMock()
mock_row1.data = {"id": "1", "name": "Product A", "price": "100"}
mock_row2 = MagicMock()
mock_row2.data = {"id": "2", "name": "Product B", "price": "200"}
mock_async_execute.return_value = [mock_row1, mock_row2]
mock_async_scan.return_value = [mock_row1]
schema = RowSchema(
name="products",
@ -432,13 +430,16 @@ class TestUnifiedTableQueries:
limit=10
)
# Query should use ALLOW FILTERING for scan
call_args = mock_async_execute.call_args
# Verify async_scan was called
mock_async_scan.assert_called_once()
# Verify query structure
call_args = mock_async_scan.call_args
query = call_args[0][1]
assert "ALLOW FILTERING" in query
# Should post-filter results
# Should return filtered results
assert len(results) == 1
assert results[0]["name"] == "Product A"

View file

@ -259,6 +259,8 @@ class TestGraphEmbeddingsNullProtection:
proc.collection_exists = MagicMock(return_value=True)
proc._cache_lock = asyncio.Lock()
proc._known_collections = set()
proc.replication_factor = 1
proc.shard_number = 1
msg = MagicMock()
msg.metadata.collection = "graphs"

View file

@ -35,9 +35,9 @@ def _make_store():
class TestGetGraphEmbeddings:
@pytest.mark.asyncio
@patch('trustgraph.tables.knowledge.async_execute', new_callable=AsyncMock)
@patch('trustgraph.tables.knowledge.async_execute_paged', new_callable=AsyncMock)
async def test_row_converts_to_entity_embeddings_with_singular_vector(
self, mock_async_execute
self, mock_async_execute_paged
):
"""
Cassandra rows return entities as a list of [entity_tuple, vector]
@ -57,7 +57,7 @@ class TestGetGraphEmbeddings:
store = _make_store()
store.cassandra = Mock()
store.get_graph_embeddings_stmt = Mock()
mock_async_execute.return_value = [fake_row]
mock_async_execute_paged.return_value = [[fake_row]]
received = []
@ -66,7 +66,7 @@ class TestGetGraphEmbeddings:
await store.get_graph_embeddings("alice", "doc-1", receiver)
mock_async_execute.assert_called_once_with(
mock_async_execute_paged.assert_called_once_with(
store.cassandra,
store.get_graph_embeddings_stmt,
("alice", "doc-1"),
@ -96,8 +96,8 @@ class TestGetGraphEmbeddings:
assert ge.entities[2].entity.value == "a literal entity"
@pytest.mark.asyncio
@patch('trustgraph.tables.knowledge.async_execute', new_callable=AsyncMock)
async def test_empty_entities_blob_yields_empty_list(self, mock_async_execute):
@patch('trustgraph.tables.knowledge.async_execute_paged', new_callable=AsyncMock)
async def test_empty_entities_blob_yields_empty_list(self, mock_async_execute_paged):
"""row[3] being None / empty must produce a GraphEmbeddings with
no entities, not raise."""
fake_row = (None, None, None, None)
@ -105,7 +105,7 @@ class TestGetGraphEmbeddings:
store = _make_store()
store.cassandra = Mock()
store.get_graph_embeddings_stmt = Mock()
mock_async_execute.return_value = [fake_row]
mock_async_execute_paged.return_value = [[fake_row]]
received = []
@ -118,8 +118,8 @@ class TestGetGraphEmbeddings:
assert received[0].entities == []
@pytest.mark.asyncio
@patch('trustgraph.tables.knowledge.async_execute', new_callable=AsyncMock)
async def test_multiple_rows_each_emit_one_message(self, mock_async_execute):
@patch('trustgraph.tables.knowledge.async_execute_paged', new_callable=AsyncMock)
async def test_multiple_rows_each_emit_one_message(self, mock_async_execute_paged):
fake_rows = [
(None, None, None, [
(("http://example.org/a", True), [1.0]),
@ -132,7 +132,7 @@ class TestGetGraphEmbeddings:
store = _make_store()
store.cassandra = Mock()
store.get_graph_embeddings_stmt = Mock()
mock_async_execute.return_value = fake_rows
mock_async_execute_paged.return_value = [fake_rows]
received = []
@ -153,9 +153,9 @@ class TestGetTriples:
the same Metadata construction. Cover it for parity."""
@pytest.mark.asyncio
@patch('trustgraph.tables.knowledge.async_execute', new_callable=AsyncMock)
async def test_row_converts_to_triples(self, mock_async_execute):
# row[3] is a list of (s_val, s_uri, p_val, p_uri, o_val, o_uri)
@patch('trustgraph.tables.knowledge.async_execute_paged', new_callable=AsyncMock)
async def test_row_converts_to_triples(self, mock_async_execute_paged):
# row[3] is a list of (s_val, s_uri, p_val, p_uri, o_val, o_uri, graph)
fake_row = (
None, None, None,
[
@ -163,6 +163,7 @@ class TestGetTriples:
"http://example.org/alice", True,
"http://example.org/knows", True,
"http://example.org/bob", True,
"urn:graph:source",
),
],
)
@ -170,7 +171,7 @@ class TestGetTriples:
store = _make_store()
store.cassandra = Mock()
store.get_triples_stmt = Mock()
mock_async_execute.return_value = [fake_row]
mock_async_execute_paged.return_value = [[fake_row]]
received = []
@ -191,3 +192,33 @@ class TestGetTriples:
assert t.s.iri == "http://example.org/alice"
assert t.p.iri == "http://example.org/knows"
assert t.o.iri == "http://example.org/bob"
assert t.g == "urn:graph:source"
@pytest.mark.asyncio
@patch('trustgraph.tables.knowledge.async_execute_paged', new_callable=AsyncMock)
async def test_empty_graph_name_becomes_none(self, mock_async_execute_paged):
fake_row = (
None, None, None,
[
(
"http://example.org/alice", True,
"http://example.org/knows", True,
"http://example.org/bob", True,
"",
),
],
)
store = _make_store()
store.cassandra = Mock()
store.get_triples_stmt = Mock()
mock_async_execute_paged.return_value = [[fake_row]]
received = []
async def receiver(msg):
received.append(msg)
await store.get_triples("w", "d", receiver)
assert received[0].triples[0].g is None

View file

@ -1,5 +1,6 @@
"""
Round-trip unit tests for KnowledgeRequestTranslator.
Round-trip unit tests for KnowledgeRequestTranslator and
KnowledgeResponseTranslator.
Regression coverage: a previous version of the decode side constructed
EntityEmbeddings(vectors=...) the schema field is `vector` (singular),
@ -15,9 +16,13 @@ Triples breaks the test.
import pytest
from trustgraph.messaging.translators.knowledge import KnowledgeRequestTranslator
from trustgraph.messaging.translators.knowledge import (
KnowledgeRequestTranslator,
KnowledgeResponseTranslator,
)
from trustgraph.schema import (
KnowledgeRequest,
KnowledgeResponse,
GraphEmbeddings,
EntityEmbeddings,
Triples,
@ -25,6 +30,8 @@ from trustgraph.schema import (
Metadata,
Term,
IRI,
LibraryMetadata,
LibraryBlob,
)
@ -145,3 +152,161 @@ class TestKnowledgeRequestTranslatorTriples:
assert t.s.iri == "http://example.org/alice"
assert t.p.iri == "http://example.org/knows"
assert t.o.iri == "http://example.org/bob"
class TestKnowledgeRequestTranslatorLibrary:
def test_roundtrip_preserves_library_metadata(self, translator):
request = KnowledgeRequest(
operation="put-kg-core",
id="doc-1",
library_metadata=LibraryMetadata(
id="doc-1",
kind="application/pdf",
title="Test Document",
parent_id="",
document_type="source",
comments="test comments",
tags=["tag1", "tag2"],
),
)
encoded = translator.encode(request)
assert "library-metadata" in encoded
lm = encoded["library-metadata"]
assert lm["id"] == "doc-1"
assert lm["kind"] == "application/pdf"
assert lm["title"] == "Test Document"
assert lm["parent-id"] == ""
assert lm["document-type"] == "source"
assert lm["comments"] == "test comments"
assert lm["tags"] == ["tag1", "tag2"]
decoded = translator.decode(encoded)
assert decoded.library_metadata is not None
assert decoded.library_metadata.id == "doc-1"
assert decoded.library_metadata.kind == "application/pdf"
assert decoded.library_metadata.title == "Test Document"
assert decoded.library_metadata.parent_id == ""
assert decoded.library_metadata.document_type == "source"
assert decoded.library_metadata.comments == "test comments"
assert decoded.library_metadata.tags == ["tag1", "tag2"]
def test_roundtrip_preserves_child_document_metadata(self, translator):
request = KnowledgeRequest(
operation="put-kg-core",
id="doc-1",
library_metadata=LibraryMetadata(
id="chunk-1",
kind="text/plain",
title="Chunk 1",
parent_id="doc-1",
document_type="chunk",
),
)
encoded = translator.encode(request)
decoded = translator.decode(encoded)
assert decoded.library_metadata.parent_id == "doc-1"
assert decoded.library_metadata.document_type == "chunk"
def test_roundtrip_preserves_library_blob(self, translator):
request = KnowledgeRequest(
operation="put-kg-core",
id="doc-1",
library_blob=LibraryBlob(
id="doc-1",
data=b"SGVsbG8gV29ybGQ=",
),
)
encoded = translator.encode(request)
assert "library-blob" in encoded
assert encoded["library-blob"]["id"] == "doc-1"
assert encoded["library-blob"]["data"] == "SGVsbG8gV29ybGQ="
decoded = translator.decode(encoded)
assert decoded.library_blob is not None
assert decoded.library_blob.id == "doc-1"
assert decoded.library_blob.data == "SGVsbG8gV29ybGQ="
def test_absent_library_fields_decode_as_none(self, translator):
decoded = translator.decode({
"operation": "get-kg-core",
"id": "doc-1",
})
assert decoded.library_metadata is None
assert decoded.library_blob is None
class TestKnowledgeResponseTranslatorLibrary:
@pytest.fixture
def response_translator(self):
return KnowledgeResponseTranslator()
def test_encode_library_metadata(self, response_translator):
response = KnowledgeResponse(
ids=None,
library_metadata=LibraryMetadata(
id="doc-1",
kind="application/pdf",
title="Test",
parent_id="",
document_type="source",
comments="",
tags=[],
),
)
encoded = response_translator.encode(response)
assert "library-metadata" in encoded
assert encoded["library-metadata"]["id"] == "doc-1"
assert encoded["library-metadata"]["kind"] == "application/pdf"
assert encoded["library-metadata"]["document-type"] == "source"
def test_encode_library_blob_bytes_to_string(self, response_translator):
response = KnowledgeResponse(
ids=None,
library_blob=LibraryBlob(
id="doc-1",
data=b"dGVzdCBkYXRh",
),
)
encoded = response_translator.encode(response)
assert "library-blob" in encoded
assert encoded["library-blob"]["id"] == "doc-1"
assert encoded["library-blob"]["data"] == "dGVzdCBkYXRh"
assert isinstance(encoded["library-blob"]["data"], str)
def test_encode_library_blob_string_passthrough(self, response_translator):
response = KnowledgeResponse(
ids=None,
library_blob=LibraryBlob(
id="doc-1",
data="already-a-string",
),
)
encoded = response_translator.encode(response)
assert encoded["library-blob"]["data"] == "already-a-string"
def test_library_metadata_is_not_final(self, response_translator):
response = KnowledgeResponse(
ids=None,
library_metadata=LibraryMetadata(id="doc-1"),
)
_, is_final = response_translator.encode_with_completion(response)
assert is_final is False
def test_library_blob_is_not_final(self, response_translator):
response = KnowledgeResponse(
ids=None,
library_blob=LibraryBlob(id="doc-1", data=b"data"),
)
_, is_final = response_translator.encode_with_completion(response)
assert is_final is False
def test_eos_is_final(self, response_translator):
response = KnowledgeResponse(eos=True)
_, is_final = response_translator.encode_with_completion(response)
assert is_final is True

View file

@ -337,7 +337,7 @@ class Api:
from . bulk_client import BulkClient
# Extract base URL (remove api/v1/ suffix)
base_url = self.url.rsplit("api/v1/", 1)[0].rstrip("/")
self._bulk_client = BulkClient(base_url, self.timeout, self.token)
self._bulk_client = BulkClient(base_url, self.timeout, self.token, workspace=self.workspace)
return self._bulk_client
def metrics(self):
@ -462,7 +462,7 @@ class Api:
from . async_bulk_client import AsyncBulkClient
# Extract base URL (remove api/v1/ suffix)
base_url = self.url.rsplit("api/v1/", 1)[0].rstrip("/")
self._async_bulk_client = AsyncBulkClient(base_url, self.timeout, self.token)
self._async_bulk_client = AsyncBulkClient(base_url, self.timeout, self.token, workspace=self.workspace)
return self._async_bulk_client
def async_metrics(self):

View file

@ -9,10 +9,11 @@ from . types import Triple
class AsyncBulkClient:
"""Asynchronous bulk operations client"""
def __init__(self, url: str, timeout: int, token: Optional[str]) -> None:
def __init__(self, url: str, timeout: int, token: Optional[str], workspace: str = "default") -> None:
self.url: str = self._convert_to_ws_url(url)
self.timeout: int = timeout
self.token: Optional[str] = token
self.workspace: str = workspace
def _convert_to_ws_url(self, url: str) -> str:
"""Convert HTTP URL to WebSocket URL"""
@ -25,11 +26,21 @@ class AsyncBulkClient:
else:
return f"ws://{url}"
def _build_ws_url(self, path: str) -> str:
"""Build a WebSocket URL with token and workspace query params."""
ws_url = f"{self.url}{path}"
params = []
if self.token:
params.append(f"token={self.token}")
if self.workspace:
params.append(f"workspace={self.workspace}")
if params:
ws_url = f"{ws_url}?{'&'.join(params)}"
return ws_url
async def import_triples(self, flow: str, triples: AsyncIterator[Triple], **kwargs: Any) -> None:
"""Bulk import triples via WebSocket"""
ws_url = f"{self.url}/api/v1/flow/{flow}/import/triples"
if self.token:
ws_url = f"{ws_url}?token={self.token}"
ws_url = self._build_ws_url(f"/api/v1/flow/{flow}/import/triples")
async with websockets.connect(ws_url, ping_interval=20, ping_timeout=self.timeout) as websocket:
async for triple in triples:
@ -42,9 +53,7 @@ class AsyncBulkClient:
async def export_triples(self, flow: str, **kwargs: Any) -> AsyncIterator[Triple]:
"""Bulk export triples via WebSocket"""
ws_url = f"{self.url}/api/v1/flow/{flow}/export/triples"
if self.token:
ws_url = f"{ws_url}?token={self.token}"
ws_url = self._build_ws_url(f"/api/v1/flow/{flow}/export/triples")
async with websockets.connect(ws_url, ping_interval=20, ping_timeout=self.timeout) as websocket:
async for raw_message in websocket:
@ -57,9 +66,7 @@ class AsyncBulkClient:
async def import_graph_embeddings(self, flow: str, embeddings: AsyncIterator[Dict[str, Any]], **kwargs: Any) -> None:
"""Bulk import graph embeddings via WebSocket"""
ws_url = f"{self.url}/api/v1/flow/{flow}/import/graph-embeddings"
if self.token:
ws_url = f"{ws_url}?token={self.token}"
ws_url = self._build_ws_url(f"/api/v1/flow/{flow}/import/graph-embeddings")
async with websockets.connect(ws_url, ping_interval=20, ping_timeout=self.timeout) as websocket:
async for embedding in embeddings:
@ -67,9 +74,7 @@ class AsyncBulkClient:
async def export_graph_embeddings(self, flow: str, **kwargs: Any) -> AsyncIterator[Dict[str, Any]]:
"""Bulk export graph embeddings via WebSocket"""
ws_url = f"{self.url}/api/v1/flow/{flow}/export/graph-embeddings"
if self.token:
ws_url = f"{ws_url}?token={self.token}"
ws_url = self._build_ws_url(f"/api/v1/flow/{flow}/export/graph-embeddings")
async with websockets.connect(ws_url, ping_interval=20, ping_timeout=self.timeout) as websocket:
async for raw_message in websocket:
@ -77,9 +82,7 @@ class AsyncBulkClient:
async def import_document_embeddings(self, flow: str, embeddings: AsyncIterator[Dict[str, Any]], **kwargs: Any) -> None:
"""Bulk import document embeddings via WebSocket"""
ws_url = f"{self.url}/api/v1/flow/{flow}/import/document-embeddings"
if self.token:
ws_url = f"{ws_url}?token={self.token}"
ws_url = self._build_ws_url(f"/api/v1/flow/{flow}/import/document-embeddings")
async with websockets.connect(ws_url, ping_interval=20, ping_timeout=self.timeout) as websocket:
async for embedding in embeddings:
@ -87,9 +90,7 @@ class AsyncBulkClient:
async def export_document_embeddings(self, flow: str, **kwargs: Any) -> AsyncIterator[Dict[str, Any]]:
"""Bulk export document embeddings via WebSocket"""
ws_url = f"{self.url}/api/v1/flow/{flow}/export/document-embeddings"
if self.token:
ws_url = f"{ws_url}?token={self.token}"
ws_url = self._build_ws_url(f"/api/v1/flow/{flow}/export/document-embeddings")
async with websockets.connect(ws_url, ping_interval=20, ping_timeout=self.timeout) as websocket:
async for raw_message in websocket:
@ -97,9 +98,7 @@ class AsyncBulkClient:
async def import_entity_contexts(self, flow: str, contexts: AsyncIterator[Dict[str, Any]], **kwargs: Any) -> None:
"""Bulk import entity contexts via WebSocket"""
ws_url = f"{self.url}/api/v1/flow/{flow}/import/entity-contexts"
if self.token:
ws_url = f"{ws_url}?token={self.token}"
ws_url = self._build_ws_url(f"/api/v1/flow/{flow}/import/entity-contexts")
async with websockets.connect(ws_url, ping_interval=20, ping_timeout=self.timeout) as websocket:
async for context in contexts:
@ -107,9 +106,7 @@ class AsyncBulkClient:
async def export_entity_contexts(self, flow: str, **kwargs: Any) -> AsyncIterator[Dict[str, Any]]:
"""Bulk export entity contexts via WebSocket"""
ws_url = f"{self.url}/api/v1/flow/{flow}/export/entity-contexts"
if self.token:
ws_url = f"{ws_url}?token={self.token}"
ws_url = self._build_ws_url(f"/api/v1/flow/{flow}/export/entity-contexts")
async with websockets.connect(ws_url, ping_interval=20, ping_timeout=self.timeout) as websocket:
async for raw_message in websocket:
@ -117,9 +114,7 @@ class AsyncBulkClient:
async def import_rows(self, flow: str, rows: AsyncIterator[Dict[str, Any]], **kwargs: Any) -> None:
"""Bulk import rows via WebSocket"""
ws_url = f"{self.url}/api/v1/flow/{flow}/import/rows"
if self.token:
ws_url = f"{ws_url}?token={self.token}"
ws_url = self._build_ws_url(f"/api/v1/flow/{flow}/import/rows")
async with websockets.connect(ws_url, ping_interval=20, ping_timeout=self.timeout) as websocket:
async for row in rows:

View file

@ -30,6 +30,7 @@ class AsyncSocketClient:
self.timeout = timeout
self.token = token
self.workspace = workspace
self._workspace_explicit = workspace != "default"
self._request_counter = 0
self._socket = None
self._connect_cm = None
@ -92,7 +93,8 @@ class AsyncSocketClient:
)
if resp.get("type") == "auth-ok":
self.workspace = resp.get("workspace", self.workspace)
if not self._workspace_explicit:
self.workspace = resp.get("workspace", self.workspace)
elif resp.get("type") == "auth-failed":
await self._socket.close()
raise ProtocolException(

View file

@ -34,7 +34,7 @@ class BulkClient:
Note: For true async support, use AsyncBulkClient instead.
"""
def __init__(self, url: str, timeout: int, token: Optional[str]) -> None:
def __init__(self, url: str, timeout: int, token: Optional[str], workspace: str = "default") -> None:
"""
Initialize synchronous bulk client.
@ -42,10 +42,12 @@ class BulkClient:
url: Base URL for TrustGraph API (HTTP/HTTPS will be converted to WS/WSS)
timeout: WebSocket timeout in seconds
token: Optional bearer token for authentication
workspace: Workspace for data isolation
"""
self.url: str = self._convert_to_ws_url(url)
self.timeout: int = timeout
self.token: Optional[str] = token
self.workspace: str = workspace
def _convert_to_ws_url(self, url: str) -> str:
"""Convert HTTP URL to WebSocket URL"""
@ -58,6 +60,18 @@ class BulkClient:
else:
return f"ws://{url}"
def _build_ws_url(self, path: str) -> str:
"""Build a WebSocket URL with token and workspace query params."""
ws_url = f"{self.url}{path}"
params = []
if self.token:
params.append(f"token={self.token}")
if self.workspace:
params.append(f"workspace={self.workspace}")
if params:
ws_url = f"{ws_url}?{'&'.join(params)}"
return ws_url
def _run_async(self, coro: Coroutine[Any, Any, Any]) -> Any:
"""Run async coroutine synchronously"""
try:
@ -116,9 +130,7 @@ class BulkClient:
metadata: Optional[Dict[str, Any]], batch_size: int
) -> None:
"""Async implementation of triple import"""
ws_url = f"{self.url}/api/v1/flow/{flow}/import/triples"
if self.token:
ws_url = f"{ws_url}?token={self.token}"
ws_url = self._build_ws_url(f"/api/v1/flow/{flow}/import/triples")
if metadata is None:
metadata = {"id": "", "metadata": [], "collection": "default"}
@ -194,9 +206,7 @@ class BulkClient:
async def _export_triples_async(self, flow: str) -> Iterator[Triple]:
"""Async implementation of triple export"""
ws_url = f"{self.url}/api/v1/flow/{flow}/export/triples"
if self.token:
ws_url = f"{ws_url}?token={self.token}"
ws_url = self._build_ws_url(f"/api/v1/flow/{flow}/export/triples")
async with websockets.connect(ws_url, ping_interval=20, ping_timeout=self.timeout) as websocket:
async for raw_message in websocket:
@ -238,9 +248,7 @@ class BulkClient:
async def _import_graph_embeddings_async(self, flow: str, embeddings: Iterator[Dict[str, Any]]) -> None:
"""Async implementation of graph embeddings import"""
ws_url = f"{self.url}/api/v1/flow/{flow}/import/graph-embeddings"
if self.token:
ws_url = f"{ws_url}?token={self.token}"
ws_url = self._build_ws_url(f"/api/v1/flow/{flow}/import/graph-embeddings")
async with websockets.connect(ws_url, ping_interval=20, ping_timeout=self.timeout) as websocket:
for embedding in embeddings:
@ -296,9 +304,7 @@ class BulkClient:
async def _export_graph_embeddings_async(self, flow: str) -> Iterator[Dict[str, Any]]:
"""Async implementation of graph embeddings export"""
ws_url = f"{self.url}/api/v1/flow/{flow}/export/graph-embeddings"
if self.token:
ws_url = f"{ws_url}?token={self.token}"
ws_url = self._build_ws_url(f"/api/v1/flow/{flow}/export/graph-embeddings")
async with websockets.connect(ws_url, ping_interval=20, ping_timeout=self.timeout) as websocket:
async for raw_message in websocket:
@ -336,9 +342,7 @@ class BulkClient:
async def _import_document_embeddings_async(self, flow: str, embeddings: Iterator[Dict[str, Any]]) -> None:
"""Async implementation of document embeddings import"""
ws_url = f"{self.url}/api/v1/flow/{flow}/import/document-embeddings"
if self.token:
ws_url = f"{ws_url}?token={self.token}"
ws_url = self._build_ws_url(f"/api/v1/flow/{flow}/import/document-embeddings")
async with websockets.connect(ws_url, ping_interval=20, ping_timeout=self.timeout) as websocket:
for embedding in embeddings:
@ -394,9 +398,7 @@ class BulkClient:
async def _export_document_embeddings_async(self, flow: str) -> Iterator[Dict[str, Any]]:
"""Async implementation of document embeddings export"""
ws_url = f"{self.url}/api/v1/flow/{flow}/export/document-embeddings"
if self.token:
ws_url = f"{ws_url}?token={self.token}"
ws_url = self._build_ws_url(f"/api/v1/flow/{flow}/export/document-embeddings")
async with websockets.connect(ws_url, ping_interval=20, ping_timeout=self.timeout) as websocket:
async for raw_message in websocket:
@ -446,9 +448,7 @@ class BulkClient:
metadata: Optional[Dict[str, Any]], batch_size: int
) -> None:
"""Async implementation of entity contexts import"""
ws_url = f"{self.url}/api/v1/flow/{flow}/import/entity-contexts"
if self.token:
ws_url = f"{ws_url}?token={self.token}"
ws_url = self._build_ws_url(f"/api/v1/flow/{flow}/import/entity-contexts")
if metadata is None:
metadata = {"id": "", "metadata": [], "collection": "default"}
@ -522,9 +522,7 @@ class BulkClient:
async def _export_entity_contexts_async(self, flow: str) -> Iterator[Dict[str, Any]]:
"""Async implementation of entity contexts export"""
ws_url = f"{self.url}/api/v1/flow/{flow}/export/entity-contexts"
if self.token:
ws_url = f"{ws_url}?token={self.token}"
ws_url = self._build_ws_url(f"/api/v1/flow/{flow}/export/entity-contexts")
async with websockets.connect(ws_url, ping_interval=20, ping_timeout=self.timeout) as websocket:
async for raw_message in websocket:
@ -562,9 +560,7 @@ class BulkClient:
async def _import_rows_async(self, flow: str, rows: Iterator[Dict[str, Any]]) -> None:
"""Async implementation of rows import"""
ws_url = f"{self.url}/api/v1/flow/{flow}/import/rows"
if self.token:
ws_url = f"{ws_url}?token={self.token}"
ws_url = self._build_ws_url(f"/api/v1/flow/{flow}/import/rows")
async with websockets.connect(ws_url, ping_interval=20, ping_timeout=self.timeout) as websocket:
for row in rows:

View file

@ -167,7 +167,8 @@ class SocketClient:
)
if resp.get("type") == "auth-ok":
self.workspace = resp.get("workspace", self.workspace)
if self.workspace == "default":
self.workspace = resp.get("workspace", self.workspace)
elif resp.get("type") == "auth-failed":
await self._socket.close()
raise ProtocolException(
@ -501,6 +502,7 @@ class SocketClient:
def put_kg_core(
self, id: str, triples=None, graph_embeddings=None,
library_metadata=None, library_blob=None,
) -> Dict[str, Any]:
request = {
"operation": "put-kg-core",
@ -511,6 +513,10 @@ class SocketClient:
request["triples"] = triples
if graph_embeddings is not None:
request["graph-embeddings"] = graph_embeddings
if library_metadata is not None:
request["library-metadata"] = library_metadata
if library_blob is not None:
request["library-blob"] = library_blob
return self._send_request_sync("knowledge", None, request)
def get_de_core(self, id: str) -> Iterator[Dict[str, Any]]:

View file

@ -103,35 +103,19 @@ def resolve_cassandra_config(
host: Optional[str] = None,
username: Optional[str] = None,
password: Optional[str] = None,
default_keyspace: Optional[str] = None
default_keyspace: Optional[str] = None,
replication_factor: Optional[int] = None,
) -> Tuple[List[str], Optional[str], Optional[str], Optional[str], int]:
"""
Resolve Cassandra configuration from various sources.
Can accept either argparse args object or explicit parameters.
Converts host string to list format for Cassandra driver.
Args:
args: Optional argparse namespace with cassandra_host, cassandra_username, cassandra_password, cassandra_keyspace, cassandra_replication_factor
host: Optional explicit host parameter (overrides args)
username: Optional explicit username parameter (overrides args)
password: Optional explicit password parameter (overrides args)
default_keyspace: Optional default keyspace if not specified elsewhere
Returns:
tuple: (hosts_list, username, password, keyspace, replication_factor)
"""
# If args provided, extract values
keyspace = None
replication_factor = 1
if args is not None:
host = host or getattr(args, 'cassandra_host', None)
username = username or getattr(args, 'cassandra_username', None)
password = password or getattr(args, 'cassandra_password', None)
keyspace = getattr(args, 'cassandra_keyspace', None)
replication_factor = getattr(args, 'cassandra_replication_factor', 1)
replication_factor = replication_factor or getattr(
args, 'cassandra_replication_factor', None
)
# Apply defaults if still None
defaults = get_cassandra_defaults()
host = host or defaults['host']
username = username or defaults['username']

View file

@ -300,6 +300,14 @@ class IamClient(RequestResponse):
)
return resp.workspace
async def list_my_workspaces(self, actor="", timeout=IAM_TIMEOUT):
resp = await self._request(
operation="list-my-workspaces",
actor=actor,
timeout=timeout,
)
return list(resp.workspaces)
async def list_workspaces(self, actor="", timeout=IAM_TIMEOUT):
resp = await self._request(
operation="list-workspaces",

View file

@ -11,6 +11,7 @@ Supports dual output to console and Loki for centralized log aggregation.
import contextvars
import logging
import logging.handlers
import uuid
from argparse import ArgumentParser
from queue import Queue
from typing import Any
@ -132,14 +133,12 @@ def setup_logging(args: dict[str, Any]) -> None:
try:
from logging_loki import LokiHandler
# Create Loki handler with optional authentication. The
# processor label is NOT baked in here — it's stamped onto
# each record by _ProcessorIdFilter reading the task-local
# contextvar, and logging_loki's emitter reads record.tags
# to build per-record Loki labels.
instance_id = str(uuid.uuid4())[:8]
loki_handler_kwargs = {
'url': loki_url,
'version': "1",
'tags': {'instance': instance_id},
}
if loki_username and loki_password:

View file

@ -0,0 +1,87 @@
import os
import argparse
from typing import Optional, Any, Tuple
def get_qdrant_defaults() -> dict:
return {
'url': os.getenv('QDRANT_URL', 'http://localhost:6333'),
'api_key': os.getenv('QDRANT_API_KEY'),
'replication_factor': int(os.getenv('QDRANT_REPLICATION_FACTOR', '1')),
'shard_number': int(os.getenv('QDRANT_SHARD_NUMBER', '1')),
}
def add_qdrant_args(parser: argparse.ArgumentParser) -> None:
defaults = get_qdrant_defaults()
url_help = f"Qdrant URL (default: {defaults['url']})"
if 'QDRANT_URL' in os.environ:
url_help += " [from QDRANT_URL]"
api_key_help = "Qdrant API key"
if defaults['api_key']:
api_key_help += " (default: <set>)"
if 'QDRANT_API_KEY' in os.environ:
api_key_help += " [from QDRANT_API_KEY]"
replication_help = f"Qdrant collection replication factor (default: {defaults['replication_factor']})"
if 'QDRANT_REPLICATION_FACTOR' in os.environ:
replication_help += " [from QDRANT_REPLICATION_FACTOR]"
shard_help = f"Qdrant collection shard number (default: {defaults['shard_number']})"
if 'QDRANT_SHARD_NUMBER' in os.environ:
shard_help += " [from QDRANT_SHARD_NUMBER]"
parser.add_argument(
'--store-uri',
default=defaults['url'],
help=url_help,
)
parser.add_argument(
'--api-key',
default=defaults['api_key'],
help=api_key_help,
)
parser.add_argument(
'--qdrant-replication-factor',
type=int,
default=defaults['replication_factor'],
help=replication_help,
)
parser.add_argument(
'--qdrant-shard-number',
type=int,
default=defaults['shard_number'],
help=shard_help,
)
def resolve_qdrant_config(
args: Optional[Any] = None,
url: Optional[str] = None,
api_key: Optional[str] = None,
replication_factor: Optional[int] = None,
shard_number: Optional[int] = None,
) -> Tuple[str, Optional[str], int, int]:
if args is not None:
url = url or getattr(args, 'store_uri', None)
api_key = api_key or getattr(args, 'api_key', None)
replication_factor = replication_factor or getattr(
args, 'qdrant_replication_factor', None
)
shard_number = shard_number or getattr(
args, 'qdrant_shard_number', None
)
defaults = get_qdrant_defaults()
url = url or defaults['url']
api_key = api_key or defaults['api_key']
replication_factor = replication_factor or defaults['replication_factor']
shard_number = shard_number or defaults['shard_number']
return url, api_key, replication_factor, shard_number

View file

@ -2,7 +2,8 @@ from typing import Dict, Any, Tuple, Optional
from ...schema import (
KnowledgeRequest, KnowledgeResponse, Triples, GraphEmbeddings,
DocumentEmbeddings, ChunkEmbeddings,
Metadata, EntityEmbeddings
Metadata, EntityEmbeddings,
LibraryMetadata, LibraryBlob,
)
from .base import MessageTranslator
from .primitives import ValueTranslator, SubgraphTranslator
@ -61,6 +62,27 @@ class KnowledgeRequestTranslator(MessageTranslator):
]
)
library_metadata = None
if "library-metadata" in data:
lm = data["library-metadata"]
library_metadata = LibraryMetadata(
id=lm.get("id", ""),
kind=lm.get("kind", ""),
title=lm.get("title", ""),
parent_id=lm.get("parent-id", ""),
document_type=lm.get("document-type", ""),
comments=lm.get("comments", ""),
tags=lm.get("tags", []),
)
library_blob = None
if "library-blob" in data:
lb = data["library-blob"]
library_blob = LibraryBlob(
id=lb.get("id", ""),
data=lb.get("data", b""),
)
return KnowledgeRequest(
operation=data.get("operation"),
id=data.get("id"),
@ -69,6 +91,8 @@ class KnowledgeRequestTranslator(MessageTranslator):
triples=triples,
graph_embeddings=graph_embeddings,
document_embeddings=document_embeddings,
library_metadata=library_metadata,
library_blob=library_blob,
)
def encode(self, obj: KnowledgeRequest) -> Dict[str, Any]:
@ -125,6 +149,26 @@ class KnowledgeRequestTranslator(MessageTranslator):
],
}
if obj.library_metadata:
result["library-metadata"] = {
"id": obj.library_metadata.id,
"kind": obj.library_metadata.kind,
"title": obj.library_metadata.title,
"parent-id": obj.library_metadata.parent_id,
"document-type": obj.library_metadata.document_type,
"comments": obj.library_metadata.comments,
"tags": obj.library_metadata.tags,
}
if obj.library_blob:
data = obj.library_blob.data
if isinstance(data, bytes):
data = data.decode("utf-8")
result["library-blob"] = {
"id": obj.library_blob.id,
"data": data,
}
return result
@ -194,6 +238,32 @@ class KnowledgeResponseTranslator(MessageTranslator):
}
}
# Streaming library metadata response
if obj.library_metadata:
return {
"library-metadata": {
"id": obj.library_metadata.id,
"kind": obj.library_metadata.kind,
"title": obj.library_metadata.title,
"parent-id": obj.library_metadata.parent_id,
"document-type": obj.library_metadata.document_type,
"comments": obj.library_metadata.comments,
"tags": obj.library_metadata.tags,
}
}
# Streaming library blob response
if obj.library_blob:
data = obj.library_blob.data
if isinstance(data, bytes):
data = data.decode("utf-8")
return {
"library-blob": {
"id": obj.library_blob.id,
"data": data,
}
}
# End of stream marker
if obj.eos is True:
return {"eos": True}
@ -209,7 +279,9 @@ class KnowledgeResponseTranslator(MessageTranslator):
is_final = (
obj.ids is not None or # List response
obj.eos is True or # End of stream
(not obj.triples and not obj.graph_embeddings and not obj.document_embeddings) # Empty response
(not obj.triples and not obj.graph_embeddings
and not obj.document_embeddings
and not obj.library_metadata and not obj.library_blob) # Empty response
)
return response, is_final

View file

@ -21,6 +21,21 @@ from .embeddings import GraphEmbeddings, DocumentEmbeddings
# <- ()
# <- (error)
@dataclass
class LibraryMetadata:
id: str = ""
kind: str = ""
title: str = ""
parent_id: str = ""
document_type: str = ""
comments: str = ""
tags: list[str] = field(default_factory=list)
@dataclass
class LibraryBlob:
id: str = ""
data: bytes = b""
@dataclass
class KnowledgeRequest:
# get-kg-core, delete-kg-core, list-kg-cores, put-kg-core
@ -44,6 +59,10 @@ class KnowledgeRequest:
# put-de-core
document_embeddings: DocumentEmbeddings | None = None
# put-kg-core (source material)
library_metadata: LibraryMetadata | None = None
library_blob: LibraryBlob | None = None
@dataclass
class KnowledgeResponse:
error: Error | None = None
@ -52,6 +71,8 @@ class KnowledgeResponse:
triples: Triples | None = None
graph_embeddings: GraphEmbeddings | None = None
document_embeddings: DocumentEmbeddings | None = None
library_metadata: LibraryMetadata | None = None
library_blob: LibraryBlob | None = None
knowledge_request_queue = queue('knowledge', cls='request')
knowledge_response_queue = queue('knowledge', cls='response')

View file

@ -56,6 +56,7 @@ tg-create-api-key = "trustgraph.cli.create_api_key:main"
tg-list-api-keys = "trustgraph.cli.list_api_keys:main"
tg-revoke-api-key = "trustgraph.cli.revoke_api_key:main"
tg-list-workspaces = "trustgraph.cli.list_workspaces:main"
tg-list-my-workspaces = "trustgraph.cli.list_my_workspaces:main"
tg-create-workspace = "trustgraph.cli.create_workspace:main"
tg-invoke-agent = "trustgraph.cli.invoke_agent:main"
tg-invoke-document-rag = "trustgraph.cli.invoke_document_rag:main"

View file

@ -5,7 +5,7 @@ Gets document content from the library by document ID.
import argparse
import os
import sys
from trustgraph.api import Api
import requests
default_url = os.getenv("TRUSTGRAPH_URL", 'http://localhost:8088/')
default_token = os.getenv("TRUSTGRAPH_TOKEN", None)
@ -13,15 +13,29 @@ default_workspace = os.getenv("TRUSTGRAPH_WORKSPACE", "default")
def get_content(url, document_id, output_file, token=None, workspace="default"):
api = Api(url, token=token, workspace=workspace).library()
stream_url = url.rstrip("/") + "/api/v1/document-stream"
content = api.get_document_content(id=document_id)
params = {
"document-id": document_id,
"workspace": workspace,
}
headers = {}
if token:
headers["Authorization"] = f"Bearer {token}"
resp = requests.get(stream_url, params=params, headers=headers, stream=True)
resp.raise_for_status()
if output_file:
total = 0
with open(output_file, 'wb') as f:
f.write(content)
print(f"Written {len(content)} bytes to {output_file}")
for chunk in resp.iter_content(chunk_size=65536):
f.write(chunk)
total += len(chunk)
print(f"Written {total} bytes to {output_file}")
else:
content = resp.content
try:
text = content.decode('utf-8')
print(text)

View file

@ -47,6 +47,31 @@ def write_ge(f, data):
)
f.write(msgpack.packb(msg, use_bin_type=True))
def write_library_metadata(f, data):
msg = (
"lm",
{
"i": data["id"],
"k": data.get("kind", ""),
"t": data.get("title", ""),
"p": data.get("parent-id", ""),
"d": data.get("document-type", ""),
"c": data.get("comments", ""),
"g": data.get("tags", []),
}
)
f.write(msgpack.packb(msg, use_bin_type=True))
def write_library_blob(f, data):
msg = (
"lb",
{
"i": data["id"],
"d": data.get("data", b""),
}
)
f.write(msgpack.packb(msg, use_bin_type=True))
def fetch(url, workspace, id, output, token=None):
api = Api(url=url, token=token, workspace=workspace)
@ -55,6 +80,8 @@ def fetch(url, workspace, id, output, token=None):
try:
ge = 0
t = 0
lm = 0
lb = 0
with open(output, "wb") as f:
@ -68,7 +95,15 @@ def fetch(url, workspace, id, output, token=None):
ge += 1
write_ge(f, response["graph-embeddings"])
print(f"Got: {t} triple, {ge} GE messages.")
if "library-metadata" in response:
lm += 1
write_library_metadata(f, response["library-metadata"])
if "library-blob" in response:
lb += 1
write_library_blob(f, response["library-blob"])
print(f"Got: {t} triple, {ge} GE, {lm} library metadata, {lb} library blob messages.")
finally:
socket.close()

View file

@ -0,0 +1,53 @@
"""
List workspaces the current user has access to.
"""
import argparse
import tabulate
from ._iam import DEFAULT_URL, DEFAULT_TOKEN, call_iam, run_main
def do_list_my_workspaces(args):
resp = call_iam(
args.api_url, args.token, {"operation": "list-my-workspaces"},
)
workspaces = resp.get("workspaces", [])
if not workspaces:
print("No workspaces.")
return
rows = [
[
w.get("id", ""),
w.get("name", ""),
"yes" if w.get("enabled") else "no",
w.get("created", ""),
]
for w in workspaces
]
print(tabulate.tabulate(
rows,
headers=["id", "name", "enabled", "created"],
tablefmt="pretty",
stralign="left",
))
def main():
parser = argparse.ArgumentParser(
prog="tg-list-my-workspaces", description=__doc__,
)
parser.add_argument(
"-u", "--api-url", default=DEFAULT_URL,
help=f"API URL (default: {DEFAULT_URL})",
)
parser.add_argument(
"-t", "--token", default=DEFAULT_TOKEN,
help="Auth token (default: $TRUSTGRAPH_TOKEN)",
)
run_main(do_list_my_workspaces, parser)
if __name__ == "__main__":
main()

View file

@ -78,7 +78,7 @@ def load_structured_data(
logger.info("Step 1: Analyzing data to discover best matching schema...")
# Step 1: Auto-discover schema (reuse discover_schema logic)
discovered_schema = _auto_discover_schema(api_url, input_file, sample_chars, flow, logger, workspace=workspace)
discovered_schema = _auto_discover_schema(api_url, input_file, sample_chars, flow, logger, token=token, workspace=workspace)
if not discovered_schema:
logger.error("Failed to discover suitable schema automatically")
print("❌ Could not automatically determine the best schema for your data.")
@ -90,7 +90,7 @@ def load_structured_data(
# Step 2: Auto-generate descriptor
logger.info("Step 2: Generating descriptor configuration...")
auto_descriptor = _auto_generate_descriptor(api_url, input_file, discovered_schema, sample_chars, flow, logger, workspace=workspace)
auto_descriptor = _auto_generate_descriptor(api_url, input_file, discovered_schema, sample_chars, flow, logger, token=token, workspace=workspace)
if not auto_descriptor:
logger.error("Failed to generate descriptor automatically")
print("❌ Could not automatically generate descriptor configuration.")
@ -172,7 +172,7 @@ def load_structured_data(
logger.info(f"Sample chars: {sample_chars} characters")
# Use the helper function to discover schema (get raw response for display)
response = _auto_discover_schema(api_url, input_file, sample_chars, flow, logger, return_raw_response=True, workspace=workspace)
response = _auto_discover_schema(api_url, input_file, sample_chars, flow, logger, return_raw_response=True, token=token, workspace=workspace)
if response:
# Debug: print response type and content
@ -203,7 +203,7 @@ def load_structured_data(
# If no schema specified, discover it first
if not schema_name:
logger.info("No schema specified, auto-discovering...")
schema_name = _auto_discover_schema(api_url, input_file, sample_chars, flow, logger, workspace=workspace)
schema_name = _auto_discover_schema(api_url, input_file, sample_chars, flow, logger, token=token, workspace=workspace)
if not schema_name:
print("Error: Could not determine schema automatically.")
print("Please specify a schema using --schema-name or run --discover-schema first.")
@ -213,7 +213,7 @@ def load_structured_data(
logger.info(f"Target schema: {schema_name}")
# Generate descriptor using helper function
descriptor = _auto_generate_descriptor(api_url, input_file, schema_name, sample_chars, flow, logger, workspace=workspace)
descriptor = _auto_generate_descriptor(api_url, input_file, schema_name, sample_chars, flow, logger, token=token, workspace=workspace)
if descriptor:
# Output the generated descriptor
@ -293,7 +293,7 @@ def load_structured_data(
# Send to TrustGraph
print(f"🚀 Importing {len(output_records)} records to TrustGraph...")
imported_count = _send_to_trustgraph(output_records, api_url, flow, batch_size, token=token)
imported_count = _send_to_trustgraph(output_records, api_url, flow, batch_size, token=token, workspace=workspace)
# Get summary info from descriptor
format_info = descriptor.get('format', {})
@ -603,7 +603,7 @@ def _send_to_trustgraph(rows, api_url, flow, batch_size=1000, token=None, worksp
# Helper functions for auto mode
def _auto_discover_schema(api_url, input_file, sample_chars, flow, logger, return_raw_response=False, workspace="default"):
def _auto_discover_schema(api_url, input_file, sample_chars, flow, logger, return_raw_response=False, token=None, workspace="default"):
"""Auto-discover the best matching schema for the input data
Args:
@ -626,7 +626,7 @@ def _auto_discover_schema(api_url, input_file, sample_chars, flow, logger, retur
# Import API modules
from trustgraph.api import Api
from trustgraph.api.types import ConfigKey
api = Api(api_url, workspace=workspace)
api = Api(api_url, token=token, workspace=workspace)
config_api = api.config()
# Get available schemas
@ -707,7 +707,7 @@ def _auto_discover_schema(api_url, input_file, sample_chars, flow, logger, retur
return None
def _auto_generate_descriptor(api_url, input_file, schema_name, sample_chars, flow, logger, workspace="default"):
def _auto_generate_descriptor(api_url, input_file, schema_name, sample_chars, flow, logger, token=None, workspace="default"):
"""Auto-generate descriptor configuration for the discovered schema"""
try:
# Read sample data
@ -717,7 +717,7 @@ def _auto_generate_descriptor(api_url, input_file, schema_name, sample_chars, fl
# Import API modules
from trustgraph.api import Api
from trustgraph.api.types import ConfigKey
api = Api(api_url, workspace=workspace)
api = Api(api_url, token=token, workspace=workspace)
config_api = api.config()
# Get schema definition

View file

@ -40,6 +40,23 @@ def read_message(unpacked, id):
},
"triples": msg["t"],
}
elif unpacked[0] == "lm":
msg = unpacked[1]
return "lm", {
"id": msg["i"],
"kind": msg.get("k", ""),
"title": msg.get("t", ""),
"parent-id": msg.get("p", ""),
"document-type": msg.get("d", ""),
"comments": msg.get("c", ""),
"tags": msg.get("g", []),
}
elif unpacked[0] == "lb":
msg = unpacked[1]
return "lb", {
"id": msg["i"],
"data": msg.get("d", b""),
}
else:
raise RuntimeError("Unpacked unexpected messsage type", unpacked[0])
@ -51,6 +68,8 @@ def put(url, workspace, id, input, token=None):
try:
ge = 0
t = 0
lm = 0
lb = 0
with open(input, "rb") as f:
@ -73,10 +92,18 @@ def put(url, workspace, id, input, token=None):
t += 1
socket.put_kg_core(id, triples=msg)
elif kind == "lm":
lm += 1
socket.put_kg_core(id, library_metadata=msg)
elif kind == "lb":
lb += 1
socket.put_kg_core(id, library_blob=msg)
else:
raise RuntimeError("Unexpected message kind", kind)
print(f"Put: {t} triple, {ge} GE messages.")
print(f"Put: {t} triple, {ge} GE, {lm} library metadata, {lb} library blob messages.")
finally:
socket.close()

Some files were not shown because too many files have changed in this diff Show more