improved SSE queue handling on shutdown

This commit is contained in:
Alpha Nerd 2025-09-12 09:44:56 +02:00
parent 175f035d86
commit 25b287eba6

View file

@ -269,6 +269,11 @@ async def publish_snapshot():
continue continue
await q.put(snapshot) await q.put(snapshot)
async def close_all_sse_queues():
for q in list(_subscribers):
# sentinel value that the generator will recognise
await q.put(None)
# ------------------------------------------------------------------ # ------------------------------------------------------------------
# Subscriber helpers # Subscriber helpers
# ------------------------------------------------------------------ # ------------------------------------------------------------------
@ -1331,6 +1336,8 @@ async def usage_stream(request: Request):
if await request.is_disconnected(): if await request.is_disconnected():
break break
data = await queue.get() data = await queue.get()
if data is None:
break
# Send the data as a single SSE message # Send the data as a single SSE message
yield f"data: {data}\n\n" yield f"data: {data}\n\n"
finally: finally:
@ -1361,4 +1368,5 @@ async def startup_event() -> None:
@app.on_event("shutdown") @app.on_event("shutdown")
async def shutdown_event() -> None: async def shutdown_event() -> None:
await close_all_sse_queues()
await app_state["session"].close() await app_state["session"].close()