#!/usr/bin/env python3 """ Loads Graph embeddings into TrustGraph processing. FIXME: This hasn't been updated following API gateway change. """ import pulsar from pulsar.schema import JsonSchema from trustgraph.schema import Triples, Triple, Value, Metadata import argparse import os import time import pyarrow as pa import rdflib from trustgraph.log_level import LogLevel default_user = 'trustgraph' default_collection = 'default' default_pulsar_host = os.getenv("PULSAR_HOST", 'pulsar://localhost:6650') default_pulsar_api_key = os.getenv("PULSAR_API_KEY", None) default_output_queue = triples_store_queue class Loader: def __init__( self, pulsar_host, output_queue, log_level, files, user, collection, pulsar_api_key=None, ): if pulsar_api_key: auth = pulsar.AuthenticationToken(pulsar_api_key) self.client = pulsar.Client( pulsar_host, authentication=auth, logger=pulsar.ConsoleLogger(log_level.to_pulsar()) ) else: self.client = pulsar.Client( pulsar_host, logger=pulsar.ConsoleLogger(log_level.to_pulsar()) ) self.producer = self.client.create_producer( topic=output_queue, schema=JsonSchema(Triples), chunking_enabled=True, ) self.files = files self.user = user self.collection = collection def run(self): try: for file in self.files: self.load_file(file) except Exception as e: print(e, flush=True) def load_file(self, file): g = rdflib.Graph() g.parse(file, format="turtle") for e in g: s = Value(value=str(e[0]), is_uri=True) p = Value(value=str(e[1]), is_uri=True) if type(e[2]) == rdflib.term.URIRef: o = Value(value=str(e[2]), is_uri=True) else: o = Value(value=str(e[2]), is_uri=False) r = Triples( metadata=Metadata( id=None, metadata=[], user=self.user, collection=self.collection, ), triples=[ Triple(s=s, p=p, o=o) ] ) self.producer.send(r) def __del__(self): self.client.close() def main(): parser = argparse.ArgumentParser( prog='tg-load-turtle', description=__doc__, ) parser.add_argument( '-p', '--pulsar-host', default=default_pulsar_host, help=f'Pulsar host (default: {default_pulsar_host})', ) parser.add_argument( '-f', '--flow-id', default="default", help=f'Flow ID (default: default)' ) parser.add_argument( '--pulsar-api-key', default=default_pulsar_api_key, help=f'Pulsar API key', ) parser.add_argument( '-o', '--output-queue', default=default_output_queue, help=f'Output queue (default: {default_output_queue})' ) parser.add_argument( '-u', '--user', default=default_user, help=f'User ID (default: {default_user})' ) parser.add_argument( '-c', '--collection', default=default_collection, help=f'Collection ID (default: {default_collection})' ) parser.add_argument( '-l', '--log-level', type=LogLevel, default=LogLevel.ERROR, choices=list(LogLevel), help=f'Output queue (default: info)' ) parser.add_argument( 'files', nargs='+', help=f'Turtle files to load' ) args = parser.parse_args() while True: try: p = Loader( pulsar_host=args.pulsar_host, pulsar_api_key=args.pulsar_api_key, output_queue=args.output_queue, log_level=args.log_level, files=args.files, user=args.user, collection=args.collection, ) p.run() print("File loaded.") break except Exception as e: print("Exception:", e, flush=True) print("Will retry...", flush=True) time.sleep(10) print("Not implemented.") #main()