diff --git a/trustgraph-base/trustgraph/api/async_bulk_client.py b/trustgraph-base/trustgraph/api/async_bulk_client.py index 76cb9f56..fcc3d7b0 100644 --- a/trustgraph-base/trustgraph/api/async_bulk_client.py +++ b/trustgraph-base/trustgraph/api/async_bulk_client.py @@ -27,16 +27,26 @@ class AsyncBulkClient: async def import_triples(self, flow: str, triples: AsyncIterator[Triple], **kwargs: Any) -> None: """Bulk import triples via WebSocket""" + metadata = kwargs.get('metadata') ws_url = f"{self.url}/api/v1/flow/{flow}/import/triples" if self.token: ws_url = f"{ws_url}?token={self.token}" async with websockets.connect(ws_url, ping_interval=20, ping_timeout=self.timeout) as websocket: async for triple in triples: + # Format in Value format expected by gateway + s_is_uri = getattr(triple, 's_is_uri', True) + p_is_uri = getattr(triple, 'p_is_uri', True) + o_is_uri = getattr(triple, 'o_is_uri', True) message = { - "s": triple.s, - "p": triple.p, - "o": triple.o + "metadata": metadata or {}, + "triples": [ + { + "s": {"v": triple.s, "e": s_is_uri}, + "p": {"v": triple.p, "e": p_is_uri}, + "o": {"v": triple.o, "e": o_is_uri}, + } + ] } await websocket.send(json.dumps(message)) @@ -97,13 +107,18 @@ class AsyncBulkClient: async def import_entity_contexts(self, flow: str, contexts: AsyncIterator[Dict[str, Any]], **kwargs: Any) -> None: """Bulk import entity contexts via WebSocket""" + metadata = kwargs.get('metadata') ws_url = f"{self.url}/api/v1/flow/{flow}/import/entity-contexts" if self.token: ws_url = f"{ws_url}?token={self.token}" async with websockets.connect(ws_url, ping_interval=20, ping_timeout=self.timeout) as websocket: async for context in contexts: - await websocket.send(json.dumps(context)) + message = { + "metadata": metadata or {}, + "entities": [context] + } + await websocket.send(json.dumps(message)) async def export_entity_contexts(self, flow: str, **kwargs: Any) -> AsyncIterator[Dict[str, Any]]: """Bulk export entity contexts via WebSocket""" diff --git a/trustgraph-base/trustgraph/api/bulk_client.py b/trustgraph-base/trustgraph/api/bulk_client.py index a2796332..ef0ced38 100644 --- a/trustgraph-base/trustgraph/api/bulk_client.py +++ b/trustgraph-base/trustgraph/api/bulk_client.py @@ -89,9 +89,10 @@ class BulkClient: bulk.import_triples(flow="default", triples=triple_generator()) ``` """ - self._run_async(self._import_triples_async(flow, triples)) + metadata = kwargs.get('metadata') + self._run_async(self._import_triples_async(flow, triples, metadata)) - async def _import_triples_async(self, flow: str, triples: Iterator[Triple]) -> None: + async def _import_triples_async(self, flow: str, triples: Iterator[Triple], metadata: Dict[str, Any] = None) -> None: """Async implementation of triple import""" ws_url = f"{self.url}/api/v1/flow/{flow}/import/triples" if self.token: @@ -99,10 +100,21 @@ class BulkClient: async with websockets.connect(ws_url, ping_interval=20, ping_timeout=self.timeout) as websocket: for triple in triples: + # Format in Value format expected by gateway + # In RDF, subjects and predicates are always URIs + # Objects default to URI but can be literals (is_uri=False) + s_is_uri = getattr(triple, 's_is_uri', True) + p_is_uri = getattr(triple, 'p_is_uri', True) + o_is_uri = getattr(triple, 'o_is_uri', True) message = { - "s": triple.s, - "p": triple.p, - "o": triple.o + "metadata": metadata or {}, + "triples": [ + { + "s": {"v": triple.s, "e": s_is_uri}, + "p": {"v": triple.p, "e": p_is_uri}, + "o": {"v": triple.o, "e": o_is_uri}, + } + ] } await websocket.send(json.dumps(message)) @@ -391,9 +403,10 @@ class BulkClient: ) ``` """ - self._run_async(self._import_entity_contexts_async(flow, contexts)) + metadata = kwargs.get('metadata') + self._run_async(self._import_entity_contexts_async(flow, contexts, metadata)) - async def _import_entity_contexts_async(self, flow: str, contexts: Iterator[Dict[str, Any]]) -> None: + async def _import_entity_contexts_async(self, flow: str, contexts: Iterator[Dict[str, Any]], metadata: Dict[str, Any] = None) -> None: """Async implementation of entity contexts import""" ws_url = f"{self.url}/api/v1/flow/{flow}/import/entity-contexts" if self.token: @@ -401,7 +414,11 @@ class BulkClient: async with websockets.connect(ws_url, ping_interval=20, ping_timeout=self.timeout) as websocket: for context in contexts: - await websocket.send(json.dumps(context)) + message = { + "metadata": metadata or {}, + "entities": [context] + } + await websocket.send(json.dumps(message)) def export_entity_contexts(self, flow: str, **kwargs: Any) -> Iterator[Dict[str, Any]]: """ diff --git a/trustgraph-cli/trustgraph/cli/load_knowledge.py b/trustgraph-cli/trustgraph/cli/load_knowledge.py index ff6ca980..05973c6e 100644 --- a/trustgraph-cli/trustgraph/cli/load_knowledge.py +++ b/trustgraph-cli/trustgraph/cli/load_knowledge.py @@ -119,7 +119,7 @@ class KnowledgeLoader: bulk.import_entity_contexts( flow=self.flow, - entities=entity_context_generator(), + contexts=entity_context_generator(), metadata={ "id": self.document_id, "metadata": [],