diff --git a/trustgraph-base/trustgraph/base/consumer_producer.py b/trustgraph-base/trustgraph/base/consumer_producer.py index cabb7525..31441cda 100644 --- a/trustgraph-base/trustgraph/base/consumer_producer.py +++ b/trustgraph-base/trustgraph/base/consumer_producer.py @@ -66,6 +66,7 @@ class ConsumerProducer(BaseProcessor): self.producer = self.client.create_producer( topic=output_queue, schema=JsonSchema(output_schema), + chunking_enabled=True, ) self.consumer = self.client.subscribe( diff --git a/trustgraph-base/trustgraph/base/producer.py b/trustgraph-base/trustgraph/base/producer.py index 27d693ee..84d7fc99 100644 --- a/trustgraph-base/trustgraph/base/producer.py +++ b/trustgraph-base/trustgraph/base/producer.py @@ -34,6 +34,7 @@ class Producer(BaseProcessor): self.producer = self.client.create_producer( topic=output_queue, schema=JsonSchema(output_schema), + chunking_enabled=True, ) def send(self, msg, properties={}): diff --git a/trustgraph-flow/trustgraph/gateway/document_load.py b/trustgraph-flow/trustgraph/gateway/document_load.py index 0fd9a0df..4a37ecb6 100644 --- a/trustgraph-flow/trustgraph/gateway/document_load.py +++ b/trustgraph-flow/trustgraph/gateway/document_load.py @@ -1,7 +1,7 @@ import base64 -from .. schema import Document +from .. schema import Document, Metadata from .. schema import document_ingest_queue from . sender import ServiceSender @@ -18,25 +18,24 @@ class DocumentLoadSender(ServiceSender): def to_request(self, body): - if "metadata" in data: - metadata = to_subgraph(data["metadata"]) + if "metadata" in body: + metadata = to_subgraph(body["metadata"]) else: metadata = [] # Doing a base64 decoe/encode here to make sure the # content is valid base64 - doc = base64.b64decode(data["data"]) + doc = base64.b64decode(body["data"]) print("Document received") return Document( metadata=Metadata( - id=data.get("id"), + id=body.get("id"), metadata=metadata, - user=data.get("user", "trustgraph"), - collection=data.get("collection", "default"), + user=body.get("user", "trustgraph"), + collection=body.get("collection", "default"), ), data=base64.b64encode(doc).decode("utf-8") ) - diff --git a/trustgraph-flow/trustgraph/gateway/publisher.py b/trustgraph-flow/trustgraph/gateway/publisher.py index 89c612ce..e3298a47 100644 --- a/trustgraph-flow/trustgraph/gateway/publisher.py +++ b/trustgraph-flow/trustgraph/gateway/publisher.py @@ -7,7 +7,7 @@ import threading class Publisher: def __init__(self, pulsar_host, topic, schema=None, max_size=10, - chunking_enabled=False): + chunking_enabled=True): self.pulsar_host = pulsar_host self.topic = topic self.schema = schema diff --git a/trustgraph-flow/trustgraph/gateway/requestor.py b/trustgraph-flow/trustgraph/gateway/requestor.py index 5f6e2692..5bfedf1e 100644 --- a/trustgraph-flow/trustgraph/gateway/requestor.py +++ b/trustgraph-flow/trustgraph/gateway/requestor.py @@ -65,7 +65,10 @@ class ServiceRequestor: raise RuntimeError("Timeout") if resp.error: - return { "error": resp.error.message } + err = { "error": resp.error.message } + if responder: + await responder(err, True) + return err resp, fin = self.from_response(resp) @@ -81,7 +84,10 @@ class ServiceRequestor: logging.error(f"Exception: {e}") - return { "error": str(e) } + err = { "error": str(e) } + if responder: + await responder(err, True) + return err finally: self.sub.unsubscribe(id) diff --git a/trustgraph-flow/trustgraph/gateway/sender.py b/trustgraph-flow/trustgraph/gateway/sender.py index 93f1164c..c5bb2e17 100644 --- a/trustgraph-flow/trustgraph/gateway/sender.py +++ b/trustgraph-flow/trustgraph/gateway/sender.py @@ -46,5 +46,11 @@ class ServiceSender: logging.error(f"Exception: {e}") - return { "error": str(e) } + err = { "error": str(e) } + + if responder: + await responder(err, True) + + return err + diff --git a/trustgraph-flow/trustgraph/gateway/socket.py b/trustgraph-flow/trustgraph/gateway/socket.py index fd408d7b..4adc336f 100644 --- a/trustgraph-flow/trustgraph/gateway/socket.py +++ b/trustgraph-flow/trustgraph/gateway/socket.py @@ -44,7 +44,10 @@ class SocketEndpoint: return web.HTTPUnauthorized() running = Running() - ws = web.WebSocketResponse() + + # 50MB max message size + ws = web.WebSocketResponse(max_msg_size=52428800) + await ws.prepare(request) try: