#!/usr/bin/env python3 """ Loads a text document into TrustGraph processing. """ import pulsar from pulsar.schema import JsonSchema from trustgraph.schema import TextDocument, Source, text_ingest_queue import base64 import hashlib import argparse import os import time from trustgraph.log_level import LogLevel class Loader: def __init__( self, pulsar_host, output_queue, log_level, ): self.client = pulsar.Client( pulsar_host, logger=pulsar.ConsoleLogger(log_level.to_pulsar()) ) self.producer = self.client.create_producer( topic=output_queue, schema=JsonSchema(TextDocument), chunking_enabled=True, ) def load(self, files): for file in files: self.load_file(file) def load_file(self, file): try: path = file data = open(path, "rb").read() id = hashlib.sha256(path.encode("utf-8")).hexdigest()[0:8] r = TextDocument( source=Source( source=path, title=path, id=id, ), text=data, ) self.producer.send(r) print(f"{file}: Loaded successfully.") except Exception as e: print(f"{file}: Failed: {str(e)}", flush=True) def __del__(self): self.client.close() def main(): parser = argparse.ArgumentParser( prog='loader', description=__doc__, ) default_pulsar_host = os.getenv("PULSAR_HOST", 'pulsar://localhost:6650') default_output_queue = text_ingest_queue parser.add_argument( '-p', '--pulsar-host', default=default_pulsar_host, help=f'Pulsar host (default: {default_pulsar_host})', ) parser.add_argument( '-o', '--output-queue', default=default_output_queue, help=f'Output queue (default: {default_output_queue})' ) parser.add_argument( '-l', '--log-level', type=LogLevel, default=LogLevel.ERROR, choices=list(LogLevel), help=f'Output queue (default: info)' ) parser.add_argument( 'files', nargs='+', help=f'File to load' ) args = parser.parse_args() while True: try: p = Loader( pulsar_host=args.pulsar_host, output_queue=args.output_queue, log_level=args.log_level, ) p.load(args.files) print("All done.") break except Exception as e: print("Exception:", e, flush=True) print("Will retry...", flush=True) time.sleep(10) main()