From 93e31d2b842fdf1a15ece7f630d3855c9dd02093 Mon Sep 17 00:00:00 2001 From: Cyber MacGeddon Date: Wed, 3 Jun 2026 10:45:06 +0100 Subject: [PATCH] =?UTF-8?q?feat:=20complete=20knowledge=20core=20storage?= =?UTF-8?q?=20=E2=80=94=20named=20graphs,=20provenance,=20source=20materia?= =?UTF-8?q?l?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implements all three changes from the knowledge-core-completeness tech spec: 1. Named graph field preserved through Cassandra storage (7-element tuple), enabling provenance triples to retain their graph URIs on round-trip. 2. Provenance triples already arrive on triples-input — no routing change needed; Change 1 was sufficient. 3. Source material (library documents) streamed alongside triples and embeddings during core download/upload. The knowledge manager fetches the document hierarchy from the librarian on download and recreates it on upload, preserving the full provenance chain across instances. --- .../tech-specs/knowledge-core-completeness.md | 535 ++++++++++++++++++ .../unit/test_cores/test_knowledge_manager.py | 254 ++++++++- .../test_tables/test_knowledge_table_store.py | 33 +- .../test_knowledge_translator_roundtrip.py | 169 +++++- .../trustgraph/api/socket_client.py | 5 + .../messaging/translators/knowledge.py | 76 ++- .../trustgraph/schema/knowledge/knowledge.py | 21 + trustgraph-cli/trustgraph/cli/get_kg_core.py | 37 +- trustgraph-cli/trustgraph/cli/put_kg_core.py | 29 +- trustgraph-flow/trustgraph/cores/knowledge.py | 123 +++- trustgraph-flow/trustgraph/cores/service.py | 7 + .../gateway/dispatch/core_export.py | 33 ++ .../gateway/dispatch/core_import.py | 33 ++ .../trustgraph/tables/knowledge.py | 7 +- 14 files changed, 1347 insertions(+), 15 deletions(-) create mode 100644 docs/tech-specs/knowledge-core-completeness.md diff --git a/docs/tech-specs/knowledge-core-completeness.md b/docs/tech-specs/knowledge-core-completeness.md new file mode 100644 index 00000000..3ccb41f0 --- /dev/null +++ b/docs/tech-specs/knowledge-core-completeness.md @@ -0,0 +1,535 @@ +--- +layout: default +title: "Knowledge Core Completeness" +parent: "Tech Specs" +--- + +# Knowledge Core Completeness + +## Overview + +Knowledge cores are portable snapshots of extracted knowledge: triples, graph +embeddings, and document embeddings stored in Cassandra's `knowledge` keyspace. +They can be downloaded as files, transferred between TrustGraph instances, and +loaded back into vector and graph stores. + +Recent additions to TrustGraph — explainability/provenance and named graphs — +were not carried through to the knowledge core system. This means that +exporting and re-importing a core loses provenance links, graph assignments, +and source material, breaking the explainability chain. + +This specification addresses three gaps: + +1. **Named graphs not stored** — The `g` (graph name) field on triples is + silently dropped when writing to the core store and comes back as `None` + on read. +2. **Provenance triples not captured** — Provenance triples (PROV-O) are + generated during extraction and flow to graph stores, but never enter + the knowledge core store. It is unclear whether they arrive at the store + in the correct form. +3. **Source material not included** — Documents, text pages, and chunks in + the librarian's bucket store are not part of the core. After loading a + core on a different instance, provenance links to source material point + at nothing. + +## Goals + +- **Self-contained cores**: A downloaded knowledge core file contains + everything needed to reconstruct the full knowledge graph including + provenance and source attribution on a fresh instance. +- **Named graph preservation**: Round-tripping a core preserves graph + assignments on all triples. +- **Backward compatibility**: Existing core files (without graph names or + source material) can still be uploaded and loaded. New fields are optional + on import. +- **No change to core identity**: A core is still identified by its document + ID. The additional data is associated with the same core ID. +- **Minimal file format changes**: Extend the existing msgpack record format + with new record types rather than restructuring existing ones. + +## Background + +### Current Lifecycle + +``` +Extraction pipeline + │ + ├─ triples ──────────────────► knowledge core store (Cassandra) + ├─ graph embeddings ─────────► knowledge core store (Cassandra) + ├─ document embeddings ──────► knowledge core store (Cassandra) + ├─ provenance triples ───────► graph store (only) + └─ source documents ─────────► librarian bucket store (only) + +Download: Cassandra ──► knowledge manager ──► API gateway ──► client file +Upload: client file ──► API gateway ──► knowledge manager ──► Cassandra +Load: Cassandra ──► knowledge manager ──► Pulsar topics ──► graph/vector stores +``` + +### Current Core File Format (msgpack) + +A core file is a sequence of concatenated msgpack records. Each record is a +2-element tuple: `(type_tag, payload)`. + +| Type tag | Payload | Description | +|----------|---------|-------------| +| `"t"` | `{"m": {id, root, collection}, "t": [triple_dicts]}` | Triple batch | +| `"ge"` | `{"m": {id, root, collection}, "e": [{entity, vector}]}` | Graph embedding batch | + +### What's Missing + +#### Named Graphs + +The `Triple` dataclass has a `g: str | None` field (graph name IRI), used to +separate provenance graphs (`urn:graph:source`, `urn:graph:retrieval`) from +the default graph. However: + +- **Cassandra schema** (`knowledge.triples` table): stores a 6-tuple per + triple `(s_val, s_is_uri, p_val, p_is_uri, o_val, o_is_uri)` — no graph + field. +- **`add_triples()`** (`tables/knowledge.py:231`): destructures only `s`, + `p`, `o` — `g` is discarded. +- **`get_triples()`** (`tables/knowledge.py:396`): reconstructs `Triple` + with `g` defaulting to `None`. +- **Core file format**: triple dicts do not include a graph field. + +#### Provenance Triples + +Provenance triples are generated in the extraction pipeline +(`trustgraph-base/trustgraph/provenance/triples.py`) and published to graph +store topics. They use named graphs (`urn:graph:source`, +`urn:graph:retrieval`) and PROV-O vocabulary. + +The knowledge core store processor (`storage/knowledge/store.py`) listens on +`triples-input` and `graph-embeddings-input`. Whether provenance triples +arrive on the same `triples-input` topic or a separate one needs +verification. Even if they do arrive, the graph name would be lost (per +above). + +#### Source Material + +The librarian stores the full document hierarchy in a separate system: + +- **Blob store** (S3/MinIO): original documents, text pages, chunks — + keyed by object UUID under `doc/{object_id}`. +- **Cassandra `library` keyspace**: document metadata including `id`, + `kind` (MIME type), `title`, `parent_id`, `document_type` + (`source`/`extracted`), `object_id` (blob reference). + +Provenance triples link extracted facts back to chunk/page/document IDs. +Those IDs resolve through the librarian. When a core is loaded on a +different instance, the librarian has no matching documents, so the entire +provenance chain is broken. + +### Key Source Files + +| Component | File | Purpose | +|-----------|------|---------| +| Core Cassandra schema | `trustgraph-flow/trustgraph/tables/knowledge.py` | Table definitions, read/write | +| Core manager | `trustgraph-flow/trustgraph/cores/knowledge.py` | API operations, load-to-store | +| Core store processor | `trustgraph-flow/trustgraph/storage/knowledge/store.py` | Extraction → Cassandra | +| CLI download | `trustgraph-cli/trustgraph/cli/get_kg_core.py` | Core → msgpack file | +| CLI upload | `trustgraph-cli/trustgraph/cli/put_kg_core.py` | Msgpack file → core | +| CLI load | `trustgraph-cli/trustgraph/cli/load_kg_core.py` | Core → graph/vector stores | +| API client | `trustgraph-base/trustgraph/api/knowledge.py` | Client-side knowledge API | +| Triple schema | `trustgraph-base/trustgraph/schema/core/primitives.py` | Triple dataclass with `g` field | +| Provenance generation | `trustgraph-base/trustgraph/provenance/triples.py` | PROV-O triple creation | +| Librarian | `trustgraph-flow/trustgraph/librarian/librarian.py` | Document storage service | +| Library tables | `trustgraph-flow/trustgraph/tables/library.py` | Document metadata in Cassandra | +| Blob store | `trustgraph-flow/trustgraph/librarian/blob_store.py` | S3/MinIO object storage | + +## Technical Design + +### Change 1: Named Graph Field in Core Storage + +#### Cassandra Schema + +Extend the `triples` tuple from 6 to 7 elements, adding the graph name: + +``` +triples list> +``` + +**Migration**: The schema change uses `ALTER TABLE` or is handled by +creating a new table version. Existing rows with 6-element tuples must be +handled gracefully on read — if the tuple has 6 elements, treat graph as +default. + +#### Write Path (`add_triples`) + +Change `tables/knowledge.py:add_triples()` to include `triple.g`: + +```python +triples = [ + ( + *term_to_tuple(v.s), *term_to_tuple(v.p), *term_to_tuple(v.o), + v.g or "" + ) + for v in m.triples +] +``` + +#### Read Path (`get_triples`) + +Change `tables/knowledge.py:get_triples()` to restore the graph name: + +```python +Triple( + s = tuple_to_term(elt[0], elt[1]), + p = tuple_to_term(elt[2], elt[3]), + o = tuple_to_term(elt[4], elt[5]), + g = elt[6] if len(elt) > 6 and elt[6] else None, +) +``` + +The `len(elt) > 6` guard provides backward compatibility with existing +6-element rows. + +#### Core File Format + +Extend triple dicts in the `"t"` record to include the graph name: + +```python +# In get_kg_core.py write_triple — each triple dict gains "g" key +{"s": ..., "p": ..., "o": ..., "g": "urn:graph:source"} +``` + +On read (`put_kg_core.py`), treat missing `"g"` key as default graph for +backward compatibility with old core files. + +### Change 2: Provenance Triples in Cores + +#### Investigation Required + +Before implementation, verify: + +1. Whether provenance triples arrive on the `triples-input` topic that the + knowledge core store processor already listens on. +2. If not, which topic they use, and whether the store processor should + subscribe to it. + +#### If provenance triples already arrive at the store + +The only change needed is Change 1 (named graphs) — the provenance triples +are already being stored, just without their graph name. Once graph names +are preserved, provenance triples will round-trip correctly. + +#### If provenance triples do NOT arrive at the store + +Two options: + +**Option A — Route provenance to the existing store topic**: Configure the +flow so provenance triples are published to the same `triples-input` topic. +This is the simpler approach and keeps the store processor unchanged. + +**Option B — Add a subscription**: Add a new `ConsumerSpec` in the store +processor for the provenance topic. This keeps provenance routing +independent but adds complexity. + +Recommendation: Option A, unless there is a reason provenance triples are +intentionally kept off the core store topic. + +### Change 3: Source Material in Cores + +This is the largest change. The goal is that when a core is loaded on a +fresh instance, provenance links to source material resolve. + +#### Architecture + +Source material is **not stored in the knowledge core tables**. It lives in +the librarian (Cassandra `library` keyspace + S3/MinIO blob store) and is +fetched on demand via the librarian's existing service API. + +The knowledge manager acts as a **client of the librarian service** — it +calls the librarian's request/response API over pub/sub to retrieve document +metadata and content. It does not access the library's Cassandra tables or +blob store directly. + +#### Transport + +The librarian's pub/sub API already handles chunking of large documents. +This chunking is designed to be websocket-friendly, so library content +flowing through the API gateway to external clients does not require +re-chunking. The API gateway remains a transport layer. + +``` +Download: + Knowledge manager ──pub/sub──► Librarian (fetch metadata + content) + Knowledge manager ──pub/sub──► API gateway ──websocket──► Client + +Upload: + Client ──websocket──► API gateway ──pub/sub──► Knowledge manager + Knowledge manager ──pub/sub──► Librarian (store metadata + content) +``` + +#### What to Include + +The provenance chain links facts → chunks → pages → documents. For the +chain to resolve, the core must include: + +1. **Document metadata** — the library record for each document in the + hierarchy (id, kind, title, parent_id, document_type, etc.) +2. **Document content** — the blob data for each document (original file, + extracted text pages, text chunks) + +Including the full hierarchy is necessary because: +- A user viewing provenance needs to traverse fact → chunk → page → document +- The chunk text is needed to show what text a fact was extracted from +- The page text provides broader context +- The original document is needed for full source attribution + +#### Size Implications + +Source material will significantly increase core file sizes. A rough model: + +| Component | Typical size per document | +|-----------|-------------------------| +| Triples + embeddings (current) | 1-10 MB | +| Chunk text (all chunks) | ~same as original document | +| Page text (all pages) | ~same as original document | +| Original document (PDF, etc.) | Varies widely (KB to hundreds of MB) | + +For a 10 MB PDF, the core could grow from ~5 MB to ~25 MB (original + +derived text + existing data). For large document sets, cores could become +very large. + +**Decision needed**: Whether to include original documents or just derived +text (pages + chunks). Including only derived text still allows provenance +display but loses the ability to serve the original file. + +#### New Core File Record Types + +Add new msgpack record types for library content: + +| Type tag | Payload | Description | +|----------|---------|-------------| +| `"lm"` | `{"id", "kind", "title", "parent_id", "document_type", "comments", "tags", "metadata"}` | Library document metadata | +| `"lb"` | `{"id", "data"}` | Library document blob content (chunked by pub/sub layer) | + +These are emitted after the existing `"t"` and `"ge"` records during +download and processed during upload. + +#### Download Path + +Extend `KnowledgeManager.get_kg_core()` to: + +1. Stream triples and graph embeddings from the core store (existing + behavior). +2. Use the librarian service API to retrieve documents associated with + this core ID: + a. Fetch the root document metadata and content. + b. Use `list-children` to discover child documents (pages, chunks). + c. Recursively fetch metadata and content for each child. +3. Stream each document as `"lm"` (metadata) and `"lb"` (content) records. + +The knowledge manager gains the librarian service as a pub/sub dependency. +Large document content is chunked by the librarian's existing pub/sub +transport — the knowledge manager receives and forwards these chunks without +buffering the full blob in memory. + +#### Upload Path + +Extend `KnowledgeManager.put_kg_core()` to handle the new record types: + +1. For `"lm"` records: call the librarian service API to create/update + the document metadata. +2. For `"lb"` records: call the librarian service API to store the + document content. + +Parent-child relationships are preserved because `parent_id` is stored in +the metadata. Documents should be processed in hierarchy order (parent +before child) to satisfy any ordering constraints. + +#### Load Path + +The load path (`_load_kg_core`) publishes triples and embeddings to Pulsar +topics for ingestion into graph/vector stores. Source material does not need +to flow through the load path — it is already in the librarian after the +upload step and can be accessed directly by services that need it. + +No changes to the load path for source material. + +#### CLI Changes + +**`tg-get-kg-core`**: Add handling for `"lm"` and `"lb"` record types in +the file writer. + +**`tg-put-kg-core`**: Add handling for `"lm"` and `"lb"` record types in +the file reader. Send library records to the knowledge manager alongside +triple/embedding records. + +#### Associating Documents with Cores + +The core ID is `metadata.root`, which is the root document ID from the +librarian. This provides a natural join: the core's root document and all +its children (pages, chunks) are the source material for that core. + +The librarian's `list-children` API provides the child documents. A +recursive traversal from the root document collects the full hierarchy. + +### API Changes + +#### KnowledgeResponse Schema + +Add optional fields to `KnowledgeResponse` for library data: + +```python +@dataclass +class KnowledgeResponse: + error: Error | None = None + ids: list | None = None + eos: bool = False + triples: Triples | None = None + graph_embeddings: GraphEmbeddings | None = None + document_embeddings: DocumentEmbeddings | None = None + library_metadata: LibraryMetadata | None = None # new + library_blob: LibraryBlob | None = None # new +``` + +#### New Schema Types + +```python +@dataclass +class LibraryMetadata: + id: str + kind: str | None = None + title: str | None = None + parent_id: str | None = None + document_type: str | None = None + comments: str | None = None + tags: list[str] | None = None + metadata: list[Triple] | None = None + +@dataclass +class LibraryBlob: + id: str + data: bytes +``` + +#### Socket API + +The existing streaming protocol for `get-kg-core` / `put-kg-core` carries +these new fields naturally — responses already stream multiple record types. + +### Dependencies Between Changes + +``` +Change 1 (named graphs) ◄── Change 2 depends on this + │ + └── Change 2 (provenance triples) + │ + └── Change 3 (source material) is independent +``` + +Change 1 is a prerequisite for Change 2 (provenance triples use named +graphs). Change 3 is independent and can be implemented in parallel. + +## Security Considerations + +- **Workspace isolation**: Core download/upload must respect workspace + boundaries. Source material from the librarian must only be included if + it belongs to the same workspace as the core. This is already enforced + by the existing workspace-scoped queries. +- **Large blob transfer**: Streaming large documents through the API + is handled by the librarian's existing pub/sub chunking, which is + designed to be websocket-friendly. No additional chunking layer is + needed. +- **Cross-instance trust**: When uploading a core from an external source, + the library content should be treated as untrusted input. Document + metadata and blob content should be validated before insertion. + +## Performance Considerations + +- **Core file size**: Including source material will significantly increase + core file sizes. Consider adding a flag to download/upload commands to + optionally exclude source material for use cases where only the knowledge + graph is needed. +- **Streaming**: All paths already use streaming (paged Cassandra queries, + msgpack record-at-a-time). Library content should follow the same pattern. +- **Cassandra schema migration**: Changing the tuple width in the `triples` + table requires careful handling. Cassandra frozen tuples cannot be altered + in place — a migration strategy is needed (see Migration Plan). + +## Testing Strategy + +- **Unit tests**: Triple round-trip with graph name (write → read → + verify `g` field preserved). Backward compatibility with 6-element tuples. +- **Integration tests**: Full lifecycle — extract with provenance → download + core → upload to fresh instance → load → verify provenance chain resolves. +- **File format tests**: Read old-format core files (no graph name, no + library records) and verify they load without error. +- **Library inclusion tests**: Download core with source material → upload → + verify documents accessible through librarian. + +## Migration Plan + +### Cassandra Schema + +The `triples` table stores tuples in a `list>` column. Cassandra +does not support altering the type of an existing column. Options: + +**Option A — New table**: Create a `triples_v2` table with the 7-element +tuple. Migrate data from `triples` to `triples_v2`. The read path checks +both tables during a transition period, then the old table is dropped. + +**Option B — Dual read**: Keep the existing table. The read path handles +both 6-element and 7-element tuples by checking length. New writes use +7-element tuples. This works if Cassandra accepts variable-length tuples in +a list — **needs verification**. + +**Option C — Separate graph column**: Instead of extending the tuple, add a +parallel `graphs list` column where `graphs[i]` corresponds to +`triples[i]`. This avoids tuple migration entirely but requires keeping the +two lists in sync. + +Recommendation: Verify Option B first (simplest). Fall back to Option A if +Cassandra rejects mixed tuple lengths. + +### Core File Format + +Backward compatible by design: +- Old files lack `"g"` in triple dicts and have no `"lm"`/`"lb"` records → + handled by defaults. +- New files read by old code → old code ignores unknown record types (the + existing `read_message` raises on unknown types, so this needs a small + fix to skip unknown types gracefully). + +## Open Questions + +1. **Provenance topic routing**: Do provenance triples currently arrive at + the `triples-input` topic consumed by the knowledge core store? If not, + what topic are they on? + +2. **Include original documents?**: Should cores include the original + uploaded document (e.g. PDF), or only derived text (pages + chunks)? + Including originals makes cores fully self-contained but potentially + very large. Excluding them preserves provenance text display but loses + the ability to serve the original file. + +3. **Optional source material**: Should there be a flag on download/upload + to include or exclude source material? This would let users choose + between compact cores (knowledge only) and complete cores (knowledge + + sources). + +4. **Cassandra tuple migration**: Can Cassandra handle mixed-length tuples + in a `list>` column, or is a table migration required? + +5. **Document embedding cores**: DE cores are managed alongside KG cores. + Do they need the same treatment (source material inclusion)? The + document embeddings reference chunk IDs — the same provenance chain + applies. + +6. **Core versioning**: Should the core file include a version marker so + readers can distinguish old-format from new-format files without + trial-and-error parsing? + +## References + +- Extraction-time provenance: `docs/tech-specs/extraction-time-provenance.md` +- Query-time explainability: `docs/tech-specs/query-time-explainability.md` +- Agent explainability: `docs/tech-specs/agent-explainability.md` +- Data ownership model: `docs/tech-specs/data-ownership-model.md` diff --git a/tests/unit/test_cores/test_knowledge_manager.py b/tests/unit/test_cores/test_knowledge_manager.py index 8f73dcc6..7797c9be 100644 --- a/tests/unit/test_cores/test_knowledge_manager.py +++ b/tests/unit/test_cores/test_knowledge_manager.py @@ -11,7 +11,12 @@ from unittest.mock import AsyncMock, Mock, patch, MagicMock from unittest.mock import call from trustgraph.cores.knowledge import KnowledgeManager -from trustgraph.schema import KnowledgeResponse, Triples, GraphEmbeddings, Metadata, Triple, Term, EntityEmbeddings, IRI, LITERAL +from trustgraph.schema import ( + KnowledgeResponse, Triples, GraphEmbeddings, Metadata, Triple, Term, + EntityEmbeddings, IRI, LITERAL, + LibraryMetadata, LibraryBlob, + LibrarianResponse, DocumentMetadata, +) @pytest.fixture @@ -373,11 +378,252 @@ class TestKnowledgeManagerOtherMethods: mock_respond = AsyncMock() await knowledge_manager.delete_kg_core(mock_request, mock_respond, "test-user") - + # Verify table store was called correctly knowledge_manager.table_store.delete_kg_core.assert_called_once_with("test-user", "test-doc-id") - + # Verify response mock_respond.assert_called_once() response = mock_respond.call_args[0][0] - assert response.error is None \ No newline at end of file + assert response.error is None + + +class TestKnowledgeManagerLibraryDownload: + """Test get_kg_core streaming of library documents.""" + + @pytest.fixture + def manager_with_librarian(self, mock_flow_config): + with patch('trustgraph.cores.knowledge.KnowledgeTableStore'): + mock_librarian = AsyncMock() + manager = KnowledgeManager( + cassandra_host=["localhost"], + cassandra_username="test_user", + cassandra_password="test_pass", + keyspace="test_keyspace", + flow_config=mock_flow_config, + librarian=mock_librarian, + ) + manager.table_store = AsyncMock() + return manager + + @pytest.mark.asyncio + async def test_get_kg_core_streams_library_docs(self, manager_with_librarian): + mock_request = Mock() + mock_request.id = "root-doc" + mock_respond = AsyncMock() + + manager_with_librarian.table_store.get_triples = AsyncMock() + manager_with_librarian.table_store.get_graph_embeddings = AsyncMock() + + root_meta = DocumentMetadata( + id="root-doc", kind="application/pdf", title="Test PDF", + document_type="source", + ) + child_meta = DocumentMetadata( + id="chunk-1", kind="text/plain", title="Chunk 1", + parent_id="root-doc", document_type="chunk", + ) + + manager_with_librarian.librarian.fetch_document_metadata.return_value = root_meta + manager_with_librarian.librarian.request.return_value = LibrarianResponse( + document_metadatas=[child_meta], + ) + manager_with_librarian.librarian.fetch_document_content.side_effect = [ + b"cm9vdCBjb250ZW50", + b"Y2h1bmsgY29udGVudA==", + ] + + await manager_with_librarian.get_kg_core( + mock_request, mock_respond, "test-user" + ) + + responses = [c[0][0] for c in mock_respond.call_args_list] + + lm_responses = [r for r in responses if r.library_metadata is not None] + lb_responses = [r for r in responses if r.library_blob is not None] + eos_responses = [r for r in responses if r.eos is True] + + assert len(lm_responses) == 2 + assert lm_responses[0].library_metadata.id == "root-doc" + assert lm_responses[0].library_metadata.document_type == "source" + assert lm_responses[1].library_metadata.id == "chunk-1" + assert lm_responses[1].library_metadata.parent_id == "root-doc" + + assert len(lb_responses) == 2 + assert lb_responses[0].library_blob.id == "root-doc" + assert lb_responses[0].library_blob.data == b"cm9vdCBjb250ZW50" + assert lb_responses[1].library_blob.id == "chunk-1" + + assert len(eos_responses) == 1 + + @pytest.mark.asyncio + async def test_get_kg_core_no_librarian_skips_library(self, mock_flow_config): + with patch('trustgraph.cores.knowledge.KnowledgeTableStore'): + manager = KnowledgeManager( + cassandra_host=["localhost"], + cassandra_username="u", cassandra_password="p", + keyspace="ks", flow_config=mock_flow_config, + ) + manager.table_store = AsyncMock() + manager.table_store.get_triples = AsyncMock() + manager.table_store.get_graph_embeddings = AsyncMock() + + mock_request = Mock() + mock_request.id = "doc-1" + mock_respond = AsyncMock() + + await manager.get_kg_core(mock_request, mock_respond, "w") + + responses = [c[0][0] for c in mock_respond.call_args_list] + assert all(r.library_metadata is None for r in responses) + assert all(r.library_blob is None for r in responses) + + @pytest.mark.asyncio + async def test_get_kg_core_librarian_metadata_failure_is_graceful( + self, manager_with_librarian, + ): + mock_request = Mock() + mock_request.id = "missing-doc" + mock_respond = AsyncMock() + + manager_with_librarian.table_store.get_triples = AsyncMock() + manager_with_librarian.table_store.get_graph_embeddings = AsyncMock() + manager_with_librarian.librarian.fetch_document_metadata.side_effect = ( + RuntimeError("not found") + ) + + await manager_with_librarian.get_kg_core( + mock_request, mock_respond, "test-user" + ) + + responses = [c[0][0] for c in mock_respond.call_args_list] + assert all(r.library_metadata is None for r in responses) + assert any(r.eos for r in responses) + + +class TestKnowledgeManagerLibraryUpload: + """Test put_kg_core handling of library metadata and blob records.""" + + @pytest.fixture + def manager_with_librarian(self, mock_flow_config): + with patch('trustgraph.cores.knowledge.KnowledgeTableStore'): + mock_librarian = AsyncMock() + manager = KnowledgeManager( + cassandra_host=["localhost"], + cassandra_username="u", cassandra_password="p", + keyspace="ks", flow_config=mock_flow_config, + librarian=mock_librarian, + ) + manager.table_store = AsyncMock() + return manager + + @pytest.mark.asyncio + async def test_put_metadata_then_blob_calls_librarian( + self, manager_with_librarian, + ): + mock_respond = AsyncMock() + manager_with_librarian.librarian.request.return_value = LibrarianResponse() + + # First call: metadata + req_meta = Mock() + req_meta.triples = None + req_meta.graph_embeddings = None + req_meta.library_metadata = LibraryMetadata( + id="doc-1", kind="application/pdf", title="Test", + document_type="source", + ) + req_meta.library_blob = None + await manager_with_librarian.put_kg_core(req_meta, mock_respond, "ws") + + # Metadata is buffered, librarian not called yet + manager_with_librarian.librarian.request.assert_not_called() + + # Second call: blob + req_blob = Mock() + req_blob.triples = None + req_blob.graph_embeddings = None + req_blob.library_metadata = None + req_blob.library_blob = LibraryBlob( + id="doc-1", data=b"dGVzdA==", + ) + await manager_with_librarian.put_kg_core(req_blob, mock_respond, "ws") + + # Now librarian should have been called with add-document + manager_with_librarian.librarian.request.assert_called_once() + call_args = manager_with_librarian.librarian.request.call_args[0][0] + assert call_args.operation == "add-document" + assert call_args.document_metadata.id == "doc-1" + assert call_args.document_metadata.kind == "application/pdf" + assert call_args.content == b"dGVzdA==" + + @pytest.mark.asyncio + async def test_put_child_document_uses_add_child_operation( + self, manager_with_librarian, + ): + mock_respond = AsyncMock() + manager_with_librarian.librarian.request.return_value = LibrarianResponse() + + req_meta = Mock() + req_meta.triples = None + req_meta.graph_embeddings = None + req_meta.library_metadata = LibraryMetadata( + id="chunk-1", kind="text/plain", title="Chunk", + parent_id="doc-1", document_type="chunk", + ) + req_meta.library_blob = None + await manager_with_librarian.put_kg_core(req_meta, mock_respond, "ws") + + req_blob = Mock() + req_blob.triples = None + req_blob.graph_embeddings = None + req_blob.library_metadata = None + req_blob.library_blob = LibraryBlob(id="chunk-1", data=b"Y2h1bms=") + await manager_with_librarian.put_kg_core(req_blob, mock_respond, "ws") + + call_args = manager_with_librarian.librarian.request.call_args[0][0] + assert call_args.operation == "add-child-document" + assert call_args.document_metadata.parent_id == "doc-1" + + @pytest.mark.asyncio + async def test_put_blob_without_metadata_logs_warning( + self, manager_with_librarian, + ): + mock_respond = AsyncMock() + + req_blob = Mock() + req_blob.triples = None + req_blob.graph_embeddings = None + req_blob.library_metadata = None + req_blob.library_blob = LibraryBlob(id="orphan", data=b"data") + await manager_with_librarian.put_kg_core(req_blob, mock_respond, "ws") + + # Librarian should not be called for orphan blob + manager_with_librarian.librarian.request.assert_not_called() + + @pytest.mark.asyncio + async def test_put_existing_document_is_graceful( + self, manager_with_librarian, + ): + mock_respond = AsyncMock() + manager_with_librarian.librarian.request.side_effect = RuntimeError( + "Document already exists" + ) + + req_meta = Mock() + req_meta.triples = None + req_meta.graph_embeddings = None + req_meta.library_metadata = LibraryMetadata( + id="doc-1", kind="application/pdf", title="Test", + document_type="source", + ) + req_meta.library_blob = None + await manager_with_librarian.put_kg_core(req_meta, mock_respond, "ws") + + req_blob = Mock() + req_blob.triples = None + req_blob.graph_embeddings = None + req_blob.library_metadata = None + req_blob.library_blob = LibraryBlob(id="doc-1", data=b"data") + await manager_with_librarian.put_kg_core(req_blob, mock_respond, "ws") + + # Should not raise — "already exists" is handled gracefully \ No newline at end of file diff --git a/tests/unit/test_tables/test_knowledge_table_store.py b/tests/unit/test_tables/test_knowledge_table_store.py index 9a0b55c4..2d058733 100644 --- a/tests/unit/test_tables/test_knowledge_table_store.py +++ b/tests/unit/test_tables/test_knowledge_table_store.py @@ -155,7 +155,7 @@ class TestGetTriples: @pytest.mark.asyncio @patch('trustgraph.tables.knowledge.async_execute_paged', new_callable=AsyncMock) async def test_row_converts_to_triples(self, mock_async_execute_paged): - # row[3] is a list of (s_val, s_uri, p_val, p_uri, o_val, o_uri) + # row[3] is a list of (s_val, s_uri, p_val, p_uri, o_val, o_uri, graph) fake_row = ( None, None, None, [ @@ -163,6 +163,7 @@ class TestGetTriples: "http://example.org/alice", True, "http://example.org/knows", True, "http://example.org/bob", True, + "urn:graph:source", ), ], ) @@ -191,3 +192,33 @@ class TestGetTriples: assert t.s.iri == "http://example.org/alice" assert t.p.iri == "http://example.org/knows" assert t.o.iri == "http://example.org/bob" + assert t.g == "urn:graph:source" + + @pytest.mark.asyncio + @patch('trustgraph.tables.knowledge.async_execute_paged', new_callable=AsyncMock) + async def test_empty_graph_name_becomes_none(self, mock_async_execute_paged): + fake_row = ( + None, None, None, + [ + ( + "http://example.org/alice", True, + "http://example.org/knows", True, + "http://example.org/bob", True, + "", + ), + ], + ) + + store = _make_store() + store.cassandra = Mock() + store.get_triples_stmt = Mock() + mock_async_execute_paged.return_value = [[fake_row]] + + received = [] + + async def receiver(msg): + received.append(msg) + + await store.get_triples("w", "d", receiver) + + assert received[0].triples[0].g is None diff --git a/tests/unit/test_translators/test_knowledge_translator_roundtrip.py b/tests/unit/test_translators/test_knowledge_translator_roundtrip.py index 437b83c8..af128f23 100644 --- a/tests/unit/test_translators/test_knowledge_translator_roundtrip.py +++ b/tests/unit/test_translators/test_knowledge_translator_roundtrip.py @@ -1,5 +1,6 @@ """ -Round-trip unit tests for KnowledgeRequestTranslator. +Round-trip unit tests for KnowledgeRequestTranslator and +KnowledgeResponseTranslator. Regression coverage: a previous version of the decode side constructed EntityEmbeddings(vectors=...) — the schema field is `vector` (singular), @@ -15,9 +16,13 @@ Triples breaks the test. import pytest -from trustgraph.messaging.translators.knowledge import KnowledgeRequestTranslator +from trustgraph.messaging.translators.knowledge import ( + KnowledgeRequestTranslator, + KnowledgeResponseTranslator, +) from trustgraph.schema import ( KnowledgeRequest, + KnowledgeResponse, GraphEmbeddings, EntityEmbeddings, Triples, @@ -25,6 +30,8 @@ from trustgraph.schema import ( Metadata, Term, IRI, + LibraryMetadata, + LibraryBlob, ) @@ -145,3 +152,161 @@ class TestKnowledgeRequestTranslatorTriples: assert t.s.iri == "http://example.org/alice" assert t.p.iri == "http://example.org/knows" assert t.o.iri == "http://example.org/bob" + + +class TestKnowledgeRequestTranslatorLibrary: + + def test_roundtrip_preserves_library_metadata(self, translator): + request = KnowledgeRequest( + operation="put-kg-core", + id="doc-1", + library_metadata=LibraryMetadata( + id="doc-1", + kind="application/pdf", + title="Test Document", + parent_id="", + document_type="source", + comments="test comments", + tags=["tag1", "tag2"], + ), + ) + + encoded = translator.encode(request) + assert "library-metadata" in encoded + lm = encoded["library-metadata"] + assert lm["id"] == "doc-1" + assert lm["kind"] == "application/pdf" + assert lm["title"] == "Test Document" + assert lm["parent-id"] == "" + assert lm["document-type"] == "source" + assert lm["comments"] == "test comments" + assert lm["tags"] == ["tag1", "tag2"] + + decoded = translator.decode(encoded) + assert decoded.library_metadata is not None + assert decoded.library_metadata.id == "doc-1" + assert decoded.library_metadata.kind == "application/pdf" + assert decoded.library_metadata.title == "Test Document" + assert decoded.library_metadata.parent_id == "" + assert decoded.library_metadata.document_type == "source" + assert decoded.library_metadata.comments == "test comments" + assert decoded.library_metadata.tags == ["tag1", "tag2"] + + def test_roundtrip_preserves_child_document_metadata(self, translator): + request = KnowledgeRequest( + operation="put-kg-core", + id="doc-1", + library_metadata=LibraryMetadata( + id="chunk-1", + kind="text/plain", + title="Chunk 1", + parent_id="doc-1", + document_type="chunk", + ), + ) + + encoded = translator.encode(request) + decoded = translator.decode(encoded) + + assert decoded.library_metadata.parent_id == "doc-1" + assert decoded.library_metadata.document_type == "chunk" + + def test_roundtrip_preserves_library_blob(self, translator): + request = KnowledgeRequest( + operation="put-kg-core", + id="doc-1", + library_blob=LibraryBlob( + id="doc-1", + data=b"SGVsbG8gV29ybGQ=", + ), + ) + + encoded = translator.encode(request) + assert "library-blob" in encoded + assert encoded["library-blob"]["id"] == "doc-1" + assert encoded["library-blob"]["data"] == "SGVsbG8gV29ybGQ=" + + decoded = translator.decode(encoded) + assert decoded.library_blob is not None + assert decoded.library_blob.id == "doc-1" + assert decoded.library_blob.data == "SGVsbG8gV29ybGQ=" + + def test_absent_library_fields_decode_as_none(self, translator): + decoded = translator.decode({ + "operation": "get-kg-core", + "id": "doc-1", + }) + assert decoded.library_metadata is None + assert decoded.library_blob is None + + +class TestKnowledgeResponseTranslatorLibrary: + + @pytest.fixture + def response_translator(self): + return KnowledgeResponseTranslator() + + def test_encode_library_metadata(self, response_translator): + response = KnowledgeResponse( + ids=None, + library_metadata=LibraryMetadata( + id="doc-1", + kind="application/pdf", + title="Test", + parent_id="", + document_type="source", + comments="", + tags=[], + ), + ) + encoded = response_translator.encode(response) + assert "library-metadata" in encoded + assert encoded["library-metadata"]["id"] == "doc-1" + assert encoded["library-metadata"]["kind"] == "application/pdf" + assert encoded["library-metadata"]["document-type"] == "source" + + def test_encode_library_blob_bytes_to_string(self, response_translator): + response = KnowledgeResponse( + ids=None, + library_blob=LibraryBlob( + id="doc-1", + data=b"dGVzdCBkYXRh", + ), + ) + encoded = response_translator.encode(response) + assert "library-blob" in encoded + assert encoded["library-blob"]["id"] == "doc-1" + assert encoded["library-blob"]["data"] == "dGVzdCBkYXRh" + assert isinstance(encoded["library-blob"]["data"], str) + + def test_encode_library_blob_string_passthrough(self, response_translator): + response = KnowledgeResponse( + ids=None, + library_blob=LibraryBlob( + id="doc-1", + data="already-a-string", + ), + ) + encoded = response_translator.encode(response) + assert encoded["library-blob"]["data"] == "already-a-string" + + def test_library_metadata_is_not_final(self, response_translator): + response = KnowledgeResponse( + ids=None, + library_metadata=LibraryMetadata(id="doc-1"), + ) + _, is_final = response_translator.encode_with_completion(response) + assert is_final is False + + def test_library_blob_is_not_final(self, response_translator): + response = KnowledgeResponse( + ids=None, + library_blob=LibraryBlob(id="doc-1", data=b"data"), + ) + _, is_final = response_translator.encode_with_completion(response) + assert is_final is False + + def test_eos_is_final(self, response_translator): + response = KnowledgeResponse(eos=True) + _, is_final = response_translator.encode_with_completion(response) + assert is_final is True diff --git a/trustgraph-base/trustgraph/api/socket_client.py b/trustgraph-base/trustgraph/api/socket_client.py index b88d0c78..91bc67a1 100644 --- a/trustgraph-base/trustgraph/api/socket_client.py +++ b/trustgraph-base/trustgraph/api/socket_client.py @@ -502,6 +502,7 @@ class SocketClient: def put_kg_core( self, id: str, triples=None, graph_embeddings=None, + library_metadata=None, library_blob=None, ) -> Dict[str, Any]: request = { "operation": "put-kg-core", @@ -512,6 +513,10 @@ class SocketClient: request["triples"] = triples if graph_embeddings is not None: request["graph-embeddings"] = graph_embeddings + if library_metadata is not None: + request["library-metadata"] = library_metadata + if library_blob is not None: + request["library-blob"] = library_blob return self._send_request_sync("knowledge", None, request) def get_de_core(self, id: str) -> Iterator[Dict[str, Any]]: diff --git a/trustgraph-base/trustgraph/messaging/translators/knowledge.py b/trustgraph-base/trustgraph/messaging/translators/knowledge.py index 3830bf59..3f09b41b 100644 --- a/trustgraph-base/trustgraph/messaging/translators/knowledge.py +++ b/trustgraph-base/trustgraph/messaging/translators/knowledge.py @@ -2,7 +2,8 @@ from typing import Dict, Any, Tuple, Optional from ...schema import ( KnowledgeRequest, KnowledgeResponse, Triples, GraphEmbeddings, DocumentEmbeddings, ChunkEmbeddings, - Metadata, EntityEmbeddings + Metadata, EntityEmbeddings, + LibraryMetadata, LibraryBlob, ) from .base import MessageTranslator from .primitives import ValueTranslator, SubgraphTranslator @@ -61,6 +62,27 @@ class KnowledgeRequestTranslator(MessageTranslator): ] ) + library_metadata = None + if "library-metadata" in data: + lm = data["library-metadata"] + library_metadata = LibraryMetadata( + id=lm.get("id", ""), + kind=lm.get("kind", ""), + title=lm.get("title", ""), + parent_id=lm.get("parent-id", ""), + document_type=lm.get("document-type", ""), + comments=lm.get("comments", ""), + tags=lm.get("tags", []), + ) + + library_blob = None + if "library-blob" in data: + lb = data["library-blob"] + library_blob = LibraryBlob( + id=lb.get("id", ""), + data=lb.get("data", b""), + ) + return KnowledgeRequest( operation=data.get("operation"), id=data.get("id"), @@ -69,6 +91,8 @@ class KnowledgeRequestTranslator(MessageTranslator): triples=triples, graph_embeddings=graph_embeddings, document_embeddings=document_embeddings, + library_metadata=library_metadata, + library_blob=library_blob, ) def encode(self, obj: KnowledgeRequest) -> Dict[str, Any]: @@ -125,6 +149,26 @@ class KnowledgeRequestTranslator(MessageTranslator): ], } + if obj.library_metadata: + result["library-metadata"] = { + "id": obj.library_metadata.id, + "kind": obj.library_metadata.kind, + "title": obj.library_metadata.title, + "parent-id": obj.library_metadata.parent_id, + "document-type": obj.library_metadata.document_type, + "comments": obj.library_metadata.comments, + "tags": obj.library_metadata.tags, + } + + if obj.library_blob: + data = obj.library_blob.data + if isinstance(data, bytes): + data = data.decode("utf-8") + result["library-blob"] = { + "id": obj.library_blob.id, + "data": data, + } + return result @@ -194,6 +238,32 @@ class KnowledgeResponseTranslator(MessageTranslator): } } + # Streaming library metadata response + if obj.library_metadata: + return { + "library-metadata": { + "id": obj.library_metadata.id, + "kind": obj.library_metadata.kind, + "title": obj.library_metadata.title, + "parent-id": obj.library_metadata.parent_id, + "document-type": obj.library_metadata.document_type, + "comments": obj.library_metadata.comments, + "tags": obj.library_metadata.tags, + } + } + + # Streaming library blob response + if obj.library_blob: + data = obj.library_blob.data + if isinstance(data, bytes): + data = data.decode("utf-8") + return { + "library-blob": { + "id": obj.library_blob.id, + "data": data, + } + } + # End of stream marker if obj.eos is True: return {"eos": True} @@ -209,7 +279,9 @@ class KnowledgeResponseTranslator(MessageTranslator): is_final = ( obj.ids is not None or # List response obj.eos is True or # End of stream - (not obj.triples and not obj.graph_embeddings and not obj.document_embeddings) # Empty response + (not obj.triples and not obj.graph_embeddings + and not obj.document_embeddings + and not obj.library_metadata and not obj.library_blob) # Empty response ) return response, is_final \ No newline at end of file diff --git a/trustgraph-base/trustgraph/schema/knowledge/knowledge.py b/trustgraph-base/trustgraph/schema/knowledge/knowledge.py index a3879103..4353065b 100644 --- a/trustgraph-base/trustgraph/schema/knowledge/knowledge.py +++ b/trustgraph-base/trustgraph/schema/knowledge/knowledge.py @@ -21,6 +21,21 @@ from .embeddings import GraphEmbeddings, DocumentEmbeddings # <- () # <- (error) +@dataclass +class LibraryMetadata: + id: str = "" + kind: str = "" + title: str = "" + parent_id: str = "" + document_type: str = "" + comments: str = "" + tags: list[str] = field(default_factory=list) + +@dataclass +class LibraryBlob: + id: str = "" + data: bytes = b"" + @dataclass class KnowledgeRequest: # get-kg-core, delete-kg-core, list-kg-cores, put-kg-core @@ -44,6 +59,10 @@ class KnowledgeRequest: # put-de-core document_embeddings: DocumentEmbeddings | None = None + # put-kg-core (source material) + library_metadata: LibraryMetadata | None = None + library_blob: LibraryBlob | None = None + @dataclass class KnowledgeResponse: error: Error | None = None @@ -52,6 +71,8 @@ class KnowledgeResponse: triples: Triples | None = None graph_embeddings: GraphEmbeddings | None = None document_embeddings: DocumentEmbeddings | None = None + library_metadata: LibraryMetadata | None = None + library_blob: LibraryBlob | None = None knowledge_request_queue = queue('knowledge', cls='request') knowledge_response_queue = queue('knowledge', cls='response') diff --git a/trustgraph-cli/trustgraph/cli/get_kg_core.py b/trustgraph-cli/trustgraph/cli/get_kg_core.py index b4f37b81..2ff1a3cc 100644 --- a/trustgraph-cli/trustgraph/cli/get_kg_core.py +++ b/trustgraph-cli/trustgraph/cli/get_kg_core.py @@ -47,6 +47,31 @@ def write_ge(f, data): ) f.write(msgpack.packb(msg, use_bin_type=True)) +def write_library_metadata(f, data): + msg = ( + "lm", + { + "i": data["id"], + "k": data.get("kind", ""), + "t": data.get("title", ""), + "p": data.get("parent-id", ""), + "d": data.get("document-type", ""), + "c": data.get("comments", ""), + "g": data.get("tags", []), + } + ) + f.write(msgpack.packb(msg, use_bin_type=True)) + +def write_library_blob(f, data): + msg = ( + "lb", + { + "i": data["id"], + "d": data.get("data", b""), + } + ) + f.write(msgpack.packb(msg, use_bin_type=True)) + def fetch(url, workspace, id, output, token=None): api = Api(url=url, token=token, workspace=workspace) @@ -55,6 +80,8 @@ def fetch(url, workspace, id, output, token=None): try: ge = 0 t = 0 + lm = 0 + lb = 0 with open(output, "wb") as f: @@ -68,7 +95,15 @@ def fetch(url, workspace, id, output, token=None): ge += 1 write_ge(f, response["graph-embeddings"]) - print(f"Got: {t} triple, {ge} GE messages.") + if "library-metadata" in response: + lm += 1 + write_library_metadata(f, response["library-metadata"]) + + if "library-blob" in response: + lb += 1 + write_library_blob(f, response["library-blob"]) + + print(f"Got: {t} triple, {ge} GE, {lm} library metadata, {lb} library blob messages.") finally: socket.close() diff --git a/trustgraph-cli/trustgraph/cli/put_kg_core.py b/trustgraph-cli/trustgraph/cli/put_kg_core.py index fe0981a5..f4e0b3dd 100644 --- a/trustgraph-cli/trustgraph/cli/put_kg_core.py +++ b/trustgraph-cli/trustgraph/cli/put_kg_core.py @@ -40,6 +40,23 @@ def read_message(unpacked, id): }, "triples": msg["t"], } + elif unpacked[0] == "lm": + msg = unpacked[1] + return "lm", { + "id": msg["i"], + "kind": msg.get("k", ""), + "title": msg.get("t", ""), + "parent-id": msg.get("p", ""), + "document-type": msg.get("d", ""), + "comments": msg.get("c", ""), + "tags": msg.get("g", []), + } + elif unpacked[0] == "lb": + msg = unpacked[1] + return "lb", { + "id": msg["i"], + "data": msg.get("d", b""), + } else: raise RuntimeError("Unpacked unexpected messsage type", unpacked[0]) @@ -51,6 +68,8 @@ def put(url, workspace, id, input, token=None): try: ge = 0 t = 0 + lm = 0 + lb = 0 with open(input, "rb") as f: @@ -73,10 +92,18 @@ def put(url, workspace, id, input, token=None): t += 1 socket.put_kg_core(id, triples=msg) + elif kind == "lm": + lm += 1 + socket.put_kg_core(id, library_metadata=msg) + + elif kind == "lb": + lb += 1 + socket.put_kg_core(id, library_blob=msg) + else: raise RuntimeError("Unexpected message kind", kind) - print(f"Put: {t} triple, {ge} GE messages.") + print(f"Put: {t} triple, {ge} GE, {lm} library metadata, {lb} library blob messages.") finally: socket.close() diff --git a/trustgraph-flow/trustgraph/cores/knowledge.py b/trustgraph-flow/trustgraph/cores/knowledge.py index f1fa53f5..6f017c43 100644 --- a/trustgraph-flow/trustgraph/cores/knowledge.py +++ b/trustgraph-flow/trustgraph/cores/knowledge.py @@ -1,6 +1,7 @@ from .. schema import KnowledgeResponse, Error, Triples, GraphEmbeddings -from .. schema import DocumentEmbeddings +from .. schema import DocumentEmbeddings, LibraryMetadata, LibraryBlob +from .. schema import LibrarianRequest, DocumentMetadata from .. knowledge import hash from .. exceptions import RequestError from .. tables.knowledge import KnowledgeTableStore @@ -18,7 +19,7 @@ class KnowledgeManager: def __init__( self, cassandra_host, cassandra_username, cassandra_password, - keyspace, flow_config, replication_factor=1, + keyspace, flow_config, librarian=None, replication_factor=1, ): self.table_store = KnowledgeTableStore( @@ -26,6 +27,9 @@ class KnowledgeManager: replication_factor ) + self.librarian = librarian + self._pending_library_metadata = {} + self.loader_queue = asyncio.Queue(maxsize=20) self.background_task = None self.flow_config = flow_config @@ -86,6 +90,9 @@ class KnowledgeManager: publish_ge, ) + if self.librarian: + await self._stream_library_docs(request.id, respond) + logger.debug("Knowledge core retrieval complete") await respond( @@ -122,6 +129,12 @@ class KnowledgeManager: workspace, request.graph_embeddings ) + if request.library_metadata and self.librarian: + await self._put_library_metadata(request.library_metadata, workspace) + + if request.library_blob and self.librarian: + await self._put_library_blob(request.library_blob, workspace) + await respond( KnowledgeResponse( error = None, @@ -250,6 +263,112 @@ class KnowledgeManager: await self.loader_queue.put((request, respond, workspace)) + async def _stream_library_docs(self, document_id, respond): + + try: + root_meta = await self.librarian.fetch_document_metadata( + document_id + ) + except Exception as e: + logger.warning(f"Could not fetch library metadata for {document_id}: {e}") + return + + if root_meta is None: + return + + await self._stream_one_doc(root_meta, respond) + + try: + resp = await self.librarian.request( + LibrarianRequest( + operation="list-children", + document_id=document_id, + ) + ) + except Exception as e: + logger.warning(f"Could not list children for {document_id}: {e}") + return + + for child_meta in resp.document_metadatas: + await self._stream_one_doc(child_meta, respond) + + async def _stream_one_doc(self, doc_meta, respond): + + lm = LibraryMetadata( + id=doc_meta.id, + kind=doc_meta.kind, + title=doc_meta.title, + parent_id=doc_meta.parent_id, + document_type=doc_meta.document_type, + comments=doc_meta.comments, + tags=doc_meta.tags or [], + ) + + await respond( + KnowledgeResponse(library_metadata=lm) + ) + + try: + content = await self.librarian.fetch_document_content( + doc_meta.id + ) + except Exception as e: + logger.warning(f"Could not fetch content for {doc_meta.id}: {e}") + return + + await respond( + KnowledgeResponse( + library_blob=LibraryBlob( + id=doc_meta.id, + data=content, + ) + ) + ) + + async def _put_library_metadata(self, lm, workspace): + self._pending_library_metadata[lm.id] = lm + + async def _put_library_blob(self, lb, workspace): + + lm = self._pending_library_metadata.pop(lb.id, None) + if lm is None: + logger.warning( + f"Received library blob for {lb.id} with no preceding metadata" + ) + return + + doc_meta = DocumentMetadata( + id=lm.id, + kind=lm.kind, + title=lm.title, + parent_id=lm.parent_id, + document_type=lm.document_type, + comments=lm.comments, + tags=lm.tags or [], + ) + + if lm.parent_id: + operation = "add-child-document" + else: + operation = "add-document" + + try: + await self.librarian.request( + LibrarianRequest( + operation=operation, + document_id=lm.id, + document_metadata=doc_meta, + content=lb.data, + ) + ) + except RuntimeError as e: + if "already exists" in str(e): + logger.debug(f"Library document {lm.id} already exists, skipping") + else: + logger.warning(f"Could not save library document {lm.id}: {e}") + except Exception as e: + logger.warning(f"Could not save library document {lm.id}: {e}") + async def core_loader(self): logger.info("Knowledge background processor running...") diff --git a/trustgraph-flow/trustgraph/cores/service.py b/trustgraph-flow/trustgraph/cores/service.py index a04e42ca..a8f52efd 100755 --- a/trustgraph-flow/trustgraph/cores/service.py +++ b/trustgraph-flow/trustgraph/cores/service.py @@ -12,6 +12,7 @@ import logging from .. base import WorkspaceProcessor, Consumer, Producer, Publisher, Subscriber from .. base import ConsumerMetrics, ProducerMetrics from .. base.cassandra_config import add_cassandra_args, resolve_cassandra_config +from .. base import LibrarianClient from .. schema import KnowledgeRequest, KnowledgeResponse, Error from .. schema import knowledge_request_queue, knowledge_response_queue @@ -77,12 +78,17 @@ class Processor(WorkspaceProcessor): } ) + self.librarian_client = LibrarianClient( + id=id, backend=self.pubsub, taskgroup=self.taskgroup, + ) + self.knowledge = KnowledgeManager( cassandra_host = self.cassandra_host, cassandra_username = self.cassandra_username, cassandra_password = self.cassandra_password, keyspace = keyspace, flow_config = self, + librarian = self.librarian_client, replication_factor = replication_factor, ) @@ -156,6 +162,7 @@ class Processor(WorkspaceProcessor): async def start(self): await super(Processor, self).start() + await self.librarian_client.start() async def on_knowledge_config(self, workspace, config, version): diff --git a/trustgraph-flow/trustgraph/gateway/dispatch/core_export.py b/trustgraph-flow/trustgraph/gateway/dispatch/core_export.py index 6696afbe..90080cc4 100644 --- a/trustgraph-flow/trustgraph/gateway/dispatch/core_export.py +++ b/trustgraph-flow/trustgraph/gateway/dispatch/core_export.py @@ -73,6 +73,39 @@ class CoreExport: enc = msgpack.packb(msg) await response.write(enc) + if "library-metadata" in resp: + + data = resp["library-metadata"] + msg = ( + "lm", + { + "i": data["id"], + "k": data.get("kind", ""), + "t": data.get("title", ""), + "p": data.get("parent-id", ""), + "d": data.get("document-type", ""), + "c": data.get("comments", ""), + "g": data.get("tags", []), + } + ) + + enc = msgpack.packb(msg) + await response.write(enc) + + if "library-blob" in resp: + + data = resp["library-blob"] + msg = ( + "lb", + { + "i": data["id"], + "d": data.get("data", b""), + } + ) + + enc = msgpack.packb(msg, use_bin_type=True) + await response.write(enc) + await kr.process( { "operation": "get-kg-core", diff --git a/trustgraph-flow/trustgraph/gateway/dispatch/core_import.py b/trustgraph-flow/trustgraph/gateway/dispatch/core_import.py index d03d4efd..bf660def 100644 --- a/trustgraph-flow/trustgraph/gateway/dispatch/core_import.py +++ b/trustgraph-flow/trustgraph/gateway/dispatch/core_import.py @@ -79,6 +79,39 @@ class CoreImport: await kr.process(msg) + elif unpacked[0] == "lm": + msg = unpacked[1] + msg = { + "operation": "put-kg-core", + "workspace": workspace, + "id": id, + "library-metadata": { + "id": msg["i"], + "kind": msg.get("k", ""), + "title": msg.get("t", ""), + "parent-id": msg.get("p", ""), + "document-type": msg.get("d", ""), + "comments": msg.get("c", ""), + "tags": msg.get("g", []), + } + } + + await kr.process(msg) + + elif unpacked[0] == "lb": + msg = unpacked[1] + msg = { + "operation": "put-kg-core", + "workspace": workspace, + "id": id, + "library-blob": { + "id": msg["i"], + "data": msg.get("d", b""), + } + } + + await kr.process(msg) + except Exception as e: logger.error(f"Core import exception: {e}", exc_info=True) await error(str(e)) diff --git a/trustgraph-flow/trustgraph/tables/knowledge.py b/trustgraph-flow/trustgraph/tables/knowledge.py index 6a23731b..4fcb2dd3 100644 --- a/trustgraph-flow/trustgraph/tables/knowledge.py +++ b/trustgraph-flow/trustgraph/tables/knowledge.py @@ -98,7 +98,8 @@ class KnowledgeTableStore: text, boolean, text, boolean, text, boolean >>, triples list>, PRIMARY KEY ((workspace, document_id), id) ); @@ -234,7 +235,8 @@ class KnowledgeTableStore: triples = [ ( - *term_to_tuple(v.s), *term_to_tuple(v.p), *term_to_tuple(v.o) + *term_to_tuple(v.s), *term_to_tuple(v.p), *term_to_tuple(v.o), + v.g or "" ) for v in m.triples ] @@ -416,6 +418,7 @@ class KnowledgeTableStore: s = tuple_to_term(elt[0], elt[1]), p = tuple_to_term(elt[2], elt[3]), o = tuple_to_term(elt[4], elt[5]), + g = elt[6] if elt[6] else None, ) for elt in row[3] ]