plano/tests/archgw/test_streaming.py
Adil Hafeez a39e61ddeb Remove Responses API passthrough tests that need real /v1/responses
OpenAI model Responses API requests pass through to /v1/responses on the
upstream, which doesn't work with mock servers. Remove those tests from
the mock suite (they're covered by live e2e tests on main/nightly).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-18 23:58:34 +00:00

289 lines
10 KiB
Python

"""Mock-based streaming tests for all three API shapes.
Tests streaming for:
- OpenAI Chat Completions (both OpenAI and Anthropic clients)
- Anthropic Messages API (both native and cross-provider)
- OpenAI Responses API (passthrough and translated)
- Tool call streaming
- Thinking mode streaming
These tests require the gateway to be running with config_mock_llm.yaml
(started via docker-compose.mock.yaml).
"""
import json
import openai
import anthropic
import logging
from pytest_httpserver import HTTPServer
from pytest_httpserver.httpserver import HandlerType
from werkzeug.wrappers import Response
from conftest import (
setup_openai_chat_mock,
setup_anthropic_mock,
make_openai_tool_call_stream,
)
logger = logging.getLogger(__name__)
LLM_GATEWAY_BASE = "http://localhost:12000"
# =============================================================================
# OPENAI CHAT COMPLETIONS STREAMING
# =============================================================================
def test_openai_chat_streaming_basic(httpserver: HTTPServer):
"""Basic OpenAI streaming: verify chunks arrive in order and reassemble correctly"""
setup_openai_chat_mock(
httpserver, content="The quick brown fox jumps over the lazy dog"
)
client = openai.OpenAI(api_key="test-key", base_url=f"{LLM_GATEWAY_BASE}/v1")
stream = client.chat.completions.create(
model="gpt-4o-mini",
max_tokens=100,
messages=[{"role": "user", "content": "Hello"}],
stream=True,
)
chunks = []
for chunk in stream:
if chunk.choices[0].delta.content:
chunks.append(chunk.choices[0].delta.content)
full_text = "".join(chunks)
assert full_text == "The quick brown fox jumps over the lazy dog"
assert len(chunks) > 1, "Should have received multiple chunks"
def test_openai_chat_streaming_tool_calls(httpserver: HTTPServer):
"""OpenAI streaming with tool calls: verify tool call chunks are properly assembled"""
def handler(request):
body = json.loads(request.data)
model = body.get("model", "gpt-5-mini-2025-08-07")
return Response(
make_openai_tool_call_stream(
model=model, tool_name="echo_tool", tool_args='{"text":"hello"}'
),
status=200,
content_type="text/event-stream",
)
httpserver.expect_request(
"/v1/chat/completions",
method="POST",
handler_type=HandlerType.PERMANENT,
).respond_with_handler(handler)
client = openai.OpenAI(api_key="test-key", base_url=f"{LLM_GATEWAY_BASE}/v1")
stream = client.chat.completions.create(
model="gpt-4o-mini",
max_tokens=100,
messages=[{"role": "user", "content": "Call the echo tool"}],
tools=[
{
"type": "function",
"function": {
"name": "echo_tool",
"description": "Echo input",
"parameters": {
"type": "object",
"properties": {"text": {"type": "string"}},
"required": ["text"],
},
},
}
],
stream=True,
)
tool_calls = []
for chunk in stream:
if chunk.choices and chunk.choices[0].delta.tool_calls:
for tc in chunk.choices[0].delta.tool_calls:
while len(tool_calls) <= tc.index:
tool_calls.append(
{"id": "", "function": {"name": "", "arguments": ""}}
)
if tc.id:
tool_calls[tc.index]["id"] = tc.id
if tc.function:
if tc.function.name:
tool_calls[tc.index]["function"]["name"] = tc.function.name
if tc.function.arguments:
tool_calls[tc.index]["function"][
"arguments"
] += tc.function.arguments
assert len(tool_calls) > 0, "Should have received tool calls"
assert tool_calls[0]["function"]["name"] == "echo_tool"
assert tool_calls[0]["id"] == "call_mock_123"
# =============================================================================
# ANTHROPIC MESSAGES STREAMING
# =============================================================================
def test_anthropic_messages_streaming_basic(httpserver: HTTPServer):
"""Basic Anthropic streaming: verify text_stream yields chunks and final message is complete"""
setup_anthropic_mock(httpserver, content="Hello from streaming Claude!")
client = anthropic.Anthropic(api_key="test-key", base_url=LLM_GATEWAY_BASE)
with client.messages.stream(
model="claude-sonnet-4-20250514",
max_tokens=100,
messages=[{"role": "user", "content": "Hello"}],
) as stream:
pieces = list(stream.text_stream)
full_text = "".join(pieces)
final = stream.get_final_message()
assert full_text == "Hello from streaming Claude!"
assert len(pieces) > 1, "Should have received multiple text chunks"
assert final is not None
assert final.content[0].text == "Hello from streaming Claude!"
def test_anthropic_messages_streaming_thinking(httpserver: HTTPServer):
"""Anthropic thinking mode streaming: verify thinking + text blocks"""
setup_anthropic_mock(httpserver, thinking=True)
client = anthropic.Anthropic(api_key="test-key", base_url=LLM_GATEWAY_BASE)
events_seen = {
"thinking_start": False,
"thinking_delta": False,
"text_delta": False,
}
with client.messages.stream(
model="claude-sonnet-4-20250514",
max_tokens=2048,
thinking={"type": "enabled", "budget_tokens": 1024},
messages=[{"role": "user", "content": "What is 2+2?"}],
) as stream:
for event in stream:
if event.type == "content_block_start" and getattr(
event, "content_block", None
):
if getattr(event.content_block, "type", None) == "thinking":
events_seen["thinking_start"] = True
if event.type == "content_block_delta" and getattr(event, "delta", None):
if event.delta.type == "text_delta":
events_seen["text_delta"] = True
elif event.delta.type == "thinking_delta":
events_seen["thinking_delta"] = True
final = stream.get_final_message()
assert events_seen["thinking_start"], "No thinking block started"
assert events_seen["thinking_delta"], "No thinking deltas"
assert events_seen["text_delta"], "No text deltas"
block_types = [blk.type for blk in final.content]
assert "thinking" in block_types
assert "text" in block_types
# =============================================================================
# CROSS-PROVIDER STREAMING
# =============================================================================
def test_openai_client_streaming_anthropic_upstream(httpserver: HTTPServer):
"""OpenAI client streaming → Anthropic model → proxied via /v1/chat/completions"""
# Gateway routes OpenAI-format requests to /v1/chat/completions on upstream
setup_openai_chat_mock(httpserver, content="Cross-provider streaming works!")
client = openai.OpenAI(api_key="test-key", base_url=f"{LLM_GATEWAY_BASE}/v1")
stream = client.chat.completions.create(
model="claude-sonnet-4-20250514",
max_tokens=100,
messages=[{"role": "user", "content": "Hello"}],
stream=True,
)
chunks = []
for chunk in stream:
if chunk.choices[0].delta.content:
chunks.append(chunk.choices[0].delta.content)
assert "".join(chunks) == "Cross-provider streaming works!"
def test_anthropic_client_streaming_openai_upstream(httpserver: HTTPServer):
"""Anthropic client streaming → OpenAI model → OpenAI SSE → transformed to Anthropic SSE"""
setup_openai_chat_mock(httpserver, content="Reverse cross-provider streaming!")
client = anthropic.Anthropic(api_key="test-key", base_url=LLM_GATEWAY_BASE)
with client.messages.stream(
model="gpt-4o-mini",
max_tokens=100,
messages=[{"role": "user", "content": "Hello"}],
) as stream:
pieces = list(stream.text_stream)
full_text = "".join(pieces)
assert full_text == "Reverse cross-provider streaming!"
# =============================================================================
# RESPONSES API STREAMING
# =============================================================================
def test_responses_api_streaming_basic(httpserver: HTTPServer):
"""Responses API streaming: verify event types and content assembly"""
# Gateway translates Responses API to /v1/chat/completions on upstream
# for non-OpenAI models (OpenAI models pass through to /v1/responses which
# doesn't work with mocks)
setup_openai_chat_mock(httpserver, content="Responses API streaming works!")
client = openai.OpenAI(api_key="test-key", base_url=f"{LLM_GATEWAY_BASE}/v1")
stream = client.responses.create(
model="claude-sonnet-4-20250514",
input="Hello",
stream=True,
)
text_chunks = []
completed = False
for event in stream:
etype = getattr(event, "type", None)
if etype == "response.output_text.delta" and getattr(event, "delta", None):
text_chunks.append(event.delta)
if etype == "response.completed":
completed = True
full_content = "".join(text_chunks)
assert len(text_chunks) > 0, "Should have received text delta events"
assert len(full_content) > 0
def test_responses_api_streaming_translated_upstream(httpserver: HTTPServer):
"""Responses API streaming with non-OpenAI model → translated to chat completions upstream"""
setup_openai_chat_mock(httpserver, content="Translated streaming response!")
client = openai.OpenAI(api_key="test-key", base_url=f"{LLM_GATEWAY_BASE}/v1")
stream = client.responses.create(
model="claude-sonnet-4-20250514",
input="Hello",
stream=True,
)
text_chunks = []
for event in stream:
if getattr(event, "type", None) == "response.output_text.delta" and getattr(
event, "delta", None
):
text_chunks.append(event.delta)
assert (
len(text_chunks) > 0
), "Should have received text delta events from translated stream"