#!/usr/bin/env python3 """ Loads Graph embeddings into TrustGraph processing. """ import pulsar from pulsar.schema import JsonSchema from trustgraph.schema import Triples, Triple, Value, Metadata from trustgraph.schema import triples_store_queue import argparse import os import time import pyarrow as pa import rdflib from trustgraph.log_level import LogLevel default_user = 'trustgraph' default_collection = 'default' default_pulsar_host = os.getenv("PULSAR_HOST", 'pulsar://localhost:6650') default_output_queue = triples_store_queue class Loader: def __init__( self, pulsar_host, output_queue, log_level, files, user, collection, ): self.client = pulsar.Client( pulsar_host, logger=pulsar.ConsoleLogger(log_level.to_pulsar()) ) self.producer = self.client.create_producer( topic=output_queue, schema=JsonSchema(Triples), chunking_enabled=True, ) self.files = files self.user = user self.collection = collection def run(self): try: for file in self.files: self.load_file(file) except Exception as e: print(e, flush=True) def load_file(self, file): g = rdflib.Graph() g.parse(file, format="turtle") for e in g: s = Value(value=str(e[0]), is_uri=True) p = Value(value=str(e[1]), is_uri=True) if type(e[2]) == rdflib.term.URIRef: o = Value(value=str(e[2]), is_uri=True) else: o = Value(value=str(e[2]), is_uri=False) r = Triples( metadata=Metadata( id=None, metadata=[], user=self.user, collection=self.collection, ), triples=[ Triple(s=s, p=p, o=o) ] ) self.producer.send(r) def __del__(self): self.client.close() def main(): parser = argparse.ArgumentParser( prog='tg-load-turtle', description=__doc__, ) parser.add_argument( '-p', '--pulsar-host', default=default_pulsar_host, help=f'Pulsar host (default: {default_pulsar_host})', ) parser.add_argument( '-o', '--output-queue', default=default_output_queue, help=f'Output queue (default: {default_output_queue})' ) parser.add_argument( '-u', '--user', default=default_user, help=f'User ID (default: {default_user})' ) parser.add_argument( '-c', '--collection', default=default_collection, help=f'Collection ID (default: {default_collection})' ) parser.add_argument( '-l', '--log-level', type=LogLevel, default=LogLevel.ERROR, choices=list(LogLevel), help=f'Output queue (default: info)' ) parser.add_argument( 'files', nargs='+', help=f'Turtle files to load' ) args = parser.parse_args() while True: try: p = Loader( pulsar_host=args.pulsar_host, output_queue=args.output_queue, log_level=args.log_level, files=args.files, user=args.user, collection=args.collection, ) p.run() print("File loaded.") break except Exception as e: print("Exception:", e, flush=True) print("Will retry...", flush=True) time.sleep(10) main()