feat: complete knowledge core storage — named graphs, provenance, source material (#973)

Implements all three changes from the knowledge-core-completeness tech spec:

1. Named graph field preserved through Cassandra storage (7-element tuple),
   enabling provenance triples to retain their graph URIs on round-trip.

2. Provenance triples already arrive on triples-input — no routing change
   needed; Change 1 was sufficient.

3. Source material (library documents) streamed alongside triples and
   embeddings during core download/upload. The knowledge manager fetches
   the document hierarchy from the librarian on download and recreates it
   on upload, preserving the full provenance chain across instances.
This commit is contained in:
cybermaggedon 2026-06-03 10:46:52 +01:00 committed by GitHub
parent aa158e1ba3
commit 6df7471a55
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
14 changed files with 1347 additions and 15 deletions

View file

@ -1,6 +1,7 @@
from .. schema import KnowledgeResponse, Error, Triples, GraphEmbeddings
from .. schema import DocumentEmbeddings
from .. schema import DocumentEmbeddings, LibraryMetadata, LibraryBlob
from .. schema import LibrarianRequest, DocumentMetadata
from .. knowledge import hash
from .. exceptions import RequestError
from .. tables.knowledge import KnowledgeTableStore
@ -18,7 +19,7 @@ class KnowledgeManager:
def __init__(
self, cassandra_host, cassandra_username, cassandra_password,
keyspace, flow_config, replication_factor=1,
keyspace, flow_config, librarian=None, replication_factor=1,
):
self.table_store = KnowledgeTableStore(
@ -26,6 +27,9 @@ class KnowledgeManager:
replication_factor
)
self.librarian = librarian
self._pending_library_metadata = {}
self.loader_queue = asyncio.Queue(maxsize=20)
self.background_task = None
self.flow_config = flow_config
@ -86,6 +90,9 @@ class KnowledgeManager:
publish_ge,
)
if self.librarian:
await self._stream_library_docs(request.id, respond)
logger.debug("Knowledge core retrieval complete")
await respond(
@ -122,6 +129,12 @@ class KnowledgeManager:
workspace, request.graph_embeddings
)
if request.library_metadata and self.librarian:
await self._put_library_metadata(request.library_metadata, workspace)
if request.library_blob and self.librarian:
await self._put_library_blob(request.library_blob, workspace)
await respond(
KnowledgeResponse(
error = None,
@ -250,6 +263,112 @@ class KnowledgeManager:
await self.loader_queue.put((request, respond, workspace))
async def _stream_library_docs(self, document_id, respond):
try:
root_meta = await self.librarian.fetch_document_metadata(
document_id
)
except Exception as e:
logger.warning(f"Could not fetch library metadata for {document_id}: {e}")
return
if root_meta is None:
return
await self._stream_one_doc(root_meta, respond)
try:
resp = await self.librarian.request(
LibrarianRequest(
operation="list-children",
document_id=document_id,
)
)
except Exception as e:
logger.warning(f"Could not list children for {document_id}: {e}")
return
for child_meta in resp.document_metadatas:
await self._stream_one_doc(child_meta, respond)
async def _stream_one_doc(self, doc_meta, respond):
lm = LibraryMetadata(
id=doc_meta.id,
kind=doc_meta.kind,
title=doc_meta.title,
parent_id=doc_meta.parent_id,
document_type=doc_meta.document_type,
comments=doc_meta.comments,
tags=doc_meta.tags or [],
)
await respond(
KnowledgeResponse(library_metadata=lm)
)
try:
content = await self.librarian.fetch_document_content(
doc_meta.id
)
except Exception as e:
logger.warning(f"Could not fetch content for {doc_meta.id}: {e}")
return
await respond(
KnowledgeResponse(
library_blob=LibraryBlob(
id=doc_meta.id,
data=content,
)
)
)
async def _put_library_metadata(self, lm, workspace):
self._pending_library_metadata[lm.id] = lm
async def _put_library_blob(self, lb, workspace):
lm = self._pending_library_metadata.pop(lb.id, None)
if lm is None:
logger.warning(
f"Received library blob for {lb.id} with no preceding metadata"
)
return
doc_meta = DocumentMetadata(
id=lm.id,
kind=lm.kind,
title=lm.title,
parent_id=lm.parent_id,
document_type=lm.document_type,
comments=lm.comments,
tags=lm.tags or [],
)
if lm.parent_id:
operation = "add-child-document"
else:
operation = "add-document"
try:
await self.librarian.request(
LibrarianRequest(
operation=operation,
document_id=lm.id,
document_metadata=doc_meta,
content=lb.data,
)
)
except RuntimeError as e:
if "already exists" in str(e):
logger.debug(f"Library document {lm.id} already exists, skipping")
else:
logger.warning(f"Could not save library document {lm.id}: {e}")
except Exception as e:
logger.warning(f"Could not save library document {lm.id}: {e}")
async def core_loader(self):
logger.info("Knowledge background processor running...")

View file

@ -12,6 +12,7 @@ import logging
from .. base import WorkspaceProcessor, Consumer, Producer, Publisher, Subscriber
from .. base import ConsumerMetrics, ProducerMetrics
from .. base.cassandra_config import add_cassandra_args, resolve_cassandra_config
from .. base import LibrarianClient
from .. schema import KnowledgeRequest, KnowledgeResponse, Error
from .. schema import knowledge_request_queue, knowledge_response_queue
@ -77,12 +78,17 @@ class Processor(WorkspaceProcessor):
}
)
self.librarian_client = LibrarianClient(
id=id, backend=self.pubsub, taskgroup=self.taskgroup,
)
self.knowledge = KnowledgeManager(
cassandra_host = self.cassandra_host,
cassandra_username = self.cassandra_username,
cassandra_password = self.cassandra_password,
keyspace = keyspace,
flow_config = self,
librarian = self.librarian_client,
replication_factor = replication_factor,
)
@ -156,6 +162,7 @@ class Processor(WorkspaceProcessor):
async def start(self):
await super(Processor, self).start()
await self.librarian_client.start()
async def on_knowledge_config(self, workspace, config, version):

View file

@ -73,6 +73,39 @@ class CoreExport:
enc = msgpack.packb(msg)
await response.write(enc)
if "library-metadata" in resp:
data = resp["library-metadata"]
msg = (
"lm",
{
"i": data["id"],
"k": data.get("kind", ""),
"t": data.get("title", ""),
"p": data.get("parent-id", ""),
"d": data.get("document-type", ""),
"c": data.get("comments", ""),
"g": data.get("tags", []),
}
)
enc = msgpack.packb(msg)
await response.write(enc)
if "library-blob" in resp:
data = resp["library-blob"]
msg = (
"lb",
{
"i": data["id"],
"d": data.get("data", b""),
}
)
enc = msgpack.packb(msg, use_bin_type=True)
await response.write(enc)
await kr.process(
{
"operation": "get-kg-core",

View file

@ -79,6 +79,39 @@ class CoreImport:
await kr.process(msg)
elif unpacked[0] == "lm":
msg = unpacked[1]
msg = {
"operation": "put-kg-core",
"workspace": workspace,
"id": id,
"library-metadata": {
"id": msg["i"],
"kind": msg.get("k", ""),
"title": msg.get("t", ""),
"parent-id": msg.get("p", ""),
"document-type": msg.get("d", ""),
"comments": msg.get("c", ""),
"tags": msg.get("g", []),
}
}
await kr.process(msg)
elif unpacked[0] == "lb":
msg = unpacked[1]
msg = {
"operation": "put-kg-core",
"workspace": workspace,
"id": id,
"library-blob": {
"id": msg["i"],
"data": msg.get("d", b""),
}
}
await kr.process(msg)
except Exception as e:
logger.error(f"Core import exception: {e}", exc_info=True)
await error(str(e))

View file

@ -98,7 +98,8 @@ class KnowledgeTableStore:
text, boolean, text, boolean, text, boolean
>>,
triples list<tuple<
text, boolean, text, boolean, text, boolean
text, boolean, text, boolean, text, boolean,
text
>>,
PRIMARY KEY ((workspace, document_id), id)
);
@ -234,7 +235,8 @@ class KnowledgeTableStore:
triples = [
(
*term_to_tuple(v.s), *term_to_tuple(v.p), *term_to_tuple(v.o)
*term_to_tuple(v.s), *term_to_tuple(v.p), *term_to_tuple(v.o),
v.g or ""
)
for v in m.triples
]
@ -416,6 +418,7 @@ class KnowledgeTableStore:
s = tuple_to_term(elt[0], elt[1]),
p = tuple_to_term(elt[2], elt[3]),
o = tuple_to_term(elt[4], elt[5]),
g = elt[6] if elt[6] else None,
)
for elt in row[3]
]