improving queue logic for high load scenarios

This commit is contained in:
Alpha Nerd 2025-09-19 16:38:48 +02:00
parent 8fe3880af7
commit f0e181d6b8

View file

@ -327,12 +327,16 @@ class rechunk:
# SSE Helpser
# ------------------------------------------------------------------
async def publish_snapshot():
snapshot = json.dumps({"usage_counts": usage_counts})
async with usage_lock:
snapshot = json.dumps({"usage_counts": usage_counts})
async with _subscribers_lock:
for q in _subscribers:
# If the queue is full, drop the message to avoid backpressure.
if q.full():
continue
try:
await q.get()
except asyncio.QueueEmpty:
pass
await q.put(snapshot)
async def close_all_sse_queues():