From 7df7843dad25bf643734522245b5c7eb5c59976e Mon Sep 17 00:00:00 2001 From: cybermaggedon Date: Fri, 6 Dec 2024 08:51:10 +0000 Subject: [PATCH] Main/remove parquet (#195) * Remove Parquet code, and package build --- .github/workflows/release.yaml | 24 +-- Containerfile | 5 +- Makefile | 3 - trustgraph-parquet/README.md | 1 - trustgraph-parquet/scripts/concat-parquet | 45 ----- trustgraph-parquet/scripts/dump-parquet | 24 --- trustgraph-parquet/scripts/ge-dump-parquet | 6 - .../scripts/load-graph-embeddings | 170 ----------------- trustgraph-parquet/scripts/load-triples | 180 ------------------ .../scripts/triples-dump-parquet | 6 - trustgraph-parquet/setup.py | 51 ----- .../trustgraph/dump/__init__.py | 0 .../dump/graph_embeddings/__init__.py | 0 .../dump/graph_embeddings/parquet/__init__.py | 3 - .../dump/graph_embeddings/parquet/__main__.py | 7 - .../graph_embeddings/parquet/processor.py | 85 --------- .../dump/graph_embeddings/parquet/writer.py | 94 --------- .../trustgraph/dump/triples/__init__.py | 0 .../dump/triples/parquet/__init__.py | 3 - .../dump/triples/parquet/__main__.py | 7 - .../dump/triples/parquet/processor.py | 87 --------- .../trustgraph/dump/triples/parquet/writer.py | 96 ---------- 22 files changed, 11 insertions(+), 886 deletions(-) delete mode 100644 trustgraph-parquet/README.md delete mode 100755 trustgraph-parquet/scripts/concat-parquet delete mode 100755 trustgraph-parquet/scripts/dump-parquet delete mode 100755 trustgraph-parquet/scripts/ge-dump-parquet delete mode 100755 trustgraph-parquet/scripts/load-graph-embeddings delete mode 100755 trustgraph-parquet/scripts/load-triples delete mode 100755 trustgraph-parquet/scripts/triples-dump-parquet delete mode 100644 trustgraph-parquet/setup.py delete mode 100644 trustgraph-parquet/trustgraph/dump/__init__.py delete mode 100644 trustgraph-parquet/trustgraph/dump/graph_embeddings/__init__.py delete mode 100644 trustgraph-parquet/trustgraph/dump/graph_embeddings/parquet/__init__.py delete mode 100755 trustgraph-parquet/trustgraph/dump/graph_embeddings/parquet/__main__.py delete mode 100755 trustgraph-parquet/trustgraph/dump/graph_embeddings/parquet/processor.py delete mode 100644 trustgraph-parquet/trustgraph/dump/graph_embeddings/parquet/writer.py delete mode 100644 trustgraph-parquet/trustgraph/dump/triples/__init__.py delete mode 100644 trustgraph-parquet/trustgraph/dump/triples/parquet/__init__.py delete mode 100755 trustgraph-parquet/trustgraph/dump/triples/parquet/__main__.py delete mode 100755 trustgraph-parquet/trustgraph/dump/triples/parquet/processor.py delete mode 100644 trustgraph-parquet/trustgraph/dump/triples/parquet/writer.py diff --git a/.github/workflows/release.yaml b/.github/workflows/release.yaml index fc85a6a8..30fc70ff 100644 --- a/.github/workflows/release.yaml +++ b/.github/workflows/release.yaml @@ -48,20 +48,6 @@ jobs: - name: Publish release distributions to PyPI uses: pypa/gh-action-pypi-publish@release/v1 - - name: Create deploy bundle - run: templates/generate-all deploy.zip ${{ steps.version.outputs.VERSION }} - - - uses: ncipollo/release-action@v1 - with: - artifacts: deploy.zip - generateReleaseNotes: true - makeLatest: false - prerelease: true - skipIfReleaseExists: true - - - name: Build container - run: make container VERSION=${{ steps.version.outputs.VERSION }} - - name: Extract metadata for container id: meta uses: docker/metadata-action@v4 @@ -84,3 +70,13 @@ jobs: tags: ${{ steps.meta.outputs.tags }} labels: ${{ steps.meta.outputs.labels }} + - name: Create deploy bundle + run: templates/generate-all deploy.zip ${{ steps.version.outputs.VERSION }} + + - uses: ncipollo/release-action@v1 + with: + artifacts: deploy.zip + generateReleaseNotes: true + makeLatest: false + prerelease: true + skipIfReleaseExists: true diff --git a/Containerfile b/Containerfile index 0d6d357b..c2735feb 100644 --- a/Containerfile +++ b/Containerfile @@ -16,7 +16,7 @@ RUN pip3 install torch --index-url https://download.pytorch.org/whl/cpu RUN pip3 install anthropic boto3 cohere openai google-cloud-aiplatform ollama google-generativeai \ langchain langchain-core langchain-huggingface langchain-text-splitters \ langchain-community pymilvus sentence-transformers transformers \ - huggingface-hub pulsar-client cassandra-driver pyarrow pyyaml \ + huggingface-hub pulsar-client cassandra-driver pyyaml \ neo4j tiktoken && \ pip3 cache purge @@ -32,7 +32,6 @@ COPY trustgraph-base/ /root/build/trustgraph-base/ COPY trustgraph-flow/ /root/build/trustgraph-flow/ COPY trustgraph-vertexai/ /root/build/trustgraph-vertexai/ COPY trustgraph-bedrock/ /root/build/trustgraph-bedrock/ -COPY trustgraph-parquet/ /root/build/trustgraph-parquet/ COPY trustgraph-embeddings-hf/ /root/build/trustgraph-embeddings-hf/ COPY trustgraph-cli/ /root/build/trustgraph-cli/ @@ -42,7 +41,6 @@ RUN pip3 wheel -w /root/wheels/ --no-deps ./trustgraph-base/ RUN pip3 wheel -w /root/wheels/ --no-deps ./trustgraph-flow/ RUN pip3 wheel -w /root/wheels/ --no-deps ./trustgraph-vertexai/ RUN pip3 wheel -w /root/wheels/ --no-deps ./trustgraph-bedrock/ -RUN pip3 wheel -w /root/wheels/ --no-deps ./trustgraph-parquet/ RUN pip3 wheel -w /root/wheels/ --no-deps ./trustgraph-embeddings-hf/ RUN pip3 wheel -w /root/wheels/ --no-deps ./trustgraph-cli/ @@ -61,7 +59,6 @@ RUN \ pip3 install /root/wheels/trustgraph_flow-* && \ pip3 install /root/wheels/trustgraph_vertexai-* && \ pip3 install /root/wheels/trustgraph_bedrock-* && \ - pip3 install /root/wheels/trustgraph_parquet-* && \ pip3 install /root/wheels/trustgraph_embeddings_hf-* && \ pip3 install /root/wheels/trustgraph_cli-* && \ pip3 cache purge && \ diff --git a/Makefile b/Makefile index 0fb4b175..72d144a9 100644 --- a/Makefile +++ b/Makefile @@ -14,7 +14,6 @@ wheels: pip3 wheel --no-deps --wheel-dir dist trustgraph-flow/ pip3 wheel --no-deps --wheel-dir dist trustgraph-vertexai/ pip3 wheel --no-deps --wheel-dir dist trustgraph-bedrock/ - pip3 wheel --no-deps --wheel-dir dist trustgraph-parquet/ pip3 wheel --no-deps --wheel-dir dist trustgraph-embeddings-hf/ pip3 wheel --no-deps --wheel-dir dist trustgraph-cli/ @@ -25,7 +24,6 @@ packages: update-package-versions cd trustgraph-flow && python3 setup.py sdist --dist-dir ../dist/ cd trustgraph-vertexai && python3 setup.py sdist --dist-dir ../dist/ cd trustgraph-bedrock && python3 setup.py sdist --dist-dir ../dist/ - cd trustgraph-parquet && python3 setup.py sdist --dist-dir ../dist/ cd trustgraph-embeddings-hf && python3 setup.py sdist --dist-dir ../dist/ cd trustgraph-cli && python3 setup.py sdist --dist-dir ../dist/ @@ -41,7 +39,6 @@ update-package-versions: echo __version__ = \"${VERSION}\" > trustgraph-flow/trustgraph/flow_version.py echo __version__ = \"${VERSION}\" > trustgraph-vertexai/trustgraph/vertexai_version.py echo __version__ = \"${VERSION}\" > trustgraph-bedrock/trustgraph/bedrock_version.py - echo __version__ = \"${VERSION}\" > trustgraph-parquet/trustgraph/parquet_version.py echo __version__ = \"${VERSION}\" > trustgraph-embeddings-hf/trustgraph/embeddings_hf_version.py echo __version__ = \"${VERSION}\" > trustgraph-cli/trustgraph/cli_version.py echo __version__ = \"${VERSION}\" > trustgraph/trustgraph/trustgraph_version.py diff --git a/trustgraph-parquet/README.md b/trustgraph-parquet/README.md deleted file mode 100644 index 7a2ce130..00000000 --- a/trustgraph-parquet/README.md +++ /dev/null @@ -1 +0,0 @@ -See https://trustgraph.ai/ diff --git a/trustgraph-parquet/scripts/concat-parquet b/trustgraph-parquet/scripts/concat-parquet deleted file mode 100755 index 7943d436..00000000 --- a/trustgraph-parquet/scripts/concat-parquet +++ /dev/null @@ -1,45 +0,0 @@ -#!/usr/bin/env python3 - -""" -Concatenates multiple parquet files into a single parquet output -""" - -import pyarrow as pa -import pyarrow.parquet as pq -import pandas as pd -import sys -import argparse - -parser = argparse.ArgumentParser( - prog="combine-parquet", - description=__doc__ -) - -parser.add_argument( - '-i', '--input', - nargs='*', - help=f'Input files' -) - -parser.add_argument( - '-o', '--output', - help=f'Output files' -) - -args = parser.parse_args() - -df = None - -for file in args.input: - - part = pq.read_table(file).to_pandas() - - if df is None: - df = part - else: - df = pd.concat([df, part], ignore_index=True) - -if df is not None: - - table = pa.Table.from_pandas(df) - pq.write_table(table, args.output) diff --git a/trustgraph-parquet/scripts/dump-parquet b/trustgraph-parquet/scripts/dump-parquet deleted file mode 100755 index 62b28998..00000000 --- a/trustgraph-parquet/scripts/dump-parquet +++ /dev/null @@ -1,24 +0,0 @@ -#!/usr/bin/env python3 - -import pyarrow as pa -import pyarrow.csv as pc -import pyarrow.parquet as pq -import pandas as pd -import sys - -df = None - -for file in sys.argv[1:]: - - part = pq.read_table(file).to_pandas() - - if df is None: - df = part - else: - df = pd.concat([df, part], ignore_index=True) - -if df is not None: - - table = pa.Table.from_pandas(df) - pc.write_csv(table, sys.stdout.buffer) - diff --git a/trustgraph-parquet/scripts/ge-dump-parquet b/trustgraph-parquet/scripts/ge-dump-parquet deleted file mode 100755 index c2b29c51..00000000 --- a/trustgraph-parquet/scripts/ge-dump-parquet +++ /dev/null @@ -1,6 +0,0 @@ -#!/usr/bin/env python3 - -from trustgraph.dump.graph_embeddings.parquet import run - -run() - diff --git a/trustgraph-parquet/scripts/load-graph-embeddings b/trustgraph-parquet/scripts/load-graph-embeddings deleted file mode 100755 index 0e6ecf93..00000000 --- a/trustgraph-parquet/scripts/load-graph-embeddings +++ /dev/null @@ -1,170 +0,0 @@ -#!/usr/bin/env python3 - -""" -Loads Graph embeddings into TrustGraph processing. -""" - -import pulsar -from pulsar.schema import JsonSchema -from trustgraph.schema import GraphEmbeddings, Value, Metadata -from trustgraph.schema import graph_embeddings_store_queue -import argparse -import os -import time -import pyarrow as pa -import pyarrow.parquet as pq - -from trustgraph.log_level import LogLevel - -class Loader: - - def __init__( - self, - pulsar_host, - output_queue, - log_level, - file, - user, - collection, - ): - - self.client = pulsar.Client( - pulsar_host, - logger=pulsar.ConsoleLogger(log_level.to_pulsar()) - ) - - self.producer = self.client.create_producer( - topic=output_queue, - schema=JsonSchema(GraphEmbeddings), - chunking_enabled=True, - ) - - self.file = file - self.user = user - self.collection = collection - - def run(self): - - try: - - path = self.file - - print("Reading file...") - table = pq.read_table(path) - print("Loaded.") - - names = set(table.column_names) - - if "embeddings" not in names: - print("No 'embeddings' column") - - if "entity" not in names: - print("No 'entity' column") - - embc = table.column("embeddings") - entc = table.column("entity") - - for emb, ent in zip(embc, entc): - - b = emb.as_py() - n = ent.as_py() - - r = GraphEmbeddings( - metadata=Metadata( - metadata=[], - user=self.user, - collection=self.collection, - ), - vectors=b, - entity=Value( - value=n, - is_uri=n.startswith("https:") - ), - ) - - self.producer.send(r) - - except Exception as e: - print(e, flush=True) - - def __del__(self): - self.client.close() - -def main(): - - parser = argparse.ArgumentParser( - prog='loader', - description=__doc__, - ) - - default_pulsar_host = os.getenv("PULSAR_HOST", 'pulsar://localhost:6650') - default_output_queue = graph_embeddings_store_queue - default_user = 'trustgraph' - default_collection = 'default' - - parser.add_argument( - '-p', '--pulsar-host', - default=default_pulsar_host, - help=f'Pulsar host (default: {default_pulsar_host})', - ) - - parser.add_argument( - '-o', '--output-queue', - default=default_output_queue, - help=f'Output queue (default: {default_output_queue})' - ) - - parser.add_argument( - '-u', '--user', - default=default_user, - help=f'User ID (default: {default_user})' - ) - - parser.add_argument( - '-c', '--collection', - default=default_collection, - help=f'Collection ID (default: {default_collection})' - ) - - parser.add_argument( - '-l', '--log-level', - type=LogLevel, - default=LogLevel.ERROR, - choices=list(LogLevel), - help=f'Output queue (default: info)' - ) - - parser.add_argument( - '-f', '--file', - required=True, - help=f'File to load' - ) - - args = parser.parse_args() - - while True: - - try: - p = Loader( - pulsar_host=args.pulsar_host, - output_queue=args.output_queue, - log_level=args.log_level, - file=args.file, - user=args.user, - collection=args.collection, - ) - - p.run() - - print("File loaded.") - break - - except Exception as e: - - print("Exception:", e, flush=True) - print("Will retry...", flush=True) - - time.sleep(10) - -main() - diff --git a/trustgraph-parquet/scripts/load-triples b/trustgraph-parquet/scripts/load-triples deleted file mode 100755 index e6bb0ff7..00000000 --- a/trustgraph-parquet/scripts/load-triples +++ /dev/null @@ -1,180 +0,0 @@ -#!/usr/bin/env python3 - -""" -Loads Graph embeddings into TrustGraph processing. -""" - -import pulsar -from pulsar.schema import JsonSchema -from trustgraph.schema import Triples, Triple, Value, Metadata -from trustgraph.schema import triples_store_queue -import argparse -import os -import time -import pyarrow as pa -import pyarrow.parquet as pq - -from trustgraph.log_level import LogLevel - -class Loader: - - def __init__( - self, - pulsar_host, - output_queue, - log_level, - file, - user, - collection, - ): - - self.client = pulsar.Client( - pulsar_host, - logger=pulsar.ConsoleLogger(log_level.to_pulsar()) - ) - - self.producer = self.client.create_producer( - topic=output_queue, - schema=JsonSchema(Triples), - chunking_enabled=True, - ) - - self.file = file - self.user = user - self.collection = collection - - def run(self): - - try: - - path = self.file - - print("Reading file...") - table = pq.read_table(path) - print("Loaded.") - - names = set(table.column_names) - - if "s" not in names: - print("No 's' column") - - if "p" not in names: - print("No 'p' column") - - if "o" not in names: - print("No 'o' column") - - sc = table.column("s") - pc = table.column("p") - oc = table.column("o") - - for s, p, o in zip(sc, pc, oc): - - r = Triples( - metadata=Metadata( - metadata=[], - user=self.user, - collection=self.collection, - ), - triples=[ - Triple( - s=Value( - value=s.as_py(), is_uri=True - ), - p=Value( - value=p.as_py(), is_uri=True - ), - o=Value( - value=o.as_py(), - is_uri=o.as_py().startswith("https:") - ) - ) - ] - ) - - self.producer.send(r) - - except Exception as e: - print(e, flush=True) - - def __del__(self): - self.client.close() - -def main(): - - parser = argparse.ArgumentParser( - prog='loader', - description=__doc__, - ) - - default_pulsar_host = os.getenv("PULSAR_HOST", 'pulsar://localhost:6650') - default_output_queue = triples_store_queue - default_user = 'trustgraph' - default_collection = 'default' - - parser.add_argument( - '-p', '--pulsar-host', - default=default_pulsar_host, - help=f'Pulsar host (default: {default_pulsar_host})', - ) - - parser.add_argument( - '-o', '--output-queue', - default=default_output_queue, - help=f'Output queue (default: {default_output_queue})' - ) - - parser.add_argument( - '-u', '--user', - default=default_user, - help=f'User ID (default: {default_user})' - ) - - parser.add_argument( - '-c', '--collection', - default=default_collection, - help=f'Collection ID (default: {default_collection})' - ) - - parser.add_argument( - '-l', '--log-level', - type=LogLevel, - default=LogLevel.ERROR, - choices=list(LogLevel), - help=f'Output queue (default: info)' - ) - - parser.add_argument( - '-f', '--file', - required=True, - help=f'File to load' - ) - - args = parser.parse_args() - - while True: - - try: - p = Loader( - pulsar_host=args.pulsar_host, - output_queue=args.output_queue, - log_level=args.log_level, - file=args.file, - user=args.user, - collection=args.collection, - ) - - p.run() - - print("File loaded.") - break - - except Exception as e: - - print("Exception:", e, flush=True) - print("Will retry...", flush=True) - - time.sleep(10) - -main() - diff --git a/trustgraph-parquet/scripts/triples-dump-parquet b/trustgraph-parquet/scripts/triples-dump-parquet deleted file mode 100755 index 78d79196..00000000 --- a/trustgraph-parquet/scripts/triples-dump-parquet +++ /dev/null @@ -1,6 +0,0 @@ -#!/usr/bin/env python3 - -from trustgraph.dump.triples.parquet import run - -run() - diff --git a/trustgraph-parquet/setup.py b/trustgraph-parquet/setup.py deleted file mode 100644 index dfe29653..00000000 --- a/trustgraph-parquet/setup.py +++ /dev/null @@ -1,51 +0,0 @@ -import setuptools -import os -import importlib - -with open("README.md", "r") as fh: - long_description = fh.read() - -# Load a version number module -spec = importlib.util.spec_from_file_location( - 'version', 'trustgraph/parquet_version.py' -) -version_module = importlib.util.module_from_spec(spec) -spec.loader.exec_module(version_module) - -version = version_module.__version__ - -setuptools.setup( - name="trustgraph-parquet", - version=version, - author="trustgraph.ai", - author_email="security@trustgraph.ai", - description="TrustGraph provides a means to run a pipeline of flexible AI processing components in a flexible means to achieve a processing pipeline.", - long_description=long_description, - long_description_content_type="text/markdown", - url="https://github.com/trustgraph-ai/trustgraph", - packages=setuptools.find_namespace_packages( - where='./', - ), - classifiers=[ - "Programming Language :: Python :: 3", - "License :: OSI Approved :: GNU General Public License v3 or later (GPLv3+)", - "Operating System :: OS Independent", - ], - python_requires='>=3.8', - download_url = "https://github.com/trustgraph-ai/trustgraph/archive/refs/tags/v" + version + ".tar.gz", - install_requires=[ - "trustgraph-base>=0.17,<0.18", - "pulsar-client", - "prometheus-client", - "pyarrow", - "pandas", - ], - scripts=[ - "scripts/concat-parquet", - "scripts/dump-parquet", - "scripts/ge-dump-parquet", - "scripts/triples-dump-parquet", - "scripts/load-graph-embeddings", - "scripts/load-triples", - ] -) diff --git a/trustgraph-parquet/trustgraph/dump/__init__.py b/trustgraph-parquet/trustgraph/dump/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/trustgraph-parquet/trustgraph/dump/graph_embeddings/__init__.py b/trustgraph-parquet/trustgraph/dump/graph_embeddings/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/trustgraph-parquet/trustgraph/dump/graph_embeddings/parquet/__init__.py b/trustgraph-parquet/trustgraph/dump/graph_embeddings/parquet/__init__.py deleted file mode 100644 index 9d16af90..00000000 --- a/trustgraph-parquet/trustgraph/dump/graph_embeddings/parquet/__init__.py +++ /dev/null @@ -1,3 +0,0 @@ - -from . processor import * - diff --git a/trustgraph-parquet/trustgraph/dump/graph_embeddings/parquet/__main__.py b/trustgraph-parquet/trustgraph/dump/graph_embeddings/parquet/__main__.py deleted file mode 100755 index c05d8c6d..00000000 --- a/trustgraph-parquet/trustgraph/dump/graph_embeddings/parquet/__main__.py +++ /dev/null @@ -1,7 +0,0 @@ -#!/usr/bin/env python3 - -from . write import run - -if __name__ == '__main__': - run() - diff --git a/trustgraph-parquet/trustgraph/dump/graph_embeddings/parquet/processor.py b/trustgraph-parquet/trustgraph/dump/graph_embeddings/parquet/processor.py deleted file mode 100755 index 795f3351..00000000 --- a/trustgraph-parquet/trustgraph/dump/graph_embeddings/parquet/processor.py +++ /dev/null @@ -1,85 +0,0 @@ - -""" -Write graph embeddings to parquet files in a directory. -""" - -import pulsar -import base64 -import os -import argparse -import time - -from .... schema import GraphEmbeddings -from .... schema import graph_embeddings_store_queue -from .... base import Consumer - -from . writer import ParquetWriter - -module = ".".join(__name__.split(".")[1:-1]) - -default_input_queue = graph_embeddings_store_queue -default_subscriber = module -default_graph_host='localhost' -default_directory = "." -default_file_template = "graph-embeds-{id}.parquet" -default_rotation_time = 60 - -class Processor(Consumer): - - def __init__(self, **params): - - input_queue = params.get("input_queue", default_input_queue) - subscriber = params.get("subscriber", default_subscriber) - directory = params.get("directory", default_directory) - file_template = params.get("file_template", default_file_template) - rotation_time = params.get("rotation_time", default_rotation_time) - - super(Processor, self).__init__( - **params | { - "input_queue": input_queue, - "subscriber": subscriber, - "input_schema": GraphEmbeddings, - } - ) - - self.writer = ParquetWriter(directory, file_template, rotation_time) - - def __del__(self): - if hasattr(self, "writer"): - del self.writer - - def handle(self, msg): - - v = msg.value() - self.writer.write(v.vectors, v.entity.value) - - @staticmethod - def add_args(parser): - - Consumer.add_args( - parser, default_input_queue, default_subscriber, - ) - - parser.add_argument( - '-d', '--directory', - default=default_directory, - help=f'Directory to write to (default: {default_directory})' - ) - - parser.add_argument( - '-f', '--file-template', - default=default_file_template, - help=f'Directory to write to (default: {default_file_template})' - ) - - parser.add_argument( - '-t', '--rotation-time', - type=int, - default=default_rotation_time, - help=f'Rotation time / seconds (default: {default_rotation_time})' - ) - -def run(): - - Processor.start(module, __doc__) - diff --git a/trustgraph-parquet/trustgraph/dump/graph_embeddings/parquet/writer.py b/trustgraph-parquet/trustgraph/dump/graph_embeddings/parquet/writer.py deleted file mode 100644 index 1844cdd1..00000000 --- a/trustgraph-parquet/trustgraph/dump/graph_embeddings/parquet/writer.py +++ /dev/null @@ -1,94 +0,0 @@ - -import threading -import queue -import time -import uuid -import pyarrow as pa -import pyarrow.parquet as pq - -class ParquetWriter: - - def __init__(self, directory, file_template, rotation_time): - self.directory = directory - self.file_template = file_template - self.rotation_time = rotation_time - - self.q = queue.Queue() - - self.running = True - - self.thread = threading.Thread(target=(self.writer_thread)) - self.thread.start() - - def writer_thread(self): - - items = [] - - timeout = None - - while self.running: - - try: - - item = self.q.get(timeout=1) - - if timeout == None: - timeout = time.time() + self.rotation_time - - items.append(item) - - except queue.Empty: - pass - - if timeout: - if time.time() > timeout: - - self.write_file(items) - timeout = None - items = [] - - def write_file(self, items): - - try: - - schema = pa.schema([ - pa.field('embeddings', pa.list_(pa.list_(pa.float64()))), - pa.field('entity', pa.string()), - ]) - - fname = self.file_template.format(id=str(uuid.uuid4())) - path = f"{self.directory}/{fname}" - - writer = pq.ParquetWriter(path, schema) - - batch = pa.record_batch( - [ - [i[0] for i in items], - [i[1] for i in items], - ], - names=['embeddings', 'entity'] - ) - - writer.write_batch(batch) - - writer.close() - - print(f"Wrote {path}.") - - except Exception as e: - - print("Parquet write:", e) - - def write(self, embeds, ent): - self.q.put((embeds, ent)) - - def __del__(self): - - self.running = False - - if hasattr(self, "q"): - self.thread.join() - - - - diff --git a/trustgraph-parquet/trustgraph/dump/triples/__init__.py b/trustgraph-parquet/trustgraph/dump/triples/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/trustgraph-parquet/trustgraph/dump/triples/parquet/__init__.py b/trustgraph-parquet/trustgraph/dump/triples/parquet/__init__.py deleted file mode 100644 index 9d16af90..00000000 --- a/trustgraph-parquet/trustgraph/dump/triples/parquet/__init__.py +++ /dev/null @@ -1,3 +0,0 @@ - -from . processor import * - diff --git a/trustgraph-parquet/trustgraph/dump/triples/parquet/__main__.py b/trustgraph-parquet/trustgraph/dump/triples/parquet/__main__.py deleted file mode 100755 index c05d8c6d..00000000 --- a/trustgraph-parquet/trustgraph/dump/triples/parquet/__main__.py +++ /dev/null @@ -1,7 +0,0 @@ -#!/usr/bin/env python3 - -from . write import run - -if __name__ == '__main__': - run() - diff --git a/trustgraph-parquet/trustgraph/dump/triples/parquet/processor.py b/trustgraph-parquet/trustgraph/dump/triples/parquet/processor.py deleted file mode 100755 index dc15d8a9..00000000 --- a/trustgraph-parquet/trustgraph/dump/triples/parquet/processor.py +++ /dev/null @@ -1,87 +0,0 @@ - -""" -Write graphs triples to parquet files in a directory. -""" - -import pulsar -import base64 -import os -import argparse -import time - -from .... schema import Triples -from .... schema import triples_store_queue -from .... base import Consumer - -from . writer import ParquetWriter - -module = ".".join(__name__.split(".")[1:-1]) - -default_input_queue = triples_store_queue -default_subscriber = module -default_graph_host='localhost' -default_directory = "." -default_file_template = "triples-{id}.parquet" -default_rotation_time = 60 - -class Processor(Consumer): - - def __init__(self, **params): - - input_queue = params.get("input_queue", default_input_queue) - subscriber = params.get("subscriber", default_subscriber) - directory = params.get("directory", default_directory) - file_template = params.get("file_template", default_file_template) - rotation_time = params.get("rotation_time", default_rotation_time) - - super(Processor, self).__init__( - **params | { - "input_queue": input_queue, - "subscriber": subscriber, - "input_schema": Triples, - } - ) - - self.writer = ParquetWriter(directory, file_template, rotation_time) - - def __del__(self): - if hasattr(self, "writer"): - del self.writer - - def handle(self, msg): - - v = msg.value() - - for t in v.triples: - self.writer.write(t.s.value, t.p.value, t.o.value) - - @staticmethod - def add_args(parser): - - Consumer.add_args( - parser, default_input_queue, default_subscriber, - ) - - parser.add_argument( - '-d', '--directory', - default=default_directory, - help=f'Directory to write to (default: {default_directory})' - ) - - parser.add_argument( - '-f', '--file-template', - default=default_file_template, - help=f'Directory to write to (default: {default_file_template})' - ) - - parser.add_argument( - '-t', '--rotation-time', - type=int, - default=default_rotation_time, - help=f'Rotation time / seconds (default: {default_rotation_time})' - ) - -def run(): - - Processor.start(module, __doc__) - diff --git a/trustgraph-parquet/trustgraph/dump/triples/parquet/writer.py b/trustgraph-parquet/trustgraph/dump/triples/parquet/writer.py deleted file mode 100644 index e68bf342..00000000 --- a/trustgraph-parquet/trustgraph/dump/triples/parquet/writer.py +++ /dev/null @@ -1,96 +0,0 @@ - -import threading -import queue -import time -import uuid -import pyarrow as pa -import pyarrow.parquet as pq - -class ParquetWriter: - - def __init__(self, directory, file_template, rotation_time): - self.directory = directory - self.file_template = file_template - self.rotation_time = rotation_time - - self.q = queue.Queue() - - self.running = True - - self.thread = threading.Thread(target=(self.writer_thread)) - self.thread.start() - - def writer_thread(self): - - triples = [] - - timeout = None - - while self.running: - - try: - - item = self.q.get(timeout=1) - - if timeout == None: - timeout = time.time() + self.rotation_time - - triples.append(item) - - except queue.Empty: - pass - - if timeout: - if time.time() > timeout: - - self.write_file(triples) - timeout = None - triples = [] - - def write_file(self, triples): - - try: - - schema = pa.schema([ - pa.field('s', pa.string()), - pa.field('p', pa.string()), - pa.field('o', pa.string()), - ]) - - fname = self.file_template.format(id=str(uuid.uuid4())) - path = f"{self.directory}/{fname}" - - writer = pq.ParquetWriter(path, schema) - - batch = pa.record_batch( - [ - [tpl[0] for tpl in triples], - [tpl[1] for tpl in triples], - [tpl[2] for tpl in triples], - ], - names=['s', 'p', 'o'] - ) - - writer.write_batch(batch) - - writer.close() - - print(f"Wrote {path}.") - - except Exception as e: - - print("Parquet write:", e) - - def write(self, s, p, o): - self.q.put((s, p, o)) - - def __del__(self): - - self.running = False - - if hasattr(self, "q"): - self.thread.join() - - - -