This commit is contained in:
Cyber MacGeddon 2024-07-24 00:27:05 +01:00
parent 9bcdee0f64
commit 7d864fe370
7 changed files with 198 additions and 0 deletions

6
scripts/ge-dump-parquet Executable file
View file

@ -0,0 +1,6 @@
#!/usr/bin/env python3
from trustgraph.dump.graph_embeddings.parquet import run
run()

View file

@ -69,5 +69,6 @@ setuptools.setup(
"scripts/triples-write-cassandra",
"scripts/dump-parquet",
"scripts/triples-dump-parquet",
"scripts/ge-dump-parquet",
]
)

View file

@ -0,0 +1,3 @@
from . processor import *

View file

@ -0,0 +1,7 @@
#!/usr/bin/env python3
from . write import run
if __name__ == '__main__':
run()

View file

@ -0,0 +1,87 @@
"""
Write graph embeddings to parquet files in a directory.
"""
import pulsar
import base64
import os
import argparse
import time
from .... trustgraph import TrustGraph
from .... schema import GraphEmbeddings
from .... schema import graph_embeddings_store_queue
from .... log_level import LogLevel
from .... base import Consumer
from . writer import ParquetWriter
module = ".".join(__name__.split(".")[1:-1])
default_input_queue = graph_embeddings_store_queue
default_subscriber = module
default_graph_host='localhost'
default_directory = "."
default_file_template = "graph-embeds-{id}.parquet"
default_rotation_time = 60
class Processor(Consumer):
def __init__(self, **params):
input_queue = params.get("input_queue", default_input_queue)
subscriber = params.get("subscriber", default_subscriber)
directory = params.get("directory", default_directory)
file_template = params.get("file_template", default_file_template)
rotation_time = params.get("rotation_time", default_rotation_time)
super(Processor, self).__init__(
**params | {
"input_queue": input_queue,
"subscriber": subscriber,
"input_schema": GraphEmbeddings,
}
)
self.writer = ParquetWriter(directory, file_template, rotation_time)
def __del__(self):
if hasattr(self, "writer"):
del self.writer
def handle(self, msg):
v = msg.value()
self.writer.write(v.vectors, v.entity.value)
@staticmethod
def add_args(parser):
Consumer.add_args(
parser, default_input_queue, default_subscriber,
)
parser.add_argument(
'-d', '--directory',
default=default_directory,
help=f'Directory to write to (default: {default_directory})'
)
parser.add_argument(
'-f', '--file-template',
default=default_file_template,
help=f'Directory to write to (default: {default_file_template})'
)
parser.add_argument(
'-t', '--rotation-time',
type=int,
default=default_rotation_time,
help=f'Rotation time / seconds (default: {default_rotation_time})'
)
def run():
Processor.start(module, __doc__)

View file

@ -0,0 +1,94 @@
import threading
import queue
import time
import uuid
import pyarrow as pa
import pyarrow.parquet as pq
class ParquetWriter:
def __init__(self, directory, file_template, rotation_time):
self.directory = directory
self.file_template = file_template
self.rotation_time = rotation_time
self.q = queue.Queue()
self.running = True
self.thread = threading.Thread(target=(self.writer_thread))
self.thread.start()
def writer_thread(self):
items = []
timeout = None
while self.running:
try:
item = self.q.get(timeout=1)
if timeout == None:
timeout = time.time() + self.rotation_time
items.append(item)
except queue.Empty:
pass
if timeout:
if time.time() > timeout:
self.write_file(items)
timeout = None
items = []
def write_file(self, items):
try:
schema = pa.schema([
pa.field('embeddings', pa.list_(pa.list_(pa.float64()))),
pa.field('entity', pa.string()),
])
fname = self.file_template.format(id=str(uuid.uuid4()))
path = f"{self.directory}/{fname}"
writer = pq.ParquetWriter(path, schema)
batch = pa.record_batch(
[
[i[0] for i in items],
[i[1] for i in items],
],
names=['embeddings', 'entity']
)
writer.write_batch(batch)
writer.close()
print(f"Wrote {path}.")
except Exception as e:
print("Parquet write:", e)
def write(self, embeds, ent):
self.q.put((embeds, ent))
def __del__(self):
self.running = False
if hasattr(self, "q"):
self.thread.join()