Fix/websocket capacity increase (#230)

* Fix invalid variable name invocation
* Fix error responses in websockets
* Increase websocket limits to 50MB max message.  Turn on Pulsar chunking by default.
This commit is contained in:
cybermaggedon 2024-12-29 18:08:12 +00:00 committed by GitHub
parent 21e7b856c4
commit 187b0e6581
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 29 additions and 13 deletions

View file

@ -1,7 +1,7 @@
import base64
from .. schema import Document
from .. schema import Document, Metadata
from .. schema import document_ingest_queue
from . sender import ServiceSender
@ -18,25 +18,24 @@ class DocumentLoadSender(ServiceSender):
def to_request(self, body):
if "metadata" in data:
metadata = to_subgraph(data["metadata"])
if "metadata" in body:
metadata = to_subgraph(body["metadata"])
else:
metadata = []
# Doing a base64 decoe/encode here to make sure the
# content is valid base64
doc = base64.b64decode(data["data"])
doc = base64.b64decode(body["data"])
print("Document received")
return Document(
metadata=Metadata(
id=data.get("id"),
id=body.get("id"),
metadata=metadata,
user=data.get("user", "trustgraph"),
collection=data.get("collection", "default"),
user=body.get("user", "trustgraph"),
collection=body.get("collection", "default"),
),
data=base64.b64encode(doc).decode("utf-8")
)

View file

@ -7,7 +7,7 @@ import threading
class Publisher:
def __init__(self, pulsar_host, topic, schema=None, max_size=10,
chunking_enabled=False):
chunking_enabled=True):
self.pulsar_host = pulsar_host
self.topic = topic
self.schema = schema

View file

@ -65,7 +65,10 @@ class ServiceRequestor:
raise RuntimeError("Timeout")
if resp.error:
return { "error": resp.error.message }
err = { "error": resp.error.message }
if responder:
await responder(err, True)
return err
resp, fin = self.from_response(resp)
@ -81,7 +84,10 @@ class ServiceRequestor:
logging.error(f"Exception: {e}")
return { "error": str(e) }
err = { "error": str(e) }
if responder:
await responder(err, True)
return err
finally:
self.sub.unsubscribe(id)

View file

@ -46,5 +46,11 @@ class ServiceSender:
logging.error(f"Exception: {e}")
return { "error": str(e) }
err = { "error": str(e) }
if responder:
await responder(err, True)
return err

View file

@ -44,7 +44,10 @@ class SocketEndpoint:
return web.HTTPUnauthorized()
running = Running()
ws = web.WebSocketResponse()
# 50MB max message size
ws = web.WebSocketResponse(max_msg_size=52428800)
await ws.prepare(request)
try: