460 lines
24 KiB
Python
460 lines
24 KiB
Python
"""Tests for the OpenAI Responses API support (api/responses.py + requests/responses.py).
|
|
|
|
Covers the pure translation layer, the translated (Ollama-style) and native
|
|
(external-OpenAI) backend paths, conversation storage / chaining, background mode,
|
|
and the retrieve / delete / cancel routes.
|
|
"""
|
|
import asyncio
|
|
from contextlib import ExitStack, contextmanager
|
|
from types import SimpleNamespace as NS
|
|
from unittest.mock import AsyncMock, MagicMock, patch
|
|
|
|
import orjson
|
|
import pytest
|
|
|
|
import router
|
|
from api import responses as api_responses
|
|
from requests import responses as rt
|
|
|
|
|
|
# ──────────────────────────────────────────────────────────────────────────────
|
|
# Pure translation unit tests (no app / no I/O)
|
|
# ──────────────────────────────────────────────────────────────────────────────
|
|
|
|
class TestTranslationInputToMessages:
|
|
def test_string_input(self):
|
|
msgs = rt.responses_input_to_messages("hello")
|
|
assert msgs == [{"role": "user", "content": "hello"}]
|
|
|
|
def test_instructions_become_system(self):
|
|
msgs = rt.responses_input_to_messages("hi", instructions="be brief")
|
|
assert msgs[0] == {"role": "system", "content": "be brief"}
|
|
assert msgs[1] == {"role": "user", "content": "hi"}
|
|
|
|
def test_item_list_text_and_image(self):
|
|
items = [{
|
|
"type": "message", "role": "user",
|
|
"content": [
|
|
{"type": "input_text", "text": "describe"},
|
|
{"type": "input_image", "image_url": "http://x/y.png"},
|
|
],
|
|
}]
|
|
msgs = rt.responses_input_to_messages(items)
|
|
assert msgs[0]["role"] == "user"
|
|
assert msgs[0]["content"] == [
|
|
{"type": "text", "text": "describe"},
|
|
{"type": "image_url", "image_url": {"url": "http://x/y.png"}},
|
|
]
|
|
|
|
def test_single_text_part_collapses_to_string(self):
|
|
items = [{"type": "message", "role": "user",
|
|
"content": [{"type": "input_text", "text": "yo"}]}]
|
|
assert rt.responses_input_to_messages(items)[0]["content"] == "yo"
|
|
|
|
def test_function_call_roundtrip(self):
|
|
items = [
|
|
{"type": "function_call", "call_id": "c1", "name": "get", "arguments": "{\"x\":1}"},
|
|
{"type": "function_call_output", "call_id": "c1", "output": "42"},
|
|
]
|
|
msgs = rt.responses_input_to_messages(items)
|
|
assert msgs[0]["role"] == "assistant"
|
|
assert msgs[0]["tool_calls"][0]["id"] == "c1"
|
|
assert msgs[0]["tool_calls"][0]["function"]["name"] == "get"
|
|
assert msgs[1] == {"role": "tool", "tool_call_id": "c1", "content": "42"}
|
|
|
|
|
|
class TestTranslationResponseDirection:
|
|
def test_chat_message_to_output_items_text(self):
|
|
items = rt.chat_message_to_output_items({"role": "assistant", "content": "hi there"})
|
|
assert len(items) == 1
|
|
assert items[0]["type"] == "message"
|
|
assert items[0]["content"][0] == {"type": "output_text", "text": "hi there", "annotations": []}
|
|
|
|
def test_chat_message_to_output_items_tool_call(self):
|
|
items = rt.chat_message_to_output_items({
|
|
"role": "assistant", "content": None,
|
|
"tool_calls": [{"id": "c9", "function": {"name": "f", "arguments": "{}"}}],
|
|
})
|
|
assert items[0]["type"] == "function_call"
|
|
assert items[0]["call_id"] == "c9"
|
|
assert items[0]["name"] == "f"
|
|
|
|
def test_usage_mapping(self):
|
|
u = rt.usage_chat_to_responses({"prompt_tokens": 7, "completion_tokens": 3})
|
|
assert u == {"input_tokens": 7, "output_tokens": 3, "total_tokens": 10}
|
|
|
|
def test_build_response_object_output_text(self):
|
|
items = rt.chat_message_to_output_items({"role": "assistant", "content": "abc"})
|
|
obj = rt.build_response_object(response_id="resp_1", model="m", output_items=items)
|
|
assert obj["object"] == "response"
|
|
assert obj["output_text"] == "abc"
|
|
assert obj["status"] == "completed"
|
|
|
|
def test_tools_responses_to_chat(self):
|
|
tools = [{"type": "function", "name": "f", "description": "d", "parameters": {"type": "object"}}]
|
|
chat_tools = rt.tools_responses_to_chat(tools)
|
|
assert chat_tools == [{"type": "function",
|
|
"function": {"name": "f", "description": "d",
|
|
"parameters": {"type": "object"}}}]
|
|
|
|
def test_messages_to_responses_input(self):
|
|
instr, items = rt.messages_to_responses_input([
|
|
{"role": "system", "content": "sys"},
|
|
{"role": "user", "content": "hi"},
|
|
{"role": "assistant", "content": "yo"},
|
|
])
|
|
assert instr == "sys"
|
|
assert items[0] == {"role": "user", "content": [{"type": "input_text", "text": "hi"}]}
|
|
assert items[1] == {"role": "assistant", "content": [{"type": "output_text", "text": "yo"}]}
|
|
|
|
|
|
# ──────────────────────────────────────────────────────────────────────────────
|
|
# Fakes for backend generators
|
|
# ──────────────────────────────────────────────────────────────────────────────
|
|
|
|
def _fake_completion(content="hello world", usage=(3, 5)):
|
|
msg = MagicMock()
|
|
msg.model_dump.return_value = {"role": "assistant", "content": content}
|
|
usage_obj = MagicMock()
|
|
usage_obj.model_dump.return_value = {
|
|
"prompt_tokens": usage[0], "completion_tokens": usage[1], "total_tokens": sum(usage)}
|
|
return NS(choices=[NS(message=msg)], usage=usage_obj)
|
|
|
|
|
|
def _chunk(content=None, tool_calls=None):
|
|
return NS(choices=[NS(delta=NS(content=content, tool_calls=tool_calls),
|
|
finish_reason=None)], usage=None)
|
|
|
|
|
|
def _usage_chunk(p, c):
|
|
return NS(choices=[], usage=NS(prompt_tokens=p, completion_tokens=c))
|
|
|
|
|
|
def _text_chunks():
|
|
async def _gen():
|
|
yield _chunk(content="Hel")
|
|
yield _chunk(content="lo")
|
|
yield _usage_chunk(3, 5)
|
|
return _gen()
|
|
|
|
|
|
def _toolcall_chunks():
|
|
tc0 = NS(index=0, id="call_1", function=NS(name="lookup", arguments='{"q":'))
|
|
tc1 = NS(index=0, id=None, function=NS(name=None, arguments='"hi"}'))
|
|
|
|
async def _gen():
|
|
yield _chunk(tool_calls=[tc0])
|
|
yield _chunk(tool_calls=[tc1])
|
|
yield _usage_chunk(4, 2)
|
|
return _gen()
|
|
|
|
|
|
class _FakeEvent:
|
|
def __init__(self, data):
|
|
self._data = data
|
|
|
|
def model_dump(self):
|
|
return self._data
|
|
|
|
|
|
def _native_event_stream():
|
|
async def _gen():
|
|
yield _FakeEvent({"type": "response.created",
|
|
"response": {"id": "resp_openai", "status": "in_progress", "output": []}})
|
|
yield _FakeEvent({"type": "response.output_text.delta",
|
|
"item_id": "msg_1", "output_index": 0, "delta": "hi"})
|
|
yield _FakeEvent({"type": "response.completed", "response": {
|
|
"id": "resp_openai", "status": "completed",
|
|
"output": [{"type": "message", "role": "assistant",
|
|
"content": [{"type": "output_text", "text": "hi"}]}],
|
|
"usage": {"input_tokens": 2, "output_tokens": 1, "total_tokens": 3}}})
|
|
return _gen()
|
|
|
|
|
|
def _sse_events(text):
|
|
"""Split an SSE body into a list of (event_type, data_dict)."""
|
|
out = []
|
|
for frame in text.strip().split("\n\n"):
|
|
if not frame.strip():
|
|
continue
|
|
etype = data = None
|
|
for line in frame.splitlines():
|
|
if line.startswith("event: "):
|
|
etype = line[len("event: "):]
|
|
elif line.startswith("data: "):
|
|
data = orjson.loads(line[len("data: "):])
|
|
out.append((etype, data))
|
|
return out
|
|
|
|
|
|
@contextmanager
|
|
def _enter(*cms):
|
|
"""Enter a variable number of context managers (works with *unpacked tuples)."""
|
|
with ExitStack() as stack:
|
|
for cm in cms:
|
|
stack.enter_context(cm)
|
|
yield
|
|
|
|
|
|
def _patch_backend(native=False, endpoint="http://ollama:11434"):
|
|
"""Context managers patching endpoint selection + client construction."""
|
|
return (
|
|
patch.object(api_responses, "choose_endpoint",
|
|
AsyncMock(return_value=(endpoint, "test-model:latest"))),
|
|
patch.object(api_responses, "decrement_usage", AsyncMock()),
|
|
patch.object(api_responses, "is_ext_openai_endpoint", return_value=native),
|
|
patch.object(api_responses, "_make_openai_client", return_value=MagicMock()),
|
|
patch.object(api_responses, "get_llm_cache", return_value=None),
|
|
)
|
|
|
|
|
|
# ──────────────────────────────────────────────────────────────────────────────
|
|
# Translated path (Ollama-style backend)
|
|
# ──────────────────────────────────────────────────────────────────────────────
|
|
|
|
class TestTranslatedPath:
|
|
async def test_nonstream(self, client):
|
|
with _enter(*_patch_backend(native=False),
|
|
patch.object(api_responses, "create_chat_with_retries",
|
|
AsyncMock(return_value=_fake_completion("hello world")))):
|
|
resp = await client.post("/v1/responses",
|
|
json={"model": "test-model", "input": "hi", "store": False})
|
|
assert resp.status_code == 200
|
|
body = resp.json()
|
|
assert body["object"] == "response"
|
|
assert body["output_text"] == "hello world"
|
|
assert body["usage"] == {"input_tokens": 3, "output_tokens": 5, "total_tokens": 8}
|
|
assert body["id"].startswith("resp_")
|
|
|
|
async def test_stream_event_sequence(self, client):
|
|
with _enter(*_patch_backend(native=False),
|
|
patch.object(api_responses, "create_chat_with_retries",
|
|
AsyncMock(return_value=_text_chunks()))):
|
|
resp = await client.post("/v1/responses",
|
|
json={"model": "test-model", "input": "hi",
|
|
"stream": True, "store": False})
|
|
assert resp.status_code == 200
|
|
assert resp.headers["content-type"].startswith("text/event-stream")
|
|
events = _sse_events(resp.content.decode())
|
|
types = [e[0] for e in events]
|
|
assert types[0] == "response.created"
|
|
assert "response.output_text.delta" in types
|
|
assert types[-1] == "response.completed"
|
|
# concatenated deltas reconstruct the content
|
|
deltas = "".join(d["delta"] for t, d in events if t == "response.output_text.delta")
|
|
assert deltas == "Hello"
|
|
# completed event carries usage
|
|
completed = [d for t, d in events if t == "response.completed"][0]
|
|
assert completed["response"]["usage"]["input_tokens"] == 3
|
|
|
|
async def test_stream_tool_calls(self, client):
|
|
with _enter(*_patch_backend(native=False),
|
|
patch.object(api_responses, "create_chat_with_retries",
|
|
AsyncMock(return_value=_toolcall_chunks()))):
|
|
resp = await client.post("/v1/responses",
|
|
json={"model": "test-model", "input": "lookup hi",
|
|
"stream": True, "store": False})
|
|
events = _sse_events(resp.content.decode())
|
|
types = [e[0] for e in events]
|
|
assert "response.function_call_arguments.delta" in types
|
|
assert "response.function_call_arguments.done" in types
|
|
args = "".join(d["delta"] for t, d in events
|
|
if t == "response.function_call_arguments.delta")
|
|
assert args == '{"q":"hi"}'
|
|
completed = [d for t, d in events if t == "response.completed"][0]
|
|
fc = [i for i in completed["response"]["output"] if i["type"] == "function_call"][0]
|
|
assert fc["name"] == "lookup"
|
|
assert fc["arguments"] == '{"q":"hi"}'
|
|
|
|
|
|
# ──────────────────────────────────────────────────────────────────────────────
|
|
# Native path (external OpenAI backend)
|
|
# ──────────────────────────────────────────────────────────────────────────────
|
|
|
|
class TestNativePath:
|
|
async def test_nonstream_passthrough_rewrites_id(self, client):
|
|
oclient = MagicMock()
|
|
resp_obj = MagicMock()
|
|
resp_obj.model_dump.return_value = {
|
|
"id": "resp_openai", "status": "completed",
|
|
"output": [{"type": "message", "role": "assistant",
|
|
"content": [{"type": "output_text", "text": "native hi"}]}],
|
|
"usage": {"input_tokens": 2, "output_tokens": 3, "total_tokens": 5}}
|
|
oclient.responses.create = AsyncMock(return_value=resp_obj)
|
|
with (patch.object(api_responses, "choose_endpoint",
|
|
AsyncMock(return_value=("https://api.openai.com/v1", "gpt"))),
|
|
patch.object(api_responses, "decrement_usage", AsyncMock()),
|
|
patch.object(api_responses, "is_ext_openai_endpoint", return_value=True),
|
|
patch.object(api_responses, "_make_openai_client", return_value=oclient),
|
|
patch.object(api_responses, "get_llm_cache", return_value=None)):
|
|
resp = await client.post("/v1/responses",
|
|
json={"model": "gpt", "input": "hi", "store": False})
|
|
body = resp.json()
|
|
assert body["output_text"] == "native hi"
|
|
assert body["id"].startswith("resp_") and body["id"] != "resp_openai"
|
|
# native call must not delegate state upstream
|
|
assert oclient.responses.create.call_args.kwargs["store"] is False
|
|
|
|
async def test_stream_passthrough(self, client):
|
|
oclient = MagicMock()
|
|
oclient.responses.create = AsyncMock(return_value=_native_event_stream())
|
|
with (patch.object(api_responses, "choose_endpoint",
|
|
AsyncMock(return_value=("https://api.openai.com/v1", "gpt"))),
|
|
patch.object(api_responses, "decrement_usage", AsyncMock()),
|
|
patch.object(api_responses, "is_ext_openai_endpoint", return_value=True),
|
|
patch.object(api_responses, "_make_openai_client", return_value=oclient),
|
|
patch.object(api_responses, "get_llm_cache", return_value=None)):
|
|
resp = await client.post("/v1/responses",
|
|
json={"model": "gpt", "input": "hi",
|
|
"stream": True, "store": False})
|
|
events = _sse_events(resp.content.decode())
|
|
# the completed event's response id is rewritten to the router id
|
|
completed = [d for t, d in events if t == "response.completed"][0]
|
|
assert completed["response"]["id"].startswith("resp_")
|
|
assert completed["response"]["id"] != "resp_openai"
|
|
|
|
|
|
# ──────────────────────────────────────────────────────────────────────────────
|
|
# Storage + chaining + retrieve/delete
|
|
# ──────────────────────────────────────────────────────────────────────────────
|
|
|
|
class TestStorageAndChaining:
|
|
async def test_store_and_retrieve(self, client):
|
|
with _enter(*_patch_backend(native=False),
|
|
patch.object(api_responses, "create_chat_with_retries",
|
|
AsyncMock(return_value=_fake_completion("remembered")))):
|
|
created = await client.post("/v1/responses",
|
|
json={"model": "test-model", "input": "hi", "store": True})
|
|
rid = created.json()["id"]
|
|
got = await client.get(f"/v1/responses/{rid}")
|
|
assert got.status_code == 200
|
|
assert got.json()["output_text"] == "remembered"
|
|
|
|
async def test_previous_response_id_rehydrates_history(self, client):
|
|
# First turn
|
|
with _enter(*_patch_backend(native=False),
|
|
patch.object(api_responses, "create_chat_with_retries",
|
|
AsyncMock(return_value=_fake_completion("turn-one")))):
|
|
first = await client.post("/v1/responses",
|
|
json={"model": "test-model", "input": "first?", "store": True})
|
|
rid = first.json()["id"]
|
|
|
|
# Second turn references the first — capture the messages sent to the backend
|
|
capture = AsyncMock(return_value=_fake_completion("turn-two"))
|
|
with _enter(*_patch_backend(native=False),
|
|
patch.object(api_responses, "create_chat_with_retries", capture)):
|
|
await client.post("/v1/responses",
|
|
json={"model": "test-model", "input": "second?",
|
|
"previous_response_id": rid, "store": True})
|
|
sent_messages = capture.call_args.args[1]["messages"]
|
|
contents = [m.get("content") for m in sent_messages]
|
|
assert "first?" in contents # prior user turn replayed
|
|
assert "turn-one" in contents # prior assistant turn replayed
|
|
assert "second?" in contents # current turn appended
|
|
|
|
async def test_delete(self, client):
|
|
with _enter(*_patch_backend(native=False),
|
|
patch.object(api_responses, "create_chat_with_retries",
|
|
AsyncMock(return_value=_fake_completion("bye")))):
|
|
created = await client.post("/v1/responses",
|
|
json={"model": "test-model", "input": "hi", "store": True})
|
|
rid = created.json()["id"]
|
|
deleted = await client.delete(f"/v1/responses/{rid}")
|
|
assert deleted.status_code == 200
|
|
assert deleted.json()["deleted"] is True
|
|
assert (await client.get(f"/v1/responses/{rid}")).status_code == 404
|
|
|
|
async def test_retrieve_missing_404(self, client):
|
|
assert (await client.get("/v1/responses/resp_missing")).status_code == 404
|
|
|
|
|
|
# ──────────────────────────────────────────────────────────────────────────────
|
|
# Background mode
|
|
# ──────────────────────────────────────────────────────────────────────────────
|
|
|
|
class TestBackgroundMode:
|
|
async def test_background_requires_store(self, client):
|
|
resp = await client.post("/v1/responses",
|
|
json={"model": "test-model", "input": "hi",
|
|
"background": True, "store": False})
|
|
assert resp.status_code == 400
|
|
|
|
async def test_background_lifecycle(self, client):
|
|
with _enter(*_patch_backend(native=False),
|
|
patch.object(api_responses, "create_chat_with_retries",
|
|
AsyncMock(return_value=_fake_completion("bg-done")))):
|
|
created = await client.post("/v1/responses",
|
|
json={"model": "test-model", "input": "hi",
|
|
"background": True, "store": True})
|
|
assert created.status_code == 200
|
|
assert created.json()["status"] == "queued"
|
|
rid = created.json()["id"]
|
|
# poll until terminal
|
|
status = None
|
|
for _ in range(100):
|
|
await asyncio.sleep(0.01)
|
|
got = await client.get(f"/v1/responses/{rid}")
|
|
status = got.json()["status"]
|
|
if status in ("completed", "failed", "cancelled"):
|
|
break
|
|
assert status == "completed"
|
|
assert got.json()["output_text"] == "bg-done"
|
|
|
|
async def test_fail_orphaned_responses(self, client):
|
|
db = router.db
|
|
await db.store_response("resp_orphan", previous_response_id=None, model="m",
|
|
status="in_progress", created_at=0, input_messages=[])
|
|
n = await db.fail_orphaned_responses()
|
|
assert n >= 1
|
|
row = await db.get_response("resp_orphan")
|
|
assert row["status"] == "failed"
|
|
|
|
|
|
# ──────────────────────────────────────────────────────────────────────────────
|
|
# Cache parity
|
|
# ──────────────────────────────────────────────────────────────────────────────
|
|
|
|
class _FakeCache:
|
|
def __init__(self, response_bytes):
|
|
self._resp = response_bytes
|
|
self.calls = []
|
|
|
|
async def get_chat(self, route, model, messages):
|
|
self.calls.append((route, model, messages))
|
|
return self._resp
|
|
|
|
|
|
class TestCacheParity:
|
|
async def test_cache_hit_served_as_response(self, client):
|
|
cached = orjson.dumps(rt.build_response_object(
|
|
response_id="resp_cached", model="test-model",
|
|
output_items=rt.chat_message_to_output_items(
|
|
{"role": "assistant", "content": "from-cache"})))
|
|
fake = _FakeCache(cached)
|
|
with (patch.object(api_responses, "get_llm_cache", return_value=fake),
|
|
patch.object(api_responses, "choose_endpoint",
|
|
AsyncMock(side_effect=AssertionError("backend must not be reached")))):
|
|
resp = await client.post("/v1/responses",
|
|
json={"model": "test-model", "input": "ping",
|
|
"store": False, "nomyo": {"cache": True}})
|
|
assert resp.status_code == 200
|
|
assert resp.json()["output_text"] == "from-cache"
|
|
assert fake.calls and fake.calls[0][0] == "openai_responses"
|
|
|
|
async def test_cache_hit_served_as_sse(self, client):
|
|
cached = orjson.dumps(rt.build_response_object(
|
|
response_id="resp_cached", model="test-model",
|
|
output_items=rt.chat_message_to_output_items(
|
|
{"role": "assistant", "content": "from-cache"})))
|
|
fake = _FakeCache(cached)
|
|
with (patch.object(api_responses, "get_llm_cache", return_value=fake),
|
|
patch.object(api_responses, "choose_endpoint",
|
|
AsyncMock(side_effect=AssertionError("backend must not be reached")))):
|
|
resp = await client.post("/v1/responses",
|
|
json={"model": "test-model", "input": "ping",
|
|
"stream": True, "store": False,
|
|
"nomyo": {"cache": True}})
|
|
assert resp.headers["content-type"].startswith("text/event-stream")
|
|
events = _sse_events(resp.content.decode())
|
|
deltas = "".join(d["delta"] for t, d in events if t == "response.output_text.delta")
|
|
assert deltas == "from-cache"
|