diff --git a/scripts/concat-parquet b/scripts/concat-parquet new file mode 100755 index 00000000..7943d436 --- /dev/null +++ b/scripts/concat-parquet @@ -0,0 +1,45 @@ +#!/usr/bin/env python3 + +""" +Concatenates multiple parquet files into a single parquet output +""" + +import pyarrow as pa +import pyarrow.parquet as pq +import pandas as pd +import sys +import argparse + +parser = argparse.ArgumentParser( + prog="combine-parquet", + description=__doc__ +) + +parser.add_argument( + '-i', '--input', + nargs='*', + help=f'Input files' +) + +parser.add_argument( + '-o', '--output', + help=f'Output files' +) + +args = parser.parse_args() + +df = None + +for file in args.input: + + part = pq.read_table(file).to_pandas() + + if df is None: + df = part + else: + df = pd.concat([df, part], ignore_index=True) + +if df is not None: + + table = pa.Table.from_pandas(df) + pq.write_table(table, args.output) diff --git a/scripts/dump-parquet b/scripts/dump-parquet index 36f7f2d1..62b28998 100755 --- a/scripts/dump-parquet +++ b/scripts/dump-parquet @@ -1,12 +1,24 @@ #!/usr/bin/env python3 +import pyarrow as pa +import pyarrow.csv as pc import pyarrow.parquet as pq +import pandas as pd import sys +df = None + for file in sys.argv[1:]: - table = pq.read_table(file).to_pandas() - print(table) + part = pq.read_table(file).to_pandas() + if df is None: + df = part + else: + df = pd.concat([df, part], ignore_index=True) +if df is not None: + + table = pa.Table.from_pandas(df) + pc.write_csv(table, sys.stdout.buffer) diff --git a/scripts/load-graph-embeddings b/scripts/load-graph-embeddings new file mode 100755 index 00000000..2dc3c06f --- /dev/null +++ b/scripts/load-graph-embeddings @@ -0,0 +1,145 @@ +#!/usr/bin/env python3 + +""" +Loads Graph embeddings into TrustGraph processing. +""" + +import pulsar +from pulsar.schema import JsonSchema +from trustgraph.schema import GraphEmbeddings, Value +from trustgraph.schema import graph_embeddings_store_queue +import argparse +import os +import time +import pyarrow as pa +import pyarrow.parquet as pq + +from trustgraph.log_level import LogLevel + +class Loader: + + def __init__( + self, + pulsar_host, + output_queue, + log_level, + file, + ): + + self.client = pulsar.Client( + pulsar_host, + logger=pulsar.ConsoleLogger(log_level.to_pulsar()) + ) + + self.producer = self.client.create_producer( + topic=output_queue, + schema=JsonSchema(GraphEmbeddings), + chunking_enabled=True, + ) + + self.file = file + + def run(self): + + try: + + path = self.file + + print("Reading file...") + table = pq.read_table(path) + print("Loaded.") + + names = set(table.column_names) + + if "embeddings" not in names: + print("No 'embeddings' column") + + if "entity" not in names: + print("No 'entity' column") + + embc = table.column("embeddings") + entc = table.column("entity") + + for emb, ent in zip(embc, entc): + + b = emb.as_py() + n = ent.as_py() + + r = GraphEmbeddings( + vectors=b, + entity=Value( + value=n, + is_uri=n.startswith("https:") + ) + ) + + self.producer.send(r) + + except Exception as e: + print(e, flush=True) + + def __del__(self): + self.client.close() + +def main(): + + parser = argparse.ArgumentParser( + prog='loader', + description=__doc__, + ) + + default_pulsar_host = os.getenv("PULSAR_HOST", 'pulsar://localhost:6650') + default_output_queue = graph_embeddings_store_queue + + parser.add_argument( + '-p', '--pulsar-host', + default=default_pulsar_host, + help=f'Pulsar host (default: {default_pulsar_host})', + ) + + parser.add_argument( + '-o', '--output-queue', + default=default_output_queue, + help=f'Output queue (default: {default_output_queue})' + ) + + parser.add_argument( + '-l', '--log-level', + type=LogLevel, + default=LogLevel.ERROR, + choices=list(LogLevel), + help=f'Output queue (default: info)' + ) + + parser.add_argument( + '-f', '--file', + required=True, + help=f'File to load' + ) + + args = parser.parse_args() + + while True: + + try: + p = Loader( + pulsar_host=args.pulsar_host, + output_queue=args.output_queue, + log_level=args.log_level, + file=args.file, + ) + + p.run() + + print("File loaded.") + break + + except Exception as e: + + print("Exception:", e, flush=True) + print("Will retry...", flush=True) + + time.sleep(10) + +main() + diff --git a/scripts/load-triples b/scripts/load-triples new file mode 100755 index 00000000..e03c065b --- /dev/null +++ b/scripts/load-triples @@ -0,0 +1,144 @@ +#!/usr/bin/env python3 + +""" +Loads Graph embeddings into TrustGraph processing. +""" + +import pulsar +from pulsar.schema import JsonSchema +from trustgraph.schema import Triple, Value +from trustgraph.schema import triples_store_queue +import argparse +import os +import time +import pyarrow as pa +import pyarrow.parquet as pq + +from trustgraph.log_level import LogLevel + +class Loader: + + def __init__( + self, + pulsar_host, + output_queue, + log_level, + file, + ): + + self.client = pulsar.Client( + pulsar_host, + logger=pulsar.ConsoleLogger(log_level.to_pulsar()) + ) + + self.producer = self.client.create_producer( + topic=output_queue, + schema=JsonSchema(Triple), + chunking_enabled=True, + ) + + self.file = file + + def run(self): + + try: + + path = self.file + + print("Reading file...") + table = pq.read_table(path) + print("Loaded.") + + names = set(table.column_names) + + if "s" not in names: + print("No 's' column") + + if "p" not in names: + print("No 'p' column") + + if "o" not in names: + print("No 'o' column") + + sc = table.column("s") + pc = table.column("p") + oc = table.column("o") + + for s, p, o in zip(sc, pc, oc): + + r = Triple( + s=Value(value=s.as_py(), is_uri=True), + p=Value(value=p.as_py(), is_uri=True), + o=Value(value=o.as_py(), is_uri=o.as_py().startswith("https:")) + ) + + self.producer.send(r) + + except Exception as e: + print(e, flush=True) + + def __del__(self): + self.client.close() + +def main(): + + parser = argparse.ArgumentParser( + prog='loader', + description=__doc__, + ) + + default_pulsar_host = os.getenv("PULSAR_HOST", 'pulsar://localhost:6650') + default_output_queue = triples_store_queue + + parser.add_argument( + '-p', '--pulsar-host', + default=default_pulsar_host, + help=f'Pulsar host (default: {default_pulsar_host})', + ) + + parser.add_argument( + '-o', '--output-queue', + default=default_output_queue, + help=f'Output queue (default: {default_output_queue})' + ) + + parser.add_argument( + '-l', '--log-level', + type=LogLevel, + default=LogLevel.ERROR, + choices=list(LogLevel), + help=f'Output queue (default: info)' + ) + + parser.add_argument( + '-f', '--file', + required=True, + help=f'File to load' + ) + + args = parser.parse_args() + + while True: + + try: + p = Loader( + pulsar_host=args.pulsar_host, + output_queue=args.output_queue, + log_level=args.log_level, + file=args.file, + ) + + p.run() + + print("File loaded.") + break + + except Exception as e: + + print("Exception:", e, flush=True) + print("Will retry...", flush=True) + + time.sleep(10) + +main() + diff --git a/scripts/loader b/scripts/loader index d862a507..17be248b 100755 --- a/scripts/loader +++ b/scripts/loader @@ -5,7 +5,7 @@ Loads a PDF documented into TrustGraph processing. """ import pulsar -from pulsar.schema import JsonSchema +from pulsar.schema import JsonSchema, document_ingest_queue from trustgraph.schema import Document, Source import base64 import hashlib @@ -72,7 +72,7 @@ def main(): ) default_pulsar_host = os.getenv("PULSAR_HOST", 'pulsar://localhost:6650') - default_output_queue = 'document-load' + default_output_queue = document_ingest_queue parser.add_argument( '-p', '--pulsar-host',