mirror of
https://github.com/trustgraph-ai/trustgraph.git
synced 2026-04-25 08:26:21 +02:00
* Tech spec * Python CLI utilities updated to use the API including streaming features * Added type safety to Python API * Completed missing auth token support in CLI
166 lines
4 KiB
Python
166 lines
4 KiB
Python
"""
|
|
Loads triples into the knowledge graph from Turtle files.
|
|
"""
|
|
|
|
import argparse
|
|
import os
|
|
import time
|
|
import rdflib
|
|
from typing import Iterator
|
|
|
|
from trustgraph.api import Api, Triple
|
|
from trustgraph.log_level import LogLevel
|
|
|
|
default_url = os.getenv("TRUSTGRAPH_URL", 'http://localhost:8088/')
|
|
default_token = os.getenv("TRUSTGRAPH_TOKEN", None)
|
|
default_user = 'trustgraph'
|
|
default_collection = 'default'
|
|
|
|
class Loader:
|
|
|
|
def __init__(
|
|
self,
|
|
files,
|
|
flow,
|
|
user,
|
|
collection,
|
|
document_id,
|
|
url=default_url,
|
|
token=None,
|
|
):
|
|
self.files = files
|
|
self.flow = flow
|
|
self.user = user
|
|
self.collection = collection
|
|
self.document_id = document_id
|
|
self.url = url
|
|
self.token = token
|
|
|
|
def load_triples_from_file(self, file) -> Iterator[Triple]:
|
|
"""Generator that yields Triple objects from a Turtle file"""
|
|
|
|
g = rdflib.Graph()
|
|
g.parse(file, format="turtle")
|
|
|
|
for e in g:
|
|
# Extract subject, predicate, object
|
|
s_value = str(e[0])
|
|
p_value = str(e[1])
|
|
|
|
# Check if object is a URI or literal
|
|
if isinstance(e[2], rdflib.term.URIRef):
|
|
o_value = str(e[2])
|
|
else:
|
|
o_value = str(e[2])
|
|
|
|
# Create Triple object
|
|
yield Triple(s=s_value, p=p_value, o=o_value)
|
|
|
|
def run(self):
|
|
"""Load triples using Python API"""
|
|
|
|
try:
|
|
# Create API client
|
|
api = Api(url=self.url, token=self.token)
|
|
bulk = api.bulk()
|
|
|
|
# Load triples from all files
|
|
print("Loading triples...")
|
|
for file in self.files:
|
|
print(f" Processing {file}...")
|
|
triples = self.load_triples_from_file(file)
|
|
|
|
bulk.import_triples(
|
|
flow=self.flow,
|
|
triples=triples,
|
|
metadata={
|
|
"id": self.document_id,
|
|
"metadata": [],
|
|
"user": self.user,
|
|
"collection": self.collection
|
|
}
|
|
)
|
|
|
|
print("Triples loaded.")
|
|
|
|
except Exception as e:
|
|
print(f"Error: {e}", flush=True)
|
|
raise
|
|
|
|
def main():
|
|
|
|
parser = argparse.ArgumentParser(
|
|
prog='tg-load-turtle',
|
|
description=__doc__,
|
|
)
|
|
|
|
parser.add_argument(
|
|
'-u', '--api-url',
|
|
default=default_url,
|
|
help=f'API URL (default: {default_url})',
|
|
)
|
|
|
|
parser.add_argument(
|
|
'-t', '--token',
|
|
default=default_token,
|
|
help='Authentication token (default: $TRUSTGRAPH_TOKEN)',
|
|
)
|
|
|
|
parser.add_argument(
|
|
'-i', '--document-id',
|
|
required=True,
|
|
help=f'Document ID)',
|
|
)
|
|
|
|
parser.add_argument(
|
|
'-f', '--flow-id',
|
|
default="default",
|
|
help=f'Flow ID (default: default)'
|
|
)
|
|
|
|
parser.add_argument(
|
|
'-U', '--user',
|
|
default=default_user,
|
|
help=f'User ID (default: {default_user})'
|
|
)
|
|
|
|
parser.add_argument(
|
|
'-C', '--collection',
|
|
default=default_collection,
|
|
help=f'Collection ID (default: {default_collection})'
|
|
)
|
|
|
|
parser.add_argument(
|
|
'files', nargs='+',
|
|
help=f'Turtle files to load'
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
|
|
while True:
|
|
|
|
try:
|
|
loader = Loader(
|
|
document_id=args.document_id,
|
|
url=args.api_url,
|
|
token=args.token,
|
|
flow=args.flow_id,
|
|
files=args.files,
|
|
user=args.user,
|
|
collection=args.collection,
|
|
)
|
|
|
|
loader.run()
|
|
|
|
print("File loaded.")
|
|
break
|
|
|
|
except Exception as e:
|
|
|
|
print("Exception:", e, flush=True)
|
|
print("Will retry...", flush=True)
|
|
|
|
time.sleep(10)
|
|
|
|
if __name__ == "__main__":
|
|
main()
|