"""Tests for the OpenAI Responses API support (api/responses.py + requests/responses.py). Covers the pure translation layer, the translated (Ollama-style) and native (external-OpenAI) backend paths, conversation storage / chaining, background mode, and the retrieve / delete / cancel routes. """ import asyncio from contextlib import ExitStack, contextmanager from types import SimpleNamespace as NS from unittest.mock import AsyncMock, MagicMock, patch import orjson import pytest import router from api import responses as api_responses from requests import responses as rt # ────────────────────────────────────────────────────────────────────────────── # Pure translation unit tests (no app / no I/O) # ────────────────────────────────────────────────────────────────────────────── class TestTranslationInputToMessages: def test_string_input(self): msgs = rt.responses_input_to_messages("hello") assert msgs == [{"role": "user", "content": "hello"}] def test_instructions_become_system(self): msgs = rt.responses_input_to_messages("hi", instructions="be brief") assert msgs[0] == {"role": "system", "content": "be brief"} assert msgs[1] == {"role": "user", "content": "hi"} def test_item_list_text_and_image(self): items = [{ "type": "message", "role": "user", "content": [ {"type": "input_text", "text": "describe"}, {"type": "input_image", "image_url": "http://x/y.png"}, ], }] msgs = rt.responses_input_to_messages(items) assert msgs[0]["role"] == "user" assert msgs[0]["content"] == [ {"type": "text", "text": "describe"}, {"type": "image_url", "image_url": {"url": "http://x/y.png"}}, ] def test_single_text_part_collapses_to_string(self): items = [{"type": "message", "role": "user", "content": [{"type": "input_text", "text": "yo"}]}] assert rt.responses_input_to_messages(items)[0]["content"] == "yo" def test_function_call_roundtrip(self): items = [ {"type": "function_call", "call_id": "c1", "name": "get", "arguments": "{\"x\":1}"}, {"type": "function_call_output", "call_id": "c1", "output": "42"}, ] msgs = rt.responses_input_to_messages(items) assert msgs[0]["role"] == "assistant" assert msgs[0]["tool_calls"][0]["id"] == "c1" assert msgs[0]["tool_calls"][0]["function"]["name"] == "get" assert msgs[1] == {"role": "tool", "tool_call_id": "c1", "content": "42"} class TestTranslationResponseDirection: def test_chat_message_to_output_items_text(self): items = rt.chat_message_to_output_items({"role": "assistant", "content": "hi there"}) assert len(items) == 1 assert items[0]["type"] == "message" assert items[0]["content"][0] == {"type": "output_text", "text": "hi there", "annotations": []} def test_chat_message_to_output_items_tool_call(self): items = rt.chat_message_to_output_items({ "role": "assistant", "content": None, "tool_calls": [{"id": "c9", "function": {"name": "f", "arguments": "{}"}}], }) assert items[0]["type"] == "function_call" assert items[0]["call_id"] == "c9" assert items[0]["name"] == "f" def test_usage_mapping(self): u = rt.usage_chat_to_responses({"prompt_tokens": 7, "completion_tokens": 3}) assert u == {"input_tokens": 7, "output_tokens": 3, "total_tokens": 10} def test_build_response_object_output_text(self): items = rt.chat_message_to_output_items({"role": "assistant", "content": "abc"}) obj = rt.build_response_object(response_id="resp_1", model="m", output_items=items) assert obj["object"] == "response" assert obj["output_text"] == "abc" assert obj["status"] == "completed" def test_tools_responses_to_chat(self): tools = [{"type": "function", "name": "f", "description": "d", "parameters": {"type": "object"}}] chat_tools = rt.tools_responses_to_chat(tools) assert chat_tools == [{"type": "function", "function": {"name": "f", "description": "d", "parameters": {"type": "object"}}}] def test_messages_to_responses_input(self): instr, items = rt.messages_to_responses_input([ {"role": "system", "content": "sys"}, {"role": "user", "content": "hi"}, {"role": "assistant", "content": "yo"}, ]) assert instr == "sys" assert items[0] == {"role": "user", "content": [{"type": "input_text", "text": "hi"}]} assert items[1] == {"role": "assistant", "content": [{"type": "output_text", "text": "yo"}]} # ────────────────────────────────────────────────────────────────────────────── # Fakes for backend generators # ────────────────────────────────────────────────────────────────────────────── def _fake_completion(content="hello world", usage=(3, 5)): msg = MagicMock() msg.model_dump.return_value = {"role": "assistant", "content": content} usage_obj = MagicMock() usage_obj.model_dump.return_value = { "prompt_tokens": usage[0], "completion_tokens": usage[1], "total_tokens": sum(usage)} return NS(choices=[NS(message=msg)], usage=usage_obj) def _chunk(content=None, tool_calls=None): return NS(choices=[NS(delta=NS(content=content, tool_calls=tool_calls), finish_reason=None)], usage=None) def _usage_chunk(p, c): return NS(choices=[], usage=NS(prompt_tokens=p, completion_tokens=c)) def _text_chunks(): async def _gen(): yield _chunk(content="Hel") yield _chunk(content="lo") yield _usage_chunk(3, 5) return _gen() def _toolcall_chunks(): tc0 = NS(index=0, id="call_1", function=NS(name="lookup", arguments='{"q":')) tc1 = NS(index=0, id=None, function=NS(name=None, arguments='"hi"}')) async def _gen(): yield _chunk(tool_calls=[tc0]) yield _chunk(tool_calls=[tc1]) yield _usage_chunk(4, 2) return _gen() class _FakeEvent: def __init__(self, data): self._data = data def model_dump(self): return self._data def _native_event_stream(): async def _gen(): yield _FakeEvent({"type": "response.created", "response": {"id": "resp_openai", "status": "in_progress", "output": []}}) yield _FakeEvent({"type": "response.output_text.delta", "item_id": "msg_1", "output_index": 0, "delta": "hi"}) yield _FakeEvent({"type": "response.completed", "response": { "id": "resp_openai", "status": "completed", "output": [{"type": "message", "role": "assistant", "content": [{"type": "output_text", "text": "hi"}]}], "usage": {"input_tokens": 2, "output_tokens": 1, "total_tokens": 3}}}) return _gen() def _sse_events(text): """Split an SSE body into a list of (event_type, data_dict).""" out = [] for frame in text.strip().split("\n\n"): if not frame.strip(): continue etype = data = None for line in frame.splitlines(): if line.startswith("event: "): etype = line[len("event: "):] elif line.startswith("data: "): data = orjson.loads(line[len("data: "):]) out.append((etype, data)) return out @contextmanager def _enter(*cms): """Enter a variable number of context managers (works with *unpacked tuples).""" with ExitStack() as stack: for cm in cms: stack.enter_context(cm) yield def _patch_backend(native=False, endpoint="http://ollama:11434"): """Context managers patching endpoint selection + client construction.""" return ( patch.object(api_responses, "choose_endpoint", AsyncMock(return_value=(endpoint, "test-model:latest"))), patch.object(api_responses, "decrement_usage", AsyncMock()), patch.object(api_responses, "is_ext_openai_endpoint", return_value=native), patch.object(api_responses, "_make_openai_client", return_value=MagicMock()), patch.object(api_responses, "get_llm_cache", return_value=None), ) # ────────────────────────────────────────────────────────────────────────────── # Translated path (Ollama-style backend) # ────────────────────────────────────────────────────────────────────────────── class TestTranslatedPath: async def test_nonstream(self, client): with _enter(*_patch_backend(native=False), patch.object(api_responses, "create_chat_with_retries", AsyncMock(return_value=_fake_completion("hello world")))): resp = await client.post("/v1/responses", json={"model": "test-model", "input": "hi", "store": False}) assert resp.status_code == 200 body = resp.json() assert body["object"] == "response" assert body["output_text"] == "hello world" assert body["usage"] == {"input_tokens": 3, "output_tokens": 5, "total_tokens": 8} assert body["id"].startswith("resp_") async def test_stream_event_sequence(self, client): with _enter(*_patch_backend(native=False), patch.object(api_responses, "create_chat_with_retries", AsyncMock(return_value=_text_chunks()))): resp = await client.post("/v1/responses", json={"model": "test-model", "input": "hi", "stream": True, "store": False}) assert resp.status_code == 200 assert resp.headers["content-type"].startswith("text/event-stream") events = _sse_events(resp.content.decode()) types = [e[0] for e in events] assert types[0] == "response.created" assert "response.output_text.delta" in types assert types[-1] == "response.completed" # concatenated deltas reconstruct the content deltas = "".join(d["delta"] for t, d in events if t == "response.output_text.delta") assert deltas == "Hello" # completed event carries usage completed = [d for t, d in events if t == "response.completed"][0] assert completed["response"]["usage"]["input_tokens"] == 3 async def test_stream_tool_calls(self, client): with _enter(*_patch_backend(native=False), patch.object(api_responses, "create_chat_with_retries", AsyncMock(return_value=_toolcall_chunks()))): resp = await client.post("/v1/responses", json={"model": "test-model", "input": "lookup hi", "stream": True, "store": False}) events = _sse_events(resp.content.decode()) types = [e[0] for e in events] assert "response.function_call_arguments.delta" in types assert "response.function_call_arguments.done" in types args = "".join(d["delta"] for t, d in events if t == "response.function_call_arguments.delta") assert args == '{"q":"hi"}' completed = [d for t, d in events if t == "response.completed"][0] fc = [i for i in completed["response"]["output"] if i["type"] == "function_call"][0] assert fc["name"] == "lookup" assert fc["arguments"] == '{"q":"hi"}' # ────────────────────────────────────────────────────────────────────────────── # Native path (external OpenAI backend) # ────────────────────────────────────────────────────────────────────────────── class TestNativePath: async def test_nonstream_passthrough_rewrites_id(self, client): oclient = MagicMock() resp_obj = MagicMock() resp_obj.model_dump.return_value = { "id": "resp_openai", "status": "completed", "output": [{"type": "message", "role": "assistant", "content": [{"type": "output_text", "text": "native hi"}]}], "usage": {"input_tokens": 2, "output_tokens": 3, "total_tokens": 5}} oclient.responses.create = AsyncMock(return_value=resp_obj) with (patch.object(api_responses, "choose_endpoint", AsyncMock(return_value=("https://api.openai.com/v1", "gpt"))), patch.object(api_responses, "decrement_usage", AsyncMock()), patch.object(api_responses, "is_ext_openai_endpoint", return_value=True), patch.object(api_responses, "_make_openai_client", return_value=oclient), patch.object(api_responses, "get_llm_cache", return_value=None)): resp = await client.post("/v1/responses", json={"model": "gpt", "input": "hi", "store": False}) body = resp.json() assert body["output_text"] == "native hi" assert body["id"].startswith("resp_") and body["id"] != "resp_openai" # native call must not delegate state upstream assert oclient.responses.create.call_args.kwargs["store"] is False async def test_stream_passthrough(self, client): oclient = MagicMock() oclient.responses.create = AsyncMock(return_value=_native_event_stream()) with (patch.object(api_responses, "choose_endpoint", AsyncMock(return_value=("https://api.openai.com/v1", "gpt"))), patch.object(api_responses, "decrement_usage", AsyncMock()), patch.object(api_responses, "is_ext_openai_endpoint", return_value=True), patch.object(api_responses, "_make_openai_client", return_value=oclient), patch.object(api_responses, "get_llm_cache", return_value=None)): resp = await client.post("/v1/responses", json={"model": "gpt", "input": "hi", "stream": True, "store": False}) events = _sse_events(resp.content.decode()) # the completed event's response id is rewritten to the router id completed = [d for t, d in events if t == "response.completed"][0] assert completed["response"]["id"].startswith("resp_") assert completed["response"]["id"] != "resp_openai" # ────────────────────────────────────────────────────────────────────────────── # Storage + chaining + retrieve/delete # ────────────────────────────────────────────────────────────────────────────── class TestStorageAndChaining: async def test_store_and_retrieve(self, client): with _enter(*_patch_backend(native=False), patch.object(api_responses, "create_chat_with_retries", AsyncMock(return_value=_fake_completion("remembered")))): created = await client.post("/v1/responses", json={"model": "test-model", "input": "hi", "store": True}) rid = created.json()["id"] got = await client.get(f"/v1/responses/{rid}") assert got.status_code == 200 assert got.json()["output_text"] == "remembered" async def test_previous_response_id_rehydrates_history(self, client): # First turn with _enter(*_patch_backend(native=False), patch.object(api_responses, "create_chat_with_retries", AsyncMock(return_value=_fake_completion("turn-one")))): first = await client.post("/v1/responses", json={"model": "test-model", "input": "first?", "store": True}) rid = first.json()["id"] # Second turn references the first — capture the messages sent to the backend capture = AsyncMock(return_value=_fake_completion("turn-two")) with _enter(*_patch_backend(native=False), patch.object(api_responses, "create_chat_with_retries", capture)): await client.post("/v1/responses", json={"model": "test-model", "input": "second?", "previous_response_id": rid, "store": True}) sent_messages = capture.call_args.args[1]["messages"] contents = [m.get("content") for m in sent_messages] assert "first?" in contents # prior user turn replayed assert "turn-one" in contents # prior assistant turn replayed assert "second?" in contents # current turn appended async def test_delete(self, client): with _enter(*_patch_backend(native=False), patch.object(api_responses, "create_chat_with_retries", AsyncMock(return_value=_fake_completion("bye")))): created = await client.post("/v1/responses", json={"model": "test-model", "input": "hi", "store": True}) rid = created.json()["id"] deleted = await client.delete(f"/v1/responses/{rid}") assert deleted.status_code == 200 assert deleted.json()["deleted"] is True assert (await client.get(f"/v1/responses/{rid}")).status_code == 404 async def test_retrieve_missing_404(self, client): assert (await client.get("/v1/responses/resp_missing")).status_code == 404 # ────────────────────────────────────────────────────────────────────────────── # Background mode # ────────────────────────────────────────────────────────────────────────────── class TestBackgroundMode: async def test_background_requires_store(self, client): resp = await client.post("/v1/responses", json={"model": "test-model", "input": "hi", "background": True, "store": False}) assert resp.status_code == 400 async def test_background_lifecycle(self, client): with _enter(*_patch_backend(native=False), patch.object(api_responses, "create_chat_with_retries", AsyncMock(return_value=_fake_completion("bg-done")))): created = await client.post("/v1/responses", json={"model": "test-model", "input": "hi", "background": True, "store": True}) assert created.status_code == 200 assert created.json()["status"] == "queued" rid = created.json()["id"] # poll until terminal status = None for _ in range(100): await asyncio.sleep(0.01) got = await client.get(f"/v1/responses/{rid}") status = got.json()["status"] if status in ("completed", "failed", "cancelled"): break assert status == "completed" assert got.json()["output_text"] == "bg-done" async def test_fail_orphaned_responses(self, client): db = router.db await db.store_response("resp_orphan", previous_response_id=None, model="m", status="in_progress", created_at=0, input_messages=[]) n = await db.fail_orphaned_responses() assert n >= 1 row = await db.get_response("resp_orphan") assert row["status"] == "failed" # ────────────────────────────────────────────────────────────────────────────── # Cache parity # ────────────────────────────────────────────────────────────────────────────── class _FakeCache: def __init__(self, response_bytes): self._resp = response_bytes self.calls = [] async def get_chat(self, route, model, messages): self.calls.append((route, model, messages)) return self._resp class TestCacheParity: async def test_cache_hit_served_as_response(self, client): cached = orjson.dumps(rt.build_response_object( response_id="resp_cached", model="test-model", output_items=rt.chat_message_to_output_items( {"role": "assistant", "content": "from-cache"}))) fake = _FakeCache(cached) with (patch.object(api_responses, "get_llm_cache", return_value=fake), patch.object(api_responses, "choose_endpoint", AsyncMock(side_effect=AssertionError("backend must not be reached")))): resp = await client.post("/v1/responses", json={"model": "test-model", "input": "ping", "store": False, "nomyo": {"cache": True}}) assert resp.status_code == 200 assert resp.json()["output_text"] == "from-cache" assert fake.calls and fake.calls[0][0] == "openai_responses" async def test_cache_hit_served_as_sse(self, client): cached = orjson.dumps(rt.build_response_object( response_id="resp_cached", model="test-model", output_items=rt.chat_message_to_output_items( {"role": "assistant", "content": "from-cache"}))) fake = _FakeCache(cached) with (patch.object(api_responses, "get_llm_cache", return_value=fake), patch.object(api_responses, "choose_endpoint", AsyncMock(side_effect=AssertionError("backend must not be reached")))): resp = await client.post("/v1/responses", json={"model": "test-model", "input": "ping", "stream": True, "store": False, "nomyo": {"cache": True}}) assert resp.headers["content-type"].startswith("text/event-stream") events = _sse_events(resp.content.decode()) deltas = "".join(d["delta"] for t, d in events if t == "response.output_text.delta") assert deltas == "from-cache"