#!/usr/bin/env python3 """ Loads a PDF document into TrustGraph processing. """ import pulsar from pulsar.schema import JsonSchema import base64 import hashlib import argparse import os import time import uuid from trustgraph.schema import Document, document_ingest_queue from trustgraph.schema import Metadata from trustgraph.log_level import LogLevel from trustgraph.knowledge import hash, to_uri from trustgraph.knowledge import PREF_PUBEV, PREF_DOC, PREF_ORG from trustgraph.knowledge import Organization, PublicationEvent from trustgraph.knowledge import DigitalDocument default_user = 'trustgraph' default_collection = 'default' class Loader: def __init__( self, pulsar_host, output_queue, user, collection, log_level, metadata, ): self.client = pulsar.Client( pulsar_host, logger=pulsar.ConsoleLogger(log_level.to_pulsar()) ) self.producer = self.client.create_producer( topic=output_queue, schema=JsonSchema(Document), chunking_enabled=True, ) self.user = user self.collection = collection self.metadata = metadata def load(self, files): for file in files: self.load_file(file) def load_file(self, file): try: path = file data = open(path, "rb").read() # Create a SHA256 hash from the data id = hash(data) id = to_uri(PREF_DOC, id) triples = [] def emit(t): triples.append(t) self.metadata.id = id self.metadata.emit(emit) r = Document( metadata=Metadata( id=id, metadata=triples, user=self.user, collection=self.collection, ), data=base64.b64encode(data), ) self.producer.send(r) print(f"{file}: Loaded successfully.") except Exception as e: print(f"{file}: Failed: {str(e)}", flush=True) def __del__(self): self.client.close() def main(): parser = argparse.ArgumentParser( prog='tg-load-pdf', description=__doc__, ) default_pulsar_host = os.getenv("PULSAR_HOST", 'pulsar://localhost:6650') default_output_queue = document_ingest_queue parser.add_argument( '-p', '--pulsar-host', default=default_pulsar_host, help=f'Pulsar host (default: {default_pulsar_host})', ) parser.add_argument( '-o', '--output-queue', default=default_output_queue, help=f'Output queue (default: {default_output_queue})' ) parser.add_argument( '-u', '--user', default=default_user, help=f'User ID (default: {default_user})' ) parser.add_argument( '-c', '--collection', default=default_collection, help=f'Collection ID (default: {default_collection})' ) parser.add_argument( '--name', help=f'Document name' ) parser.add_argument( '--description', help=f'Document description' ) parser.add_argument( '--copyright-notice', help=f'Copyright notice' ) parser.add_argument( '--copyright-holder', help=f'Copyright holder' ) parser.add_argument( '--copyright-year', help=f'Copyright year' ) parser.add_argument( '--license', help=f'Copyright license' ) parser.add_argument( '--publication-organization', help=f'Publication organization' ) parser.add_argument( '--publication-description', help=f'Publication description' ) parser.add_argument( '--publication-date', help=f'Publication date' ) parser.add_argument( '--url', help=f'Document URL' ) parser.add_argument( '--keyword', nargs='+', help=f'Keyword' ) parser.add_argument( '--identifier', '--id', help=f'Document ID' ) parser.add_argument( '-l', '--log-level', type=LogLevel, default=LogLevel.ERROR, choices=list(LogLevel), help=f'Output queue (default: info)' ) parser.add_argument( 'files', nargs='+', help=f'File to load' ) args = parser.parse_args() while True: try: document = DigitalDocument( id, name=args.name, description=args.description, copyright_notice=args.copyright_notice, copyright_holder=args.copyright_holder, copyright_year=args.copyright_year, license=args.license, url=args.url, keywords=args.keyword, ) if args.publication_organization: org = Organization( id=to_uri(PREF_ORG, hash(args.publication_organization)), name=args.publication_organization, ) document.publication = PublicationEvent( id = to_uri(PREF_PUBEV, str(uuid.uuid4())), organization=org, description=args.publication_description, start_date=args.publication_date, end_date=args.publication_date, ) p = Loader( pulsar_host=args.pulsar_host, output_queue=args.output_queue, user=args.user, collection=args.collection, log_level=args.log_level, metadata=document, ) p.load(args.files) print("All done.") break except Exception as e: print("Exception:", e, flush=True) print("Will retry...", flush=True) time.sleep(10) main()