Fix turtle loader (#407)

This commit is contained in:
cybermaggedon 2025-06-02 14:55:22 +01:00 committed by GitHub
parent 8fc8880d51
commit 083702d3d4
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -1,80 +1,68 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
""" """
Loads Graph embeddings into TrustGraph processing. Loads triples into the knowledge graph.
FIXME: This hasn't been updated following API gateway change.
""" """
import pulsar import asyncio
from pulsar.schema import JsonSchema
from trustgraph.schema import Triples, Triple, Value, Metadata
import argparse import argparse
import os import os
import time import time
import pyarrow as pa
import rdflib import rdflib
import json
from websockets.asyncio.client import connect
from trustgraph.log_level import LogLevel from trustgraph.log_level import LogLevel
default_url = os.getenv("TRUSTGRAPH_URL", 'ws://localhost:8088/')
default_user = 'trustgraph' default_user = 'trustgraph'
default_collection = 'default' default_collection = 'default'
default_pulsar_host = os.getenv("PULSAR_HOST", 'pulsar://localhost:6650')
default_pulsar_api_key = os.getenv("PULSAR_API_KEY", None)
default_output_queue = triples_store_queue
class Loader: class Loader:
def __init__( def __init__(
self, self,
pulsar_host,
output_queue,
log_level,
files, files,
flow,
user, user,
collection, collection,
pulsar_api_key=None, document_id,
url = default_url,
): ):
if pulsar_api_key: if not url.endswith("/"):
auth = pulsar.AuthenticationToken(pulsar_api_key) url += "/"
self.client = pulsar.Client(
pulsar_host,
authentication=auth,
logger=pulsar.ConsoleLogger(log_level.to_pulsar())
)
else:
self.client = pulsar.Client(
pulsar_host,
logger=pulsar.ConsoleLogger(log_level.to_pulsar())
)
self.producer = self.client.create_producer( url = url + f"api/v1/flow/{flow}/import/triples"
topic=output_queue,
schema=JsonSchema(Triples), self.url = url
chunking_enabled=True,
)
self.files = files self.files = files
self.user = user self.user = user
self.collection = collection self.collection = collection
self.document_id = document_id
def run(self): async def run(self):
try: try:
for file in self.files: async with connect(self.url) as ws:
self.load_file(file) for file in self.files:
await self.load_file(file, ws)
except Exception as e: except Exception as e:
print(e, flush=True) print(e, flush=True)
def load_file(self, file): async def load_file(self, file, ws):
g = rdflib.Graph() g = rdflib.Graph()
g.parse(file, format="turtle") g.parse(file, format="turtle")
def Value(value, is_uri):
return { "v": value, "e": is_uri }
triples = []
for e in g: for e in g:
s = Value(value=str(e[0]), is_uri=True) s = Value(value=str(e[0]), is_uri=True)
p = Value(value=str(e[1]), is_uri=True) p = Value(value=str(e[1]), is_uri=True)
@ -83,20 +71,25 @@ class Loader:
else: else:
o = Value(value=str(e[2]), is_uri=False) o = Value(value=str(e[2]), is_uri=False)
r = Triples( req = {
metadata=Metadata( "metadata": {
id=None, "id": self.document_id,
metadata=[], "metadata": [],
user=self.user, "user": self.user,
collection=self.collection, "collection": self.collection
), },
triples=[ Triple(s=s, p=p, o=o) ] "triples": [
) {
"s": s,
"p": p,
"o": o,
}
]
}
self.producer.send(r) await ws.send(json.dumps(req))
def __del__(self): print(req)
self.client.close()
def main(): def main():
@ -106,9 +99,15 @@ def main():
) )
parser.add_argument( parser.add_argument(
'-p', '--pulsar-host', '-u', '--api-url',
default=default_pulsar_host, default=default_url,
help=f'Pulsar host (default: {default_pulsar_host})', help=f'API URL (default: {default_url})',
)
parser.add_argument(
'-i', '--document-id',
required=True,
help=f'Document ID)',
) )
parser.add_argument( parser.add_argument(
@ -118,37 +117,17 @@ def main():
) )
parser.add_argument( parser.add_argument(
'--pulsar-api-key', '-U', '--user',
default=default_pulsar_api_key,
help=f'Pulsar API key',
)
parser.add_argument(
'-o', '--output-queue',
default=default_output_queue,
help=f'Output queue (default: {default_output_queue})'
)
parser.add_argument(
'-u', '--user',
default=default_user, default=default_user,
help=f'User ID (default: {default_user})' help=f'User ID (default: {default_user})'
) )
parser.add_argument( parser.add_argument(
'-c', '--collection', '-C', '--collection',
default=default_collection, default=default_collection,
help=f'Collection ID (default: {default_collection})' help=f'Collection ID (default: {default_collection})'
) )
parser.add_argument(
'-l', '--log-level',
type=LogLevel,
default=LogLevel.ERROR,
choices=list(LogLevel),
help=f'Output queue (default: info)'
)
parser.add_argument( parser.add_argument(
'files', nargs='+', 'files', nargs='+',
help=f'Turtle files to load' help=f'Turtle files to load'
@ -160,16 +139,15 @@ def main():
try: try:
p = Loader( p = Loader(
pulsar_host=args.pulsar_host, document_id = args.document_id,
pulsar_api_key=args.pulsar_api_key, url = args.api_url,
output_queue=args.output_queue, flow = args.flow_id,
log_level=args.log_level, files = args.files,
files=args.files, user = args.user,
user=args.user, collection = args.collection,
collection=args.collection,
) )
p.run() asyncio.run(p.run())
print("File loaded.") print("File loaded.")
break break
@ -181,6 +159,5 @@ def main():
time.sleep(10) time.sleep(10)
print("Not implemented.") main()
#main()