diff --git a/scripts/triples-dump-parquet b/scripts/triples-dump-parquet new file mode 100755 index 00000000..78d79196 --- /dev/null +++ b/scripts/triples-dump-parquet @@ -0,0 +1,6 @@ +#!/usr/bin/env python3 + +from trustgraph.dump.triples.parquet import run + +run() + diff --git a/setup.py b/setup.py index 8cd2103a..703a4070 100644 --- a/setup.py +++ b/setup.py @@ -44,6 +44,7 @@ setuptools.setup( "google-cloud-aiplatform", "pyyaml", "prometheus-client", + "pyarrow", ], scripts=[ "scripts/chunker-recursive", diff --git a/trustgraph/dump/__init__.py b/trustgraph/dump/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/trustgraph/dump/triples/__init__.py b/trustgraph/dump/triples/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/trustgraph/dump/triples/parquet/__init__.py b/trustgraph/dump/triples/parquet/__init__.py new file mode 100644 index 00000000..9d16af90 --- /dev/null +++ b/trustgraph/dump/triples/parquet/__init__.py @@ -0,0 +1,3 @@ + +from . processor import * + diff --git a/trustgraph/dump/triples/parquet/__main__.py b/trustgraph/dump/triples/parquet/__main__.py new file mode 100755 index 00000000..c05d8c6d --- /dev/null +++ b/trustgraph/dump/triples/parquet/__main__.py @@ -0,0 +1,7 @@ +#!/usr/bin/env python3 + +from . write import run + +if __name__ == '__main__': + run() + diff --git a/trustgraph/dump/triples/parquet/processor.py b/trustgraph/dump/triples/parquet/processor.py new file mode 100755 index 00000000..4a95c61c --- /dev/null +++ b/trustgraph/dump/triples/parquet/processor.py @@ -0,0 +1,87 @@ + +""" +Write graphs triples to parquet files in a directory. +""" + +import pulsar +import base64 +import os +import argparse +import time + +from .... trustgraph import TrustGraph +from .... schema import Triple +from .... schema import triples_store_queue +from .... log_level import LogLevel +from .... base import Consumer + +from . writer import ParquetWriter + +module = ".".join(__name__.split(".")[1:-1]) + +default_input_queue = triples_store_queue +default_subscriber = module +default_graph_host='localhost' +default_directory = "." +default_file_template = "triples-{id}.parquet" +default_rotation_time = 30 + +class Processor(Consumer): + + def __init__(self, **params): + + input_queue = params.get("input_queue", default_input_queue) + subscriber = params.get("subscriber", default_subscriber) + directory = params.get("directory", default_directory) + file_template = params.get("file_template", default_file_template) + rotation_time = params.get("rotation_time", default_rotation_time) + + super(Processor, self).__init__( + **params | { + "input_queue": input_queue, + "subscriber": subscriber, + "input_schema": Triple, + } + ) + + self.writer = ParquetWriter(directory, file_template, rotation_time) + + def __del__(self): + if hasattr(self, "writer"): + del self.writer + + def handle(self, msg): + + v = msg.value() + self.writer.write(v.s.value, v.p.value, v.o.value) + + @staticmethod + def add_args(parser): + + Consumer.add_args( + parser, default_input_queue, default_subscriber, + ) + + parser.add_argument( + '-d', '--directory', + default=default_directory, + help=f'Directory to write to (default: {default_directory})' + ) + + parser.add_argument( + '-f', '--file-template', + default=default_file_template, + help=f'Directory to write to (default: {default_file_template})' + ) + + parser.add_argument( + '-t', '--rotation-time', + type=int, + default=default_rotation_time, + help=f'Rotation time / seconds (default: {default_rotation_time})' + ) + +def run(): + + Processor.start(module, __doc__) + diff --git a/trustgraph/dump/triples/parquet/writer.py b/trustgraph/dump/triples/parquet/writer.py new file mode 100644 index 00000000..e68bf342 --- /dev/null +++ b/trustgraph/dump/triples/parquet/writer.py @@ -0,0 +1,96 @@ + +import threading +import queue +import time +import uuid +import pyarrow as pa +import pyarrow.parquet as pq + +class ParquetWriter: + + def __init__(self, directory, file_template, rotation_time): + self.directory = directory + self.file_template = file_template + self.rotation_time = rotation_time + + self.q = queue.Queue() + + self.running = True + + self.thread = threading.Thread(target=(self.writer_thread)) + self.thread.start() + + def writer_thread(self): + + triples = [] + + timeout = None + + while self.running: + + try: + + item = self.q.get(timeout=1) + + if timeout == None: + timeout = time.time() + self.rotation_time + + triples.append(item) + + except queue.Empty: + pass + + if timeout: + if time.time() > timeout: + + self.write_file(triples) + timeout = None + triples = [] + + def write_file(self, triples): + + try: + + schema = pa.schema([ + pa.field('s', pa.string()), + pa.field('p', pa.string()), + pa.field('o', pa.string()), + ]) + + fname = self.file_template.format(id=str(uuid.uuid4())) + path = f"{self.directory}/{fname}" + + writer = pq.ParquetWriter(path, schema) + + batch = pa.record_batch( + [ + [tpl[0] for tpl in triples], + [tpl[1] for tpl in triples], + [tpl[2] for tpl in triples], + ], + names=['s', 'p', 'o'] + ) + + writer.write_batch(batch) + + writer.close() + + print(f"Wrote {path}.") + + except Exception as e: + + print("Parquet write:", e) + + def write(self, s, p, o): + self.q.put((s, p, o)) + + def __del__(self): + + self.running = False + + if hasattr(self, "q"): + self.thread.join() + + + +