trustgraph/trustgraph-cli/scripts/tg-load-text
cybermaggedon 617eb7efd5
Feature/pulsar api key support (#308)
* Add pulsar API token check

* Added missing api_key references

---------

Co-authored-by: Tyler O <4535788+toliver38@users.noreply.github.com>
2025-02-15 11:22:48 +00:00

206 lines
4.7 KiB
Python
Executable file

#!/usr/bin/env python3
"""
Loads a text document into TrustGraph processing.
"""
import pulsar
from pulsar.schema import JsonSchema
import hashlib
import argparse
import os
import time
import uuid
from trustgraph.api import Api
from trustgraph.knowledge import hash, to_uri
from trustgraph.knowledge import PREF_PUBEV, PREF_DOC, PREF_ORG
from trustgraph.knowledge import Organization, PublicationEvent
from trustgraph.knowledge import DigitalDocument
default_url = os.getenv("TRUSTGRAPH_URL", 'http://localhost:8088/')
default_user = 'trustgraph'
default_collection = 'default'
class Loader:
def __init__(
self,
url,
user,
collection,
metadata,
):
self.api = Api(url)
self.user = user
self.collection = collection
self.metadata = metadata
def load(self, files):
for file in files:
self.load_file(file)
def load_file(self, file):
try:
path = file
data = open(path, "rb").read()
# Create a SHA256 hash from the data
id = hash(data)
id = to_uri(PREF_DOC, id)
self.metadata.id = id
self.api.load_text(
text=data, id=id, metadata=self.metadata,
# user=self.user,
# collection=self.collection,
)
print(f"{file}: Loaded successfully.")
except Exception as e:
print(f"{file}: Failed: {str(e)}", flush=True)
def main():
parser = argparse.ArgumentParser(
prog='tg-load-text',
description=__doc__,
)
parser.add_argument(
'-u', '--url',
default=default_url,
help=f'API URL (default: {default_url})',
)
# parser.add_argument(
# '--pulsar-api-key',
# default=default_pulsar_api_key,
# help=f'Pulsar API key',
# )
parser.add_argument(
'-U', '--user',
default=default_user,
help=f'User ID (default: {default_user})'
)
parser.add_argument(
'-C', '--collection',
default=default_collection,
help=f'Collection ID (default: {default_collection})'
)
parser.add_argument(
'--name', help=f'Document name'
)
parser.add_argument(
'--description', help=f'Document description'
)
parser.add_argument(
'--copyright-notice', help=f'Copyright notice'
)
parser.add_argument(
'--copyright-holder', help=f'Copyright holder'
)
parser.add_argument(
'--copyright-year', help=f'Copyright year'
)
parser.add_argument(
'--license', help=f'Copyright license'
)
parser.add_argument(
'--publication-organization', help=f'Publication organization'
)
parser.add_argument(
'--publication-description', help=f'Publication description'
)
parser.add_argument(
'--publication-date', help=f'Publication date'
)
parser.add_argument(
'--document-url', help=f'Document URL'
)
parser.add_argument(
'--keyword', nargs='+', help=f'Keyword'
)
parser.add_argument(
'--identifier', '--id', help=f'Document ID'
)
parser.add_argument(
'files', nargs='+',
help=f'File to load'
)
args = parser.parse_args()
while True:
try:
document = DigitalDocument(
id,
name=args.name,
description=args.description,
copyright_notice=args.copyright_notice,
copyright_holder=args.copyright_holder,
copyright_year=args.copyright_year,
license=args.license,
url=args.document_url,
keywords=args.keyword,
)
if args.publication_organization:
org = Organization(
id=to_uri(PREF_ORG, hash(args.publication_organization)),
name=args.publication_organization,
)
document.publication = PublicationEvent(
id = to_uri(PREF_PUBEV, str(uuid.uuid4())),
organization=org,
description=args.publication_description,
start_date=args.publication_date,
end_date=args.publication_date,
)
p = Loader(
url=args.url,
user=args.user,
collection=args.collection,
metadata=document,
)
p.load(args.files)
print("All done.")
break
except Exception as e:
print("Exception:", e, flush=True)
print("Will retry...", flush=True)
time.sleep(10)
main()