trustgraph/trustgraph-base/trustgraph/schema/knowledge/knowledge.py
cybermaggedon f0ad282708
CLI auth migration, document embeddings core lifecycle (#913)
Migrate get_kg_core and put_kg_core CLI tools to use Api/SocketClient
with first-frame auth (fixes broken raw websocket path). Fix wire
format field names (root/vector). Remove ~600 lines of dead raw
websocket code from invoke_graph_rag.py.

Add document embeddings core lifecycle to the knowledge service:
list/get/put/delete/load operations across schema, translator,
Cassandra table store, knowledge manager, gateway registry, REST API,
socket client, and CLI (tg-get-de-core, tg-put-de-core).

Fix delete_kg_core to also clean up document embeddings rows.
2026-05-14 10:30:21 +01:00

57 lines
1.4 KiB
Python

from dataclasses import dataclass, field
from ..core.primitives import Triple, Error
from ..core.topic import queue
from ..core.metadata import Metadata
from .document import Document, TextDocument
from .graph import Triples
from .embeddings import GraphEmbeddings, DocumentEmbeddings
# get-kg-core
# -> (???)
# <- ()
# <- (error)
# delete-kg-core
# -> (???)
# <- ()
# <- (error)
# list-kg-cores
# -> ()
# <- ()
# <- (error)
@dataclass
class KnowledgeRequest:
# get-kg-core, delete-kg-core, list-kg-cores, put-kg-core
# load-kg-core, unload-kg-core
operation: str = ""
# get-kg-core, list-kg-cores, delete-kg-core, put-kg-core,
# load-kg-core, unload-kg-core
id: str = ""
# load-kg-core
flow: str = ""
# load-kg-core
collection: str = ""
# put-kg-core
triples: Triples | None = None
graph_embeddings: GraphEmbeddings | None = None
# put-de-core
document_embeddings: DocumentEmbeddings | None = None
@dataclass
class KnowledgeResponse:
error: Error | None = None
ids: list[str] = field(default_factory=list)
eos: bool = False # Indicates end of knowledge core stream
triples: Triples | None = None
graph_embeddings: GraphEmbeddings | None = None
document_embeddings: DocumentEmbeddings | None = None
knowledge_request_queue = queue('knowledge', cls='request')
knowledge_response_queue = queue('knowledge', cls='response')