diff --git a/trustgraph-cli/scripts/tg-load-turtle b/trustgraph-cli/scripts/tg-load-turtle index ba92067e..5c85944e 100755 --- a/trustgraph-cli/scripts/tg-load-turtle +++ b/trustgraph-cli/scripts/tg-load-turtle @@ -1,80 +1,68 @@ #!/usr/bin/env python3 """ -Loads Graph embeddings into TrustGraph processing. - -FIXME: This hasn't been updated following API gateway change. +Loads triples into the knowledge graph. """ -import pulsar -from pulsar.schema import JsonSchema -from trustgraph.schema import Triples, Triple, Value, Metadata +import asyncio import argparse import os import time -import pyarrow as pa import rdflib +import json +from websockets.asyncio.client import connect from trustgraph.log_level import LogLevel +default_url = os.getenv("TRUSTGRAPH_URL", 'ws://localhost:8088/') default_user = 'trustgraph' default_collection = 'default' -default_pulsar_host = os.getenv("PULSAR_HOST", 'pulsar://localhost:6650') -default_pulsar_api_key = os.getenv("PULSAR_API_KEY", None) - -default_output_queue = triples_store_queue class Loader: def __init__( self, - pulsar_host, - output_queue, - log_level, files, + flow, user, collection, - pulsar_api_key=None, + document_id, + url = default_url, ): - if pulsar_api_key: - auth = pulsar.AuthenticationToken(pulsar_api_key) - self.client = pulsar.Client( - pulsar_host, - authentication=auth, - logger=pulsar.ConsoleLogger(log_level.to_pulsar()) - ) - else: - self.client = pulsar.Client( - pulsar_host, - logger=pulsar.ConsoleLogger(log_level.to_pulsar()) - ) + if not url.endswith("/"): + url += "/" - self.producer = self.client.create_producer( - topic=output_queue, - schema=JsonSchema(Triples), - chunking_enabled=True, - ) + url = url + f"api/v1/flow/{flow}/import/triples" + + self.url = url self.files = files self.user = user self.collection = collection + self.document_id = document_id - def run(self): + async def run(self): try: - for file in self.files: - self.load_file(file) + async with connect(self.url) as ws: + for file in self.files: + await self.load_file(file, ws) except Exception as e: print(e, flush=True) - def load_file(self, file): + async def load_file(self, file, ws): g = rdflib.Graph() g.parse(file, format="turtle") + def Value(value, is_uri): + return { "v": value, "e": is_uri } + + triples = [] + for e in g: s = Value(value=str(e[0]), is_uri=True) p = Value(value=str(e[1]), is_uri=True) @@ -83,20 +71,25 @@ class Loader: else: o = Value(value=str(e[2]), is_uri=False) - r = Triples( - metadata=Metadata( - id=None, - metadata=[], - user=self.user, - collection=self.collection, - ), - triples=[ Triple(s=s, p=p, o=o) ] - ) + req = { + "metadata": { + "id": self.document_id, + "metadata": [], + "user": self.user, + "collection": self.collection + }, + "triples": [ + { + "s": s, + "p": p, + "o": o, + } + ] + } - self.producer.send(r) + await ws.send(json.dumps(req)) - def __del__(self): - self.client.close() + print(req) def main(): @@ -106,9 +99,15 @@ def main(): ) parser.add_argument( - '-p', '--pulsar-host', - default=default_pulsar_host, - help=f'Pulsar host (default: {default_pulsar_host})', + '-u', '--api-url', + default=default_url, + help=f'API URL (default: {default_url})', + ) + + parser.add_argument( + '-i', '--document-id', + required=True, + help=f'Document ID)', ) parser.add_argument( @@ -116,39 +115,19 @@ def main(): default="default", help=f'Flow ID (default: default)' ) - - parser.add_argument( - '--pulsar-api-key', - default=default_pulsar_api_key, - help=f'Pulsar API key', - ) parser.add_argument( - '-o', '--output-queue', - default=default_output_queue, - help=f'Output queue (default: {default_output_queue})' - ) - - parser.add_argument( - '-u', '--user', + '-U', '--user', default=default_user, help=f'User ID (default: {default_user})' ) parser.add_argument( - '-c', '--collection', + '-C', '--collection', default=default_collection, help=f'Collection ID (default: {default_collection})' ) - parser.add_argument( - '-l', '--log-level', - type=LogLevel, - default=LogLevel.ERROR, - choices=list(LogLevel), - help=f'Output queue (default: info)' - ) - parser.add_argument( 'files', nargs='+', help=f'Turtle files to load' @@ -160,16 +139,15 @@ def main(): try: p = Loader( - pulsar_host=args.pulsar_host, - pulsar_api_key=args.pulsar_api_key, - output_queue=args.output_queue, - log_level=args.log_level, - files=args.files, - user=args.user, - collection=args.collection, + document_id = args.document_id, + url = args.api_url, + flow = args.flow_id, + files = args.files, + user = args.user, + collection = args.collection, ) - p.run() + asyncio.run(p.run()) print("File loaded.") break @@ -181,6 +159,5 @@ def main(): time.sleep(10) -print("Not implemented.") -#main() +main()