trustgraph/trustgraph-cli/scripts/tg-load-turtle
cybermaggedon 3b8b9ea866
Feature/flow api 3 (#358)
* Working mux socket

* Change API to incorporate flow

* Add Flow ID to all relevant CLIs, not completely implemented

* Change tg-processor-state to use API gateway

* Updated all CLIs

* New tg-show-flow-state command

* tg-show-flow-state shows classes too
2025-05-03 10:39:53 +01:00

184 lines
4.2 KiB
Python
Executable file

#!/usr/bin/env python3
"""
Loads Graph embeddings into TrustGraph processing.
"""
import pulsar
from pulsar.schema import JsonSchema
from trustgraph.schema import Triples, Triple, Value, Metadata
import argparse
import os
import time
import pyarrow as pa
import rdflib
from trustgraph.log_level import LogLevel
default_user = 'trustgraph'
default_collection = 'default'
default_pulsar_host = os.getenv("PULSAR_HOST", 'pulsar://localhost:6650')
default_pulsar_api_key = os.getenv("PULSAR_API_KEY", None)
default_output_queue = triples_store_queue
class Loader:
def __init__(
self,
pulsar_host,
output_queue,
log_level,
files,
user,
collection,
pulsar_api_key=None,
):
if pulsar_api_key:
auth = pulsar.AuthenticationToken(pulsar_api_key)
self.client = pulsar.Client(
pulsar_host,
authentication=auth,
logger=pulsar.ConsoleLogger(log_level.to_pulsar())
)
else:
self.client = pulsar.Client(
pulsar_host,
logger=pulsar.ConsoleLogger(log_level.to_pulsar())
)
self.producer = self.client.create_producer(
topic=output_queue,
schema=JsonSchema(Triples),
chunking_enabled=True,
)
self.files = files
self.user = user
self.collection = collection
def run(self):
try:
for file in self.files:
self.load_file(file)
except Exception as e:
print(e, flush=True)
def load_file(self, file):
g = rdflib.Graph()
g.parse(file, format="turtle")
for e in g:
s = Value(value=str(e[0]), is_uri=True)
p = Value(value=str(e[1]), is_uri=True)
if type(e[2]) == rdflib.term.URIRef:
o = Value(value=str(e[2]), is_uri=True)
else:
o = Value(value=str(e[2]), is_uri=False)
r = Triples(
metadata=Metadata(
id=None,
metadata=[],
user=self.user,
collection=self.collection,
),
triples=[ Triple(s=s, p=p, o=o) ]
)
self.producer.send(r)
def __del__(self):
self.client.close()
def main():
parser = argparse.ArgumentParser(
prog='tg-load-turtle',
description=__doc__,
)
parser.add_argument(
'-p', '--pulsar-host',
default=default_pulsar_host,
help=f'Pulsar host (default: {default_pulsar_host})',
)
parser.add_argument(
'-f', '--flow-id',
default="0000",
help=f'Flow ID (default: 0000)'
)
parser.add_argument(
'--pulsar-api-key',
default=default_pulsar_api_key,
help=f'Pulsar API key',
)
parser.add_argument(
'-o', '--output-queue',
default=default_output_queue,
help=f'Output queue (default: {default_output_queue})'
)
parser.add_argument(
'-u', '--user',
default=default_user,
help=f'User ID (default: {default_user})'
)
parser.add_argument(
'-c', '--collection',
default=default_collection,
help=f'Collection ID (default: {default_collection})'
)
parser.add_argument(
'-l', '--log-level',
type=LogLevel,
default=LogLevel.ERROR,
choices=list(LogLevel),
help=f'Output queue (default: info)'
)
parser.add_argument(
'files', nargs='+',
help=f'Turtle files to load'
)
args = parser.parse_args()
while True:
try:
p = Loader(
pulsar_host=args.pulsar_host,
pulsar_api_key=args.pulsar_api_key,
output_queue=args.output_queue,
log_level=args.log_level,
files=args.files,
user=args.user,
collection=args.collection,
)
p.run()
print("File loaded.")
break
except Exception as e:
print("Exception:", e, flush=True)
print("Will retry...", flush=True)
time.sleep(10)
print("Not implemented.")
#main()