From a4e2f67cb1c2d43195792ea542f501ab5060bd3a Mon Sep 17 00:00:00 2001 From: cybermaggedon Date: Fri, 20 Jun 2025 16:59:55 +0100 Subject: [PATCH] Feature/translator classes (#414) Pull the JSON/Pulsar message translation into a separate module, will be useful for other comms channels --- .../trustgraph/messaging/__init__.py | 105 ++++++++++ .../trustgraph/messaging/registry.py | 51 +++++ .../messaging/translators/__init__.py | 19 ++ .../trustgraph/messaging/translators/agent.py | 44 ++++ .../trustgraph/messaging/translators/base.py | 43 ++++ .../messaging/translators/config.py | 100 +++++++++ .../messaging/translators/document_loading.py | 191 ++++++++++++++++++ .../messaging/translators/embeddings.py | 33 +++ .../messaging/translators/embeddings_query.py | 94 +++++++++ .../trustgraph/messaging/translators/flow.py | 59 ++++++ .../messaging/translators/knowledge.py | 183 +++++++++++++++++ .../messaging/translators/library.py | 124 ++++++++++++ .../messaging/translators/metadata.py | 81 ++++++++ .../messaging/translators/primitives.py | 47 +++++ .../messaging/translators/prompt.py | 54 +++++ .../messaging/translators/retrieval.py | 81 ++++++++ .../messaging/translators/text_completion.py | 42 ++++ .../messaging/translators/triples.py | 60 ++++++ .../trustgraph/gateway/dispatch/agent.py | 23 +-- .../trustgraph/gateway/dispatch/config.py | 59 +----- .../dispatch/document_embeddings_import.py | 22 +- .../gateway/dispatch/document_load.py | 25 +-- .../gateway/dispatch/document_rag.py | 13 +- .../trustgraph/gateway/dispatch/embeddings.py | 10 +- .../trustgraph/gateway/dispatch/flow.py | 33 +-- .../dispatch/graph_embeddings_query.py | 22 +- .../trustgraph/gateway/dispatch/graph_rag.py | 16 +- .../trustgraph/gateway/dispatch/knowledge.py | 75 +------ .../trustgraph/gateway/dispatch/librarian.py | 72 +------ .../trustgraph/gateway/dispatch/prompt.py | 21 +- .../trustgraph/gateway/dispatch/serialize.py | 7 + .../gateway/dispatch/text_completion.py | 11 +- .../trustgraph/gateway/dispatch/text_load.py | 29 +-- .../gateway/dispatch/triples_query.py | 34 +--- 34 files changed, 1506 insertions(+), 377 deletions(-) create mode 100644 trustgraph-base/trustgraph/messaging/__init__.py create mode 100644 trustgraph-base/trustgraph/messaging/registry.py create mode 100644 trustgraph-base/trustgraph/messaging/translators/__init__.py create mode 100644 trustgraph-base/trustgraph/messaging/translators/agent.py create mode 100644 trustgraph-base/trustgraph/messaging/translators/base.py create mode 100644 trustgraph-base/trustgraph/messaging/translators/config.py create mode 100644 trustgraph-base/trustgraph/messaging/translators/document_loading.py create mode 100644 trustgraph-base/trustgraph/messaging/translators/embeddings.py create mode 100644 trustgraph-base/trustgraph/messaging/translators/embeddings_query.py create mode 100644 trustgraph-base/trustgraph/messaging/translators/flow.py create mode 100644 trustgraph-base/trustgraph/messaging/translators/knowledge.py create mode 100644 trustgraph-base/trustgraph/messaging/translators/library.py create mode 100644 trustgraph-base/trustgraph/messaging/translators/metadata.py create mode 100644 trustgraph-base/trustgraph/messaging/translators/primitives.py create mode 100644 trustgraph-base/trustgraph/messaging/translators/prompt.py create mode 100644 trustgraph-base/trustgraph/messaging/translators/retrieval.py create mode 100644 trustgraph-base/trustgraph/messaging/translators/text_completion.py create mode 100644 trustgraph-base/trustgraph/messaging/translators/triples.py diff --git a/trustgraph-base/trustgraph/messaging/__init__.py b/trustgraph-base/trustgraph/messaging/__init__.py new file mode 100644 index 00000000..922ee7c8 --- /dev/null +++ b/trustgraph-base/trustgraph/messaging/__init__.py @@ -0,0 +1,105 @@ +from .registry import TranslatorRegistry +from .translators import * + +# Auto-register all translators +from .translators.agent import AgentRequestTranslator, AgentResponseTranslator +from .translators.embeddings import EmbeddingsRequestTranslator, EmbeddingsResponseTranslator +from .translators.text_completion import TextCompletionRequestTranslator, TextCompletionResponseTranslator +from .translators.retrieval import ( + DocumentRagRequestTranslator, DocumentRagResponseTranslator, + GraphRagRequestTranslator, GraphRagResponseTranslator +) +from .translators.triples import TriplesQueryRequestTranslator, TriplesQueryResponseTranslator +from .translators.knowledge import KnowledgeRequestTranslator, KnowledgeResponseTranslator +from .translators.library import LibraryDocumentTranslator, LibraryProcessingTranslator +from .translators.document_loading import DocumentTranslator, TextDocumentTranslator +from .translators.config import ConfigRequestTranslator, ConfigResponseTranslator +from .translators.flow import FlowRequestTranslator, FlowResponseTranslator +from .translators.prompt import PromptRequestTranslator, PromptResponseTranslator +from .translators.embeddings_query import ( + DocumentEmbeddingsRequestTranslator, DocumentEmbeddingsResponseTranslator, + GraphEmbeddingsRequestTranslator, GraphEmbeddingsResponseTranslator +) + +# Register all service translators +TranslatorRegistry.register_service( + "agent", + AgentRequestTranslator(), + AgentResponseTranslator() +) + +TranslatorRegistry.register_service( + "embeddings", + EmbeddingsRequestTranslator(), + EmbeddingsResponseTranslator() +) + +TranslatorRegistry.register_service( + "text-completion", + TextCompletionRequestTranslator(), + TextCompletionResponseTranslator() +) + +TranslatorRegistry.register_service( + "document-rag", + DocumentRagRequestTranslator(), + DocumentRagResponseTranslator() +) + +TranslatorRegistry.register_service( + "graph-rag", + GraphRagRequestTranslator(), + GraphRagResponseTranslator() +) + +TranslatorRegistry.register_service( + "triples-query", + TriplesQueryRequestTranslator(), + TriplesQueryResponseTranslator() +) + +TranslatorRegistry.register_service( + "knowledge", + KnowledgeRequestTranslator(), + KnowledgeResponseTranslator() +) + +TranslatorRegistry.register_service( + "librarian", + LibraryDocumentTranslator(), + LibraryProcessingTranslator() +) + +TranslatorRegistry.register_service( + "config", + ConfigRequestTranslator(), + ConfigResponseTranslator() +) + +TranslatorRegistry.register_service( + "flow", + FlowRequestTranslator(), + FlowResponseTranslator() +) + +TranslatorRegistry.register_service( + "prompt", + PromptRequestTranslator(), + PromptResponseTranslator() +) + +TranslatorRegistry.register_service( + "document-embeddings-query", + DocumentEmbeddingsRequestTranslator(), + DocumentEmbeddingsResponseTranslator() +) + +TranslatorRegistry.register_service( + "graph-embeddings-query", + GraphEmbeddingsRequestTranslator(), + GraphEmbeddingsResponseTranslator() +) + +# Register single-direction translators for document loading +TranslatorRegistry.register_request("document", DocumentTranslator()) +TranslatorRegistry.register_request("text-document", TextDocumentTranslator()) diff --git a/trustgraph-base/trustgraph/messaging/registry.py b/trustgraph-base/trustgraph/messaging/registry.py new file mode 100644 index 00000000..f42c53bb --- /dev/null +++ b/trustgraph-base/trustgraph/messaging/registry.py @@ -0,0 +1,51 @@ +from typing import Dict, List, Union +from .translators.base import MessageTranslator + + +class TranslatorRegistry: + """Registry for service translators""" + + _request_translators: Dict[str, MessageTranslator] = {} + _response_translators: Dict[str, MessageTranslator] = {} + + @classmethod + def register_request(cls, service_name: str, translator: MessageTranslator): + """Register a request translator for a service""" + cls._request_translators[service_name] = translator + + @classmethod + def register_response(cls, service_name: str, translator: MessageTranslator): + """Register a response translator for a service""" + cls._response_translators[service_name] = translator + + @classmethod + def register_service(cls, service_name: str, request_translator: MessageTranslator, + response_translator: MessageTranslator): + """Register both request and response translators for a service""" + cls.register_request(service_name, request_translator) + cls.register_response(service_name, response_translator) + + @classmethod + def get_request_translator(cls, service_name: str) -> MessageTranslator: + """Get request translator for a service""" + if service_name not in cls._request_translators: + raise KeyError(f"No request translator registered for service: {service_name}") + return cls._request_translators[service_name] + + @classmethod + def get_response_translator(cls, service_name: str) -> MessageTranslator: + """Get response translator for a service""" + if service_name not in cls._response_translators: + raise KeyError(f"No response translator registered for service: {service_name}") + return cls._response_translators[service_name] + + @classmethod + def list_services(cls) -> List[str]: + """List all registered services""" + return sorted(set(cls._request_translators.keys()) | set(cls._response_translators.keys())) + + @classmethod + def has_service(cls, service_name: str) -> bool: + """Check if a service is registered""" + return (service_name in cls._request_translators or + service_name in cls._response_translators) \ No newline at end of file diff --git a/trustgraph-base/trustgraph/messaging/translators/__init__.py b/trustgraph-base/trustgraph/messaging/translators/__init__.py new file mode 100644 index 00000000..1a860217 --- /dev/null +++ b/trustgraph-base/trustgraph/messaging/translators/__init__.py @@ -0,0 +1,19 @@ +from .base import Translator, MessageTranslator +from .primitives import ValueTranslator, TripleTranslator, SubgraphTranslator +from .metadata import DocumentMetadataTranslator, ProcessingMetadataTranslator +from .agent import AgentRequestTranslator, AgentResponseTranslator +from .embeddings import EmbeddingsRequestTranslator, EmbeddingsResponseTranslator +from .text_completion import TextCompletionRequestTranslator, TextCompletionResponseTranslator +from .retrieval import DocumentRagRequestTranslator, DocumentRagResponseTranslator +from .retrieval import GraphRagRequestTranslator, GraphRagResponseTranslator +from .triples import TriplesQueryRequestTranslator, TriplesQueryResponseTranslator +from .knowledge import KnowledgeRequestTranslator, KnowledgeResponseTranslator +from .library import LibraryDocumentTranslator, LibraryProcessingTranslator +from .document_loading import DocumentTranslator, TextDocumentTranslator, ChunkTranslator, DocumentEmbeddingsTranslator +from .config import ConfigRequestTranslator, ConfigResponseTranslator +from .flow import FlowRequestTranslator, FlowResponseTranslator +from .prompt import PromptRequestTranslator, PromptResponseTranslator +from .embeddings_query import ( + DocumentEmbeddingsRequestTranslator, DocumentEmbeddingsResponseTranslator, + GraphEmbeddingsRequestTranslator, GraphEmbeddingsResponseTranslator +) \ No newline at end of file diff --git a/trustgraph-base/trustgraph/messaging/translators/agent.py b/trustgraph-base/trustgraph/messaging/translators/agent.py new file mode 100644 index 00000000..5529a1a2 --- /dev/null +++ b/trustgraph-base/trustgraph/messaging/translators/agent.py @@ -0,0 +1,44 @@ +from typing import Dict, Any, Tuple +from ...schema import AgentRequest, AgentResponse +from .base import MessageTranslator + + +class AgentRequestTranslator(MessageTranslator): + """Translator for AgentRequest schema objects""" + + def to_pulsar(self, data: Dict[str, Any]) -> AgentRequest: + return AgentRequest( + question=data["question"], + plan=data.get("plan", ""), + state=data.get("state", ""), + history=data.get("history", []) + ) + + def from_pulsar(self, obj: AgentRequest) -> Dict[str, Any]: + return { + "question": obj.question, + "plan": obj.plan, + "state": obj.state, + "history": obj.history + } + + +class AgentResponseTranslator(MessageTranslator): + """Translator for AgentResponse schema objects""" + + def to_pulsar(self, data: Dict[str, Any]) -> AgentResponse: + raise NotImplementedError("Response translation to Pulsar not typically needed") + + def from_pulsar(self, obj: AgentResponse) -> Dict[str, Any]: + result = {} + if obj.answer: + result["answer"] = obj.answer + if obj.thought: + result["thought"] = obj.thought + if obj.observation: + result["observation"] = obj.observation + return result + + def from_response_with_completion(self, obj: AgentResponse) -> Tuple[Dict[str, Any], bool]: + """Returns (response_dict, is_final)""" + return self.from_pulsar(obj), (obj.answer is not None) \ No newline at end of file diff --git a/trustgraph-base/trustgraph/messaging/translators/base.py b/trustgraph-base/trustgraph/messaging/translators/base.py new file mode 100644 index 00000000..64e2b635 --- /dev/null +++ b/trustgraph-base/trustgraph/messaging/translators/base.py @@ -0,0 +1,43 @@ +from abc import ABC, abstractmethod +from typing import Dict, Any, Tuple +from pulsar.schema import Record + + +class Translator(ABC): + """Base class for bidirectional Pulsar ↔ dict translation""" + + @abstractmethod + def to_pulsar(self, data: Dict[str, Any]) -> Record: + """Convert dict to Pulsar schema object""" + pass + + @abstractmethod + def from_pulsar(self, obj: Record) -> Dict[str, Any]: + """Convert Pulsar schema object to dict""" + pass + + +class MessageTranslator(Translator): + """For complete request/response message translation""" + + def from_response_with_completion(self, obj: Record) -> Tuple[Dict[str, Any], bool]: + """Returns (response_dict, is_final) - for streaming responses""" + return self.from_pulsar(obj), True + + +class SendTranslator(Translator): + """For fire-and-forget send operations (like ServiceSender)""" + + def from_pulsar(self, obj: Record) -> Dict[str, Any]: + """Usually not needed for send-only operations""" + raise NotImplementedError("Send translators typically don't need from_pulsar") + + +def handle_optional_fields(obj: Record, fields: list) -> Dict[str, Any]: + """Helper to extract optional fields from Pulsar object""" + result = {} + for field in fields: + value = getattr(obj, field, None) + if value is not None: + result[field] = value + return result \ No newline at end of file diff --git a/trustgraph-base/trustgraph/messaging/translators/config.py b/trustgraph-base/trustgraph/messaging/translators/config.py new file mode 100644 index 00000000..10e023f6 --- /dev/null +++ b/trustgraph-base/trustgraph/messaging/translators/config.py @@ -0,0 +1,100 @@ +from typing import Dict, Any, Tuple +from ...schema import ConfigRequest, ConfigResponse, ConfigKey, ConfigValue +from .base import MessageTranslator + + +class ConfigRequestTranslator(MessageTranslator): + """Translator for ConfigRequest schema objects""" + + def to_pulsar(self, data: Dict[str, Any]) -> ConfigRequest: + keys = None + if "keys" in data: + keys = [ + ConfigKey( + type=k["type"], + key=k["key"] + ) + for k in data["keys"] + ] + + values = None + if "values" in data: + values = [ + ConfigValue( + type=v["type"], + key=v["key"], + value=v["value"] + ) + for v in data["values"] + ] + + return ConfigRequest( + operation=data.get("operation"), + keys=keys, + type=data.get("type"), + values=values + ) + + def from_pulsar(self, obj: ConfigRequest) -> Dict[str, Any]: + result = {} + + if obj.operation: + result["operation"] = obj.operation + if obj.type: + result["type"] = obj.type + + if obj.keys: + result["keys"] = [ + { + "type": k.type, + "key": k.key + } + for k in obj.keys + ] + + if obj.values: + result["values"] = [ + { + "type": v.type, + "key": v.key, + "value": v.value + } + for v in obj.values + ] + + return result + + +class ConfigResponseTranslator(MessageTranslator): + """Translator for ConfigResponse schema objects""" + + def to_pulsar(self, data: Dict[str, Any]) -> ConfigResponse: + raise NotImplementedError("Response translation to Pulsar not typically needed") + + def from_pulsar(self, obj: ConfigResponse) -> Dict[str, Any]: + result = {} + + if obj.version is not None: + result["version"] = obj.version + + if obj.values: + result["values"] = [ + { + "type": v.type, + "key": v.key, + "value": v.value + } + for v in obj.values + ] + + if obj.directory: + result["directory"] = obj.directory + + if obj.config: + result["config"] = obj.config + + return result + + def from_response_with_completion(self, obj: ConfigResponse) -> Tuple[Dict[str, Any], bool]: + """Returns (response_dict, is_final)""" + return self.from_pulsar(obj), True \ No newline at end of file diff --git a/trustgraph-base/trustgraph/messaging/translators/document_loading.py b/trustgraph-base/trustgraph/messaging/translators/document_loading.py new file mode 100644 index 00000000..3dfef718 --- /dev/null +++ b/trustgraph-base/trustgraph/messaging/translators/document_loading.py @@ -0,0 +1,191 @@ +import base64 +from typing import Dict, Any +from ...schema import Document, TextDocument, Chunk, DocumentEmbeddings, ChunkEmbeddings +from .base import SendTranslator +from .metadata import DocumentMetadataTranslator +from .primitives import SubgraphTranslator + + +class DocumentTranslator(SendTranslator): + """Translator for Document schema objects (PDF docs etc.)""" + + def __init__(self): + self.subgraph_translator = SubgraphTranslator() + + def to_pulsar(self, data: Dict[str, Any]) -> Document: + metadata = data.get("metadata", []) + + # Handle base64 content validation + doc = base64.b64decode(data["data"]) + + from ...schema import Metadata + return Document( + metadata=Metadata( + id=data.get("id"), + metadata=self.subgraph_translator.to_pulsar(metadata) if metadata else [], + user=data.get("user", "trustgraph"), + collection=data.get("collection", "default"), + ), + data=base64.b64encode(doc).decode("utf-8") + ) + + def from_pulsar(self, obj: Document) -> Dict[str, Any]: + result = { + "data": obj.data + } + + if obj.metadata: + metadata_dict = {} + if obj.metadata.id: + metadata_dict["id"] = obj.metadata.id + if obj.metadata.user: + metadata_dict["user"] = obj.metadata.user + if obj.metadata.collection: + metadata_dict["collection"] = obj.metadata.collection + if obj.metadata.metadata: + metadata_dict["metadata"] = self.subgraph_translator.from_pulsar(obj.metadata.metadata) + + result["metadata"] = metadata_dict + + return result + + +class TextDocumentTranslator(SendTranslator): + """Translator for TextDocument schema objects""" + + def __init__(self): + self.subgraph_translator = SubgraphTranslator() + + def to_pulsar(self, data: Dict[str, Any]) -> TextDocument: + metadata = data.get("metadata", []) + charset = data.get("charset", "utf-8") + + # Text is base64 encoded in input + text = base64.b64decode(data["text"]).decode(charset) + + from ...schema import Metadata + return TextDocument( + metadata=Metadata( + id=data.get("id"), + metadata=self.subgraph_translator.to_pulsar(metadata) if metadata else [], + user=data.get("user", "trustgraph"), + collection=data.get("collection", "default"), + ), + text=text.encode("utf-8") + ) + + def from_pulsar(self, obj: TextDocument) -> Dict[str, Any]: + result = { + "text": obj.text.decode("utf-8") if isinstance(obj.text, bytes) else obj.text + } + + if obj.metadata: + metadata_dict = {} + if obj.metadata.id: + metadata_dict["id"] = obj.metadata.id + if obj.metadata.user: + metadata_dict["user"] = obj.metadata.user + if obj.metadata.collection: + metadata_dict["collection"] = obj.metadata.collection + if obj.metadata.metadata: + metadata_dict["metadata"] = self.subgraph_translator.from_pulsar(obj.metadata.metadata) + + result["metadata"] = metadata_dict + + return result + + +class ChunkTranslator(SendTranslator): + """Translator for Chunk schema objects""" + + def __init__(self): + self.subgraph_translator = SubgraphTranslator() + + def to_pulsar(self, data: Dict[str, Any]) -> Chunk: + metadata = data.get("metadata", []) + + from ...schema import Metadata + return Chunk( + metadata=Metadata( + id=data.get("id"), + metadata=self.subgraph_translator.to_pulsar(metadata) if metadata else [], + user=data.get("user", "trustgraph"), + collection=data.get("collection", "default"), + ), + chunk=data["chunk"].encode("utf-8") if isinstance(data["chunk"], str) else data["chunk"] + ) + + def from_pulsar(self, obj: Chunk) -> Dict[str, Any]: + result = { + "chunk": obj.chunk.decode("utf-8") if isinstance(obj.chunk, bytes) else obj.chunk + } + + if obj.metadata: + metadata_dict = {} + if obj.metadata.id: + metadata_dict["id"] = obj.metadata.id + if obj.metadata.user: + metadata_dict["user"] = obj.metadata.user + if obj.metadata.collection: + metadata_dict["collection"] = obj.metadata.collection + if obj.metadata.metadata: + metadata_dict["metadata"] = self.subgraph_translator.from_pulsar(obj.metadata.metadata) + + result["metadata"] = metadata_dict + + return result + + +class DocumentEmbeddingsTranslator(SendTranslator): + """Translator for DocumentEmbeddings schema objects""" + + def __init__(self): + self.subgraph_translator = SubgraphTranslator() + + def to_pulsar(self, data: Dict[str, Any]) -> DocumentEmbeddings: + metadata = data.get("metadata", {}) + + chunks = [ + ChunkEmbeddings( + chunk=chunk["chunk"].encode("utf-8") if isinstance(chunk["chunk"], str) else chunk["chunk"], + vectors=chunk["vectors"] + ) + for chunk in data.get("chunks", []) + ] + + from ...schema import Metadata + return DocumentEmbeddings( + metadata=Metadata( + id=metadata.get("id"), + metadata=self.subgraph_translator.to_pulsar(metadata.get("metadata", [])), + user=metadata.get("user", "trustgraph"), + collection=metadata.get("collection", "default"), + ), + chunks=chunks + ) + + def from_pulsar(self, obj: DocumentEmbeddings) -> Dict[str, Any]: + result = { + "chunks": [ + { + "chunk": chunk.chunk.decode("utf-8") if isinstance(chunk.chunk, bytes) else chunk.chunk, + "vectors": chunk.vectors + } + for chunk in obj.chunks + ] + } + + if obj.metadata: + metadata_dict = {} + if obj.metadata.id: + metadata_dict["id"] = obj.metadata.id + if obj.metadata.user: + metadata_dict["user"] = obj.metadata.user + if obj.metadata.collection: + metadata_dict["collection"] = obj.metadata.collection + if obj.metadata.metadata: + metadata_dict["metadata"] = self.subgraph_translator.from_pulsar(obj.metadata.metadata) + + result["metadata"] = metadata_dict + + return result \ No newline at end of file diff --git a/trustgraph-base/trustgraph/messaging/translators/embeddings.py b/trustgraph-base/trustgraph/messaging/translators/embeddings.py new file mode 100644 index 00000000..7e6eff83 --- /dev/null +++ b/trustgraph-base/trustgraph/messaging/translators/embeddings.py @@ -0,0 +1,33 @@ +from typing import Dict, Any, Tuple +from ...schema import EmbeddingsRequest, EmbeddingsResponse +from .base import MessageTranslator + + +class EmbeddingsRequestTranslator(MessageTranslator): + """Translator for EmbeddingsRequest schema objects""" + + def to_pulsar(self, data: Dict[str, Any]) -> EmbeddingsRequest: + return EmbeddingsRequest( + text=data["text"] + ) + + def from_pulsar(self, obj: EmbeddingsRequest) -> Dict[str, Any]: + return { + "text": obj.text + } + + +class EmbeddingsResponseTranslator(MessageTranslator): + """Translator for EmbeddingsResponse schema objects""" + + def to_pulsar(self, data: Dict[str, Any]) -> EmbeddingsResponse: + raise NotImplementedError("Response translation to Pulsar not typically needed") + + def from_pulsar(self, obj: EmbeddingsResponse) -> Dict[str, Any]: + return { + "vectors": obj.vectors + } + + def from_response_with_completion(self, obj: EmbeddingsResponse) -> Tuple[Dict[str, Any], bool]: + """Returns (response_dict, is_final)""" + return self.from_pulsar(obj), True \ No newline at end of file diff --git a/trustgraph-base/trustgraph/messaging/translators/embeddings_query.py b/trustgraph-base/trustgraph/messaging/translators/embeddings_query.py new file mode 100644 index 00000000..d69e7bef --- /dev/null +++ b/trustgraph-base/trustgraph/messaging/translators/embeddings_query.py @@ -0,0 +1,94 @@ +from typing import Dict, Any, Tuple +from ...schema import ( + DocumentEmbeddingsRequest, DocumentEmbeddingsResponse, + GraphEmbeddingsRequest, GraphEmbeddingsResponse +) +from .base import MessageTranslator +from .primitives import ValueTranslator + + +class DocumentEmbeddingsRequestTranslator(MessageTranslator): + """Translator for DocumentEmbeddingsRequest schema objects""" + + def to_pulsar(self, data: Dict[str, Any]) -> DocumentEmbeddingsRequest: + return DocumentEmbeddingsRequest( + vectors=data["vectors"], + limit=int(data.get("limit", 10)), + user=data.get("user", "trustgraph"), + collection=data.get("collection", "default") + ) + + def from_pulsar(self, obj: DocumentEmbeddingsRequest) -> Dict[str, Any]: + return { + "vectors": obj.vectors, + "limit": obj.limit, + "user": obj.user, + "collection": obj.collection + } + + +class DocumentEmbeddingsResponseTranslator(MessageTranslator): + """Translator for DocumentEmbeddingsResponse schema objects""" + + def to_pulsar(self, data: Dict[str, Any]) -> DocumentEmbeddingsResponse: + raise NotImplementedError("Response translation to Pulsar not typically needed") + + def from_pulsar(self, obj: DocumentEmbeddingsResponse) -> Dict[str, Any]: + result = {} + + if obj.documents: + result["documents"] = [ + doc.decode("utf-8") if isinstance(doc, bytes) else doc + for doc in obj.documents + ] + + return result + + def from_response_with_completion(self, obj: DocumentEmbeddingsResponse) -> Tuple[Dict[str, Any], bool]: + """Returns (response_dict, is_final)""" + return self.from_pulsar(obj), True + + +class GraphEmbeddingsRequestTranslator(MessageTranslator): + """Translator for GraphEmbeddingsRequest schema objects""" + + def to_pulsar(self, data: Dict[str, Any]) -> GraphEmbeddingsRequest: + return GraphEmbeddingsRequest( + vectors=data["vectors"], + limit=int(data.get("limit", 10)), + user=data.get("user", "trustgraph"), + collection=data.get("collection", "default") + ) + + def from_pulsar(self, obj: GraphEmbeddingsRequest) -> Dict[str, Any]: + return { + "vectors": obj.vectors, + "limit": obj.limit, + "user": obj.user, + "collection": obj.collection + } + + +class GraphEmbeddingsResponseTranslator(MessageTranslator): + """Translator for GraphEmbeddingsResponse schema objects""" + + def __init__(self): + self.value_translator = ValueTranslator() + + def to_pulsar(self, data: Dict[str, Any]) -> GraphEmbeddingsResponse: + raise NotImplementedError("Response translation to Pulsar not typically needed") + + def from_pulsar(self, obj: GraphEmbeddingsResponse) -> Dict[str, Any]: + result = {} + + if obj.entities: + result["entities"] = [ + self.value_translator.from_pulsar(entity) + for entity in obj.entities + ] + + return result + + def from_response_with_completion(self, obj: GraphEmbeddingsResponse) -> Tuple[Dict[str, Any], bool]: + """Returns (response_dict, is_final)""" + return self.from_pulsar(obj), True \ No newline at end of file diff --git a/trustgraph-base/trustgraph/messaging/translators/flow.py b/trustgraph-base/trustgraph/messaging/translators/flow.py new file mode 100644 index 00000000..212a9992 --- /dev/null +++ b/trustgraph-base/trustgraph/messaging/translators/flow.py @@ -0,0 +1,59 @@ +from typing import Dict, Any, Tuple +from ...schema import FlowRequest, FlowResponse +from .base import MessageTranslator + + +class FlowRequestTranslator(MessageTranslator): + """Translator for FlowRequest schema objects""" + + def to_pulsar(self, data: Dict[str, Any]) -> FlowRequest: + return FlowRequest( + operation=data.get("operation"), + class_name=data.get("class-name"), + class_definition=data.get("class-definition"), + description=data.get("description"), + flow_id=data.get("flow-id") + ) + + def from_pulsar(self, obj: FlowRequest) -> Dict[str, Any]: + result = {} + + if obj.operation: + result["operation"] = obj.operation + if obj.class_name: + result["class-name"] = obj.class_name + if obj.class_definition: + result["class-definition"] = obj.class_definition + if obj.description: + result["description"] = obj.description + if obj.flow_id: + result["flow-id"] = obj.flow_id + + return result + + +class FlowResponseTranslator(MessageTranslator): + """Translator for FlowResponse schema objects""" + + def to_pulsar(self, data: Dict[str, Any]) -> FlowResponse: + raise NotImplementedError("Response translation to Pulsar not typically needed") + + def from_pulsar(self, obj: FlowResponse) -> Dict[str, Any]: + result = {} + + if obj.class_names: + result["class-names"] = obj.class_names + if obj.flow_ids: + result["flow-ids"] = obj.flow_ids + if obj.class_definition: + result["class-definition"] = obj.class_definition + if obj.flow: + result["flow"] = obj.flow + if obj.description: + result["description"] = obj.description + + return result + + def from_response_with_completion(self, obj: FlowResponse) -> Tuple[Dict[str, Any], bool]: + """Returns (response_dict, is_final)""" + return self.from_pulsar(obj), True \ No newline at end of file diff --git a/trustgraph-base/trustgraph/messaging/translators/knowledge.py b/trustgraph-base/trustgraph/messaging/translators/knowledge.py new file mode 100644 index 00000000..5377cbd4 --- /dev/null +++ b/trustgraph-base/trustgraph/messaging/translators/knowledge.py @@ -0,0 +1,183 @@ +from typing import Dict, Any, Tuple, Optional +from ...schema import ( + KnowledgeRequest, KnowledgeResponse, Triples, GraphEmbeddings, + Metadata, EntityEmbeddings +) +from .base import MessageTranslator +from .primitives import ValueTranslator, SubgraphTranslator +from .metadata import DocumentMetadataTranslator + + +class KnowledgeRequestTranslator(MessageTranslator): + """Translator for KnowledgeRequest schema objects""" + + def __init__(self): + self.value_translator = ValueTranslator() + self.subgraph_translator = SubgraphTranslator() + + def to_pulsar(self, data: Dict[str, Any]) -> KnowledgeRequest: + triples = None + if "triples" in data: + triples = Triples( + metadata=Metadata( + id=data["triples"]["metadata"]["id"], + metadata=self.subgraph_translator.to_pulsar( + data["triples"]["metadata"]["metadata"] + ), + user=data["triples"]["metadata"]["user"], + collection=data["triples"]["metadata"]["collection"] + ), + triples=self.subgraph_translator.to_pulsar(data["triples"]["triples"]), + ) + + graph_embeddings = None + if "graph-embeddings" in data: + graph_embeddings = GraphEmbeddings( + metadata=Metadata( + id=data["graph-embeddings"]["metadata"]["id"], + metadata=self.subgraph_translator.to_pulsar( + data["graph-embeddings"]["metadata"]["metadata"] + ), + user=data["graph-embeddings"]["metadata"]["user"], + collection=data["graph-embeddings"]["metadata"]["collection"] + ), + entities=[ + EntityEmbeddings( + entity=self.value_translator.to_pulsar(ent["entity"]), + vectors=ent["vectors"], + ) + for ent in data["graph-embeddings"]["entities"] + ] + ) + + return KnowledgeRequest( + operation=data.get("operation"), + user=data.get("user"), + id=data.get("id"), + flow=data.get("flow"), + collection=data.get("collection"), + triples=triples, + graph_embeddings=graph_embeddings, + ) + + def from_pulsar(self, obj: KnowledgeRequest) -> Dict[str, Any]: + result = {} + + if obj.operation: + result["operation"] = obj.operation + if obj.user: + result["user"] = obj.user + if obj.id: + result["id"] = obj.id + if obj.flow: + result["flow"] = obj.flow + if obj.collection: + result["collection"] = obj.collection + + if obj.triples: + result["triples"] = { + "metadata": { + "id": obj.triples.metadata.id, + "metadata": self.subgraph_translator.from_pulsar( + obj.triples.metadata.metadata + ), + "user": obj.triples.metadata.user, + "collection": obj.triples.metadata.collection, + }, + "triples": self.subgraph_translator.from_pulsar(obj.triples.triples), + } + + if obj.graph_embeddings: + result["graph-embeddings"] = { + "metadata": { + "id": obj.graph_embeddings.metadata.id, + "metadata": self.subgraph_translator.from_pulsar( + obj.graph_embeddings.metadata.metadata + ), + "user": obj.graph_embeddings.metadata.user, + "collection": obj.graph_embeddings.metadata.collection, + }, + "entities": [ + { + "vectors": entity.vectors, + "entity": self.value_translator.from_pulsar(entity.entity), + } + for entity in obj.graph_embeddings.entities + ], + } + + return result + + +class KnowledgeResponseTranslator(MessageTranslator): + """Translator for KnowledgeResponse schema objects""" + + def __init__(self): + self.value_translator = ValueTranslator() + self.subgraph_translator = SubgraphTranslator() + + def to_pulsar(self, data: Dict[str, Any]) -> KnowledgeResponse: + raise NotImplementedError("Response translation to Pulsar not typically needed") + + def from_pulsar(self, obj: KnowledgeResponse) -> Dict[str, Any]: + # Response to list operation + if obj.ids is not None: + return {"ids": obj.ids} + + # Streaming triples response + if obj.triples: + return { + "triples": { + "metadata": { + "id": obj.triples.metadata.id, + "metadata": self.subgraph_translator.from_pulsar( + obj.triples.metadata.metadata + ), + "user": obj.triples.metadata.user, + "collection": obj.triples.metadata.collection, + }, + "triples": self.subgraph_translator.from_pulsar(obj.triples.triples), + } + } + + # Streaming graph embeddings response + if obj.graph_embeddings: + return { + "graph-embeddings": { + "metadata": { + "id": obj.graph_embeddings.metadata.id, + "metadata": self.subgraph_translator.from_pulsar( + obj.graph_embeddings.metadata.metadata + ), + "user": obj.graph_embeddings.metadata.user, + "collection": obj.graph_embeddings.metadata.collection, + }, + "entities": [ + { + "vectors": entity.vectors, + "entity": self.value_translator.from_pulsar(entity.entity), + } + for entity in obj.graph_embeddings.entities + ], + } + } + + # End of stream marker + if obj.eos is True: + return {"eos": True} + + # Empty response (successful delete) + return {} + + def from_response_with_completion(self, obj: KnowledgeResponse) -> Tuple[Dict[str, Any], bool]: + """Returns (response_dict, is_final)""" + response = self.from_pulsar(obj) + + # Check if this is a final response + is_final = ( + obj.ids is not None or # List response + obj.eos is True or # End of stream + (not obj.triples and not obj.graph_embeddings) # Empty response + ) + + return response, is_final \ No newline at end of file diff --git a/trustgraph-base/trustgraph/messaging/translators/library.py b/trustgraph-base/trustgraph/messaging/translators/library.py new file mode 100644 index 00000000..f590459a --- /dev/null +++ b/trustgraph-base/trustgraph/messaging/translators/library.py @@ -0,0 +1,124 @@ +from typing import Dict, Any, Tuple, Optional +from ...schema import LibrarianRequest, LibrarianResponse, DocumentMetadata, ProcessingMetadata, Criteria +from .base import MessageTranslator +from .metadata import DocumentMetadataTranslator, ProcessingMetadataTranslator + + +class LibraryDocumentTranslator(MessageTranslator): + """Translator for LibrarianRequest/Response schema objects""" + + def __init__(self): + self.doc_metadata_translator = DocumentMetadataTranslator() + self.proc_metadata_translator = ProcessingMetadataTranslator() + + def to_pulsar(self, data: Dict[str, Any]) -> LibrarianRequest: + # Document metadata + doc_metadata = None + if "document-metadata" in data: + doc_metadata = self.doc_metadata_translator.to_pulsar(data["document-metadata"]) + + # Processing metadata + proc_metadata = None + if "processing-metadata" in data: + proc_metadata = self.proc_metadata_translator.to_pulsar(data["processing-metadata"]) + + # Criteria + criteria = [] + if "criteria" in data: + criteria = [ + Criteria( + key=c["key"], + value=c["value"], + operator=c["operator"] + ) + for c in data["criteria"] + ] + + # Content as bytes + content = None + if "content" in data: + if isinstance(data["content"], str): + content = data["content"].encode("utf-8") + else: + content = data["content"] + + return LibrarianRequest( + operation=data.get("operation"), + document_id=data.get("document-id"), + processing_id=data.get("processing-id"), + document_metadata=doc_metadata, + processing_metadata=proc_metadata, + content=content, + user=data.get("user"), + collection=data.get("collection"), + criteria=criteria + ) + + def from_pulsar(self, obj: LibrarianRequest) -> Dict[str, Any]: + result = {} + + if obj.operation: + result["operation"] = obj.operation + if obj.document_id: + result["document-id"] = obj.document_id + if obj.processing_id: + result["processing-id"] = obj.processing_id + if obj.document_metadata: + result["document-metadata"] = self.doc_metadata_translator.from_pulsar(obj.document_metadata) + if obj.processing_metadata: + result["processing-metadata"] = self.proc_metadata_translator.from_pulsar(obj.processing_metadata) + if obj.content: + result["content"] = obj.content.decode("utf-8") if isinstance(obj.content, bytes) else obj.content + if obj.user: + result["user"] = obj.user + if obj.collection: + result["collection"] = obj.collection + if obj.criteria: + result["criteria"] = [ + { + "key": c.key, + "value": c.value, + "operator": c.operator + } + for c in obj.criteria + ] + + return result + + +class LibraryProcessingTranslator(MessageTranslator): + """Translator for LibrarianResponse schema objects""" + + def __init__(self): + self.doc_metadata_translator = DocumentMetadataTranslator() + self.proc_metadata_translator = ProcessingMetadataTranslator() + + def to_pulsar(self, data: Dict[str, Any]) -> LibrarianResponse: + raise NotImplementedError("Response translation to Pulsar not typically needed") + + def from_pulsar(self, obj: LibrarianResponse) -> Dict[str, Any]: + result = {} + + if obj.document_metadata: + result["document-metadata"] = self.doc_metadata_translator.from_pulsar(obj.document_metadata) + + if obj.content: + result["content"] = obj.content.decode("utf-8") if isinstance(obj.content, bytes) else obj.content + + if obj.document_metadatas: + result["document-metadatas"] = [ + self.doc_metadata_translator.from_pulsar(dm) + for dm in obj.document_metadatas + ] + + if obj.processing_metadatas: + result["processing-metadatas"] = [ + self.proc_metadata_translator.from_pulsar(pm) + for pm in obj.processing_metadatas + ] + + return result + + def from_response_with_completion(self, obj: LibrarianResponse) -> Tuple[Dict[str, Any], bool]: + """Returns (response_dict, is_final)""" + return self.from_pulsar(obj), True \ No newline at end of file diff --git a/trustgraph-base/trustgraph/messaging/translators/metadata.py b/trustgraph-base/trustgraph/messaging/translators/metadata.py new file mode 100644 index 00000000..218a02bb --- /dev/null +++ b/trustgraph-base/trustgraph/messaging/translators/metadata.py @@ -0,0 +1,81 @@ +from typing import Dict, Any, Optional +from ...schema import DocumentMetadata, ProcessingMetadata +from .base import Translator +from .primitives import SubgraphTranslator + + +class DocumentMetadataTranslator(Translator): + """Translator for DocumentMetadata schema objects""" + + def __init__(self): + self.subgraph_translator = SubgraphTranslator() + + def to_pulsar(self, data: Dict[str, Any]) -> DocumentMetadata: + metadata = data.get("metadata", []) + return DocumentMetadata( + id=data.get("id"), + time=data.get("time"), + kind=data.get("kind"), + title=data.get("title"), + comments=data.get("comments"), + metadata=self.subgraph_translator.to_pulsar(metadata) if metadata else [], + user=data.get("user"), + tags=data.get("tags") + ) + + def from_pulsar(self, obj: DocumentMetadata) -> Dict[str, Any]: + result = {} + + if obj.id: + result["id"] = obj.id + if obj.time: + result["time"] = obj.time + if obj.kind: + result["kind"] = obj.kind + if obj.title: + result["title"] = obj.title + if obj.comments: + result["comments"] = obj.comments + if obj.metadata: + result["metadata"] = self.subgraph_translator.from_pulsar(obj.metadata) + if obj.user: + result["user"] = obj.user + if obj.tags is not None: + result["tags"] = obj.tags + + return result + + +class ProcessingMetadataTranslator(Translator): + """Translator for ProcessingMetadata schema objects""" + + def to_pulsar(self, data: Dict[str, Any]) -> ProcessingMetadata: + return ProcessingMetadata( + id=data.get("id"), + document_id=data.get("document-id"), + time=data.get("time"), + flow=data.get("flow"), + user=data.get("user"), + collection=data.get("collection"), + tags=data.get("tags") + ) + + def from_pulsar(self, obj: ProcessingMetadata) -> Dict[str, Any]: + result = {} + + if obj.id: + result["id"] = obj.id + if obj.document_id: + result["document-id"] = obj.document_id + if obj.time: + result["time"] = obj.time + if obj.flow: + result["flow"] = obj.flow + if obj.user: + result["user"] = obj.user + if obj.collection: + result["collection"] = obj.collection + if obj.tags is not None: + result["tags"] = obj.tags + + return result \ No newline at end of file diff --git a/trustgraph-base/trustgraph/messaging/translators/primitives.py b/trustgraph-base/trustgraph/messaging/translators/primitives.py new file mode 100644 index 00000000..6b57aec4 --- /dev/null +++ b/trustgraph-base/trustgraph/messaging/translators/primitives.py @@ -0,0 +1,47 @@ +from typing import Dict, Any, List +from ...schema import Value, Triple +from .base import Translator + + +class ValueTranslator(Translator): + """Translator for Value schema objects""" + + def to_pulsar(self, data: Dict[str, Any]) -> Value: + return Value(value=data["v"], is_uri=data["e"]) + + def from_pulsar(self, obj: Value) -> Dict[str, Any]: + return {"v": obj.value, "e": obj.is_uri} + + +class TripleTranslator(Translator): + """Translator for Triple schema objects""" + + def __init__(self): + self.value_translator = ValueTranslator() + + def to_pulsar(self, data: Dict[str, Any]) -> Triple: + return Triple( + s=self.value_translator.to_pulsar(data["s"]), + p=self.value_translator.to_pulsar(data["p"]), + o=self.value_translator.to_pulsar(data["o"]) + ) + + def from_pulsar(self, obj: Triple) -> Dict[str, Any]: + return { + "s": self.value_translator.from_pulsar(obj.s), + "p": self.value_translator.from_pulsar(obj.p), + "o": self.value_translator.from_pulsar(obj.o) + } + + +class SubgraphTranslator(Translator): + """Translator for lists of Triple objects (subgraphs)""" + + def __init__(self): + self.triple_translator = TripleTranslator() + + def to_pulsar(self, data: List[Dict[str, Any]]) -> List[Triple]: + return [self.triple_translator.to_pulsar(t) for t in data] + + def from_pulsar(self, obj: List[Triple]) -> List[Dict[str, Any]]: + return [self.triple_translator.from_pulsar(t) for t in obj] \ No newline at end of file diff --git a/trustgraph-base/trustgraph/messaging/translators/prompt.py b/trustgraph-base/trustgraph/messaging/translators/prompt.py new file mode 100644 index 00000000..b0e7351f --- /dev/null +++ b/trustgraph-base/trustgraph/messaging/translators/prompt.py @@ -0,0 +1,54 @@ +import json +from typing import Dict, Any, Tuple +from ...schema import PromptRequest, PromptResponse +from .base import MessageTranslator + + +class PromptRequestTranslator(MessageTranslator): + """Translator for PromptRequest schema objects""" + + def to_pulsar(self, data: Dict[str, Any]) -> PromptRequest: + # Handle both "terms" and "variables" input keys + terms = data.get("terms", {}) + if "variables" in data: + # Convert variables to JSON strings as expected by the service + terms = { + k: json.dumps(v) + for k, v in data["variables"].items() + } + + return PromptRequest( + id=data.get("id"), + terms=terms + ) + + def from_pulsar(self, obj: PromptRequest) -> Dict[str, Any]: + result = {} + + if obj.id: + result["id"] = obj.id + if obj.terms: + result["terms"] = obj.terms + + return result + + +class PromptResponseTranslator(MessageTranslator): + """Translator for PromptResponse schema objects""" + + def to_pulsar(self, data: Dict[str, Any]) -> PromptResponse: + raise NotImplementedError("Response translation to Pulsar not typically needed") + + def from_pulsar(self, obj: PromptResponse) -> Dict[str, Any]: + result = {} + + if obj.text: + result["text"] = obj.text + if obj.object: + result["object"] = obj.object + + return result + + def from_response_with_completion(self, obj: PromptResponse) -> Tuple[Dict[str, Any], bool]: + """Returns (response_dict, is_final)""" + return self.from_pulsar(obj), True \ No newline at end of file diff --git a/trustgraph-base/trustgraph/messaging/translators/retrieval.py b/trustgraph-base/trustgraph/messaging/translators/retrieval.py new file mode 100644 index 00000000..96c25ed8 --- /dev/null +++ b/trustgraph-base/trustgraph/messaging/translators/retrieval.py @@ -0,0 +1,81 @@ +from typing import Dict, Any, Tuple +from ...schema import DocumentRagQuery, DocumentRagResponse, GraphRagQuery, GraphRagResponse +from .base import MessageTranslator + + +class DocumentRagRequestTranslator(MessageTranslator): + """Translator for DocumentRagQuery schema objects""" + + def to_pulsar(self, data: Dict[str, Any]) -> DocumentRagQuery: + return DocumentRagQuery( + query=data["query"], + user=data.get("user", "trustgraph"), + collection=data.get("collection", "default"), + doc_limit=int(data.get("doc-limit", 20)) + ) + + def from_pulsar(self, obj: DocumentRagQuery) -> Dict[str, Any]: + return { + "query": obj.query, + "user": obj.user, + "collection": obj.collection, + "doc-limit": obj.doc_limit + } + + +class DocumentRagResponseTranslator(MessageTranslator): + """Translator for DocumentRagResponse schema objects""" + + def to_pulsar(self, data: Dict[str, Any]) -> DocumentRagResponse: + raise NotImplementedError("Response translation to Pulsar not typically needed") + + def from_pulsar(self, obj: DocumentRagResponse) -> Dict[str, Any]: + return { + "response": obj.response + } + + def from_response_with_completion(self, obj: DocumentRagResponse) -> Tuple[Dict[str, Any], bool]: + """Returns (response_dict, is_final)""" + return self.from_pulsar(obj), True + + +class GraphRagRequestTranslator(MessageTranslator): + """Translator for GraphRagQuery schema objects""" + + def to_pulsar(self, data: Dict[str, Any]) -> GraphRagQuery: + return GraphRagQuery( + query=data["query"], + user=data.get("user", "trustgraph"), + collection=data.get("collection", "default"), + entity_limit=int(data.get("entity-limit", 50)), + triple_limit=int(data.get("triple-limit", 30)), + max_subgraph_size=int(data.get("max-subgraph-size", 1000)), + max_path_length=int(data.get("max-path-length", 2)) + ) + + def from_pulsar(self, obj: GraphRagQuery) -> Dict[str, Any]: + return { + "query": obj.query, + "user": obj.user, + "collection": obj.collection, + "entity-limit": obj.entity_limit, + "triple-limit": obj.triple_limit, + "max-subgraph-size": obj.max_subgraph_size, + "max-path-length": obj.max_path_length + } + + +class GraphRagResponseTranslator(MessageTranslator): + """Translator for GraphRagResponse schema objects""" + + def to_pulsar(self, data: Dict[str, Any]) -> GraphRagResponse: + raise NotImplementedError("Response translation to Pulsar not typically needed") + + def from_pulsar(self, obj: GraphRagResponse) -> Dict[str, Any]: + return { + "response": obj.response + } + + def from_response_with_completion(self, obj: GraphRagResponse) -> Tuple[Dict[str, Any], bool]: + """Returns (response_dict, is_final)""" + return self.from_pulsar(obj), True \ No newline at end of file diff --git a/trustgraph-base/trustgraph/messaging/translators/text_completion.py b/trustgraph-base/trustgraph/messaging/translators/text_completion.py new file mode 100644 index 00000000..eda3be5d --- /dev/null +++ b/trustgraph-base/trustgraph/messaging/translators/text_completion.py @@ -0,0 +1,42 @@ +from typing import Dict, Any, Tuple +from ...schema import TextCompletionRequest, TextCompletionResponse +from .base import MessageTranslator + + +class TextCompletionRequestTranslator(MessageTranslator): + """Translator for TextCompletionRequest schema objects""" + + def to_pulsar(self, data: Dict[str, Any]) -> TextCompletionRequest: + return TextCompletionRequest( + system=data["system"], + prompt=data["prompt"] + ) + + def from_pulsar(self, obj: TextCompletionRequest) -> Dict[str, Any]: + return { + "system": obj.system, + "prompt": obj.prompt + } + + +class TextCompletionResponseTranslator(MessageTranslator): + """Translator for TextCompletionResponse schema objects""" + + def to_pulsar(self, data: Dict[str, Any]) -> TextCompletionResponse: + raise NotImplementedError("Response translation to Pulsar not typically needed") + + def from_pulsar(self, obj: TextCompletionResponse) -> Dict[str, Any]: + result = {"response": obj.response} + + if obj.in_token: + result["in_token"] = obj.in_token + if obj.out_token: + result["out_token"] = obj.out_token + if obj.model: + result["model"] = obj.model + + return result + + def from_response_with_completion(self, obj: TextCompletionResponse) -> Tuple[Dict[str, Any], bool]: + """Returns (response_dict, is_final)""" + return self.from_pulsar(obj), True \ No newline at end of file diff --git a/trustgraph-base/trustgraph/messaging/translators/triples.py b/trustgraph-base/trustgraph/messaging/translators/triples.py new file mode 100644 index 00000000..1c08625b --- /dev/null +++ b/trustgraph-base/trustgraph/messaging/translators/triples.py @@ -0,0 +1,60 @@ +from typing import Dict, Any, Tuple, Optional +from ...schema import TriplesQueryRequest, TriplesQueryResponse +from .base import MessageTranslator +from .primitives import ValueTranslator, SubgraphTranslator + + +class TriplesQueryRequestTranslator(MessageTranslator): + """Translator for TriplesQueryRequest schema objects""" + + def __init__(self): + self.value_translator = ValueTranslator() + + def to_pulsar(self, data: Dict[str, Any]) -> TriplesQueryRequest: + s = self.value_translator.to_pulsar(data["s"]) if "s" in data else None + p = self.value_translator.to_pulsar(data["p"]) if "p" in data else None + o = self.value_translator.to_pulsar(data["o"]) if "o" in data else None + + return TriplesQueryRequest( + s=s, + p=p, + o=o, + limit=int(data.get("limit", 10000)), + user=data.get("user", "trustgraph"), + collection=data.get("collection", "default") + ) + + def from_pulsar(self, obj: TriplesQueryRequest) -> Dict[str, Any]: + result = { + "limit": obj.limit, + "user": obj.user, + "collection": obj.collection + } + + if obj.s: + result["s"] = self.value_translator.from_pulsar(obj.s) + if obj.p: + result["p"] = self.value_translator.from_pulsar(obj.p) + if obj.o: + result["o"] = self.value_translator.from_pulsar(obj.o) + + return result + + +class TriplesQueryResponseTranslator(MessageTranslator): + """Translator for TriplesQueryResponse schema objects""" + + def __init__(self): + self.subgraph_translator = SubgraphTranslator() + + def to_pulsar(self, data: Dict[str, Any]) -> TriplesQueryResponse: + raise NotImplementedError("Response translation to Pulsar not typically needed") + + def from_pulsar(self, obj: TriplesQueryResponse) -> Dict[str, Any]: + return { + "response": self.subgraph_translator.from_pulsar(obj.triples) + } + + def from_response_with_completion(self, obj: TriplesQueryResponse) -> Tuple[Dict[str, Any], bool]: + """Returns (response_dict, is_final)""" + return self.from_pulsar(obj), True \ No newline at end of file diff --git a/trustgraph-flow/trustgraph/gateway/dispatch/agent.py b/trustgraph-flow/trustgraph/gateway/dispatch/agent.py index d0fd8537..1a5e8299 100644 --- a/trustgraph-flow/trustgraph/gateway/dispatch/agent.py +++ b/trustgraph-flow/trustgraph/gateway/dispatch/agent.py @@ -1,5 +1,6 @@ from ... schema import AgentRequest, AgentResponse +from ... messaging import TranslatorRegistry from . requestor import ServiceRequestor @@ -20,24 +21,12 @@ class AgentRequestor(ServiceRequestor): timeout=timeout, ) + self.request_translator = TranslatorRegistry.get_request_translator("agent") + self.response_translator = TranslatorRegistry.get_response_translator("agent") + def to_request(self, body): - return AgentRequest( - question=body["question"] - ) + return self.request_translator.to_pulsar(body) def from_response(self, message): - resp = { - } - - if message.answer: - resp["answer"] = message.answer - - if message.thought: - resp["thought"] = message.thought - - if message.observation: - resp["observation"] = message.observation - - # The 2nd boolean expression indicates whether we're done responding - return resp, (message.answer is not None) + return self.response_translator.from_response_with_completion(message) diff --git a/trustgraph-flow/trustgraph/gateway/dispatch/config.py b/trustgraph-flow/trustgraph/gateway/dispatch/config.py index 3aeedb6f..c4fac5fa 100644 --- a/trustgraph-flow/trustgraph/gateway/dispatch/config.py +++ b/trustgraph-flow/trustgraph/gateway/dispatch/config.py @@ -2,6 +2,7 @@ from ... schema import ConfigRequest, ConfigResponse, ConfigKey, ConfigValue from ... schema import config_request_queue from ... schema import config_response_queue +from ... messaging import TranslatorRegistry from . requestor import ServiceRequestor @@ -19,60 +20,12 @@ class ConfigRequestor(ServiceRequestor): timeout=timeout, ) + self.request_translator = TranslatorRegistry.get_request_translator("config") + self.response_translator = TranslatorRegistry.get_response_translator("config") + def to_request(self, body): - - if "keys" in body: - keys = [ - ConfigKey( - type = k["type"], - key = k["key"], - ) - for k in body["keys"] - ] - else: - keys = None - - if "values" in body: - values = [ - ConfigValue( - type = v["type"], - key = v["key"], - value = v["value"], - ) - for v in body["values"] - ] - else: - values = None - - return ConfigRequest( - operation = body.get("operation", None), - keys = keys, - type = body.get("type", None), - values = values - ) + return self.request_translator.to_pulsar(body) def from_response(self, message): - - response = { } - - if message.version is not None: - response["version"] = message.version - - if message.values is not None: - response["values"] = [ - { - "type": v.type, - "key": v.key, - "value": v.value, - } - for v in message.values - ] - - if message.directory is not None: - response["directory"] = message.directory - - if message.config is not None: - response["config"] = message.config - - return response, True + return self.response_translator.from_response_with_completion(message) diff --git a/trustgraph-flow/trustgraph/gateway/dispatch/document_embeddings_import.py b/trustgraph-flow/trustgraph/gateway/dispatch/document_embeddings_import.py index e486f613..dd4fc4e1 100644 --- a/trustgraph-flow/trustgraph/gateway/dispatch/document_embeddings_import.py +++ b/trustgraph-flow/trustgraph/gateway/dispatch/document_embeddings_import.py @@ -6,8 +6,7 @@ from aiohttp import WSMsgType from ... schema import Metadata from ... schema import DocumentEmbeddings, ChunkEmbeddings from ... base import Publisher - -from . serialize import to_subgraph +from ... messaging.translators.document_loading import DocumentEmbeddingsTranslator class DocumentEmbeddingsImport: @@ -17,6 +16,7 @@ class DocumentEmbeddingsImport: self.ws = ws self.running = running + self.translator = DocumentEmbeddingsTranslator() self.publisher = Publisher( pulsar_client, topic = queue, schema = DocumentEmbeddings @@ -36,23 +36,7 @@ class DocumentEmbeddingsImport: async def receive(self, msg): data = msg.json() - - elt = DocumentEmbeddings( - metadata=Metadata( - id=data["metadata"]["id"], - metadata=to_subgraph(data["metadata"]["metadata"]), - user=data["metadata"]["user"], - collection=data["metadata"]["collection"], - ), - chunks=[ - ChunkEmbeddings( - chunk=de["chunk"].encode("utf-8"), - vectors=de["vectors"], - ) - for de in data["chunks"] - ], - ) - + elt = self.translator.to_pulsar(data) await self.publisher.send(None, elt) async def run(self): diff --git a/trustgraph-flow/trustgraph/gateway/dispatch/document_load.py b/trustgraph-flow/trustgraph/gateway/dispatch/document_load.py index f92fc34f..101e9b41 100644 --- a/trustgraph-flow/trustgraph/gateway/dispatch/document_load.py +++ b/trustgraph-flow/trustgraph/gateway/dispatch/document_load.py @@ -2,9 +2,9 @@ import base64 from ... schema import Document, Metadata +from ... messaging import TranslatorRegistry from . sender import ServiceSender -from . serialize import to_subgraph class DocumentLoad(ServiceSender): def __init__(self, pulsar_client, queue): @@ -15,26 +15,9 @@ class DocumentLoad(ServiceSender): schema = Document, ) + self.translator = TranslatorRegistry.get_request_translator("document") + def to_request(self, body): - - if "metadata" in body: - metadata = to_subgraph(body["metadata"]) - else: - metadata = [] - - # Doing a base64 decoe/encode here to make sure the - # content is valid base64 - doc = base64.b64decode(body["data"]) - print("Document received") - - return Document( - metadata=Metadata( - id=body.get("id"), - metadata=metadata, - user=body.get("user", "trustgraph"), - collection=body.get("collection", "default"), - ), - data=base64.b64encode(doc).decode("utf-8") - ) + return self.translator.to_pulsar(body) diff --git a/trustgraph-flow/trustgraph/gateway/dispatch/document_rag.py b/trustgraph-flow/trustgraph/gateway/dispatch/document_rag.py index 29194f97..a7f3634e 100644 --- a/trustgraph-flow/trustgraph/gateway/dispatch/document_rag.py +++ b/trustgraph-flow/trustgraph/gateway/dispatch/document_rag.py @@ -1,5 +1,6 @@ from ... schema import DocumentRagQuery, DocumentRagResponse +from ... messaging import TranslatorRegistry from . requestor import ServiceRequestor @@ -20,14 +21,12 @@ class DocumentRagRequestor(ServiceRequestor): timeout=timeout, ) + self.request_translator = TranslatorRegistry.get_request_translator("document-rag") + self.response_translator = TranslatorRegistry.get_response_translator("document-rag") + def to_request(self, body): - return DocumentRagQuery( - query=body["query"], - user=body.get("user", "trustgraph"), - collection=body.get("collection", "default"), - doc_limit=int(body.get("doc-limit", 20)), - ) + return self.request_translator.to_pulsar(body) def from_response(self, message): - return { "response": message.response }, True + return self.response_translator.from_response_with_completion(message) diff --git a/trustgraph-flow/trustgraph/gateway/dispatch/embeddings.py b/trustgraph-flow/trustgraph/gateway/dispatch/embeddings.py index 4549942e..47146e57 100644 --- a/trustgraph-flow/trustgraph/gateway/dispatch/embeddings.py +++ b/trustgraph-flow/trustgraph/gateway/dispatch/embeddings.py @@ -1,5 +1,6 @@ from ... schema import EmbeddingsRequest, EmbeddingsResponse +from ... messaging import TranslatorRegistry from . requestor import ServiceRequestor @@ -20,11 +21,12 @@ class EmbeddingsRequestor(ServiceRequestor): timeout=timeout, ) + self.request_translator = TranslatorRegistry.get_request_translator("embeddings") + self.response_translator = TranslatorRegistry.get_response_translator("embeddings") + def to_request(self, body): - return EmbeddingsRequest( - text=body["text"] - ) + return self.request_translator.to_pulsar(body) def from_response(self, message): - return { "vectors": message.vectors }, True + return self.response_translator.from_response_with_completion(message) diff --git a/trustgraph-flow/trustgraph/gateway/dispatch/flow.py b/trustgraph-flow/trustgraph/gateway/dispatch/flow.py index 0b38e9be..30f8d45e 100644 --- a/trustgraph-flow/trustgraph/gateway/dispatch/flow.py +++ b/trustgraph-flow/trustgraph/gateway/dispatch/flow.py @@ -2,6 +2,7 @@ from ... schema import FlowRequest, FlowResponse from ... schema import flow_request_queue from ... schema import flow_response_queue +from ... messaging import TranslatorRegistry from . requestor import ServiceRequestor @@ -19,34 +20,12 @@ class FlowRequestor(ServiceRequestor): timeout=timeout, ) - def to_request(self, body): + self.request_translator = TranslatorRegistry.get_request_translator("flow") + self.response_translator = TranslatorRegistry.get_response_translator("flow") - return FlowRequest( - operation = body.get("operation", None), - class_name = body.get("class-name", None), - class_definition = body.get("class-definition", None), - description = body.get("description", None), - flow_id = body.get("flow-id", None), - ) + def to_request(self, body): + return self.request_translator.to_pulsar(body) def from_response(self, message): - - response = { } - - if message.class_names is not None: - response["class-names"] = message.class_names - - if message.flow_ids is not None: - response["flow-ids"] = message.flow_ids - - if message.class_definition is not None: - response["class-definition"] = message.class_definition - - if message.flow is not None: - response["flow"] = message.flow - - if message.description is not None: - response["description"] = message.description - - return response, True + return self.response_translator.from_response_with_completion(message) diff --git a/trustgraph-flow/trustgraph/gateway/dispatch/graph_embeddings_query.py b/trustgraph-flow/trustgraph/gateway/dispatch/graph_embeddings_query.py index 27ceb702..f5be06fb 100644 --- a/trustgraph-flow/trustgraph/gateway/dispatch/graph_embeddings_query.py +++ b/trustgraph-flow/trustgraph/gateway/dispatch/graph_embeddings_query.py @@ -1,8 +1,8 @@ from ... schema import GraphEmbeddingsRequest, GraphEmbeddingsResponse +from ... messaging import TranslatorRegistry from . requestor import ServiceRequestor -from . serialize import serialize_value class GraphEmbeddingsQueryRequestor(ServiceRequestor): def __init__( @@ -21,22 +21,12 @@ class GraphEmbeddingsQueryRequestor(ServiceRequestor): timeout=timeout, ) + self.request_translator = TranslatorRegistry.get_request_translator("graph-embeddings-query") + self.response_translator = TranslatorRegistry.get_response_translator("graph-embeddings-query") + def to_request(self, body): - - limit = int(body.get("limit", 20)) - - return GraphEmbeddingsRequest( - vectors = body["vectors"], - limit = limit, - user = body.get("user", "trustgraph"), - collection = body.get("collection", "default"), - ) + return self.request_translator.to_pulsar(body) def from_response(self, message): - - return { - "entities": [ - serialize_value(ent) for ent in message.entities - ] - }, True + return self.response_translator.from_response_with_completion(message) diff --git a/trustgraph-flow/trustgraph/gateway/dispatch/graph_rag.py b/trustgraph-flow/trustgraph/gateway/dispatch/graph_rag.py index a31795b9..a15a1aee 100644 --- a/trustgraph-flow/trustgraph/gateway/dispatch/graph_rag.py +++ b/trustgraph-flow/trustgraph/gateway/dispatch/graph_rag.py @@ -1,5 +1,6 @@ from ... schema import GraphRagQuery, GraphRagResponse +from ... messaging import TranslatorRegistry from . requestor import ServiceRequestor @@ -20,17 +21,12 @@ class GraphRagRequestor(ServiceRequestor): timeout=timeout, ) + self.request_translator = TranslatorRegistry.get_request_translator("graph-rag") + self.response_translator = TranslatorRegistry.get_response_translator("graph-rag") + def to_request(self, body): - return GraphRagQuery( - query=body["query"], - user=body.get("user", "trustgraph"), - collection=body.get("collection", "default"), - entity_limit=int(body.get("entity-limit", 50)), - triple_limit=int(body.get("triple-limit", 30)), - max_subgraph_size=int(body.get("max-subgraph-size", 1000)), - max_path_length=int(body.get("max-path-length", 2)), - ) + return self.request_translator.to_pulsar(body) def from_response(self, message): - return { "response": message.response }, True + return self.response_translator.from_response_with_completion(message) diff --git a/trustgraph-flow/trustgraph/gateway/dispatch/knowledge.py b/trustgraph-flow/trustgraph/gateway/dispatch/knowledge.py index a35ee4f0..950b3430 100644 --- a/trustgraph-flow/trustgraph/gateway/dispatch/knowledge.py +++ b/trustgraph-flow/trustgraph/gateway/dispatch/knowledge.py @@ -5,11 +5,9 @@ from ... schema import KnowledgeRequest, KnowledgeResponse, Triples from ... schema import GraphEmbeddings, Metadata, EntityEmbeddings from ... schema import knowledge_request_queue from ... schema import knowledge_response_queue +from ... messaging import TranslatorRegistry from . requestor import ServiceRequestor -from . serialize import serialize_graph_embeddings -from . serialize import serialize_triples, to_subgraph, to_value -from . serialize import to_document_metadata, to_processing_metadata class KnowledgeRequestor(ServiceRequestor): def __init__(self, pulsar_client, consumer, subscriber, timeout=120): @@ -25,73 +23,12 @@ class KnowledgeRequestor(ServiceRequestor): timeout=timeout, ) + self.request_translator = TranslatorRegistry.get_request_translator("knowledge") + self.response_translator = TranslatorRegistry.get_response_translator("knowledge") + def to_request(self, body): - - if "triples" in body: - triples = Triples( - metadata=Metadata( - id = body["triples"]["metadata"]["id"], - metadata = to_subgraph(body["triples"]["metadata"]["metadata"]), - user = body["triples"]["metadata"]["user"], - ), - triples = to_subgraph(body["triples"]["triples"]), - ) - else: - triples = None - - if "graph-embeddings" in body: - ge = GraphEmbeddings( - metadata = Metadata( - id = body["graph-embeddings"]["metadata"]["id"], - metadata = to_subgraph(body["graph-embeddings"]["metadata"]["metadata"]), - user = body["graph-embeddings"]["metadata"]["user"], - ), - entities=[ - EntityEmbeddings( - entity = to_value(ent["entity"]), - vectors = ent["vectors"], - ) - for ent in body["graph-embeddings"]["entities"] - ] - ) - else: - ge = None - - return KnowledgeRequest( - operation = body.get("operation", None), - user = body.get("user", None), - id = body.get("id", None), - flow = body.get("flow", None), - collection = body.get("collection", None), - triples = triples, - graph_embeddings = ge, - ) + return self.request_translator.to_pulsar(body) def from_response(self, message): - - # Response to list, - if message.ids is not None: - return { - "ids": message.ids - }, True - - if message.triples: - return { - "triples": serialize_triples(message.triples) - }, False - - if message.graph_embeddings: - return { - "graph-embeddings": serialize_graph_embeddings( - message.graph_embeddings - ) - }, False - - if message.eos is True: - return { - "eos": True - }, True - - # Empty case, return from successful delete. - return {}, True + return self.response_translator.from_response_with_completion(message) diff --git a/trustgraph-flow/trustgraph/gateway/dispatch/librarian.py b/trustgraph-flow/trustgraph/gateway/dispatch/librarian.py index 364ba1c2..2155aa5d 100644 --- a/trustgraph-flow/trustgraph/gateway/dispatch/librarian.py +++ b/trustgraph-flow/trustgraph/gateway/dispatch/librarian.py @@ -4,12 +4,9 @@ import base64 from ... schema import LibrarianRequest, LibrarianResponse from ... schema import librarian_request_queue from ... schema import librarian_response_queue +from ... messaging import TranslatorRegistry from . requestor import ServiceRequestor -from . serialize import serialize_document_metadata -from . serialize import serialize_processing_metadata -from . serialize import to_document_metadata, to_processing_metadata -from . serialize import to_criteria class LibrarianRequestor(ServiceRequestor): def __init__(self, pulsar_client, consumer, subscriber, timeout=120): @@ -25,67 +22,20 @@ class LibrarianRequestor(ServiceRequestor): timeout=timeout, ) + self.request_translator = TranslatorRegistry.get_request_translator("librarian") + self.response_translator = TranslatorRegistry.get_response_translator("librarian") + def to_request(self, body): - - # Content gets base64 decoded & encoded again. It at least makes - # sure payload is valid base64. - - if "document-metadata" in body: - dm = to_document_metadata(body["document-metadata"]) - else: - dm = None - - if "processing-metadata" in body: - pm = to_processing_metadata(body["processing-metadata"]) - else: - pm = None - - if "criteria" in body: - criteria = to_criteria(body["criteria"]) - else: - criteria = None - + # Handle base64 content processing if "content" in body: + # Content gets base64 decoded & encoded again to ensure valid base64 content = base64.b64decode(body["content"].encode("utf-8")) content = base64.b64encode(content).decode("utf-8") - else: - content = None - - return LibrarianRequest( - operation = body.get("operation", None), - document_id = body.get("document-id", None), - processing_id = body.get("processing-id", None), - document_metadata = dm, - processing_metadata = pm, - content = content, - user = body.get("user", None), - collection = body.get("collection", None), - criteria = criteria, - ) + body = body.copy() + body["content"] = content + + return self.request_translator.to_pulsar(body) def from_response(self, message): - - response = {} - - if message.document_metadata: - response["document-metadata"] = serialize_document_metadata( - message.document_metadata - ) - - if message.content: - response["content"] = message.content.decode("utf-8") - - if message.document_metadatas != None: - response["document-metadatas"] = [ - serialize_document_metadata(v) - for v in message.document_metadatas - ] - - if message.processing_metadatas != None: - response["processing-metadatas"] = [ - serialize_processing_metadata(v) - for v in message.processing_metadatas - ] - - return response, True + return self.response_translator.from_response_with_completion(message) diff --git a/trustgraph-flow/trustgraph/gateway/dispatch/prompt.py b/trustgraph-flow/trustgraph/gateway/dispatch/prompt.py index 496d01e5..5c316cf6 100644 --- a/trustgraph-flow/trustgraph/gateway/dispatch/prompt.py +++ b/trustgraph-flow/trustgraph/gateway/dispatch/prompt.py @@ -2,6 +2,7 @@ import json from ... schema import PromptRequest, PromptResponse +from ... messaging import TranslatorRegistry from . requestor import ServiceRequestor @@ -22,22 +23,12 @@ class PromptRequestor(ServiceRequestor): timeout=timeout, ) + self.request_translator = TranslatorRegistry.get_request_translator("prompt") + self.response_translator = TranslatorRegistry.get_response_translator("prompt") + def to_request(self, body): - return PromptRequest( - id=body["id"], - terms={ - k: json.dumps(v) - for k, v in body["variables"].items() - } - ) + return self.request_translator.to_pulsar(body) def from_response(self, message): - if message.object: - return { - "object": message.object - }, True - else: - return { - "text": message.text - }, True + return self.response_translator.from_response_with_completion(message) diff --git a/trustgraph-flow/trustgraph/gateway/dispatch/serialize.py b/trustgraph-flow/trustgraph/gateway/dispatch/serialize.py index bde3553a..653ecfd9 100644 --- a/trustgraph-flow/trustgraph/gateway/dispatch/serialize.py +++ b/trustgraph-flow/trustgraph/gateway/dispatch/serialize.py @@ -3,6 +3,13 @@ import base64 from ... schema import Value, Triple, DocumentMetadata, ProcessingMetadata +# DEPRECATED: These functions have been moved to trustgraph.... messaging.translators +# Use the new messaging translation system instead for consistency and reusability. +# Examples: +# from trustgraph.... messaging.translators.primitives import ValueTranslator +# value_translator = ValueTranslator() +# pulsar_value = value_translator.to_pulsar({"v": "example", "e": True}) + def to_value(x): return Value(value=x["v"], is_uri=x["e"]) diff --git a/trustgraph-flow/trustgraph/gateway/dispatch/text_completion.py b/trustgraph-flow/trustgraph/gateway/dispatch/text_completion.py index 40ae7616..d29d1918 100644 --- a/trustgraph-flow/trustgraph/gateway/dispatch/text_completion.py +++ b/trustgraph-flow/trustgraph/gateway/dispatch/text_completion.py @@ -1,5 +1,6 @@ from ... schema import TextCompletionRequest, TextCompletionResponse +from ... messaging import TranslatorRegistry from . requestor import ServiceRequestor @@ -20,12 +21,12 @@ class TextCompletionRequestor(ServiceRequestor): timeout=timeout, ) + self.request_translator = TranslatorRegistry.get_request_translator("text-completion") + self.response_translator = TranslatorRegistry.get_response_translator("text-completion") + def to_request(self, body): - return TextCompletionRequest( - system=body["system"], - prompt=body["prompt"] - ) + return self.request_translator.to_pulsar(body) def from_response(self, message): - return { "response": message.response }, True + return self.response_translator.from_response_with_completion(message) diff --git a/trustgraph-flow/trustgraph/gateway/dispatch/text_load.py b/trustgraph-flow/trustgraph/gateway/dispatch/text_load.py index 53ea7452..8f30c8de 100644 --- a/trustgraph-flow/trustgraph/gateway/dispatch/text_load.py +++ b/trustgraph-flow/trustgraph/gateway/dispatch/text_load.py @@ -2,9 +2,9 @@ import base64 from ... schema import TextDocument, Metadata +from ... messaging import TranslatorRegistry from . sender import ServiceSender -from . serialize import to_subgraph class TextLoad(ServiceSender): def __init__(self, pulsar_client, queue): @@ -15,30 +15,9 @@ class TextLoad(ServiceSender): schema = TextDocument, ) + self.translator = TranslatorRegistry.get_request_translator("text-document") + def to_request(self, body): - - if "metadata" in body: - metadata = to_subgraph(body["metadata"]) - else: - metadata = [] - - if "charset" in body: - charset = body["charset"] - else: - charset = "utf-8" - - # Text is base64 encoded - text = base64.b64decode(body["text"]).decode(charset) - print("Text document received") - - return TextDocument( - metadata=Metadata( - id=body.get("id"), - metadata=metadata, - user=body.get("user", "trustgraph"), - collection=body.get("collection", "default"), - ), - text=text, - ) + return self.translator.to_pulsar(body) diff --git a/trustgraph-flow/trustgraph/gateway/dispatch/triples_query.py b/trustgraph-flow/trustgraph/gateway/dispatch/triples_query.py index 7c3a5fc9..d2def9c1 100644 --- a/trustgraph-flow/trustgraph/gateway/dispatch/triples_query.py +++ b/trustgraph-flow/trustgraph/gateway/dispatch/triples_query.py @@ -1,8 +1,8 @@ from ... schema import TriplesQueryRequest, TriplesQueryResponse, Triples +from ... messaging import TranslatorRegistry from . requestor import ServiceRequestor -from . serialize import to_value, serialize_subgraph class TriplesQueryRequestor(ServiceRequestor): def __init__( @@ -21,34 +21,12 @@ class TriplesQueryRequestor(ServiceRequestor): timeout=timeout, ) + self.request_translator = TranslatorRegistry.get_request_translator("triples-query") + self.response_translator = TranslatorRegistry.get_response_translator("triples-query") + def to_request(self, body): - - if "s" in body: - s = to_value(body["s"]) - else: - s = None - - if "p" in body: - p = to_value(body["p"]) - else: - p = None - - if "o" in body: - o = to_value(body["o"]) - else: - o = None - - limit = int(body.get("limit", 10000)) - - return TriplesQueryRequest( - s = s, p = p, o = o, - limit = limit, - user = body.get("user", "trustgraph"), - collection = body.get("collection", "default"), - ) + return self.request_translator.to_pulsar(body) def from_response(self, message): - return { - "response": serialize_subgraph(message.triples) - }, True + return self.response_translator.from_response_with_completion(message)