diff --git a/trustgraph-base/trustgraph/base/producer.py b/trustgraph-base/trustgraph/base/producer.py index bb665924..550855b8 100644 --- a/trustgraph-base/trustgraph/base/producer.py +++ b/trustgraph-base/trustgraph/base/producer.py @@ -4,7 +4,9 @@ import asyncio class Producer: - def __init__(self, client, topic, schema, metrics=None): + def __init__(self, client, topic, schema, metrics=None, + chunking_enabled=True): + self.client = client self.topic = topic self.schema = schema @@ -14,6 +16,8 @@ class Producer: self.running = True self.producer = None + self.chunking_enabled = chunking_enabled + def __del__(self): self.running = False @@ -38,7 +42,8 @@ class Producer: print("Connect publisher to", self.topic, "...", flush=True) self.producer = self.client.create_producer( topic = self.topic, - schema = JsonSchema(self.schema) + schema = JsonSchema(self.schema), + chunking_enabled = self.chunking_enabled, ) print("Connected to", self.topic, flush=True) except Exception as e: diff --git a/trustgraph-base/trustgraph/base/publisher.py b/trustgraph-base/trustgraph/base/publisher.py index 654588a7..ef963e84 100644 --- a/trustgraph-base/trustgraph/base/publisher.py +++ b/trustgraph-base/trustgraph/base/publisher.py @@ -37,6 +37,7 @@ class Publisher: while self.running: try: + producer = self.client.create_producer( topic=self.topic, schema=JsonSchema(self.schema), diff --git a/trustgraph-base/trustgraph/schema/library.py b/trustgraph-base/trustgraph/schema/library.py index dfec533f..e6854987 100644 --- a/trustgraph-base/trustgraph/schema/library.py +++ b/trustgraph-base/trustgraph/schema/library.py @@ -125,10 +125,13 @@ class LibrarianResponse(Record): document_metadatas = Array(DocumentMetadata()) processing_metadatas = Array(ProcessingMetadata()) +# FIXME: Is this right? Using persistence on librarian so that +# message chunking works + librarian_request_queue = topic( - 'librarian', kind='non-persistent', namespace='request' + 'librarian', kind='persistent', namespace='request' ) librarian_response_queue = topic( - 'librarian', kind='non-persistent', namespace='response', + 'librarian', kind='persistent', namespace='response', ) diff --git a/trustgraph-cli/scripts/tg-show-library-documents b/trustgraph-cli/scripts/tg-show-library-documents old mode 100644 new mode 100755 index d71261c4..6d35c34e --- a/trustgraph-cli/scripts/tg-show-library-documents +++ b/trustgraph-cli/scripts/tg-show-library-documents @@ -36,6 +36,7 @@ def show_docs(url, user): table, tablefmt="pretty", stralign="left", + maxcolwidths=[None, 55], )) print() diff --git a/trustgraph-cli/scripts/tg-show-library-processing b/trustgraph-cli/scripts/tg-show-library-processing old mode 100644 new mode 100755 index 5b617b3b..9390afe2 --- a/trustgraph-cli/scripts/tg-show-library-processing +++ b/trustgraph-cli/scripts/tg-show-library-processing @@ -36,6 +36,7 @@ def show_procs(url, user): table, tablefmt="pretty", stralign="left", + maxcolwidths=[None, 50], )) print() diff --git a/trustgraph-cli/scripts/tg-start-library-processing b/trustgraph-cli/scripts/tg-start-library-processing old mode 100644 new mode 100755 diff --git a/trustgraph-cli/scripts/tg-stop-library-processing b/trustgraph-cli/scripts/tg-stop-library-processing old mode 100644 new mode 100755 diff --git a/trustgraph-flow/trustgraph/gateway/dispatch/serialize.py b/trustgraph-flow/trustgraph/gateway/dispatch/serialize.py index 678f5109..45ae55d7 100644 --- a/trustgraph-flow/trustgraph/gateway/dispatch/serialize.py +++ b/trustgraph-flow/trustgraph/gateway/dispatch/serialize.py @@ -105,7 +105,7 @@ def serialize_document_metadata(message): if message.user: ret["user"] = message.user - if message.tags: + if message.tags is not None: ret["tags"] = message.tags return ret @@ -132,7 +132,7 @@ def serialize_processing_metadata(message): if message.collection: ret["collection"] = message.collection - if message.tags: + if message.tags is not None: ret["tags"] = message.tags return ret