mirror of
https://github.com/trustgraph-ai/trustgraph.git
synced 2026-06-03 03:45:13 +02:00
parent
2818ec9f23
commit
7df7843dad
22 changed files with 11 additions and 886 deletions
24
.github/workflows/release.yaml
vendored
24
.github/workflows/release.yaml
vendored
|
|
@ -48,20 +48,6 @@ jobs:
|
|||
- name: Publish release distributions to PyPI
|
||||
uses: pypa/gh-action-pypi-publish@release/v1
|
||||
|
||||
- name: Create deploy bundle
|
||||
run: templates/generate-all deploy.zip ${{ steps.version.outputs.VERSION }}
|
||||
|
||||
- uses: ncipollo/release-action@v1
|
||||
with:
|
||||
artifacts: deploy.zip
|
||||
generateReleaseNotes: true
|
||||
makeLatest: false
|
||||
prerelease: true
|
||||
skipIfReleaseExists: true
|
||||
|
||||
- name: Build container
|
||||
run: make container VERSION=${{ steps.version.outputs.VERSION }}
|
||||
|
||||
- name: Extract metadata for container
|
||||
id: meta
|
||||
uses: docker/metadata-action@v4
|
||||
|
|
@ -84,3 +70,13 @@ jobs:
|
|||
tags: ${{ steps.meta.outputs.tags }}
|
||||
labels: ${{ steps.meta.outputs.labels }}
|
||||
|
||||
- name: Create deploy bundle
|
||||
run: templates/generate-all deploy.zip ${{ steps.version.outputs.VERSION }}
|
||||
|
||||
- uses: ncipollo/release-action@v1
|
||||
with:
|
||||
artifacts: deploy.zip
|
||||
generateReleaseNotes: true
|
||||
makeLatest: false
|
||||
prerelease: true
|
||||
skipIfReleaseExists: true
|
||||
|
|
|
|||
|
|
@ -16,7 +16,7 @@ RUN pip3 install torch --index-url https://download.pytorch.org/whl/cpu
|
|||
RUN pip3 install anthropic boto3 cohere openai google-cloud-aiplatform ollama google-generativeai \
|
||||
langchain langchain-core langchain-huggingface langchain-text-splitters \
|
||||
langchain-community pymilvus sentence-transformers transformers \
|
||||
huggingface-hub pulsar-client cassandra-driver pyarrow pyyaml \
|
||||
huggingface-hub pulsar-client cassandra-driver pyyaml \
|
||||
neo4j tiktoken && \
|
||||
pip3 cache purge
|
||||
|
||||
|
|
@ -32,7 +32,6 @@ COPY trustgraph-base/ /root/build/trustgraph-base/
|
|||
COPY trustgraph-flow/ /root/build/trustgraph-flow/
|
||||
COPY trustgraph-vertexai/ /root/build/trustgraph-vertexai/
|
||||
COPY trustgraph-bedrock/ /root/build/trustgraph-bedrock/
|
||||
COPY trustgraph-parquet/ /root/build/trustgraph-parquet/
|
||||
COPY trustgraph-embeddings-hf/ /root/build/trustgraph-embeddings-hf/
|
||||
COPY trustgraph-cli/ /root/build/trustgraph-cli/
|
||||
|
||||
|
|
@ -42,7 +41,6 @@ RUN pip3 wheel -w /root/wheels/ --no-deps ./trustgraph-base/
|
|||
RUN pip3 wheel -w /root/wheels/ --no-deps ./trustgraph-flow/
|
||||
RUN pip3 wheel -w /root/wheels/ --no-deps ./trustgraph-vertexai/
|
||||
RUN pip3 wheel -w /root/wheels/ --no-deps ./trustgraph-bedrock/
|
||||
RUN pip3 wheel -w /root/wheels/ --no-deps ./trustgraph-parquet/
|
||||
RUN pip3 wheel -w /root/wheels/ --no-deps ./trustgraph-embeddings-hf/
|
||||
RUN pip3 wheel -w /root/wheels/ --no-deps ./trustgraph-cli/
|
||||
|
||||
|
|
@ -61,7 +59,6 @@ RUN \
|
|||
pip3 install /root/wheels/trustgraph_flow-* && \
|
||||
pip3 install /root/wheels/trustgraph_vertexai-* && \
|
||||
pip3 install /root/wheels/trustgraph_bedrock-* && \
|
||||
pip3 install /root/wheels/trustgraph_parquet-* && \
|
||||
pip3 install /root/wheels/trustgraph_embeddings_hf-* && \
|
||||
pip3 install /root/wheels/trustgraph_cli-* && \
|
||||
pip3 cache purge && \
|
||||
|
|
|
|||
3
Makefile
3
Makefile
|
|
@ -14,7 +14,6 @@ wheels:
|
|||
pip3 wheel --no-deps --wheel-dir dist trustgraph-flow/
|
||||
pip3 wheel --no-deps --wheel-dir dist trustgraph-vertexai/
|
||||
pip3 wheel --no-deps --wheel-dir dist trustgraph-bedrock/
|
||||
pip3 wheel --no-deps --wheel-dir dist trustgraph-parquet/
|
||||
pip3 wheel --no-deps --wheel-dir dist trustgraph-embeddings-hf/
|
||||
pip3 wheel --no-deps --wheel-dir dist trustgraph-cli/
|
||||
|
||||
|
|
@ -25,7 +24,6 @@ packages: update-package-versions
|
|||
cd trustgraph-flow && python3 setup.py sdist --dist-dir ../dist/
|
||||
cd trustgraph-vertexai && python3 setup.py sdist --dist-dir ../dist/
|
||||
cd trustgraph-bedrock && python3 setup.py sdist --dist-dir ../dist/
|
||||
cd trustgraph-parquet && python3 setup.py sdist --dist-dir ../dist/
|
||||
cd trustgraph-embeddings-hf && python3 setup.py sdist --dist-dir ../dist/
|
||||
cd trustgraph-cli && python3 setup.py sdist --dist-dir ../dist/
|
||||
|
||||
|
|
@ -41,7 +39,6 @@ update-package-versions:
|
|||
echo __version__ = \"${VERSION}\" > trustgraph-flow/trustgraph/flow_version.py
|
||||
echo __version__ = \"${VERSION}\" > trustgraph-vertexai/trustgraph/vertexai_version.py
|
||||
echo __version__ = \"${VERSION}\" > trustgraph-bedrock/trustgraph/bedrock_version.py
|
||||
echo __version__ = \"${VERSION}\" > trustgraph-parquet/trustgraph/parquet_version.py
|
||||
echo __version__ = \"${VERSION}\" > trustgraph-embeddings-hf/trustgraph/embeddings_hf_version.py
|
||||
echo __version__ = \"${VERSION}\" > trustgraph-cli/trustgraph/cli_version.py
|
||||
echo __version__ = \"${VERSION}\" > trustgraph/trustgraph/trustgraph_version.py
|
||||
|
|
|
|||
|
|
@ -1 +0,0 @@
|
|||
See https://trustgraph.ai/
|
||||
|
|
@ -1,45 +0,0 @@
|
|||
#!/usr/bin/env python3
|
||||
|
||||
"""
|
||||
Concatenates multiple parquet files into a single parquet output
|
||||
"""
|
||||
|
||||
import pyarrow as pa
|
||||
import pyarrow.parquet as pq
|
||||
import pandas as pd
|
||||
import sys
|
||||
import argparse
|
||||
|
||||
parser = argparse.ArgumentParser(
|
||||
prog="combine-parquet",
|
||||
description=__doc__
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
'-i', '--input',
|
||||
nargs='*',
|
||||
help=f'Input files'
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
'-o', '--output',
|
||||
help=f'Output files'
|
||||
)
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
df = None
|
||||
|
||||
for file in args.input:
|
||||
|
||||
part = pq.read_table(file).to_pandas()
|
||||
|
||||
if df is None:
|
||||
df = part
|
||||
else:
|
||||
df = pd.concat([df, part], ignore_index=True)
|
||||
|
||||
if df is not None:
|
||||
|
||||
table = pa.Table.from_pandas(df)
|
||||
pq.write_table(table, args.output)
|
||||
|
|
@ -1,24 +0,0 @@
|
|||
#!/usr/bin/env python3
|
||||
|
||||
import pyarrow as pa
|
||||
import pyarrow.csv as pc
|
||||
import pyarrow.parquet as pq
|
||||
import pandas as pd
|
||||
import sys
|
||||
|
||||
df = None
|
||||
|
||||
for file in sys.argv[1:]:
|
||||
|
||||
part = pq.read_table(file).to_pandas()
|
||||
|
||||
if df is None:
|
||||
df = part
|
||||
else:
|
||||
df = pd.concat([df, part], ignore_index=True)
|
||||
|
||||
if df is not None:
|
||||
|
||||
table = pa.Table.from_pandas(df)
|
||||
pc.write_csv(table, sys.stdout.buffer)
|
||||
|
||||
|
|
@ -1,6 +0,0 @@
|
|||
#!/usr/bin/env python3
|
||||
|
||||
from trustgraph.dump.graph_embeddings.parquet import run
|
||||
|
||||
run()
|
||||
|
||||
|
|
@ -1,170 +0,0 @@
|
|||
#!/usr/bin/env python3
|
||||
|
||||
"""
|
||||
Loads Graph embeddings into TrustGraph processing.
|
||||
"""
|
||||
|
||||
import pulsar
|
||||
from pulsar.schema import JsonSchema
|
||||
from trustgraph.schema import GraphEmbeddings, Value, Metadata
|
||||
from trustgraph.schema import graph_embeddings_store_queue
|
||||
import argparse
|
||||
import os
|
||||
import time
|
||||
import pyarrow as pa
|
||||
import pyarrow.parquet as pq
|
||||
|
||||
from trustgraph.log_level import LogLevel
|
||||
|
||||
class Loader:
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
pulsar_host,
|
||||
output_queue,
|
||||
log_level,
|
||||
file,
|
||||
user,
|
||||
collection,
|
||||
):
|
||||
|
||||
self.client = pulsar.Client(
|
||||
pulsar_host,
|
||||
logger=pulsar.ConsoleLogger(log_level.to_pulsar())
|
||||
)
|
||||
|
||||
self.producer = self.client.create_producer(
|
||||
topic=output_queue,
|
||||
schema=JsonSchema(GraphEmbeddings),
|
||||
chunking_enabled=True,
|
||||
)
|
||||
|
||||
self.file = file
|
||||
self.user = user
|
||||
self.collection = collection
|
||||
|
||||
def run(self):
|
||||
|
||||
try:
|
||||
|
||||
path = self.file
|
||||
|
||||
print("Reading file...")
|
||||
table = pq.read_table(path)
|
||||
print("Loaded.")
|
||||
|
||||
names = set(table.column_names)
|
||||
|
||||
if "embeddings" not in names:
|
||||
print("No 'embeddings' column")
|
||||
|
||||
if "entity" not in names:
|
||||
print("No 'entity' column")
|
||||
|
||||
embc = table.column("embeddings")
|
||||
entc = table.column("entity")
|
||||
|
||||
for emb, ent in zip(embc, entc):
|
||||
|
||||
b = emb.as_py()
|
||||
n = ent.as_py()
|
||||
|
||||
r = GraphEmbeddings(
|
||||
metadata=Metadata(
|
||||
metadata=[],
|
||||
user=self.user,
|
||||
collection=self.collection,
|
||||
),
|
||||
vectors=b,
|
||||
entity=Value(
|
||||
value=n,
|
||||
is_uri=n.startswith("https:")
|
||||
),
|
||||
)
|
||||
|
||||
self.producer.send(r)
|
||||
|
||||
except Exception as e:
|
||||
print(e, flush=True)
|
||||
|
||||
def __del__(self):
|
||||
self.client.close()
|
||||
|
||||
def main():
|
||||
|
||||
parser = argparse.ArgumentParser(
|
||||
prog='loader',
|
||||
description=__doc__,
|
||||
)
|
||||
|
||||
default_pulsar_host = os.getenv("PULSAR_HOST", 'pulsar://localhost:6650')
|
||||
default_output_queue = graph_embeddings_store_queue
|
||||
default_user = 'trustgraph'
|
||||
default_collection = 'default'
|
||||
|
||||
parser.add_argument(
|
||||
'-p', '--pulsar-host',
|
||||
default=default_pulsar_host,
|
||||
help=f'Pulsar host (default: {default_pulsar_host})',
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
'-o', '--output-queue',
|
||||
default=default_output_queue,
|
||||
help=f'Output queue (default: {default_output_queue})'
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
'-u', '--user',
|
||||
default=default_user,
|
||||
help=f'User ID (default: {default_user})'
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
'-c', '--collection',
|
||||
default=default_collection,
|
||||
help=f'Collection ID (default: {default_collection})'
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
'-l', '--log-level',
|
||||
type=LogLevel,
|
||||
default=LogLevel.ERROR,
|
||||
choices=list(LogLevel),
|
||||
help=f'Output queue (default: info)'
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
'-f', '--file',
|
||||
required=True,
|
||||
help=f'File to load'
|
||||
)
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
while True:
|
||||
|
||||
try:
|
||||
p = Loader(
|
||||
pulsar_host=args.pulsar_host,
|
||||
output_queue=args.output_queue,
|
||||
log_level=args.log_level,
|
||||
file=args.file,
|
||||
user=args.user,
|
||||
collection=args.collection,
|
||||
)
|
||||
|
||||
p.run()
|
||||
|
||||
print("File loaded.")
|
||||
break
|
||||
|
||||
except Exception as e:
|
||||
|
||||
print("Exception:", e, flush=True)
|
||||
print("Will retry...", flush=True)
|
||||
|
||||
time.sleep(10)
|
||||
|
||||
main()
|
||||
|
||||
|
|
@ -1,180 +0,0 @@
|
|||
#!/usr/bin/env python3
|
||||
|
||||
"""
|
||||
Loads Graph embeddings into TrustGraph processing.
|
||||
"""
|
||||
|
||||
import pulsar
|
||||
from pulsar.schema import JsonSchema
|
||||
from trustgraph.schema import Triples, Triple, Value, Metadata
|
||||
from trustgraph.schema import triples_store_queue
|
||||
import argparse
|
||||
import os
|
||||
import time
|
||||
import pyarrow as pa
|
||||
import pyarrow.parquet as pq
|
||||
|
||||
from trustgraph.log_level import LogLevel
|
||||
|
||||
class Loader:
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
pulsar_host,
|
||||
output_queue,
|
||||
log_level,
|
||||
file,
|
||||
user,
|
||||
collection,
|
||||
):
|
||||
|
||||
self.client = pulsar.Client(
|
||||
pulsar_host,
|
||||
logger=pulsar.ConsoleLogger(log_level.to_pulsar())
|
||||
)
|
||||
|
||||
self.producer = self.client.create_producer(
|
||||
topic=output_queue,
|
||||
schema=JsonSchema(Triples),
|
||||
chunking_enabled=True,
|
||||
)
|
||||
|
||||
self.file = file
|
||||
self.user = user
|
||||
self.collection = collection
|
||||
|
||||
def run(self):
|
||||
|
||||
try:
|
||||
|
||||
path = self.file
|
||||
|
||||
print("Reading file...")
|
||||
table = pq.read_table(path)
|
||||
print("Loaded.")
|
||||
|
||||
names = set(table.column_names)
|
||||
|
||||
if "s" not in names:
|
||||
print("No 's' column")
|
||||
|
||||
if "p" not in names:
|
||||
print("No 'p' column")
|
||||
|
||||
if "o" not in names:
|
||||
print("No 'o' column")
|
||||
|
||||
sc = table.column("s")
|
||||
pc = table.column("p")
|
||||
oc = table.column("o")
|
||||
|
||||
for s, p, o in zip(sc, pc, oc):
|
||||
|
||||
r = Triples(
|
||||
metadata=Metadata(
|
||||
metadata=[],
|
||||
user=self.user,
|
||||
collection=self.collection,
|
||||
),
|
||||
triples=[
|
||||
Triple(
|
||||
s=Value(
|
||||
value=s.as_py(), is_uri=True
|
||||
),
|
||||
p=Value(
|
||||
value=p.as_py(), is_uri=True
|
||||
),
|
||||
o=Value(
|
||||
value=o.as_py(),
|
||||
is_uri=o.as_py().startswith("https:")
|
||||
)
|
||||
)
|
||||
]
|
||||
)
|
||||
|
||||
self.producer.send(r)
|
||||
|
||||
except Exception as e:
|
||||
print(e, flush=True)
|
||||
|
||||
def __del__(self):
|
||||
self.client.close()
|
||||
|
||||
def main():
|
||||
|
||||
parser = argparse.ArgumentParser(
|
||||
prog='loader',
|
||||
description=__doc__,
|
||||
)
|
||||
|
||||
default_pulsar_host = os.getenv("PULSAR_HOST", 'pulsar://localhost:6650')
|
||||
default_output_queue = triples_store_queue
|
||||
default_user = 'trustgraph'
|
||||
default_collection = 'default'
|
||||
|
||||
parser.add_argument(
|
||||
'-p', '--pulsar-host',
|
||||
default=default_pulsar_host,
|
||||
help=f'Pulsar host (default: {default_pulsar_host})',
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
'-o', '--output-queue',
|
||||
default=default_output_queue,
|
||||
help=f'Output queue (default: {default_output_queue})'
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
'-u', '--user',
|
||||
default=default_user,
|
||||
help=f'User ID (default: {default_user})'
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
'-c', '--collection',
|
||||
default=default_collection,
|
||||
help=f'Collection ID (default: {default_collection})'
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
'-l', '--log-level',
|
||||
type=LogLevel,
|
||||
default=LogLevel.ERROR,
|
||||
choices=list(LogLevel),
|
||||
help=f'Output queue (default: info)'
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
'-f', '--file',
|
||||
required=True,
|
||||
help=f'File to load'
|
||||
)
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
while True:
|
||||
|
||||
try:
|
||||
p = Loader(
|
||||
pulsar_host=args.pulsar_host,
|
||||
output_queue=args.output_queue,
|
||||
log_level=args.log_level,
|
||||
file=args.file,
|
||||
user=args.user,
|
||||
collection=args.collection,
|
||||
)
|
||||
|
||||
p.run()
|
||||
|
||||
print("File loaded.")
|
||||
break
|
||||
|
||||
except Exception as e:
|
||||
|
||||
print("Exception:", e, flush=True)
|
||||
print("Will retry...", flush=True)
|
||||
|
||||
time.sleep(10)
|
||||
|
||||
main()
|
||||
|
||||
|
|
@ -1,6 +0,0 @@
|
|||
#!/usr/bin/env python3
|
||||
|
||||
from trustgraph.dump.triples.parquet import run
|
||||
|
||||
run()
|
||||
|
||||
|
|
@ -1,51 +0,0 @@
|
|||
import setuptools
|
||||
import os
|
||||
import importlib
|
||||
|
||||
with open("README.md", "r") as fh:
|
||||
long_description = fh.read()
|
||||
|
||||
# Load a version number module
|
||||
spec = importlib.util.spec_from_file_location(
|
||||
'version', 'trustgraph/parquet_version.py'
|
||||
)
|
||||
version_module = importlib.util.module_from_spec(spec)
|
||||
spec.loader.exec_module(version_module)
|
||||
|
||||
version = version_module.__version__
|
||||
|
||||
setuptools.setup(
|
||||
name="trustgraph-parquet",
|
||||
version=version,
|
||||
author="trustgraph.ai",
|
||||
author_email="security@trustgraph.ai",
|
||||
description="TrustGraph provides a means to run a pipeline of flexible AI processing components in a flexible means to achieve a processing pipeline.",
|
||||
long_description=long_description,
|
||||
long_description_content_type="text/markdown",
|
||||
url="https://github.com/trustgraph-ai/trustgraph",
|
||||
packages=setuptools.find_namespace_packages(
|
||||
where='./',
|
||||
),
|
||||
classifiers=[
|
||||
"Programming Language :: Python :: 3",
|
||||
"License :: OSI Approved :: GNU General Public License v3 or later (GPLv3+)",
|
||||
"Operating System :: OS Independent",
|
||||
],
|
||||
python_requires='>=3.8',
|
||||
download_url = "https://github.com/trustgraph-ai/trustgraph/archive/refs/tags/v" + version + ".tar.gz",
|
||||
install_requires=[
|
||||
"trustgraph-base>=0.17,<0.18",
|
||||
"pulsar-client",
|
||||
"prometheus-client",
|
||||
"pyarrow",
|
||||
"pandas",
|
||||
],
|
||||
scripts=[
|
||||
"scripts/concat-parquet",
|
||||
"scripts/dump-parquet",
|
||||
"scripts/ge-dump-parquet",
|
||||
"scripts/triples-dump-parquet",
|
||||
"scripts/load-graph-embeddings",
|
||||
"scripts/load-triples",
|
||||
]
|
||||
)
|
||||
|
|
@ -1,3 +0,0 @@
|
|||
|
||||
from . processor import *
|
||||
|
||||
|
|
@ -1,7 +0,0 @@
|
|||
#!/usr/bin/env python3
|
||||
|
||||
from . write import run
|
||||
|
||||
if __name__ == '__main__':
|
||||
run()
|
||||
|
||||
|
|
@ -1,85 +0,0 @@
|
|||
|
||||
"""
|
||||
Write graph embeddings to parquet files in a directory.
|
||||
"""
|
||||
|
||||
import pulsar
|
||||
import base64
|
||||
import os
|
||||
import argparse
|
||||
import time
|
||||
|
||||
from .... schema import GraphEmbeddings
|
||||
from .... schema import graph_embeddings_store_queue
|
||||
from .... base import Consumer
|
||||
|
||||
from . writer import ParquetWriter
|
||||
|
||||
module = ".".join(__name__.split(".")[1:-1])
|
||||
|
||||
default_input_queue = graph_embeddings_store_queue
|
||||
default_subscriber = module
|
||||
default_graph_host='localhost'
|
||||
default_directory = "."
|
||||
default_file_template = "graph-embeds-{id}.parquet"
|
||||
default_rotation_time = 60
|
||||
|
||||
class Processor(Consumer):
|
||||
|
||||
def __init__(self, **params):
|
||||
|
||||
input_queue = params.get("input_queue", default_input_queue)
|
||||
subscriber = params.get("subscriber", default_subscriber)
|
||||
directory = params.get("directory", default_directory)
|
||||
file_template = params.get("file_template", default_file_template)
|
||||
rotation_time = params.get("rotation_time", default_rotation_time)
|
||||
|
||||
super(Processor, self).__init__(
|
||||
**params | {
|
||||
"input_queue": input_queue,
|
||||
"subscriber": subscriber,
|
||||
"input_schema": GraphEmbeddings,
|
||||
}
|
||||
)
|
||||
|
||||
self.writer = ParquetWriter(directory, file_template, rotation_time)
|
||||
|
||||
def __del__(self):
|
||||
if hasattr(self, "writer"):
|
||||
del self.writer
|
||||
|
||||
def handle(self, msg):
|
||||
|
||||
v = msg.value()
|
||||
self.writer.write(v.vectors, v.entity.value)
|
||||
|
||||
@staticmethod
|
||||
def add_args(parser):
|
||||
|
||||
Consumer.add_args(
|
||||
parser, default_input_queue, default_subscriber,
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
'-d', '--directory',
|
||||
default=default_directory,
|
||||
help=f'Directory to write to (default: {default_directory})'
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
'-f', '--file-template',
|
||||
default=default_file_template,
|
||||
help=f'Directory to write to (default: {default_file_template})'
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
'-t', '--rotation-time',
|
||||
type=int,
|
||||
default=default_rotation_time,
|
||||
help=f'Rotation time / seconds (default: {default_rotation_time})'
|
||||
)
|
||||
|
||||
def run():
|
||||
|
||||
Processor.start(module, __doc__)
|
||||
|
||||
|
|
@ -1,94 +0,0 @@
|
|||
|
||||
import threading
|
||||
import queue
|
||||
import time
|
||||
import uuid
|
||||
import pyarrow as pa
|
||||
import pyarrow.parquet as pq
|
||||
|
||||
class ParquetWriter:
|
||||
|
||||
def __init__(self, directory, file_template, rotation_time):
|
||||
self.directory = directory
|
||||
self.file_template = file_template
|
||||
self.rotation_time = rotation_time
|
||||
|
||||
self.q = queue.Queue()
|
||||
|
||||
self.running = True
|
||||
|
||||
self.thread = threading.Thread(target=(self.writer_thread))
|
||||
self.thread.start()
|
||||
|
||||
def writer_thread(self):
|
||||
|
||||
items = []
|
||||
|
||||
timeout = None
|
||||
|
||||
while self.running:
|
||||
|
||||
try:
|
||||
|
||||
item = self.q.get(timeout=1)
|
||||
|
||||
if timeout == None:
|
||||
timeout = time.time() + self.rotation_time
|
||||
|
||||
items.append(item)
|
||||
|
||||
except queue.Empty:
|
||||
pass
|
||||
|
||||
if timeout:
|
||||
if time.time() > timeout:
|
||||
|
||||
self.write_file(items)
|
||||
timeout = None
|
||||
items = []
|
||||
|
||||
def write_file(self, items):
|
||||
|
||||
try:
|
||||
|
||||
schema = pa.schema([
|
||||
pa.field('embeddings', pa.list_(pa.list_(pa.float64()))),
|
||||
pa.field('entity', pa.string()),
|
||||
])
|
||||
|
||||
fname = self.file_template.format(id=str(uuid.uuid4()))
|
||||
path = f"{self.directory}/{fname}"
|
||||
|
||||
writer = pq.ParquetWriter(path, schema)
|
||||
|
||||
batch = pa.record_batch(
|
||||
[
|
||||
[i[0] for i in items],
|
||||
[i[1] for i in items],
|
||||
],
|
||||
names=['embeddings', 'entity']
|
||||
)
|
||||
|
||||
writer.write_batch(batch)
|
||||
|
||||
writer.close()
|
||||
|
||||
print(f"Wrote {path}.")
|
||||
|
||||
except Exception as e:
|
||||
|
||||
print("Parquet write:", e)
|
||||
|
||||
def write(self, embeds, ent):
|
||||
self.q.put((embeds, ent))
|
||||
|
||||
def __del__(self):
|
||||
|
||||
self.running = False
|
||||
|
||||
if hasattr(self, "q"):
|
||||
self.thread.join()
|
||||
|
||||
|
||||
|
||||
|
||||
|
|
@ -1,3 +0,0 @@
|
|||
|
||||
from . processor import *
|
||||
|
||||
|
|
@ -1,7 +0,0 @@
|
|||
#!/usr/bin/env python3
|
||||
|
||||
from . write import run
|
||||
|
||||
if __name__ == '__main__':
|
||||
run()
|
||||
|
||||
|
|
@ -1,87 +0,0 @@
|
|||
|
||||
"""
|
||||
Write graphs triples to parquet files in a directory.
|
||||
"""
|
||||
|
||||
import pulsar
|
||||
import base64
|
||||
import os
|
||||
import argparse
|
||||
import time
|
||||
|
||||
from .... schema import Triples
|
||||
from .... schema import triples_store_queue
|
||||
from .... base import Consumer
|
||||
|
||||
from . writer import ParquetWriter
|
||||
|
||||
module = ".".join(__name__.split(".")[1:-1])
|
||||
|
||||
default_input_queue = triples_store_queue
|
||||
default_subscriber = module
|
||||
default_graph_host='localhost'
|
||||
default_directory = "."
|
||||
default_file_template = "triples-{id}.parquet"
|
||||
default_rotation_time = 60
|
||||
|
||||
class Processor(Consumer):
|
||||
|
||||
def __init__(self, **params):
|
||||
|
||||
input_queue = params.get("input_queue", default_input_queue)
|
||||
subscriber = params.get("subscriber", default_subscriber)
|
||||
directory = params.get("directory", default_directory)
|
||||
file_template = params.get("file_template", default_file_template)
|
||||
rotation_time = params.get("rotation_time", default_rotation_time)
|
||||
|
||||
super(Processor, self).__init__(
|
||||
**params | {
|
||||
"input_queue": input_queue,
|
||||
"subscriber": subscriber,
|
||||
"input_schema": Triples,
|
||||
}
|
||||
)
|
||||
|
||||
self.writer = ParquetWriter(directory, file_template, rotation_time)
|
||||
|
||||
def __del__(self):
|
||||
if hasattr(self, "writer"):
|
||||
del self.writer
|
||||
|
||||
def handle(self, msg):
|
||||
|
||||
v = msg.value()
|
||||
|
||||
for t in v.triples:
|
||||
self.writer.write(t.s.value, t.p.value, t.o.value)
|
||||
|
||||
@staticmethod
|
||||
def add_args(parser):
|
||||
|
||||
Consumer.add_args(
|
||||
parser, default_input_queue, default_subscriber,
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
'-d', '--directory',
|
||||
default=default_directory,
|
||||
help=f'Directory to write to (default: {default_directory})'
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
'-f', '--file-template',
|
||||
default=default_file_template,
|
||||
help=f'Directory to write to (default: {default_file_template})'
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
'-t', '--rotation-time',
|
||||
type=int,
|
||||
default=default_rotation_time,
|
||||
help=f'Rotation time / seconds (default: {default_rotation_time})'
|
||||
)
|
||||
|
||||
def run():
|
||||
|
||||
Processor.start(module, __doc__)
|
||||
|
||||
|
|
@ -1,96 +0,0 @@
|
|||
|
||||
import threading
|
||||
import queue
|
||||
import time
|
||||
import uuid
|
||||
import pyarrow as pa
|
||||
import pyarrow.parquet as pq
|
||||
|
||||
class ParquetWriter:
|
||||
|
||||
def __init__(self, directory, file_template, rotation_time):
|
||||
self.directory = directory
|
||||
self.file_template = file_template
|
||||
self.rotation_time = rotation_time
|
||||
|
||||
self.q = queue.Queue()
|
||||
|
||||
self.running = True
|
||||
|
||||
self.thread = threading.Thread(target=(self.writer_thread))
|
||||
self.thread.start()
|
||||
|
||||
def writer_thread(self):
|
||||
|
||||
triples = []
|
||||
|
||||
timeout = None
|
||||
|
||||
while self.running:
|
||||
|
||||
try:
|
||||
|
||||
item = self.q.get(timeout=1)
|
||||
|
||||
if timeout == None:
|
||||
timeout = time.time() + self.rotation_time
|
||||
|
||||
triples.append(item)
|
||||
|
||||
except queue.Empty:
|
||||
pass
|
||||
|
||||
if timeout:
|
||||
if time.time() > timeout:
|
||||
|
||||
self.write_file(triples)
|
||||
timeout = None
|
||||
triples = []
|
||||
|
||||
def write_file(self, triples):
|
||||
|
||||
try:
|
||||
|
||||
schema = pa.schema([
|
||||
pa.field('s', pa.string()),
|
||||
pa.field('p', pa.string()),
|
||||
pa.field('o', pa.string()),
|
||||
])
|
||||
|
||||
fname = self.file_template.format(id=str(uuid.uuid4()))
|
||||
path = f"{self.directory}/{fname}"
|
||||
|
||||
writer = pq.ParquetWriter(path, schema)
|
||||
|
||||
batch = pa.record_batch(
|
||||
[
|
||||
[tpl[0] for tpl in triples],
|
||||
[tpl[1] for tpl in triples],
|
||||
[tpl[2] for tpl in triples],
|
||||
],
|
||||
names=['s', 'p', 'o']
|
||||
)
|
||||
|
||||
writer.write_batch(batch)
|
||||
|
||||
writer.close()
|
||||
|
||||
print(f"Wrote {path}.")
|
||||
|
||||
except Exception as e:
|
||||
|
||||
print("Parquet write:", e)
|
||||
|
||||
def write(self, s, p, o):
|
||||
self.q.put((s, p, o))
|
||||
|
||||
def __del__(self):
|
||||
|
||||
self.running = False
|
||||
|
||||
if hasattr(self, "q"):
|
||||
self.thread.join()
|
||||
|
||||
|
||||
|
||||
|
||||
Loading…
Add table
Add a link
Reference in a new issue