plano/tests/archgw/test_responses_api.py
Adil Hafeez aeef0c33a8 Fix HandlerType import and apply Black formatting
- Import HandlerType from pytest_httpserver.httpserver (not top-level)
- Apply Black formatting to all new test files

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-18 23:47:12 +00:00

394 lines
13 KiB
Python

"""Mock-based tests for the OpenAI Responses API (/v1/responses).
Tests passthrough to OpenAI, translation to chat completions for non-OpenAI
providers, tool calling, streaming, and multi-turn state management.
These tests require the gateway to be running with config_mock_llm.yaml
(started via docker-compose.mock.yaml).
"""
import json
import openai
import pytest
import logging
from pytest_httpserver import HTTPServer
from conftest import (
setup_openai_chat_mock,
setup_responses_api_mock,
)
logger = logging.getLogger(__name__)
LLM_GATEWAY_BASE = "http://localhost:12000"
# =============================================================================
# PASSTHROUGH TESTS (OpenAI upstream → /v1/responses)
# =============================================================================
def test_responses_api_non_streaming_passthrough(httpserver: HTTPServer):
"""Responses API with OpenAI model should pass through to /v1/responses"""
captured = setup_responses_api_mock(httpserver, content="Hello from Responses API!")
client = openai.OpenAI(api_key="test-key", base_url=f"{LLM_GATEWAY_BASE}/v1")
resp = client.responses.create(
model="gpt-4o",
input="Hello via responses passthrough",
)
assert resp is not None
assert resp.id is not None
assert resp.output_text == "Hello from Responses API!"
def test_responses_api_streaming_passthrough(httpserver: HTTPServer):
"""Responses API streaming with OpenAI model"""
setup_responses_api_mock(httpserver, content="Streaming responses API!")
client = openai.OpenAI(api_key="test-key", base_url=f"{LLM_GATEWAY_BASE}/v1")
stream = client.responses.create(
model="gpt-4o",
input="Write a haiku",
stream=True,
)
text_chunks = []
final_message = None
for event in stream:
if getattr(event, "type", None) == "response.output_text.delta" and getattr(
event, "delta", None
):
text_chunks.append(event.delta)
if getattr(event, "type", None) == "response.completed" and getattr(
event, "response", None
):
final_message = event.response
full_content = "".join(text_chunks)
assert len(text_chunks) > 0, "Should have received streaming text deltas"
assert len(full_content) > 0, "Should have received content"
def test_responses_api_with_tools_passthrough(httpserver: HTTPServer):
"""Responses API with tools for OpenAI model"""
setup_responses_api_mock(httpserver, content="Tool response")
client = openai.OpenAI(
api_key="test-key", base_url=f"{LLM_GATEWAY_BASE}/v1", max_retries=0
)
tools = [
{
"type": "function",
"name": "echo_tool",
"description": "Echo back the provided input",
"parameters": {
"type": "object",
"properties": {"text": {"type": "string"}},
"required": ["text"],
},
}
]
resp = client.responses.create(
model="openai/gpt-5-mini-2025-08-07",
input="Call the echo tool",
tools=tools,
)
assert resp is not None
assert resp.id is not None
def test_responses_api_streaming_with_tools_passthrough(httpserver: HTTPServer):
"""Responses API streaming with tools for OpenAI model"""
setup_responses_api_mock(httpserver, content="Streamed tool response")
client = openai.OpenAI(
api_key="test-key", base_url=f"{LLM_GATEWAY_BASE}/v1", max_retries=0
)
tools = [
{
"type": "function",
"name": "echo_tool",
"description": "Echo back the provided input",
"parameters": {
"type": "object",
"properties": {"text": {"type": "string"}},
"required": ["text"],
},
}
]
stream = client.responses.create(
model="openai/gpt-5-mini-2025-08-07",
input="Call the echo tool",
tools=tools,
stream=True,
)
text_chunks = []
tool_calls = []
for event in stream:
etype = getattr(event, "type", None)
if etype == "response.output_text.delta" and getattr(event, "delta", None):
text_chunks.append(event.delta)
if etype == "response.function_call_arguments.delta" and getattr(
event, "delta", None
):
tool_calls.append(event.delta)
assert text_chunks or tool_calls, "Expected streamed text or tool call deltas"
# =============================================================================
# UPSTREAM TRANSLATION TESTS (non-OpenAI → /v1/chat/completions)
# =============================================================================
def test_responses_api_non_streaming_upstream_anthropic(httpserver: HTTPServer):
"""Responses API with Anthropic model → translated to /v1/chat/completions"""
captured = setup_openai_chat_mock(
httpserver, content="Hello from Claude via Responses!"
)
client = openai.OpenAI(api_key="test-key", base_url=f"{LLM_GATEWAY_BASE}/v1")
resp = client.responses.create(
model="claude-sonnet-4-20250514",
input="Hello, translate this",
)
assert resp is not None
assert resp.id is not None
def test_responses_api_streaming_upstream_anthropic(httpserver: HTTPServer):
"""Responses API streaming with Anthropic model → translated upstream"""
setup_openai_chat_mock(httpserver, content="Streaming from Claude via Responses!")
client = openai.OpenAI(api_key="test-key", base_url=f"{LLM_GATEWAY_BASE}/v1")
stream = client.responses.create(
model="claude-sonnet-4-20250514",
input="Write a haiku",
stream=True,
)
text_chunks = []
for event in stream:
if getattr(event, "type", None) == "response.output_text.delta" and getattr(
event, "delta", None
):
text_chunks.append(event.delta)
assert len(text_chunks) > 0, "Should have received streaming text deltas"
def test_responses_api_with_tools_upstream_anthropic(httpserver: HTTPServer):
"""Responses API with tools routed to Anthropic (translated to chat completions)"""
setup_openai_chat_mock(httpserver, content="Tool response via Claude")
client = openai.OpenAI(api_key="test-key", base_url=f"{LLM_GATEWAY_BASE}/v1")
tools = [
{
"type": "function",
"name": "echo_tool",
"description": "Echo back the provided input: hello_world",
"parameters": {
"type": "object",
"properties": {"text": {"type": "string"}},
"required": ["text"],
},
}
]
resp = client.responses.create(
model="claude-sonnet-4-20250514",
input="Call the echo tool",
tools=tools,
)
assert resp.id is not None
def test_responses_api_streaming_with_tools_upstream_anthropic(httpserver: HTTPServer):
"""Responses API streaming with tools routed to Anthropic"""
setup_openai_chat_mock(httpserver, content="Streamed tool via Claude")
client = openai.OpenAI(
api_key="test-key", base_url=f"{LLM_GATEWAY_BASE}/v1", max_retries=0
)
tools = [
{
"type": "function",
"name": "echo_tool",
"description": "Echo back the provided input: hello_world",
"parameters": {
"type": "object",
"properties": {"text": {"type": "string"}},
"required": ["text"],
},
}
]
stream = client.responses.create(
model="claude-sonnet-4-20250514",
input="Call the echo tool with hello_world",
tools=tools,
stream=True,
)
text_chunks = []
tool_calls = []
for event in stream:
etype = getattr(event, "type", None)
if etype == "response.output_text.delta" and getattr(event, "delta", None):
text_chunks.append(event.delta)
if etype == "response.function_call_arguments.delta" and getattr(
event, "delta", None
):
tool_calls.append(event.delta)
assert text_chunks or tool_calls, "Expected streamed text or tool call deltas"
# =============================================================================
# MIXED CONTENT TYPES
# =============================================================================
def test_responses_api_mixed_content_types(httpserver: HTTPServer):
"""Responses API with mixed content types (string and array) in input"""
setup_responses_api_mock(httpserver, content="Weather Seattle")
client = openai.OpenAI(api_key="test-key", base_url=f"{LLM_GATEWAY_BASE}/v1")
resp = client.responses.create(
model="openai/gpt-5-mini-2025-08-07",
input=[
{
"role": "developer",
"content": "Generate a short chat title based on the user's message.",
},
{
"role": "user",
"content": [
{"type": "input_text", "text": "What is the weather in Seattle"}
],
},
],
)
assert resp is not None
assert resp.id is not None
assert len(resp.output_text) > 0
# =============================================================================
# STATE MANAGEMENT (multi-turn via previous_response_id)
# =============================================================================
def test_conversation_state_management_two_turn(httpserver: HTTPServer):
"""Two-turn conversation using previous_response_id for state management.
Turn 1: Send initial message → get response_id
Turn 2: Send with previous_response_id → verify state was combined
"""
# For non-OpenAI models, Responses API translates to /v1/chat/completions
# But for OpenAI models, it uses /v1/responses directly
# The state management is handled by brightstaff regardless of upstream
captured = setup_openai_chat_mock(
httpserver, content="I remember your name is Alice!"
)
client = openai.OpenAI(api_key="test-key", base_url=f"{LLM_GATEWAY_BASE}/v1")
# Turn 1
resp1 = client.responses.create(
model="claude-sonnet-4-20250514",
input="My name is Alice and I like pizza.",
)
response_id_1 = resp1.id
assert response_id_1 is not None
assert len(resp1.output_text) > 0
# Turn 2 with previous_response_id
resp2 = client.responses.create(
model="claude-sonnet-4-20250514",
input="What is my name?",
previous_response_id=response_id_1,
)
response_id_2 = resp2.id
assert response_id_2 is not None
assert response_id_2 != response_id_1
# Verify the upstream received both turns' messages in the second request
assert len(captured) == 2
second_request = captured[1]
messages = second_request.get("messages", [])
# Should have messages from both turns (user + assistant from turn 1, plus user from turn 2)
assert (
len(messages) >= 3
), f"Expected >= 3 messages in second turn, got {len(messages)}: {messages}"
def test_conversation_state_management_two_turn_streaming(httpserver: HTTPServer):
"""Two-turn streaming conversation using previous_response_id."""
captured = setup_openai_chat_mock(httpserver, content="Alice likes pizza!")
client = openai.OpenAI(api_key="test-key", base_url=f"{LLM_GATEWAY_BASE}/v1")
# Turn 1: streaming
stream1 = client.responses.create(
model="claude-sonnet-4-20250514",
input="My name is Alice and I like pizza.",
stream=True,
)
text_chunks_1 = []
response_id_1 = None
for event in stream1:
if getattr(event, "type", None) == "response.output_text.delta" and getattr(
event, "delta", None
):
text_chunks_1.append(event.delta)
if getattr(event, "type", None) == "response.completed" and getattr(
event, "response", None
):
response_id_1 = event.response.id
assert response_id_1 is not None
assert len(text_chunks_1) > 0
# Turn 2: streaming with previous_response_id
stream2 = client.responses.create(
model="claude-sonnet-4-20250514",
input="What do I like?",
previous_response_id=response_id_1,
stream=True,
)
text_chunks_2 = []
response_id_2 = None
for event in stream2:
if getattr(event, "type", None) == "response.output_text.delta" and getattr(
event, "delta", None
):
text_chunks_2.append(event.delta)
if getattr(event, "type", None) == "response.completed" and getattr(
event, "response", None
):
response_id_2 = event.response.id
assert response_id_2 is not None
assert response_id_2 != response_id_1
assert len(text_chunks_2) > 0
# Verify second turn included first turn's context
assert len(captured) == 2
second_request = captured[1]
messages = second_request.get("messages", [])
assert (
len(messages) >= 3
), f"Expected >= 3 messages in second turn, got {len(messages)}"