mirror of
https://github.com/trustgraph-ai/trustgraph.git
synced 2026-04-25 00:16:23 +02:00
Fix core load for 1.9 (#628)
This commit is contained in:
parent
ca626c8471
commit
202d80f6b8
3 changed files with 45 additions and 13 deletions
|
|
@ -27,16 +27,26 @@ class AsyncBulkClient:
|
|||
|
||||
async def import_triples(self, flow: str, triples: AsyncIterator[Triple], **kwargs: Any) -> None:
|
||||
"""Bulk import triples via WebSocket"""
|
||||
metadata = kwargs.get('metadata')
|
||||
ws_url = f"{self.url}/api/v1/flow/{flow}/import/triples"
|
||||
if self.token:
|
||||
ws_url = f"{ws_url}?token={self.token}"
|
||||
|
||||
async with websockets.connect(ws_url, ping_interval=20, ping_timeout=self.timeout) as websocket:
|
||||
async for triple in triples:
|
||||
# Format in Value format expected by gateway
|
||||
s_is_uri = getattr(triple, 's_is_uri', True)
|
||||
p_is_uri = getattr(triple, 'p_is_uri', True)
|
||||
o_is_uri = getattr(triple, 'o_is_uri', True)
|
||||
message = {
|
||||
"s": triple.s,
|
||||
"p": triple.p,
|
||||
"o": triple.o
|
||||
"metadata": metadata or {},
|
||||
"triples": [
|
||||
{
|
||||
"s": {"v": triple.s, "e": s_is_uri},
|
||||
"p": {"v": triple.p, "e": p_is_uri},
|
||||
"o": {"v": triple.o, "e": o_is_uri},
|
||||
}
|
||||
]
|
||||
}
|
||||
await websocket.send(json.dumps(message))
|
||||
|
||||
|
|
@ -97,13 +107,18 @@ class AsyncBulkClient:
|
|||
|
||||
async def import_entity_contexts(self, flow: str, contexts: AsyncIterator[Dict[str, Any]], **kwargs: Any) -> None:
|
||||
"""Bulk import entity contexts via WebSocket"""
|
||||
metadata = kwargs.get('metadata')
|
||||
ws_url = f"{self.url}/api/v1/flow/{flow}/import/entity-contexts"
|
||||
if self.token:
|
||||
ws_url = f"{ws_url}?token={self.token}"
|
||||
|
||||
async with websockets.connect(ws_url, ping_interval=20, ping_timeout=self.timeout) as websocket:
|
||||
async for context in contexts:
|
||||
await websocket.send(json.dumps(context))
|
||||
message = {
|
||||
"metadata": metadata or {},
|
||||
"entities": [context]
|
||||
}
|
||||
await websocket.send(json.dumps(message))
|
||||
|
||||
async def export_entity_contexts(self, flow: str, **kwargs: Any) -> AsyncIterator[Dict[str, Any]]:
|
||||
"""Bulk export entity contexts via WebSocket"""
|
||||
|
|
|
|||
|
|
@ -89,9 +89,10 @@ class BulkClient:
|
|||
bulk.import_triples(flow="default", triples=triple_generator())
|
||||
```
|
||||
"""
|
||||
self._run_async(self._import_triples_async(flow, triples))
|
||||
metadata = kwargs.get('metadata')
|
||||
self._run_async(self._import_triples_async(flow, triples, metadata))
|
||||
|
||||
async def _import_triples_async(self, flow: str, triples: Iterator[Triple]) -> None:
|
||||
async def _import_triples_async(self, flow: str, triples: Iterator[Triple], metadata: Dict[str, Any] = None) -> None:
|
||||
"""Async implementation of triple import"""
|
||||
ws_url = f"{self.url}/api/v1/flow/{flow}/import/triples"
|
||||
if self.token:
|
||||
|
|
@ -99,10 +100,21 @@ class BulkClient:
|
|||
|
||||
async with websockets.connect(ws_url, ping_interval=20, ping_timeout=self.timeout) as websocket:
|
||||
for triple in triples:
|
||||
# Format in Value format expected by gateway
|
||||
# In RDF, subjects and predicates are always URIs
|
||||
# Objects default to URI but can be literals (is_uri=False)
|
||||
s_is_uri = getattr(triple, 's_is_uri', True)
|
||||
p_is_uri = getattr(triple, 'p_is_uri', True)
|
||||
o_is_uri = getattr(triple, 'o_is_uri', True)
|
||||
message = {
|
||||
"s": triple.s,
|
||||
"p": triple.p,
|
||||
"o": triple.o
|
||||
"metadata": metadata or {},
|
||||
"triples": [
|
||||
{
|
||||
"s": {"v": triple.s, "e": s_is_uri},
|
||||
"p": {"v": triple.p, "e": p_is_uri},
|
||||
"o": {"v": triple.o, "e": o_is_uri},
|
||||
}
|
||||
]
|
||||
}
|
||||
await websocket.send(json.dumps(message))
|
||||
|
||||
|
|
@ -391,9 +403,10 @@ class BulkClient:
|
|||
)
|
||||
```
|
||||
"""
|
||||
self._run_async(self._import_entity_contexts_async(flow, contexts))
|
||||
metadata = kwargs.get('metadata')
|
||||
self._run_async(self._import_entity_contexts_async(flow, contexts, metadata))
|
||||
|
||||
async def _import_entity_contexts_async(self, flow: str, contexts: Iterator[Dict[str, Any]]) -> None:
|
||||
async def _import_entity_contexts_async(self, flow: str, contexts: Iterator[Dict[str, Any]], metadata: Dict[str, Any] = None) -> None:
|
||||
"""Async implementation of entity contexts import"""
|
||||
ws_url = f"{self.url}/api/v1/flow/{flow}/import/entity-contexts"
|
||||
if self.token:
|
||||
|
|
@ -401,7 +414,11 @@ class BulkClient:
|
|||
|
||||
async with websockets.connect(ws_url, ping_interval=20, ping_timeout=self.timeout) as websocket:
|
||||
for context in contexts:
|
||||
await websocket.send(json.dumps(context))
|
||||
message = {
|
||||
"metadata": metadata or {},
|
||||
"entities": [context]
|
||||
}
|
||||
await websocket.send(json.dumps(message))
|
||||
|
||||
def export_entity_contexts(self, flow: str, **kwargs: Any) -> Iterator[Dict[str, Any]]:
|
||||
"""
|
||||
|
|
|
|||
|
|
@ -119,7 +119,7 @@ class KnowledgeLoader:
|
|||
|
||||
bulk.import_entity_contexts(
|
||||
flow=self.flow,
|
||||
entities=entity_context_generator(),
|
||||
contexts=entity_context_generator(),
|
||||
metadata={
|
||||
"id": self.document_id,
|
||||
"metadata": [],
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue