fix: stale reservation counters be releasing it only once

This commit is contained in:
Alpha Nerd 2026-06-23 14:56:04 +02:00
parent cef71df3df
commit 4f42f350a3
Signed by: alpha-nerd
SSH key fingerprint: SHA256:QkkAgVoYi9TQ0UKPkiKSfnerZy2h4qhi3SVPXJmBN+M
5 changed files with 319 additions and 262 deletions

View file

@ -140,9 +140,9 @@ async def _run_to_completion(*, native, oclient, endpoint, model, tracking_model
send_params, native_params):
"""Drive the backend to completion (no client streaming).
Returns ``(output_items, usage)`` where usage is responses-shaped. Caller is
responsible for ``decrement_usage`` (translated failures self-decrement inside
``create_chat_with_retries``)."""
Returns ``(output_items, usage)`` where usage is responses-shaped. The caller
owns the usage reservation and must release it (this function and
``create_chat_with_retries`` never decrement)."""
if native:
resp_obj = await oclient.responses.create(stream=False, **native_params)
data = resp_obj.model_dump()
@ -209,38 +209,46 @@ async def openai_responses_proxy(request: Request):
return StreamingResponse(_served_cached(), media_type="text/event-stream")
return JSONResponse(content=resp_obj)
# Endpoint selection (reserves a slot — must be released exactly once).
# Endpoint selection (reserves a slot — must be released exactly once). The
# release is owned by the per-branch finally (_bg_run / _stream / the
# non-streaming try) once we hand off; any failure during client/param
# construction (including CancelledError on client disconnect) must release
# it here or the usage counter leaks.
_affinity_key = _conversation_fingerprint(model, messages, None)
endpoint, tracking_model = await choose_endpoint(model, affinity_key=_affinity_key)
oclient = _make_openai_client(endpoint, default_headers=default_headers,
api_key=config.api_keys.get(endpoint, "no-key"))
native = is_ext_openai_endpoint(endpoint)
try:
oclient = _make_openai_client(endpoint, default_headers=default_headers,
api_key=config.api_keys.get(endpoint, "no-key"))
native = is_ext_openai_endpoint(endpoint)
# Build backend params for both shapes.
send_params = {"messages": messages, "model": model}
_opt = {
"temperature": payload.get("temperature"),
"top_p": payload.get("top_p"),
"max_tokens": payload.get("max_output_tokens"),
"tools": tools_responses_to_chat(tools),
"tool_choice": payload.get("tool_choice"),
"response_format": _text_format_to_response_format(payload.get("text")),
}
send_params.update({k: v for k, v in _opt.items() if v is not None})
# Build backend params for both shapes.
send_params = {"messages": messages, "model": model}
_opt = {
"temperature": payload.get("temperature"),
"top_p": payload.get("top_p"),
"max_tokens": payload.get("max_output_tokens"),
"tools": tools_responses_to_chat(tools),
"tool_choice": payload.get("tool_choice"),
"response_format": _text_format_to_response_format(payload.get("text")),
}
send_params.update({k: v for k, v in _opt.items() if v is not None})
native_instructions, native_input = messages_to_responses_input(messages)
native_params = {"model": model, "input": native_input, "store": False}
_nopt = {
"instructions": native_instructions,
"temperature": payload.get("temperature"),
"top_p": payload.get("top_p"),
"max_output_tokens": payload.get("max_output_tokens"),
"tools": tools,
"tool_choice": payload.get("tool_choice"),
"text": payload.get("text"),
"reasoning": payload.get("reasoning"),
}
native_params.update({k: v for k, v in _nopt.items() if v is not None})
native_instructions, native_input = messages_to_responses_input(messages)
native_params = {"model": model, "input": native_input, "store": False}
_nopt = {
"instructions": native_instructions,
"temperature": payload.get("temperature"),
"top_p": payload.get("top_p"),
"max_output_tokens": payload.get("max_output_tokens"),
"tools": tools,
"tool_choice": payload.get("tool_choice"),
"text": payload.get("text"),
"reasoning": payload.get("reasoning"),
}
native_params.update({k: v for k, v in _nopt.items() if v is not None})
except BaseException:
await decrement_usage(endpoint, tracking_model)
raise
async def _persist(status, output_items=None, usage=None, error=None, insert=False):
if not store:
@ -275,30 +283,37 @@ async def openai_responses_proxy(request: Request):
# ---- background: run detached, return queued immediately --------------
if background:
await _persist("queued", insert=True)
# Once the task is created, _bg_run's finally owns the release. Guard the
# pre-task setup so a failure there (queued persist, task creation, or a
# client disconnect) still releases the reservation.
try:
await _persist("queued", insert=True)
async def _bg_run():
try:
await get_db().update_response_status(response_id, "in_progress")
output_items, usage = await _run_to_completion(
native=native, oclient=oclient, endpoint=endpoint, model=model,
tracking_model=tracking_model, send_params=send_params,
native_params=native_params)
await _track(usage)
await _persist("completed", output_items=output_items, usage=usage)
await _cache_store(output_items, usage)
except asyncio.CancelledError:
await get_db().update_response_status(response_id, "cancelled")
raise
except Exception as e:
await get_db().update_response_status(
response_id, "failed",
error={"message": str(e)[:500], "type": type(e).__name__})
finally:
await decrement_usage(endpoint, tracking_model)
_background_tasks.pop(response_id, None)
async def _bg_run():
try:
await get_db().update_response_status(response_id, "in_progress")
output_items, usage = await _run_to_completion(
native=native, oclient=oclient, endpoint=endpoint, model=model,
tracking_model=tracking_model, send_params=send_params,
native_params=native_params)
await _track(usage)
await _persist("completed", output_items=output_items, usage=usage)
await _cache_store(output_items, usage)
except asyncio.CancelledError:
await get_db().update_response_status(response_id, "cancelled")
raise
except Exception as e:
await get_db().update_response_status(
response_id, "failed",
error={"message": str(e)[:500], "type": type(e).__name__})
finally:
await decrement_usage(endpoint, tracking_model)
_background_tasks.pop(response_id, None)
task = asyncio.create_task(_bg_run())
task = asyncio.create_task(_bg_run())
except BaseException:
await decrement_usage(endpoint, tracking_model)
raise
_background_tasks[response_id] = task
queued = build_response_object(response_id=response_id, model=model, output_items=[],
status="queued", created_at=created_at,
@ -308,18 +323,25 @@ async def openai_responses_proxy(request: Request):
# ---- streaming sync ----------------------------------------------------
if stream:
if native:
source = await oclient.responses.create(stream=True, **native_params)
translator = _NativeStream(response_id)
else:
source = await create_chat_with_retries(
oclient, {**send_params, "stream": True,
"stream_options": {"include_usage": True}},
endpoint, model, tracking_model)
translator = ChatToResponsesStream(
response_id, model, created_at=created_at,
previous_response_id=previous_response_id, instructions=instructions,
metadata=metadata)
# _stream's finally owns the release once iteration starts. Establishing
# the source can fail (or be cancelled) before that — release here, since
# create_chat_with_retries no longer self-decrements.
try:
if native:
source = await oclient.responses.create(stream=True, **native_params)
translator = _NativeStream(response_id)
else:
source = await create_chat_with_retries(
oclient, {**send_params, "stream": True,
"stream_options": {"include_usage": True}},
endpoint, model, tracking_model)
translator = ChatToResponsesStream(
response_id, model, created_at=created_at,
previous_response_id=previous_response_id, instructions=instructions,
metadata=metadata)
except BaseException:
await decrement_usage(endpoint, tracking_model)
raise
async def _stream():
await _persist("in_progress", insert=True)