diff --git a/scripts/dump-parquet b/scripts/dump-parquet new file mode 100755 index 00000000..36f7f2d1 --- /dev/null +++ b/scripts/dump-parquet @@ -0,0 +1,12 @@ +#!/usr/bin/env python3 + +import pyarrow.parquet as pq +import sys + +for file in sys.argv[1:]: + + table = pq.read_table(file).to_pandas() + print(table) + + + diff --git a/trustgraph/dump/triples/parquet/processor.py b/trustgraph/dump/triples/parquet/processor.py index 4a95c61c..42bebbe3 100755 --- a/trustgraph/dump/triples/parquet/processor.py +++ b/trustgraph/dump/triples/parquet/processor.py @@ -24,7 +24,7 @@ default_subscriber = module default_graph_host='localhost' default_directory = "." default_file_template = "triples-{id}.parquet" -default_rotation_time = 30 +default_rotation_time = 60 class Processor(Consumer): diff --git a/trustgraph/embeddings_client.py b/trustgraph/embeddings_client.py index e09216d0..fd69d602 100644 --- a/trustgraph/embeddings_client.py +++ b/trustgraph/embeddings_client.py @@ -67,6 +67,13 @@ class EmbeddingsClient: def __del__(self): - if self.client: - self.client.close() + if hasattr(self, "consumer"): + self.consumer.unsubscribe() + self.consumer.close() + + if hasattr(self, "producer"): + self.producer.flush() + self.producer.close() + + self.client.close() diff --git a/trustgraph/graph_rag_client.py b/trustgraph/graph_rag_client.py index c6634623..4a6fbc2f 100644 --- a/trustgraph/graph_rag_client.py +++ b/trustgraph/graph_rag_client.py @@ -66,5 +66,13 @@ class GraphRagClient: def __del__(self): + if hasattr(self, "consumer"): + self.consumer.unsubscribe() + self.consumer.close() + + if hasattr(self, "producer"): + self.producer.flush() + self.producer.close() + self.client.close() diff --git a/trustgraph/llm_client.py b/trustgraph/llm_client.py index aa034b77..c6252bf0 100644 --- a/trustgraph/llm_client.py +++ b/trustgraph/llm_client.py @@ -68,7 +68,13 @@ class LlmClient: def __del__(self): - self.producer.close() - self.consumer.close() + if hasattr(self, "consumer"): + self.consumer.unsubscribe() + self.consumer.close() + + if hasattr(self, "producer"): + self.producer.flush() + self.producer.close() + self.client.close()