mirror of
https://github.com/trustgraph-ai/trustgraph.git
synced 2026-06-10 15:25:14 +02:00
feat: complete knowledge core storage — named graphs, provenance, source material
Implements all three changes from the knowledge-core-completeness tech spec: 1. Named graph field preserved through Cassandra storage (7-element tuple), enabling provenance triples to retain their graph URIs on round-trip. 2. Provenance triples already arrive on triples-input — no routing change needed; Change 1 was sufficient. 3. Source material (library documents) streamed alongside triples and embeddings during core download/upload. The knowledge manager fetches the document hierarchy from the librarian on download and recreates it on upload, preserving the full provenance chain across instances.
This commit is contained in:
parent
aa158e1ba3
commit
93e31d2b84
14 changed files with 1347 additions and 15 deletions
535
docs/tech-specs/knowledge-core-completeness.md
Normal file
535
docs/tech-specs/knowledge-core-completeness.md
Normal file
|
|
@ -0,0 +1,535 @@
|
|||
---
|
||||
layout: default
|
||||
title: "Knowledge Core Completeness"
|
||||
parent: "Tech Specs"
|
||||
---
|
||||
|
||||
# Knowledge Core Completeness
|
||||
|
||||
## Overview
|
||||
|
||||
Knowledge cores are portable snapshots of extracted knowledge: triples, graph
|
||||
embeddings, and document embeddings stored in Cassandra's `knowledge` keyspace.
|
||||
They can be downloaded as files, transferred between TrustGraph instances, and
|
||||
loaded back into vector and graph stores.
|
||||
|
||||
Recent additions to TrustGraph — explainability/provenance and named graphs —
|
||||
were not carried through to the knowledge core system. This means that
|
||||
exporting and re-importing a core loses provenance links, graph assignments,
|
||||
and source material, breaking the explainability chain.
|
||||
|
||||
This specification addresses three gaps:
|
||||
|
||||
1. **Named graphs not stored** — The `g` (graph name) field on triples is
|
||||
silently dropped when writing to the core store and comes back as `None`
|
||||
on read.
|
||||
2. **Provenance triples not captured** — Provenance triples (PROV-O) are
|
||||
generated during extraction and flow to graph stores, but never enter
|
||||
the knowledge core store. It is unclear whether they arrive at the store
|
||||
in the correct form.
|
||||
3. **Source material not included** — Documents, text pages, and chunks in
|
||||
the librarian's bucket store are not part of the core. After loading a
|
||||
core on a different instance, provenance links to source material point
|
||||
at nothing.
|
||||
|
||||
## Goals
|
||||
|
||||
- **Self-contained cores**: A downloaded knowledge core file contains
|
||||
everything needed to reconstruct the full knowledge graph including
|
||||
provenance and source attribution on a fresh instance.
|
||||
- **Named graph preservation**: Round-tripping a core preserves graph
|
||||
assignments on all triples.
|
||||
- **Backward compatibility**: Existing core files (without graph names or
|
||||
source material) can still be uploaded and loaded. New fields are optional
|
||||
on import.
|
||||
- **No change to core identity**: A core is still identified by its document
|
||||
ID. The additional data is associated with the same core ID.
|
||||
- **Minimal file format changes**: Extend the existing msgpack record format
|
||||
with new record types rather than restructuring existing ones.
|
||||
|
||||
## Background
|
||||
|
||||
### Current Lifecycle
|
||||
|
||||
```
|
||||
Extraction pipeline
|
||||
│
|
||||
├─ triples ──────────────────► knowledge core store (Cassandra)
|
||||
├─ graph embeddings ─────────► knowledge core store (Cassandra)
|
||||
├─ document embeddings ──────► knowledge core store (Cassandra)
|
||||
├─ provenance triples ───────► graph store (only)
|
||||
└─ source documents ─────────► librarian bucket store (only)
|
||||
|
||||
Download: Cassandra ──► knowledge manager ──► API gateway ──► client file
|
||||
Upload: client file ──► API gateway ──► knowledge manager ──► Cassandra
|
||||
Load: Cassandra ──► knowledge manager ──► Pulsar topics ──► graph/vector stores
|
||||
```
|
||||
|
||||
### Current Core File Format (msgpack)
|
||||
|
||||
A core file is a sequence of concatenated msgpack records. Each record is a
|
||||
2-element tuple: `(type_tag, payload)`.
|
||||
|
||||
| Type tag | Payload | Description |
|
||||
|----------|---------|-------------|
|
||||
| `"t"` | `{"m": {id, root, collection}, "t": [triple_dicts]}` | Triple batch |
|
||||
| `"ge"` | `{"m": {id, root, collection}, "e": [{entity, vector}]}` | Graph embedding batch |
|
||||
|
||||
### What's Missing
|
||||
|
||||
#### Named Graphs
|
||||
|
||||
The `Triple` dataclass has a `g: str | None` field (graph name IRI), used to
|
||||
separate provenance graphs (`urn:graph:source`, `urn:graph:retrieval`) from
|
||||
the default graph. However:
|
||||
|
||||
- **Cassandra schema** (`knowledge.triples` table): stores a 6-tuple per
|
||||
triple `(s_val, s_is_uri, p_val, p_is_uri, o_val, o_is_uri)` — no graph
|
||||
field.
|
||||
- **`add_triples()`** (`tables/knowledge.py:231`): destructures only `s`,
|
||||
`p`, `o` — `g` is discarded.
|
||||
- **`get_triples()`** (`tables/knowledge.py:396`): reconstructs `Triple`
|
||||
with `g` defaulting to `None`.
|
||||
- **Core file format**: triple dicts do not include a graph field.
|
||||
|
||||
#### Provenance Triples
|
||||
|
||||
Provenance triples are generated in the extraction pipeline
|
||||
(`trustgraph-base/trustgraph/provenance/triples.py`) and published to graph
|
||||
store topics. They use named graphs (`urn:graph:source`,
|
||||
`urn:graph:retrieval`) and PROV-O vocabulary.
|
||||
|
||||
The knowledge core store processor (`storage/knowledge/store.py`) listens on
|
||||
`triples-input` and `graph-embeddings-input`. Whether provenance triples
|
||||
arrive on the same `triples-input` topic or a separate one needs
|
||||
verification. Even if they do arrive, the graph name would be lost (per
|
||||
above).
|
||||
|
||||
#### Source Material
|
||||
|
||||
The librarian stores the full document hierarchy in a separate system:
|
||||
|
||||
- **Blob store** (S3/MinIO): original documents, text pages, chunks —
|
||||
keyed by object UUID under `doc/{object_id}`.
|
||||
- **Cassandra `library` keyspace**: document metadata including `id`,
|
||||
`kind` (MIME type), `title`, `parent_id`, `document_type`
|
||||
(`source`/`extracted`), `object_id` (blob reference).
|
||||
|
||||
Provenance triples link extracted facts back to chunk/page/document IDs.
|
||||
Those IDs resolve through the librarian. When a core is loaded on a
|
||||
different instance, the librarian has no matching documents, so the entire
|
||||
provenance chain is broken.
|
||||
|
||||
### Key Source Files
|
||||
|
||||
| Component | File | Purpose |
|
||||
|-----------|------|---------|
|
||||
| Core Cassandra schema | `trustgraph-flow/trustgraph/tables/knowledge.py` | Table definitions, read/write |
|
||||
| Core manager | `trustgraph-flow/trustgraph/cores/knowledge.py` | API operations, load-to-store |
|
||||
| Core store processor | `trustgraph-flow/trustgraph/storage/knowledge/store.py` | Extraction → Cassandra |
|
||||
| CLI download | `trustgraph-cli/trustgraph/cli/get_kg_core.py` | Core → msgpack file |
|
||||
| CLI upload | `trustgraph-cli/trustgraph/cli/put_kg_core.py` | Msgpack file → core |
|
||||
| CLI load | `trustgraph-cli/trustgraph/cli/load_kg_core.py` | Core → graph/vector stores |
|
||||
| API client | `trustgraph-base/trustgraph/api/knowledge.py` | Client-side knowledge API |
|
||||
| Triple schema | `trustgraph-base/trustgraph/schema/core/primitives.py` | Triple dataclass with `g` field |
|
||||
| Provenance generation | `trustgraph-base/trustgraph/provenance/triples.py` | PROV-O triple creation |
|
||||
| Librarian | `trustgraph-flow/trustgraph/librarian/librarian.py` | Document storage service |
|
||||
| Library tables | `trustgraph-flow/trustgraph/tables/library.py` | Document metadata in Cassandra |
|
||||
| Blob store | `trustgraph-flow/trustgraph/librarian/blob_store.py` | S3/MinIO object storage |
|
||||
|
||||
## Technical Design
|
||||
|
||||
### Change 1: Named Graph Field in Core Storage
|
||||
|
||||
#### Cassandra Schema
|
||||
|
||||
Extend the `triples` tuple from 6 to 7 elements, adding the graph name:
|
||||
|
||||
```
|
||||
triples list<tuple<
|
||||
text, boolean, -- s_val, s_is_uri
|
||||
text, boolean, -- p_val, p_is_uri
|
||||
text, boolean, -- o_val, o_is_uri
|
||||
text -- graph name (empty string = default graph)
|
||||
>>
|
||||
```
|
||||
|
||||
**Migration**: The schema change uses `ALTER TABLE` or is handled by
|
||||
creating a new table version. Existing rows with 6-element tuples must be
|
||||
handled gracefully on read — if the tuple has 6 elements, treat graph as
|
||||
default.
|
||||
|
||||
#### Write Path (`add_triples`)
|
||||
|
||||
Change `tables/knowledge.py:add_triples()` to include `triple.g`:
|
||||
|
||||
```python
|
||||
triples = [
|
||||
(
|
||||
*term_to_tuple(v.s), *term_to_tuple(v.p), *term_to_tuple(v.o),
|
||||
v.g or ""
|
||||
)
|
||||
for v in m.triples
|
||||
]
|
||||
```
|
||||
|
||||
#### Read Path (`get_triples`)
|
||||
|
||||
Change `tables/knowledge.py:get_triples()` to restore the graph name:
|
||||
|
||||
```python
|
||||
Triple(
|
||||
s = tuple_to_term(elt[0], elt[1]),
|
||||
p = tuple_to_term(elt[2], elt[3]),
|
||||
o = tuple_to_term(elt[4], elt[5]),
|
||||
g = elt[6] if len(elt) > 6 and elt[6] else None,
|
||||
)
|
||||
```
|
||||
|
||||
The `len(elt) > 6` guard provides backward compatibility with existing
|
||||
6-element rows.
|
||||
|
||||
#### Core File Format
|
||||
|
||||
Extend triple dicts in the `"t"` record to include the graph name:
|
||||
|
||||
```python
|
||||
# In get_kg_core.py write_triple — each triple dict gains "g" key
|
||||
{"s": ..., "p": ..., "o": ..., "g": "urn:graph:source"}
|
||||
```
|
||||
|
||||
On read (`put_kg_core.py`), treat missing `"g"` key as default graph for
|
||||
backward compatibility with old core files.
|
||||
|
||||
### Change 2: Provenance Triples in Cores
|
||||
|
||||
#### Investigation Required
|
||||
|
||||
Before implementation, verify:
|
||||
|
||||
1. Whether provenance triples arrive on the `triples-input` topic that the
|
||||
knowledge core store processor already listens on.
|
||||
2. If not, which topic they use, and whether the store processor should
|
||||
subscribe to it.
|
||||
|
||||
#### If provenance triples already arrive at the store
|
||||
|
||||
The only change needed is Change 1 (named graphs) — the provenance triples
|
||||
are already being stored, just without their graph name. Once graph names
|
||||
are preserved, provenance triples will round-trip correctly.
|
||||
|
||||
#### If provenance triples do NOT arrive at the store
|
||||
|
||||
Two options:
|
||||
|
||||
**Option A — Route provenance to the existing store topic**: Configure the
|
||||
flow so provenance triples are published to the same `triples-input` topic.
|
||||
This is the simpler approach and keeps the store processor unchanged.
|
||||
|
||||
**Option B — Add a subscription**: Add a new `ConsumerSpec` in the store
|
||||
processor for the provenance topic. This keeps provenance routing
|
||||
independent but adds complexity.
|
||||
|
||||
Recommendation: Option A, unless there is a reason provenance triples are
|
||||
intentionally kept off the core store topic.
|
||||
|
||||
### Change 3: Source Material in Cores
|
||||
|
||||
This is the largest change. The goal is that when a core is loaded on a
|
||||
fresh instance, provenance links to source material resolve.
|
||||
|
||||
#### Architecture
|
||||
|
||||
Source material is **not stored in the knowledge core tables**. It lives in
|
||||
the librarian (Cassandra `library` keyspace + S3/MinIO blob store) and is
|
||||
fetched on demand via the librarian's existing service API.
|
||||
|
||||
The knowledge manager acts as a **client of the librarian service** — it
|
||||
calls the librarian's request/response API over pub/sub to retrieve document
|
||||
metadata and content. It does not access the library's Cassandra tables or
|
||||
blob store directly.
|
||||
|
||||
#### Transport
|
||||
|
||||
The librarian's pub/sub API already handles chunking of large documents.
|
||||
This chunking is designed to be websocket-friendly, so library content
|
||||
flowing through the API gateway to external clients does not require
|
||||
re-chunking. The API gateway remains a transport layer.
|
||||
|
||||
```
|
||||
Download:
|
||||
Knowledge manager ──pub/sub──► Librarian (fetch metadata + content)
|
||||
Knowledge manager ──pub/sub──► API gateway ──websocket──► Client
|
||||
|
||||
Upload:
|
||||
Client ──websocket──► API gateway ──pub/sub──► Knowledge manager
|
||||
Knowledge manager ──pub/sub──► Librarian (store metadata + content)
|
||||
```
|
||||
|
||||
#### What to Include
|
||||
|
||||
The provenance chain links facts → chunks → pages → documents. For the
|
||||
chain to resolve, the core must include:
|
||||
|
||||
1. **Document metadata** — the library record for each document in the
|
||||
hierarchy (id, kind, title, parent_id, document_type, etc.)
|
||||
2. **Document content** — the blob data for each document (original file,
|
||||
extracted text pages, text chunks)
|
||||
|
||||
Including the full hierarchy is necessary because:
|
||||
- A user viewing provenance needs to traverse fact → chunk → page → document
|
||||
- The chunk text is needed to show what text a fact was extracted from
|
||||
- The page text provides broader context
|
||||
- The original document is needed for full source attribution
|
||||
|
||||
#### Size Implications
|
||||
|
||||
Source material will significantly increase core file sizes. A rough model:
|
||||
|
||||
| Component | Typical size per document |
|
||||
|-----------|-------------------------|
|
||||
| Triples + embeddings (current) | 1-10 MB |
|
||||
| Chunk text (all chunks) | ~same as original document |
|
||||
| Page text (all pages) | ~same as original document |
|
||||
| Original document (PDF, etc.) | Varies widely (KB to hundreds of MB) |
|
||||
|
||||
For a 10 MB PDF, the core could grow from ~5 MB to ~25 MB (original +
|
||||
derived text + existing data). For large document sets, cores could become
|
||||
very large.
|
||||
|
||||
**Decision needed**: Whether to include original documents or just derived
|
||||
text (pages + chunks). Including only derived text still allows provenance
|
||||
display but loses the ability to serve the original file.
|
||||
|
||||
#### New Core File Record Types
|
||||
|
||||
Add new msgpack record types for library content:
|
||||
|
||||
| Type tag | Payload | Description |
|
||||
|----------|---------|-------------|
|
||||
| `"lm"` | `{"id", "kind", "title", "parent_id", "document_type", "comments", "tags", "metadata"}` | Library document metadata |
|
||||
| `"lb"` | `{"id", "data"}` | Library document blob content (chunked by pub/sub layer) |
|
||||
|
||||
These are emitted after the existing `"t"` and `"ge"` records during
|
||||
download and processed during upload.
|
||||
|
||||
#### Download Path
|
||||
|
||||
Extend `KnowledgeManager.get_kg_core()` to:
|
||||
|
||||
1. Stream triples and graph embeddings from the core store (existing
|
||||
behavior).
|
||||
2. Use the librarian service API to retrieve documents associated with
|
||||
this core ID:
|
||||
a. Fetch the root document metadata and content.
|
||||
b. Use `list-children` to discover child documents (pages, chunks).
|
||||
c. Recursively fetch metadata and content for each child.
|
||||
3. Stream each document as `"lm"` (metadata) and `"lb"` (content) records.
|
||||
|
||||
The knowledge manager gains the librarian service as a pub/sub dependency.
|
||||
Large document content is chunked by the librarian's existing pub/sub
|
||||
transport — the knowledge manager receives and forwards these chunks without
|
||||
buffering the full blob in memory.
|
||||
|
||||
#### Upload Path
|
||||
|
||||
Extend `KnowledgeManager.put_kg_core()` to handle the new record types:
|
||||
|
||||
1. For `"lm"` records: call the librarian service API to create/update
|
||||
the document metadata.
|
||||
2. For `"lb"` records: call the librarian service API to store the
|
||||
document content.
|
||||
|
||||
Parent-child relationships are preserved because `parent_id` is stored in
|
||||
the metadata. Documents should be processed in hierarchy order (parent
|
||||
before child) to satisfy any ordering constraints.
|
||||
|
||||
#### Load Path
|
||||
|
||||
The load path (`_load_kg_core`) publishes triples and embeddings to Pulsar
|
||||
topics for ingestion into graph/vector stores. Source material does not need
|
||||
to flow through the load path — it is already in the librarian after the
|
||||
upload step and can be accessed directly by services that need it.
|
||||
|
||||
No changes to the load path for source material.
|
||||
|
||||
#### CLI Changes
|
||||
|
||||
**`tg-get-kg-core`**: Add handling for `"lm"` and `"lb"` record types in
|
||||
the file writer.
|
||||
|
||||
**`tg-put-kg-core`**: Add handling for `"lm"` and `"lb"` record types in
|
||||
the file reader. Send library records to the knowledge manager alongside
|
||||
triple/embedding records.
|
||||
|
||||
#### Associating Documents with Cores
|
||||
|
||||
The core ID is `metadata.root`, which is the root document ID from the
|
||||
librarian. This provides a natural join: the core's root document and all
|
||||
its children (pages, chunks) are the source material for that core.
|
||||
|
||||
The librarian's `list-children` API provides the child documents. A
|
||||
recursive traversal from the root document collects the full hierarchy.
|
||||
|
||||
### API Changes
|
||||
|
||||
#### KnowledgeResponse Schema
|
||||
|
||||
Add optional fields to `KnowledgeResponse` for library data:
|
||||
|
||||
```python
|
||||
@dataclass
|
||||
class KnowledgeResponse:
|
||||
error: Error | None = None
|
||||
ids: list | None = None
|
||||
eos: bool = False
|
||||
triples: Triples | None = None
|
||||
graph_embeddings: GraphEmbeddings | None = None
|
||||
document_embeddings: DocumentEmbeddings | None = None
|
||||
library_metadata: LibraryMetadata | None = None # new
|
||||
library_blob: LibraryBlob | None = None # new
|
||||
```
|
||||
|
||||
#### New Schema Types
|
||||
|
||||
```python
|
||||
@dataclass
|
||||
class LibraryMetadata:
|
||||
id: str
|
||||
kind: str | None = None
|
||||
title: str | None = None
|
||||
parent_id: str | None = None
|
||||
document_type: str | None = None
|
||||
comments: str | None = None
|
||||
tags: list[str] | None = None
|
||||
metadata: list[Triple] | None = None
|
||||
|
||||
@dataclass
|
||||
class LibraryBlob:
|
||||
id: str
|
||||
data: bytes
|
||||
```
|
||||
|
||||
#### Socket API
|
||||
|
||||
The existing streaming protocol for `get-kg-core` / `put-kg-core` carries
|
||||
these new fields naturally — responses already stream multiple record types.
|
||||
|
||||
### Dependencies Between Changes
|
||||
|
||||
```
|
||||
Change 1 (named graphs) ◄── Change 2 depends on this
|
||||
│
|
||||
└── Change 2 (provenance triples)
|
||||
│
|
||||
└── Change 3 (source material) is independent
|
||||
```
|
||||
|
||||
Change 1 is a prerequisite for Change 2 (provenance triples use named
|
||||
graphs). Change 3 is independent and can be implemented in parallel.
|
||||
|
||||
## Security Considerations
|
||||
|
||||
- **Workspace isolation**: Core download/upload must respect workspace
|
||||
boundaries. Source material from the librarian must only be included if
|
||||
it belongs to the same workspace as the core. This is already enforced
|
||||
by the existing workspace-scoped queries.
|
||||
- **Large blob transfer**: Streaming large documents through the API
|
||||
is handled by the librarian's existing pub/sub chunking, which is
|
||||
designed to be websocket-friendly. No additional chunking layer is
|
||||
needed.
|
||||
- **Cross-instance trust**: When uploading a core from an external source,
|
||||
the library content should be treated as untrusted input. Document
|
||||
metadata and blob content should be validated before insertion.
|
||||
|
||||
## Performance Considerations
|
||||
|
||||
- **Core file size**: Including source material will significantly increase
|
||||
core file sizes. Consider adding a flag to download/upload commands to
|
||||
optionally exclude source material for use cases where only the knowledge
|
||||
graph is needed.
|
||||
- **Streaming**: All paths already use streaming (paged Cassandra queries,
|
||||
msgpack record-at-a-time). Library content should follow the same pattern.
|
||||
- **Cassandra schema migration**: Changing the tuple width in the `triples`
|
||||
table requires careful handling. Cassandra frozen tuples cannot be altered
|
||||
in place — a migration strategy is needed (see Migration Plan).
|
||||
|
||||
## Testing Strategy
|
||||
|
||||
- **Unit tests**: Triple round-trip with graph name (write → read →
|
||||
verify `g` field preserved). Backward compatibility with 6-element tuples.
|
||||
- **Integration tests**: Full lifecycle — extract with provenance → download
|
||||
core → upload to fresh instance → load → verify provenance chain resolves.
|
||||
- **File format tests**: Read old-format core files (no graph name, no
|
||||
library records) and verify they load without error.
|
||||
- **Library inclusion tests**: Download core with source material → upload →
|
||||
verify documents accessible through librarian.
|
||||
|
||||
## Migration Plan
|
||||
|
||||
### Cassandra Schema
|
||||
|
||||
The `triples` table stores tuples in a `list<tuple<...>>` column. Cassandra
|
||||
does not support altering the type of an existing column. Options:
|
||||
|
||||
**Option A — New table**: Create a `triples_v2` table with the 7-element
|
||||
tuple. Migrate data from `triples` to `triples_v2`. The read path checks
|
||||
both tables during a transition period, then the old table is dropped.
|
||||
|
||||
**Option B — Dual read**: Keep the existing table. The read path handles
|
||||
both 6-element and 7-element tuples by checking length. New writes use
|
||||
7-element tuples. This works if Cassandra accepts variable-length tuples in
|
||||
a list — **needs verification**.
|
||||
|
||||
**Option C — Separate graph column**: Instead of extending the tuple, add a
|
||||
parallel `graphs list<text>` column where `graphs[i]` corresponds to
|
||||
`triples[i]`. This avoids tuple migration entirely but requires keeping the
|
||||
two lists in sync.
|
||||
|
||||
Recommendation: Verify Option B first (simplest). Fall back to Option A if
|
||||
Cassandra rejects mixed tuple lengths.
|
||||
|
||||
### Core File Format
|
||||
|
||||
Backward compatible by design:
|
||||
- Old files lack `"g"` in triple dicts and have no `"lm"`/`"lb"` records →
|
||||
handled by defaults.
|
||||
- New files read by old code → old code ignores unknown record types (the
|
||||
existing `read_message` raises on unknown types, so this needs a small
|
||||
fix to skip unknown types gracefully).
|
||||
|
||||
## Open Questions
|
||||
|
||||
1. **Provenance topic routing**: Do provenance triples currently arrive at
|
||||
the `triples-input` topic consumed by the knowledge core store? If not,
|
||||
what topic are they on?
|
||||
|
||||
2. **Include original documents?**: Should cores include the original
|
||||
uploaded document (e.g. PDF), or only derived text (pages + chunks)?
|
||||
Including originals makes cores fully self-contained but potentially
|
||||
very large. Excluding them preserves provenance text display but loses
|
||||
the ability to serve the original file.
|
||||
|
||||
3. **Optional source material**: Should there be a flag on download/upload
|
||||
to include or exclude source material? This would let users choose
|
||||
between compact cores (knowledge only) and complete cores (knowledge +
|
||||
sources).
|
||||
|
||||
4. **Cassandra tuple migration**: Can Cassandra handle mixed-length tuples
|
||||
in a `list<tuple<...>>` column, or is a table migration required?
|
||||
|
||||
5. **Document embedding cores**: DE cores are managed alongside KG cores.
|
||||
Do they need the same treatment (source material inclusion)? The
|
||||
document embeddings reference chunk IDs — the same provenance chain
|
||||
applies.
|
||||
|
||||
6. **Core versioning**: Should the core file include a version marker so
|
||||
readers can distinguish old-format from new-format files without
|
||||
trial-and-error parsing?
|
||||
|
||||
## References
|
||||
|
||||
- Extraction-time provenance: `docs/tech-specs/extraction-time-provenance.md`
|
||||
- Query-time explainability: `docs/tech-specs/query-time-explainability.md`
|
||||
- Agent explainability: `docs/tech-specs/agent-explainability.md`
|
||||
- Data ownership model: `docs/tech-specs/data-ownership-model.md`
|
||||
|
|
@ -11,7 +11,12 @@ from unittest.mock import AsyncMock, Mock, patch, MagicMock
|
|||
from unittest.mock import call
|
||||
|
||||
from trustgraph.cores.knowledge import KnowledgeManager
|
||||
from trustgraph.schema import KnowledgeResponse, Triples, GraphEmbeddings, Metadata, Triple, Term, EntityEmbeddings, IRI, LITERAL
|
||||
from trustgraph.schema import (
|
||||
KnowledgeResponse, Triples, GraphEmbeddings, Metadata, Triple, Term,
|
||||
EntityEmbeddings, IRI, LITERAL,
|
||||
LibraryMetadata, LibraryBlob,
|
||||
LibrarianResponse, DocumentMetadata,
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
|
|
@ -373,11 +378,252 @@ class TestKnowledgeManagerOtherMethods:
|
|||
mock_respond = AsyncMock()
|
||||
|
||||
await knowledge_manager.delete_kg_core(mock_request, mock_respond, "test-user")
|
||||
|
||||
|
||||
# Verify table store was called correctly
|
||||
knowledge_manager.table_store.delete_kg_core.assert_called_once_with("test-user", "test-doc-id")
|
||||
|
||||
|
||||
# Verify response
|
||||
mock_respond.assert_called_once()
|
||||
response = mock_respond.call_args[0][0]
|
||||
assert response.error is None
|
||||
assert response.error is None
|
||||
|
||||
|
||||
class TestKnowledgeManagerLibraryDownload:
|
||||
"""Test get_kg_core streaming of library documents."""
|
||||
|
||||
@pytest.fixture
|
||||
def manager_with_librarian(self, mock_flow_config):
|
||||
with patch('trustgraph.cores.knowledge.KnowledgeTableStore'):
|
||||
mock_librarian = AsyncMock()
|
||||
manager = KnowledgeManager(
|
||||
cassandra_host=["localhost"],
|
||||
cassandra_username="test_user",
|
||||
cassandra_password="test_pass",
|
||||
keyspace="test_keyspace",
|
||||
flow_config=mock_flow_config,
|
||||
librarian=mock_librarian,
|
||||
)
|
||||
manager.table_store = AsyncMock()
|
||||
return manager
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_get_kg_core_streams_library_docs(self, manager_with_librarian):
|
||||
mock_request = Mock()
|
||||
mock_request.id = "root-doc"
|
||||
mock_respond = AsyncMock()
|
||||
|
||||
manager_with_librarian.table_store.get_triples = AsyncMock()
|
||||
manager_with_librarian.table_store.get_graph_embeddings = AsyncMock()
|
||||
|
||||
root_meta = DocumentMetadata(
|
||||
id="root-doc", kind="application/pdf", title="Test PDF",
|
||||
document_type="source",
|
||||
)
|
||||
child_meta = DocumentMetadata(
|
||||
id="chunk-1", kind="text/plain", title="Chunk 1",
|
||||
parent_id="root-doc", document_type="chunk",
|
||||
)
|
||||
|
||||
manager_with_librarian.librarian.fetch_document_metadata.return_value = root_meta
|
||||
manager_with_librarian.librarian.request.return_value = LibrarianResponse(
|
||||
document_metadatas=[child_meta],
|
||||
)
|
||||
manager_with_librarian.librarian.fetch_document_content.side_effect = [
|
||||
b"cm9vdCBjb250ZW50",
|
||||
b"Y2h1bmsgY29udGVudA==",
|
||||
]
|
||||
|
||||
await manager_with_librarian.get_kg_core(
|
||||
mock_request, mock_respond, "test-user"
|
||||
)
|
||||
|
||||
responses = [c[0][0] for c in mock_respond.call_args_list]
|
||||
|
||||
lm_responses = [r for r in responses if r.library_metadata is not None]
|
||||
lb_responses = [r for r in responses if r.library_blob is not None]
|
||||
eos_responses = [r for r in responses if r.eos is True]
|
||||
|
||||
assert len(lm_responses) == 2
|
||||
assert lm_responses[0].library_metadata.id == "root-doc"
|
||||
assert lm_responses[0].library_metadata.document_type == "source"
|
||||
assert lm_responses[1].library_metadata.id == "chunk-1"
|
||||
assert lm_responses[1].library_metadata.parent_id == "root-doc"
|
||||
|
||||
assert len(lb_responses) == 2
|
||||
assert lb_responses[0].library_blob.id == "root-doc"
|
||||
assert lb_responses[0].library_blob.data == b"cm9vdCBjb250ZW50"
|
||||
assert lb_responses[1].library_blob.id == "chunk-1"
|
||||
|
||||
assert len(eos_responses) == 1
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_get_kg_core_no_librarian_skips_library(self, mock_flow_config):
|
||||
with patch('trustgraph.cores.knowledge.KnowledgeTableStore'):
|
||||
manager = KnowledgeManager(
|
||||
cassandra_host=["localhost"],
|
||||
cassandra_username="u", cassandra_password="p",
|
||||
keyspace="ks", flow_config=mock_flow_config,
|
||||
)
|
||||
manager.table_store = AsyncMock()
|
||||
manager.table_store.get_triples = AsyncMock()
|
||||
manager.table_store.get_graph_embeddings = AsyncMock()
|
||||
|
||||
mock_request = Mock()
|
||||
mock_request.id = "doc-1"
|
||||
mock_respond = AsyncMock()
|
||||
|
||||
await manager.get_kg_core(mock_request, mock_respond, "w")
|
||||
|
||||
responses = [c[0][0] for c in mock_respond.call_args_list]
|
||||
assert all(r.library_metadata is None for r in responses)
|
||||
assert all(r.library_blob is None for r in responses)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_get_kg_core_librarian_metadata_failure_is_graceful(
|
||||
self, manager_with_librarian,
|
||||
):
|
||||
mock_request = Mock()
|
||||
mock_request.id = "missing-doc"
|
||||
mock_respond = AsyncMock()
|
||||
|
||||
manager_with_librarian.table_store.get_triples = AsyncMock()
|
||||
manager_with_librarian.table_store.get_graph_embeddings = AsyncMock()
|
||||
manager_with_librarian.librarian.fetch_document_metadata.side_effect = (
|
||||
RuntimeError("not found")
|
||||
)
|
||||
|
||||
await manager_with_librarian.get_kg_core(
|
||||
mock_request, mock_respond, "test-user"
|
||||
)
|
||||
|
||||
responses = [c[0][0] for c in mock_respond.call_args_list]
|
||||
assert all(r.library_metadata is None for r in responses)
|
||||
assert any(r.eos for r in responses)
|
||||
|
||||
|
||||
class TestKnowledgeManagerLibraryUpload:
|
||||
"""Test put_kg_core handling of library metadata and blob records."""
|
||||
|
||||
@pytest.fixture
|
||||
def manager_with_librarian(self, mock_flow_config):
|
||||
with patch('trustgraph.cores.knowledge.KnowledgeTableStore'):
|
||||
mock_librarian = AsyncMock()
|
||||
manager = KnowledgeManager(
|
||||
cassandra_host=["localhost"],
|
||||
cassandra_username="u", cassandra_password="p",
|
||||
keyspace="ks", flow_config=mock_flow_config,
|
||||
librarian=mock_librarian,
|
||||
)
|
||||
manager.table_store = AsyncMock()
|
||||
return manager
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_put_metadata_then_blob_calls_librarian(
|
||||
self, manager_with_librarian,
|
||||
):
|
||||
mock_respond = AsyncMock()
|
||||
manager_with_librarian.librarian.request.return_value = LibrarianResponse()
|
||||
|
||||
# First call: metadata
|
||||
req_meta = Mock()
|
||||
req_meta.triples = None
|
||||
req_meta.graph_embeddings = None
|
||||
req_meta.library_metadata = LibraryMetadata(
|
||||
id="doc-1", kind="application/pdf", title="Test",
|
||||
document_type="source",
|
||||
)
|
||||
req_meta.library_blob = None
|
||||
await manager_with_librarian.put_kg_core(req_meta, mock_respond, "ws")
|
||||
|
||||
# Metadata is buffered, librarian not called yet
|
||||
manager_with_librarian.librarian.request.assert_not_called()
|
||||
|
||||
# Second call: blob
|
||||
req_blob = Mock()
|
||||
req_blob.triples = None
|
||||
req_blob.graph_embeddings = None
|
||||
req_blob.library_metadata = None
|
||||
req_blob.library_blob = LibraryBlob(
|
||||
id="doc-1", data=b"dGVzdA==",
|
||||
)
|
||||
await manager_with_librarian.put_kg_core(req_blob, mock_respond, "ws")
|
||||
|
||||
# Now librarian should have been called with add-document
|
||||
manager_with_librarian.librarian.request.assert_called_once()
|
||||
call_args = manager_with_librarian.librarian.request.call_args[0][0]
|
||||
assert call_args.operation == "add-document"
|
||||
assert call_args.document_metadata.id == "doc-1"
|
||||
assert call_args.document_metadata.kind == "application/pdf"
|
||||
assert call_args.content == b"dGVzdA=="
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_put_child_document_uses_add_child_operation(
|
||||
self, manager_with_librarian,
|
||||
):
|
||||
mock_respond = AsyncMock()
|
||||
manager_with_librarian.librarian.request.return_value = LibrarianResponse()
|
||||
|
||||
req_meta = Mock()
|
||||
req_meta.triples = None
|
||||
req_meta.graph_embeddings = None
|
||||
req_meta.library_metadata = LibraryMetadata(
|
||||
id="chunk-1", kind="text/plain", title="Chunk",
|
||||
parent_id="doc-1", document_type="chunk",
|
||||
)
|
||||
req_meta.library_blob = None
|
||||
await manager_with_librarian.put_kg_core(req_meta, mock_respond, "ws")
|
||||
|
||||
req_blob = Mock()
|
||||
req_blob.triples = None
|
||||
req_blob.graph_embeddings = None
|
||||
req_blob.library_metadata = None
|
||||
req_blob.library_blob = LibraryBlob(id="chunk-1", data=b"Y2h1bms=")
|
||||
await manager_with_librarian.put_kg_core(req_blob, mock_respond, "ws")
|
||||
|
||||
call_args = manager_with_librarian.librarian.request.call_args[0][0]
|
||||
assert call_args.operation == "add-child-document"
|
||||
assert call_args.document_metadata.parent_id == "doc-1"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_put_blob_without_metadata_logs_warning(
|
||||
self, manager_with_librarian,
|
||||
):
|
||||
mock_respond = AsyncMock()
|
||||
|
||||
req_blob = Mock()
|
||||
req_blob.triples = None
|
||||
req_blob.graph_embeddings = None
|
||||
req_blob.library_metadata = None
|
||||
req_blob.library_blob = LibraryBlob(id="orphan", data=b"data")
|
||||
await manager_with_librarian.put_kg_core(req_blob, mock_respond, "ws")
|
||||
|
||||
# Librarian should not be called for orphan blob
|
||||
manager_with_librarian.librarian.request.assert_not_called()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_put_existing_document_is_graceful(
|
||||
self, manager_with_librarian,
|
||||
):
|
||||
mock_respond = AsyncMock()
|
||||
manager_with_librarian.librarian.request.side_effect = RuntimeError(
|
||||
"Document already exists"
|
||||
)
|
||||
|
||||
req_meta = Mock()
|
||||
req_meta.triples = None
|
||||
req_meta.graph_embeddings = None
|
||||
req_meta.library_metadata = LibraryMetadata(
|
||||
id="doc-1", kind="application/pdf", title="Test",
|
||||
document_type="source",
|
||||
)
|
||||
req_meta.library_blob = None
|
||||
await manager_with_librarian.put_kg_core(req_meta, mock_respond, "ws")
|
||||
|
||||
req_blob = Mock()
|
||||
req_blob.triples = None
|
||||
req_blob.graph_embeddings = None
|
||||
req_blob.library_metadata = None
|
||||
req_blob.library_blob = LibraryBlob(id="doc-1", data=b"data")
|
||||
await manager_with_librarian.put_kg_core(req_blob, mock_respond, "ws")
|
||||
|
||||
# Should not raise — "already exists" is handled gracefully
|
||||
|
|
@ -155,7 +155,7 @@ class TestGetTriples:
|
|||
@pytest.mark.asyncio
|
||||
@patch('trustgraph.tables.knowledge.async_execute_paged', new_callable=AsyncMock)
|
||||
async def test_row_converts_to_triples(self, mock_async_execute_paged):
|
||||
# row[3] is a list of (s_val, s_uri, p_val, p_uri, o_val, o_uri)
|
||||
# row[3] is a list of (s_val, s_uri, p_val, p_uri, o_val, o_uri, graph)
|
||||
fake_row = (
|
||||
None, None, None,
|
||||
[
|
||||
|
|
@ -163,6 +163,7 @@ class TestGetTriples:
|
|||
"http://example.org/alice", True,
|
||||
"http://example.org/knows", True,
|
||||
"http://example.org/bob", True,
|
||||
"urn:graph:source",
|
||||
),
|
||||
],
|
||||
)
|
||||
|
|
@ -191,3 +192,33 @@ class TestGetTriples:
|
|||
assert t.s.iri == "http://example.org/alice"
|
||||
assert t.p.iri == "http://example.org/knows"
|
||||
assert t.o.iri == "http://example.org/bob"
|
||||
assert t.g == "urn:graph:source"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@patch('trustgraph.tables.knowledge.async_execute_paged', new_callable=AsyncMock)
|
||||
async def test_empty_graph_name_becomes_none(self, mock_async_execute_paged):
|
||||
fake_row = (
|
||||
None, None, None,
|
||||
[
|
||||
(
|
||||
"http://example.org/alice", True,
|
||||
"http://example.org/knows", True,
|
||||
"http://example.org/bob", True,
|
||||
"",
|
||||
),
|
||||
],
|
||||
)
|
||||
|
||||
store = _make_store()
|
||||
store.cassandra = Mock()
|
||||
store.get_triples_stmt = Mock()
|
||||
mock_async_execute_paged.return_value = [[fake_row]]
|
||||
|
||||
received = []
|
||||
|
||||
async def receiver(msg):
|
||||
received.append(msg)
|
||||
|
||||
await store.get_triples("w", "d", receiver)
|
||||
|
||||
assert received[0].triples[0].g is None
|
||||
|
|
|
|||
|
|
@ -1,5 +1,6 @@
|
|||
"""
|
||||
Round-trip unit tests for KnowledgeRequestTranslator.
|
||||
Round-trip unit tests for KnowledgeRequestTranslator and
|
||||
KnowledgeResponseTranslator.
|
||||
|
||||
Regression coverage: a previous version of the decode side constructed
|
||||
EntityEmbeddings(vectors=...) — the schema field is `vector` (singular),
|
||||
|
|
@ -15,9 +16,13 @@ Triples breaks the test.
|
|||
|
||||
import pytest
|
||||
|
||||
from trustgraph.messaging.translators.knowledge import KnowledgeRequestTranslator
|
||||
from trustgraph.messaging.translators.knowledge import (
|
||||
KnowledgeRequestTranslator,
|
||||
KnowledgeResponseTranslator,
|
||||
)
|
||||
from trustgraph.schema import (
|
||||
KnowledgeRequest,
|
||||
KnowledgeResponse,
|
||||
GraphEmbeddings,
|
||||
EntityEmbeddings,
|
||||
Triples,
|
||||
|
|
@ -25,6 +30,8 @@ from trustgraph.schema import (
|
|||
Metadata,
|
||||
Term,
|
||||
IRI,
|
||||
LibraryMetadata,
|
||||
LibraryBlob,
|
||||
)
|
||||
|
||||
|
||||
|
|
@ -145,3 +152,161 @@ class TestKnowledgeRequestTranslatorTriples:
|
|||
assert t.s.iri == "http://example.org/alice"
|
||||
assert t.p.iri == "http://example.org/knows"
|
||||
assert t.o.iri == "http://example.org/bob"
|
||||
|
||||
|
||||
class TestKnowledgeRequestTranslatorLibrary:
|
||||
|
||||
def test_roundtrip_preserves_library_metadata(self, translator):
|
||||
request = KnowledgeRequest(
|
||||
operation="put-kg-core",
|
||||
id="doc-1",
|
||||
library_metadata=LibraryMetadata(
|
||||
id="doc-1",
|
||||
kind="application/pdf",
|
||||
title="Test Document",
|
||||
parent_id="",
|
||||
document_type="source",
|
||||
comments="test comments",
|
||||
tags=["tag1", "tag2"],
|
||||
),
|
||||
)
|
||||
|
||||
encoded = translator.encode(request)
|
||||
assert "library-metadata" in encoded
|
||||
lm = encoded["library-metadata"]
|
||||
assert lm["id"] == "doc-1"
|
||||
assert lm["kind"] == "application/pdf"
|
||||
assert lm["title"] == "Test Document"
|
||||
assert lm["parent-id"] == ""
|
||||
assert lm["document-type"] == "source"
|
||||
assert lm["comments"] == "test comments"
|
||||
assert lm["tags"] == ["tag1", "tag2"]
|
||||
|
||||
decoded = translator.decode(encoded)
|
||||
assert decoded.library_metadata is not None
|
||||
assert decoded.library_metadata.id == "doc-1"
|
||||
assert decoded.library_metadata.kind == "application/pdf"
|
||||
assert decoded.library_metadata.title == "Test Document"
|
||||
assert decoded.library_metadata.parent_id == ""
|
||||
assert decoded.library_metadata.document_type == "source"
|
||||
assert decoded.library_metadata.comments == "test comments"
|
||||
assert decoded.library_metadata.tags == ["tag1", "tag2"]
|
||||
|
||||
def test_roundtrip_preserves_child_document_metadata(self, translator):
|
||||
request = KnowledgeRequest(
|
||||
operation="put-kg-core",
|
||||
id="doc-1",
|
||||
library_metadata=LibraryMetadata(
|
||||
id="chunk-1",
|
||||
kind="text/plain",
|
||||
title="Chunk 1",
|
||||
parent_id="doc-1",
|
||||
document_type="chunk",
|
||||
),
|
||||
)
|
||||
|
||||
encoded = translator.encode(request)
|
||||
decoded = translator.decode(encoded)
|
||||
|
||||
assert decoded.library_metadata.parent_id == "doc-1"
|
||||
assert decoded.library_metadata.document_type == "chunk"
|
||||
|
||||
def test_roundtrip_preserves_library_blob(self, translator):
|
||||
request = KnowledgeRequest(
|
||||
operation="put-kg-core",
|
||||
id="doc-1",
|
||||
library_blob=LibraryBlob(
|
||||
id="doc-1",
|
||||
data=b"SGVsbG8gV29ybGQ=",
|
||||
),
|
||||
)
|
||||
|
||||
encoded = translator.encode(request)
|
||||
assert "library-blob" in encoded
|
||||
assert encoded["library-blob"]["id"] == "doc-1"
|
||||
assert encoded["library-blob"]["data"] == "SGVsbG8gV29ybGQ="
|
||||
|
||||
decoded = translator.decode(encoded)
|
||||
assert decoded.library_blob is not None
|
||||
assert decoded.library_blob.id == "doc-1"
|
||||
assert decoded.library_blob.data == "SGVsbG8gV29ybGQ="
|
||||
|
||||
def test_absent_library_fields_decode_as_none(self, translator):
|
||||
decoded = translator.decode({
|
||||
"operation": "get-kg-core",
|
||||
"id": "doc-1",
|
||||
})
|
||||
assert decoded.library_metadata is None
|
||||
assert decoded.library_blob is None
|
||||
|
||||
|
||||
class TestKnowledgeResponseTranslatorLibrary:
|
||||
|
||||
@pytest.fixture
|
||||
def response_translator(self):
|
||||
return KnowledgeResponseTranslator()
|
||||
|
||||
def test_encode_library_metadata(self, response_translator):
|
||||
response = KnowledgeResponse(
|
||||
ids=None,
|
||||
library_metadata=LibraryMetadata(
|
||||
id="doc-1",
|
||||
kind="application/pdf",
|
||||
title="Test",
|
||||
parent_id="",
|
||||
document_type="source",
|
||||
comments="",
|
||||
tags=[],
|
||||
),
|
||||
)
|
||||
encoded = response_translator.encode(response)
|
||||
assert "library-metadata" in encoded
|
||||
assert encoded["library-metadata"]["id"] == "doc-1"
|
||||
assert encoded["library-metadata"]["kind"] == "application/pdf"
|
||||
assert encoded["library-metadata"]["document-type"] == "source"
|
||||
|
||||
def test_encode_library_blob_bytes_to_string(self, response_translator):
|
||||
response = KnowledgeResponse(
|
||||
ids=None,
|
||||
library_blob=LibraryBlob(
|
||||
id="doc-1",
|
||||
data=b"dGVzdCBkYXRh",
|
||||
),
|
||||
)
|
||||
encoded = response_translator.encode(response)
|
||||
assert "library-blob" in encoded
|
||||
assert encoded["library-blob"]["id"] == "doc-1"
|
||||
assert encoded["library-blob"]["data"] == "dGVzdCBkYXRh"
|
||||
assert isinstance(encoded["library-blob"]["data"], str)
|
||||
|
||||
def test_encode_library_blob_string_passthrough(self, response_translator):
|
||||
response = KnowledgeResponse(
|
||||
ids=None,
|
||||
library_blob=LibraryBlob(
|
||||
id="doc-1",
|
||||
data="already-a-string",
|
||||
),
|
||||
)
|
||||
encoded = response_translator.encode(response)
|
||||
assert encoded["library-blob"]["data"] == "already-a-string"
|
||||
|
||||
def test_library_metadata_is_not_final(self, response_translator):
|
||||
response = KnowledgeResponse(
|
||||
ids=None,
|
||||
library_metadata=LibraryMetadata(id="doc-1"),
|
||||
)
|
||||
_, is_final = response_translator.encode_with_completion(response)
|
||||
assert is_final is False
|
||||
|
||||
def test_library_blob_is_not_final(self, response_translator):
|
||||
response = KnowledgeResponse(
|
||||
ids=None,
|
||||
library_blob=LibraryBlob(id="doc-1", data=b"data"),
|
||||
)
|
||||
_, is_final = response_translator.encode_with_completion(response)
|
||||
assert is_final is False
|
||||
|
||||
def test_eos_is_final(self, response_translator):
|
||||
response = KnowledgeResponse(eos=True)
|
||||
_, is_final = response_translator.encode_with_completion(response)
|
||||
assert is_final is True
|
||||
|
|
|
|||
|
|
@ -502,6 +502,7 @@ class SocketClient:
|
|||
|
||||
def put_kg_core(
|
||||
self, id: str, triples=None, graph_embeddings=None,
|
||||
library_metadata=None, library_blob=None,
|
||||
) -> Dict[str, Any]:
|
||||
request = {
|
||||
"operation": "put-kg-core",
|
||||
|
|
@ -512,6 +513,10 @@ class SocketClient:
|
|||
request["triples"] = triples
|
||||
if graph_embeddings is not None:
|
||||
request["graph-embeddings"] = graph_embeddings
|
||||
if library_metadata is not None:
|
||||
request["library-metadata"] = library_metadata
|
||||
if library_blob is not None:
|
||||
request["library-blob"] = library_blob
|
||||
return self._send_request_sync("knowledge", None, request)
|
||||
|
||||
def get_de_core(self, id: str) -> Iterator[Dict[str, Any]]:
|
||||
|
|
|
|||
|
|
@ -2,7 +2,8 @@ from typing import Dict, Any, Tuple, Optional
|
|||
from ...schema import (
|
||||
KnowledgeRequest, KnowledgeResponse, Triples, GraphEmbeddings,
|
||||
DocumentEmbeddings, ChunkEmbeddings,
|
||||
Metadata, EntityEmbeddings
|
||||
Metadata, EntityEmbeddings,
|
||||
LibraryMetadata, LibraryBlob,
|
||||
)
|
||||
from .base import MessageTranslator
|
||||
from .primitives import ValueTranslator, SubgraphTranslator
|
||||
|
|
@ -61,6 +62,27 @@ class KnowledgeRequestTranslator(MessageTranslator):
|
|||
]
|
||||
)
|
||||
|
||||
library_metadata = None
|
||||
if "library-metadata" in data:
|
||||
lm = data["library-metadata"]
|
||||
library_metadata = LibraryMetadata(
|
||||
id=lm.get("id", ""),
|
||||
kind=lm.get("kind", ""),
|
||||
title=lm.get("title", ""),
|
||||
parent_id=lm.get("parent-id", ""),
|
||||
document_type=lm.get("document-type", ""),
|
||||
comments=lm.get("comments", ""),
|
||||
tags=lm.get("tags", []),
|
||||
)
|
||||
|
||||
library_blob = None
|
||||
if "library-blob" in data:
|
||||
lb = data["library-blob"]
|
||||
library_blob = LibraryBlob(
|
||||
id=lb.get("id", ""),
|
||||
data=lb.get("data", b""),
|
||||
)
|
||||
|
||||
return KnowledgeRequest(
|
||||
operation=data.get("operation"),
|
||||
id=data.get("id"),
|
||||
|
|
@ -69,6 +91,8 @@ class KnowledgeRequestTranslator(MessageTranslator):
|
|||
triples=triples,
|
||||
graph_embeddings=graph_embeddings,
|
||||
document_embeddings=document_embeddings,
|
||||
library_metadata=library_metadata,
|
||||
library_blob=library_blob,
|
||||
)
|
||||
|
||||
def encode(self, obj: KnowledgeRequest) -> Dict[str, Any]:
|
||||
|
|
@ -125,6 +149,26 @@ class KnowledgeRequestTranslator(MessageTranslator):
|
|||
],
|
||||
}
|
||||
|
||||
if obj.library_metadata:
|
||||
result["library-metadata"] = {
|
||||
"id": obj.library_metadata.id,
|
||||
"kind": obj.library_metadata.kind,
|
||||
"title": obj.library_metadata.title,
|
||||
"parent-id": obj.library_metadata.parent_id,
|
||||
"document-type": obj.library_metadata.document_type,
|
||||
"comments": obj.library_metadata.comments,
|
||||
"tags": obj.library_metadata.tags,
|
||||
}
|
||||
|
||||
if obj.library_blob:
|
||||
data = obj.library_blob.data
|
||||
if isinstance(data, bytes):
|
||||
data = data.decode("utf-8")
|
||||
result["library-blob"] = {
|
||||
"id": obj.library_blob.id,
|
||||
"data": data,
|
||||
}
|
||||
|
||||
return result
|
||||
|
||||
|
||||
|
|
@ -194,6 +238,32 @@ class KnowledgeResponseTranslator(MessageTranslator):
|
|||
}
|
||||
}
|
||||
|
||||
# Streaming library metadata response
|
||||
if obj.library_metadata:
|
||||
return {
|
||||
"library-metadata": {
|
||||
"id": obj.library_metadata.id,
|
||||
"kind": obj.library_metadata.kind,
|
||||
"title": obj.library_metadata.title,
|
||||
"parent-id": obj.library_metadata.parent_id,
|
||||
"document-type": obj.library_metadata.document_type,
|
||||
"comments": obj.library_metadata.comments,
|
||||
"tags": obj.library_metadata.tags,
|
||||
}
|
||||
}
|
||||
|
||||
# Streaming library blob response
|
||||
if obj.library_blob:
|
||||
data = obj.library_blob.data
|
||||
if isinstance(data, bytes):
|
||||
data = data.decode("utf-8")
|
||||
return {
|
||||
"library-blob": {
|
||||
"id": obj.library_blob.id,
|
||||
"data": data,
|
||||
}
|
||||
}
|
||||
|
||||
# End of stream marker
|
||||
if obj.eos is True:
|
||||
return {"eos": True}
|
||||
|
|
@ -209,7 +279,9 @@ class KnowledgeResponseTranslator(MessageTranslator):
|
|||
is_final = (
|
||||
obj.ids is not None or # List response
|
||||
obj.eos is True or # End of stream
|
||||
(not obj.triples and not obj.graph_embeddings and not obj.document_embeddings) # Empty response
|
||||
(not obj.triples and not obj.graph_embeddings
|
||||
and not obj.document_embeddings
|
||||
and not obj.library_metadata and not obj.library_blob) # Empty response
|
||||
)
|
||||
|
||||
return response, is_final
|
||||
|
|
@ -21,6 +21,21 @@ from .embeddings import GraphEmbeddings, DocumentEmbeddings
|
|||
# <- ()
|
||||
# <- (error)
|
||||
|
||||
@dataclass
|
||||
class LibraryMetadata:
|
||||
id: str = ""
|
||||
kind: str = ""
|
||||
title: str = ""
|
||||
parent_id: str = ""
|
||||
document_type: str = ""
|
||||
comments: str = ""
|
||||
tags: list[str] = field(default_factory=list)
|
||||
|
||||
@dataclass
|
||||
class LibraryBlob:
|
||||
id: str = ""
|
||||
data: bytes = b""
|
||||
|
||||
@dataclass
|
||||
class KnowledgeRequest:
|
||||
# get-kg-core, delete-kg-core, list-kg-cores, put-kg-core
|
||||
|
|
@ -44,6 +59,10 @@ class KnowledgeRequest:
|
|||
# put-de-core
|
||||
document_embeddings: DocumentEmbeddings | None = None
|
||||
|
||||
# put-kg-core (source material)
|
||||
library_metadata: LibraryMetadata | None = None
|
||||
library_blob: LibraryBlob | None = None
|
||||
|
||||
@dataclass
|
||||
class KnowledgeResponse:
|
||||
error: Error | None = None
|
||||
|
|
@ -52,6 +71,8 @@ class KnowledgeResponse:
|
|||
triples: Triples | None = None
|
||||
graph_embeddings: GraphEmbeddings | None = None
|
||||
document_embeddings: DocumentEmbeddings | None = None
|
||||
library_metadata: LibraryMetadata | None = None
|
||||
library_blob: LibraryBlob | None = None
|
||||
|
||||
knowledge_request_queue = queue('knowledge', cls='request')
|
||||
knowledge_response_queue = queue('knowledge', cls='response')
|
||||
|
|
|
|||
|
|
@ -47,6 +47,31 @@ def write_ge(f, data):
|
|||
)
|
||||
f.write(msgpack.packb(msg, use_bin_type=True))
|
||||
|
||||
def write_library_metadata(f, data):
|
||||
msg = (
|
||||
"lm",
|
||||
{
|
||||
"i": data["id"],
|
||||
"k": data.get("kind", ""),
|
||||
"t": data.get("title", ""),
|
||||
"p": data.get("parent-id", ""),
|
||||
"d": data.get("document-type", ""),
|
||||
"c": data.get("comments", ""),
|
||||
"g": data.get("tags", []),
|
||||
}
|
||||
)
|
||||
f.write(msgpack.packb(msg, use_bin_type=True))
|
||||
|
||||
def write_library_blob(f, data):
|
||||
msg = (
|
||||
"lb",
|
||||
{
|
||||
"i": data["id"],
|
||||
"d": data.get("data", b""),
|
||||
}
|
||||
)
|
||||
f.write(msgpack.packb(msg, use_bin_type=True))
|
||||
|
||||
def fetch(url, workspace, id, output, token=None):
|
||||
|
||||
api = Api(url=url, token=token, workspace=workspace)
|
||||
|
|
@ -55,6 +80,8 @@ def fetch(url, workspace, id, output, token=None):
|
|||
try:
|
||||
ge = 0
|
||||
t = 0
|
||||
lm = 0
|
||||
lb = 0
|
||||
|
||||
with open(output, "wb") as f:
|
||||
|
||||
|
|
@ -68,7 +95,15 @@ def fetch(url, workspace, id, output, token=None):
|
|||
ge += 1
|
||||
write_ge(f, response["graph-embeddings"])
|
||||
|
||||
print(f"Got: {t} triple, {ge} GE messages.")
|
||||
if "library-metadata" in response:
|
||||
lm += 1
|
||||
write_library_metadata(f, response["library-metadata"])
|
||||
|
||||
if "library-blob" in response:
|
||||
lb += 1
|
||||
write_library_blob(f, response["library-blob"])
|
||||
|
||||
print(f"Got: {t} triple, {ge} GE, {lm} library metadata, {lb} library blob messages.")
|
||||
|
||||
finally:
|
||||
socket.close()
|
||||
|
|
|
|||
|
|
@ -40,6 +40,23 @@ def read_message(unpacked, id):
|
|||
},
|
||||
"triples": msg["t"],
|
||||
}
|
||||
elif unpacked[0] == "lm":
|
||||
msg = unpacked[1]
|
||||
return "lm", {
|
||||
"id": msg["i"],
|
||||
"kind": msg.get("k", ""),
|
||||
"title": msg.get("t", ""),
|
||||
"parent-id": msg.get("p", ""),
|
||||
"document-type": msg.get("d", ""),
|
||||
"comments": msg.get("c", ""),
|
||||
"tags": msg.get("g", []),
|
||||
}
|
||||
elif unpacked[0] == "lb":
|
||||
msg = unpacked[1]
|
||||
return "lb", {
|
||||
"id": msg["i"],
|
||||
"data": msg.get("d", b""),
|
||||
}
|
||||
else:
|
||||
raise RuntimeError("Unpacked unexpected messsage type", unpacked[0])
|
||||
|
||||
|
|
@ -51,6 +68,8 @@ def put(url, workspace, id, input, token=None):
|
|||
try:
|
||||
ge = 0
|
||||
t = 0
|
||||
lm = 0
|
||||
lb = 0
|
||||
|
||||
with open(input, "rb") as f:
|
||||
|
||||
|
|
@ -73,10 +92,18 @@ def put(url, workspace, id, input, token=None):
|
|||
t += 1
|
||||
socket.put_kg_core(id, triples=msg)
|
||||
|
||||
elif kind == "lm":
|
||||
lm += 1
|
||||
socket.put_kg_core(id, library_metadata=msg)
|
||||
|
||||
elif kind == "lb":
|
||||
lb += 1
|
||||
socket.put_kg_core(id, library_blob=msg)
|
||||
|
||||
else:
|
||||
raise RuntimeError("Unexpected message kind", kind)
|
||||
|
||||
print(f"Put: {t} triple, {ge} GE messages.")
|
||||
print(f"Put: {t} triple, {ge} GE, {lm} library metadata, {lb} library blob messages.")
|
||||
|
||||
finally:
|
||||
socket.close()
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
|
||||
from .. schema import KnowledgeResponse, Error, Triples, GraphEmbeddings
|
||||
from .. schema import DocumentEmbeddings
|
||||
from .. schema import DocumentEmbeddings, LibraryMetadata, LibraryBlob
|
||||
from .. schema import LibrarianRequest, DocumentMetadata
|
||||
from .. knowledge import hash
|
||||
from .. exceptions import RequestError
|
||||
from .. tables.knowledge import KnowledgeTableStore
|
||||
|
|
@ -18,7 +19,7 @@ class KnowledgeManager:
|
|||
|
||||
def __init__(
|
||||
self, cassandra_host, cassandra_username, cassandra_password,
|
||||
keyspace, flow_config, replication_factor=1,
|
||||
keyspace, flow_config, librarian=None, replication_factor=1,
|
||||
):
|
||||
|
||||
self.table_store = KnowledgeTableStore(
|
||||
|
|
@ -26,6 +27,9 @@ class KnowledgeManager:
|
|||
replication_factor
|
||||
)
|
||||
|
||||
self.librarian = librarian
|
||||
self._pending_library_metadata = {}
|
||||
|
||||
self.loader_queue = asyncio.Queue(maxsize=20)
|
||||
self.background_task = None
|
||||
self.flow_config = flow_config
|
||||
|
|
@ -86,6 +90,9 @@ class KnowledgeManager:
|
|||
publish_ge,
|
||||
)
|
||||
|
||||
if self.librarian:
|
||||
await self._stream_library_docs(request.id, respond)
|
||||
|
||||
logger.debug("Knowledge core retrieval complete")
|
||||
|
||||
await respond(
|
||||
|
|
@ -122,6 +129,12 @@ class KnowledgeManager:
|
|||
workspace, request.graph_embeddings
|
||||
)
|
||||
|
||||
if request.library_metadata and self.librarian:
|
||||
await self._put_library_metadata(request.library_metadata, workspace)
|
||||
|
||||
if request.library_blob and self.librarian:
|
||||
await self._put_library_blob(request.library_blob, workspace)
|
||||
|
||||
await respond(
|
||||
KnowledgeResponse(
|
||||
error = None,
|
||||
|
|
@ -250,6 +263,112 @@ class KnowledgeManager:
|
|||
|
||||
await self.loader_queue.put((request, respond, workspace))
|
||||
|
||||
async def _stream_library_docs(self, document_id, respond):
|
||||
|
||||
try:
|
||||
root_meta = await self.librarian.fetch_document_metadata(
|
||||
document_id
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning(f"Could not fetch library metadata for {document_id}: {e}")
|
||||
return
|
||||
|
||||
if root_meta is None:
|
||||
return
|
||||
|
||||
await self._stream_one_doc(root_meta, respond)
|
||||
|
||||
try:
|
||||
resp = await self.librarian.request(
|
||||
LibrarianRequest(
|
||||
operation="list-children",
|
||||
document_id=document_id,
|
||||
)
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning(f"Could not list children for {document_id}: {e}")
|
||||
return
|
||||
|
||||
for child_meta in resp.document_metadatas:
|
||||
await self._stream_one_doc(child_meta, respond)
|
||||
|
||||
async def _stream_one_doc(self, doc_meta, respond):
|
||||
|
||||
lm = LibraryMetadata(
|
||||
id=doc_meta.id,
|
||||
kind=doc_meta.kind,
|
||||
title=doc_meta.title,
|
||||
parent_id=doc_meta.parent_id,
|
||||
document_type=doc_meta.document_type,
|
||||
comments=doc_meta.comments,
|
||||
tags=doc_meta.tags or [],
|
||||
)
|
||||
|
||||
await respond(
|
||||
KnowledgeResponse(library_metadata=lm)
|
||||
)
|
||||
|
||||
try:
|
||||
content = await self.librarian.fetch_document_content(
|
||||
doc_meta.id
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning(f"Could not fetch content for {doc_meta.id}: {e}")
|
||||
return
|
||||
|
||||
await respond(
|
||||
KnowledgeResponse(
|
||||
library_blob=LibraryBlob(
|
||||
id=doc_meta.id,
|
||||
data=content,
|
||||
)
|
||||
)
|
||||
)
|
||||
|
||||
async def _put_library_metadata(self, lm, workspace):
|
||||
self._pending_library_metadata[lm.id] = lm
|
||||
|
||||
async def _put_library_blob(self, lb, workspace):
|
||||
|
||||
lm = self._pending_library_metadata.pop(lb.id, None)
|
||||
if lm is None:
|
||||
logger.warning(
|
||||
f"Received library blob for {lb.id} with no preceding metadata"
|
||||
)
|
||||
return
|
||||
|
||||
doc_meta = DocumentMetadata(
|
||||
id=lm.id,
|
||||
kind=lm.kind,
|
||||
title=lm.title,
|
||||
parent_id=lm.parent_id,
|
||||
document_type=lm.document_type,
|
||||
comments=lm.comments,
|
||||
tags=lm.tags or [],
|
||||
)
|
||||
|
||||
if lm.parent_id:
|
||||
operation = "add-child-document"
|
||||
else:
|
||||
operation = "add-document"
|
||||
|
||||
try:
|
||||
await self.librarian.request(
|
||||
LibrarianRequest(
|
||||
operation=operation,
|
||||
document_id=lm.id,
|
||||
document_metadata=doc_meta,
|
||||
content=lb.data,
|
||||
)
|
||||
)
|
||||
except RuntimeError as e:
|
||||
if "already exists" in str(e):
|
||||
logger.debug(f"Library document {lm.id} already exists, skipping")
|
||||
else:
|
||||
logger.warning(f"Could not save library document {lm.id}: {e}")
|
||||
except Exception as e:
|
||||
logger.warning(f"Could not save library document {lm.id}: {e}")
|
||||
|
||||
async def core_loader(self):
|
||||
|
||||
logger.info("Knowledge background processor running...")
|
||||
|
|
|
|||
|
|
@ -12,6 +12,7 @@ import logging
|
|||
from .. base import WorkspaceProcessor, Consumer, Producer, Publisher, Subscriber
|
||||
from .. base import ConsumerMetrics, ProducerMetrics
|
||||
from .. base.cassandra_config import add_cassandra_args, resolve_cassandra_config
|
||||
from .. base import LibrarianClient
|
||||
|
||||
from .. schema import KnowledgeRequest, KnowledgeResponse, Error
|
||||
from .. schema import knowledge_request_queue, knowledge_response_queue
|
||||
|
|
@ -77,12 +78,17 @@ class Processor(WorkspaceProcessor):
|
|||
}
|
||||
)
|
||||
|
||||
self.librarian_client = LibrarianClient(
|
||||
id=id, backend=self.pubsub, taskgroup=self.taskgroup,
|
||||
)
|
||||
|
||||
self.knowledge = KnowledgeManager(
|
||||
cassandra_host = self.cassandra_host,
|
||||
cassandra_username = self.cassandra_username,
|
||||
cassandra_password = self.cassandra_password,
|
||||
keyspace = keyspace,
|
||||
flow_config = self,
|
||||
librarian = self.librarian_client,
|
||||
replication_factor = replication_factor,
|
||||
)
|
||||
|
||||
|
|
@ -156,6 +162,7 @@ class Processor(WorkspaceProcessor):
|
|||
async def start(self):
|
||||
|
||||
await super(Processor, self).start()
|
||||
await self.librarian_client.start()
|
||||
|
||||
async def on_knowledge_config(self, workspace, config, version):
|
||||
|
||||
|
|
|
|||
|
|
@ -73,6 +73,39 @@ class CoreExport:
|
|||
enc = msgpack.packb(msg)
|
||||
await response.write(enc)
|
||||
|
||||
if "library-metadata" in resp:
|
||||
|
||||
data = resp["library-metadata"]
|
||||
msg = (
|
||||
"lm",
|
||||
{
|
||||
"i": data["id"],
|
||||
"k": data.get("kind", ""),
|
||||
"t": data.get("title", ""),
|
||||
"p": data.get("parent-id", ""),
|
||||
"d": data.get("document-type", ""),
|
||||
"c": data.get("comments", ""),
|
||||
"g": data.get("tags", []),
|
||||
}
|
||||
)
|
||||
|
||||
enc = msgpack.packb(msg)
|
||||
await response.write(enc)
|
||||
|
||||
if "library-blob" in resp:
|
||||
|
||||
data = resp["library-blob"]
|
||||
msg = (
|
||||
"lb",
|
||||
{
|
||||
"i": data["id"],
|
||||
"d": data.get("data", b""),
|
||||
}
|
||||
)
|
||||
|
||||
enc = msgpack.packb(msg, use_bin_type=True)
|
||||
await response.write(enc)
|
||||
|
||||
await kr.process(
|
||||
{
|
||||
"operation": "get-kg-core",
|
||||
|
|
|
|||
|
|
@ -79,6 +79,39 @@ class CoreImport:
|
|||
|
||||
await kr.process(msg)
|
||||
|
||||
elif unpacked[0] == "lm":
|
||||
msg = unpacked[1]
|
||||
msg = {
|
||||
"operation": "put-kg-core",
|
||||
"workspace": workspace,
|
||||
"id": id,
|
||||
"library-metadata": {
|
||||
"id": msg["i"],
|
||||
"kind": msg.get("k", ""),
|
||||
"title": msg.get("t", ""),
|
||||
"parent-id": msg.get("p", ""),
|
||||
"document-type": msg.get("d", ""),
|
||||
"comments": msg.get("c", ""),
|
||||
"tags": msg.get("g", []),
|
||||
}
|
||||
}
|
||||
|
||||
await kr.process(msg)
|
||||
|
||||
elif unpacked[0] == "lb":
|
||||
msg = unpacked[1]
|
||||
msg = {
|
||||
"operation": "put-kg-core",
|
||||
"workspace": workspace,
|
||||
"id": id,
|
||||
"library-blob": {
|
||||
"id": msg["i"],
|
||||
"data": msg.get("d", b""),
|
||||
}
|
||||
}
|
||||
|
||||
await kr.process(msg)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Core import exception: {e}", exc_info=True)
|
||||
await error(str(e))
|
||||
|
|
|
|||
|
|
@ -98,7 +98,8 @@ class KnowledgeTableStore:
|
|||
text, boolean, text, boolean, text, boolean
|
||||
>>,
|
||||
triples list<tuple<
|
||||
text, boolean, text, boolean, text, boolean
|
||||
text, boolean, text, boolean, text, boolean,
|
||||
text
|
||||
>>,
|
||||
PRIMARY KEY ((workspace, document_id), id)
|
||||
);
|
||||
|
|
@ -234,7 +235,8 @@ class KnowledgeTableStore:
|
|||
|
||||
triples = [
|
||||
(
|
||||
*term_to_tuple(v.s), *term_to_tuple(v.p), *term_to_tuple(v.o)
|
||||
*term_to_tuple(v.s), *term_to_tuple(v.p), *term_to_tuple(v.o),
|
||||
v.g or ""
|
||||
)
|
||||
for v in m.triples
|
||||
]
|
||||
|
|
@ -416,6 +418,7 @@ class KnowledgeTableStore:
|
|||
s = tuple_to_term(elt[0], elt[1]),
|
||||
p = tuple_to_term(elt[2], elt[3]),
|
||||
o = tuple_to_term(elt[4], elt[5]),
|
||||
g = elt[6] if elt[6] else None,
|
||||
)
|
||||
for elt in row[3]
|
||||
]
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue