Parquet dumper for triples

This commit is contained in:
Cyber MacGeddon 2024-07-23 22:38:07 +01:00
parent a9bd50c9af
commit 2db050fe93
8 changed files with 200 additions and 0 deletions

6
scripts/triples-dump-parquet Executable file
View file

@ -0,0 +1,6 @@
#!/usr/bin/env python3
from trustgraph.dump.triples.parquet import run
run()

View file

@ -44,6 +44,7 @@ setuptools.setup(
"google-cloud-aiplatform",
"pyyaml",
"prometheus-client",
"pyarrow",
],
scripts=[
"scripts/chunker-recursive",

View file

View file

View file

@ -0,0 +1,3 @@
from . processor import *

View file

@ -0,0 +1,7 @@
#!/usr/bin/env python3
from . write import run
if __name__ == '__main__':
run()

View file

@ -0,0 +1,87 @@
"""
Write graphs triples to parquet files in a directory.
"""
import pulsar
import base64
import os
import argparse
import time
from .... trustgraph import TrustGraph
from .... schema import Triple
from .... schema import triples_store_queue
from .... log_level import LogLevel
from .... base import Consumer
from . writer import ParquetWriter
module = ".".join(__name__.split(".")[1:-1])
default_input_queue = triples_store_queue
default_subscriber = module
default_graph_host='localhost'
default_directory = "."
default_file_template = "triples-{id}.parquet"
default_rotation_time = 30
class Processor(Consumer):
def __init__(self, **params):
input_queue = params.get("input_queue", default_input_queue)
subscriber = params.get("subscriber", default_subscriber)
directory = params.get("directory", default_directory)
file_template = params.get("file_template", default_file_template)
rotation_time = params.get("rotation_time", default_rotation_time)
super(Processor, self).__init__(
**params | {
"input_queue": input_queue,
"subscriber": subscriber,
"input_schema": Triple,
}
)
self.writer = ParquetWriter(directory, file_template, rotation_time)
def __del__(self):
if hasattr(self, "writer"):
del self.writer
def handle(self, msg):
v = msg.value()
self.writer.write(v.s.value, v.p.value, v.o.value)
@staticmethod
def add_args(parser):
Consumer.add_args(
parser, default_input_queue, default_subscriber,
)
parser.add_argument(
'-d', '--directory',
default=default_directory,
help=f'Directory to write to (default: {default_directory})'
)
parser.add_argument(
'-f', '--file-template',
default=default_file_template,
help=f'Directory to write to (default: {default_file_template})'
)
parser.add_argument(
'-t', '--rotation-time',
type=int,
default=default_rotation_time,
help=f'Rotation time / seconds (default: {default_rotation_time})'
)
def run():
Processor.start(module, __doc__)

View file

@ -0,0 +1,96 @@
import threading
import queue
import time
import uuid
import pyarrow as pa
import pyarrow.parquet as pq
class ParquetWriter:
def __init__(self, directory, file_template, rotation_time):
self.directory = directory
self.file_template = file_template
self.rotation_time = rotation_time
self.q = queue.Queue()
self.running = True
self.thread = threading.Thread(target=(self.writer_thread))
self.thread.start()
def writer_thread(self):
triples = []
timeout = None
while self.running:
try:
item = self.q.get(timeout=1)
if timeout == None:
timeout = time.time() + self.rotation_time
triples.append(item)
except queue.Empty:
pass
if timeout:
if time.time() > timeout:
self.write_file(triples)
timeout = None
triples = []
def write_file(self, triples):
try:
schema = pa.schema([
pa.field('s', pa.string()),
pa.field('p', pa.string()),
pa.field('o', pa.string()),
])
fname = self.file_template.format(id=str(uuid.uuid4()))
path = f"{self.directory}/{fname}"
writer = pq.ParquetWriter(path, schema)
batch = pa.record_batch(
[
[tpl[0] for tpl in triples],
[tpl[1] for tpl in triples],
[tpl[2] for tpl in triples],
],
names=['s', 'p', 'o']
)
writer.write_batch(batch)
writer.close()
print(f"Wrote {path}.")
except Exception as e:
print("Parquet write:", e)
def write(self, s, p, o):
self.q.put((s, p, o))
def __del__(self):
self.running = False
if hasattr(self, "q"):
self.thread.join()