From e49ff4bbf44064a044942925b4b42d154d748ec2 Mon Sep 17 00:00:00 2001 From: Salman Paracha Date: Tue, 16 Dec 2025 11:53:06 -0800 Subject: [PATCH] cleaned up logs and fixed issue with connectivity for llm gateway in weather forecast demo --- crates/brightstaff/src/main.rs | 6 +- crates/brightstaff/src/state/mod.rs | 2 +- .../src/state/{supabase.rs => postgresql.rs} | 10 +- .../arch_config_with_aliases.yaml | 10 - ...arch_config_memory_state_v1_responses.yaml | 25 ++ tests/e2e/run_e2e_tests.sh | 8 + tests/e2e/test_openai_responses_api_client.py | 201 ---------------- ..._openai_responses_api_client_with_state.py | 218 ++++++++++++++++++ 8 files changed, 260 insertions(+), 220 deletions(-) rename crates/brightstaff/src/state/{supabase.rs => postgresql.rs} (98%) create mode 100644 tests/e2e/arch_config_memory_state_v1_responses.yaml create mode 100644 tests/e2e/test_openai_responses_api_client_with_state.py diff --git a/crates/brightstaff/src/main.rs b/crates/brightstaff/src/main.rs index 8217acc8..a67cb38c 100644 --- a/crates/brightstaff/src/main.rs +++ b/crates/brightstaff/src/main.rs @@ -3,9 +3,9 @@ use brightstaff::handlers::llm::llm_chat; use brightstaff::handlers::models::list_models; use brightstaff::handlers::function_calling::{function_calling_chat_handler}; use brightstaff::router::llm_router::RouterService; -use brightstaff::state::memory::MemoryConversationalStorage; use brightstaff::state::StateStorage; -use brightstaff::state::supabase::SupabaseConversationalStorage; +use brightstaff::state::postgresql::PostgreSQLConversationStorage; +use brightstaff::state::memory::MemoryConversationalStorage; use brightstaff::utils::tracing::init_tracer; use bytes::Bytes; use common::configuration::Configuration; @@ -123,7 +123,7 @@ async fn main() -> Result<(), Box> { debug!("Postgres connection string (full): {}", connection_string); info!("Initializing conversation state storage: Postgres"); Arc::new( - SupabaseConversationalStorage::new(connection_string.clone()) + PostgreSQLConversationStorage::new(connection_string.clone()) .await .expect("Failed to initialize Postgres state storage"), ) diff --git a/crates/brightstaff/src/state/mod.rs b/crates/brightstaff/src/state/mod.rs index efd9b0a5..99d408ff 100644 --- a/crates/brightstaff/src/state/mod.rs +++ b/crates/brightstaff/src/state/mod.rs @@ -8,7 +8,7 @@ use tracing::{debug}; pub mod memory; pub mod response_state_processor; -pub mod supabase; +pub mod postgresql; /// Represents the conversational state for a v1/responses request /// Contains the complete input/output history that can be restored diff --git a/crates/brightstaff/src/state/supabase.rs b/crates/brightstaff/src/state/postgresql.rs similarity index 98% rename from crates/brightstaff/src/state/supabase.rs rename to crates/brightstaff/src/state/postgresql.rs index 990b0a77..64141b5a 100644 --- a/crates/brightstaff/src/state/supabase.rs +++ b/crates/brightstaff/src/state/postgresql.rs @@ -8,12 +8,12 @@ use tracing::{debug, info, warn}; /// Supabase/PostgreSQL storage backend for conversation state #[derive(Clone)] -pub struct SupabaseConversationalStorage { +pub struct PostgreSQLConversationStorage { client: Arc, table_verified: Arc>, } -impl SupabaseConversationalStorage { +impl PostgreSQLConversationStorage { /// Creates a new Supabase storage instance with the given connection string pub async fn new(connection_string: String) -> Result { let (client, connection) = tokio_postgres::connect(&connection_string, NoTls) @@ -76,7 +76,7 @@ impl SupabaseConversationalStorage { } #[async_trait] -impl StateStorage for SupabaseConversationalStorage { +impl StateStorage for PostgreSQLConversationStorage { async fn put(&self, state: OpenAIConversationState) -> Result<(), StateStorageError> { self.ensure_ready().await?; @@ -251,9 +251,9 @@ mod tests { // Set TEST_DATABASE_URL environment variable to run integration tests // Example: TEST_DATABASE_URL=postgresql://user:pass@localhost/test_db - async fn get_test_storage() -> Option { + async fn get_test_storage() -> Option { if let Ok(db_url) = std::env::var("TEST_DATABASE_URL") { - match SupabaseConversationalStorage::new(db_url).await { + match PostgreSQLConversationStorage::new(db_url).await { Ok(storage) => Some(storage), Err(e) => { eprintln!("Failed to create test storage: {}", e); diff --git a/demos/use_cases/model_alias_routing/arch_config_with_aliases.yaml b/demos/use_cases/model_alias_routing/arch_config_with_aliases.yaml index a002c384..0aaaa537 100644 --- a/demos/use_cases/model_alias_routing/arch_config_with_aliases.yaml +++ b/demos/use_cases/model_alias_routing/arch_config_with_aliases.yaml @@ -92,13 +92,3 @@ model_aliases: tracing: random_sampling: 100 - -state_storage: - # Type: memory | postgres - type: postgres - - # Connection string for postgres type - # Environment variables are supported using $VAR_NAME or ${VAR_NAME} syntax - # Variables MUST be set before running config validation/rendering - # Example with environment variable substitution: - connection_string: "postgresql://postgres.saueycoonskiktmozyvp:$DB_PASSWORD@aws-0-us-west-2.pooler.supabase.com:5432/postgres" diff --git a/tests/e2e/arch_config_memory_state_v1_responses.yaml b/tests/e2e/arch_config_memory_state_v1_responses.yaml new file mode 100644 index 00000000..0986b8b7 --- /dev/null +++ b/tests/e2e/arch_config_memory_state_v1_responses.yaml @@ -0,0 +1,25 @@ +version: v0.1 + +listeners: + egress_traffic: + address: 0.0.0.0 + port: 12000 + message_format: openai + timeout: 30s + +llm_providers: + + # OpenAI Models + - model: openai/gpt-5-mini-2025-08-07 + access_key: $OPENAI_API_KEY + default: true + + # Anthropic Models + - model: anthropic/claude-sonnet-4-20250514 + access_key: $ANTHROPIC_API_KEY + +# State storage configuration for v1/responses API +# Manages conversation state for multi-turn conversations +state_storage_v1_responses: + # Type: memory | postgres + type: memory diff --git a/tests/e2e/run_e2e_tests.sh b/tests/e2e/run_e2e_tests.sh index f60f79bc..a6e66121 100644 --- a/tests/e2e/run_e2e_tests.sh +++ b/tests/e2e/run_e2e_tests.sh @@ -69,6 +69,14 @@ log running e2e tests for openai responses api client log ======================================== poetry run pytest test_openai_responses_api_client.py +log startup arch gateway with state storage for openai responses api client demo +archgw down +archgw up arch_config_memory_state_v1_responses.yaml + +log running e2e tests for openai responses api client +log ======================================== +poetry run pytest test_openai_responses_api_client_with_state.py + log shutting down the weather_forecast demo log ======================================= cd ../../demos/samples_python/weather_forecast diff --git a/tests/e2e/test_openai_responses_api_client.py b/tests/e2e/test_openai_responses_api_client.py index 7ccf1bb8..800db93d 100644 --- a/tests/e2e/test_openai_responses_api_client.py +++ b/tests/e2e/test_openai_responses_api_client.py @@ -628,204 +628,3 @@ def test_openai_responses_api_streaming_with_tools_upstream_anthropic(): assert ( full_text or tool_calls ), "Expected streamed text or tool call argument deltas from Responses tools stream" - - -def test_conversation_state_management_two_turn(): - """ - Test conversation state management across two turns: - 1. Send initial message to non-OpenAI model via v1/responses - 2. Capture response_id from first response - 3. Send second message with previous_response_id - 4. Verify model receives both messages in correct order - """ - base_url = LLM_GATEWAY_ENDPOINT.replace("/v1/chat/completions", "") - client = openai.OpenAI(api_key="test-key", base_url=f"{base_url}/v1") - - logger.info("\n" + "=" * 80) - logger.info("TEST: Conversation State Management - Two Turn Flow") - logger.info("=" * 80) - - # Turn 1: Send initial message to Anthropic (non-OpenAI model) - logger.info("\n[TURN 1] Sending initial message...") - resp1 = client.responses.create( - model="claude-sonnet-4-20250514", - input="My name is Alice and I like pizza.", - ) - - # Extract response_id from first response - response_id_1 = resp1.id - logger.info(f"[TURN 1] Received response_id: {response_id_1}") - logger.info(f"[TURN 1] Model response: {resp1.output_text}") - - assert response_id_1 is not None, "First response should have an id" - assert len(resp1.output_text) > 0, "First response should have content" - - # Turn 2: Send follow-up message with previous_response_id - # Ask the model to list all messages to verify state was combined - logger.info( - f"\n[TURN 2] Sending follow-up with previous_response_id={response_id_1}" - ) - resp2 = client.responses.create( - model="claude-sonnet-4-20250514", - input="Please list all the messages you have received in our conversation, numbering each one.", - previous_response_id=response_id_1, - ) - - response_id_2 = resp2.id - logger.info(f"[TURN 2] Received response_id: {response_id_2}") - logger.info(f"[TURN 2] Model response: {resp2.output_text}") - - assert response_id_2 is not None, "Second response should have an id" - assert response_id_2 != response_id_1, "Second response should have different id" - - # Verify the model received the conversation history - # The response should reference both the initial message and the follow-up - response_lower = resp2.output_text.lower() - - # Check if the model acknowledges receiving multiple messages - # Different models might format this differently, so we check for various indicators - has_conversation_context = ( - "alice" in response_lower - or "pizza" in response_lower # References the name from turn 1 - or "two" in response_lower # References the preference from turn 1 - or "2" in response_lower # Mentions number of messages - or "first" in response_lower # Numeric indicator - or "second" # References first message - in response_lower # References second message - ) - - logger.info( - f"\n[VALIDATION] Conversation context preserved: {has_conversation_context}" - ) - logger.info( - f"[VALIDATION] Response contains conversation markers: {has_conversation_context}" - ) - - print(f"\n{'='*80}") - print("Conversation State Test Results:") - print(f"Turn 1 Response ID: {response_id_1}") - print(f"Turn 2 Response ID: {response_id_2}") - print(f"Turn 1 Output: {resp1.output_text[:100]}...") - print(f"Turn 2 Output: {resp2.output_text}") - print(f"Conversation Context Preserved: {has_conversation_context}") - print(f"{'='*80}\n") - - assert has_conversation_context, ( - f"Model should have received conversation history. " - f"Response: {resp2.output_text}" - ) - - -def test_conversation_state_management_two_turn_streaming(): - """ - Test conversation state management across two turns with streaming: - 1. Send initial streaming message to non-OpenAI model via v1/responses - 2. Capture response_id from first response - 3. Send second streaming message with previous_response_id - 4. Verify model receives both messages in correct order - """ - base_url = LLM_GATEWAY_ENDPOINT.replace("/v1/chat/completions", "") - client = openai.OpenAI(api_key="test-key", base_url=f"{base_url}/v1") - - logger.info("\n" + "=" * 80) - logger.info("TEST: Conversation State Management - Two Turn Streaming Flow") - logger.info("=" * 80) - - # Turn 1: Send initial streaming message to Anthropic (non-OpenAI model) - logger.info("\n[TURN 1] Sending initial streaming message...") - stream1 = client.responses.create( - model="claude-sonnet-4-20250514", - input="My name is Alice and I like pizza.", - stream=True, - ) - - # Collect streamed content and capture response_id - text_chunks_1 = [] - response_id_1 = None - - for event in stream1: - if getattr(event, "type", None) == "response.output_text.delta" and getattr( - event, "delta", None - ): - text_chunks_1.append(event.delta) - - # Capture response_id from response.completed event - if getattr(event, "type", None) == "response.completed" and getattr( - event, "response", None - ): - response_id_1 = event.response.id - - output_1 = "".join(text_chunks_1) - logger.info(f"[TURN 1] Received response_id: {response_id_1}") - logger.info(f"[TURN 1] Model response: {output_1}") - - assert response_id_1 is not None, "First response should have an id" - assert len(output_1) > 0, "First response should have content" - - # Turn 2: Send follow-up streaming message with previous_response_id - logger.info( - f"\n[TURN 2] Sending follow-up streaming request with previous_response_id={response_id_1}" - ) - stream2 = client.responses.create( - model="claude-sonnet-4-20250514", - input="Please list all the messages you have received in our conversation, numbering each one.", - previous_response_id=response_id_1, - stream=True, - ) - - # Collect streamed content from second response - text_chunks_2 = [] - response_id_2 = None - - for event in stream2: - if getattr(event, "type", None) == "response.output_text.delta" and getattr( - event, "delta", None - ): - text_chunks_2.append(event.delta) - - # Capture response_id from response.completed event - if getattr(event, "type", None) == "response.completed" and getattr( - event, "response", None - ): - response_id_2 = event.response.id - - output_2 = "".join(text_chunks_2) - logger.info(f"[TURN 2] Received response_id: {response_id_2}") - logger.info(f"[TURN 2] Model response: {output_2}") - - assert response_id_2 is not None, "Second response should have an id" - assert response_id_2 != response_id_1, "Second response should have different id" - - # Verify the model received the conversation history - response_lower = output_2.lower() - - # Check if the model acknowledges receiving multiple messages - has_conversation_context = ( - "alice" in response_lower - or "pizza" in response_lower # References the name from turn 1 - or "two" in response_lower # References the preference from turn 1 - or "2" in response_lower # Mentions number of messages - or "first" in response_lower # Numeric indicator - or "second" # References first message - in response_lower # References second message - ) - - logger.info( - f"\n[VALIDATION] Conversation context preserved: {has_conversation_context}" - ) - logger.info( - f"[VALIDATION] Response contains conversation markers: {has_conversation_context}" - ) - - print(f"\n{'='*80}") - print("Streaming Conversation State Test Results:") - print(f"Turn 1 Response ID: {response_id_1}") - print(f"Turn 2 Response ID: {response_id_2}") - print(f"Turn 1 Output: {output_1[:100]}...") - print(f"Turn 2 Output: {output_2}") - print(f"Conversation Context Preserved: {has_conversation_context}") - print(f"{'='*80}\n") - - assert has_conversation_context, ( - f"Model should have received conversation history. " f"Response: {output_2}" - ) diff --git a/tests/e2e/test_openai_responses_api_client_with_state.py b/tests/e2e/test_openai_responses_api_client_with_state.py new file mode 100644 index 00000000..c23307e6 --- /dev/null +++ b/tests/e2e/test_openai_responses_api_client_with_state.py @@ -0,0 +1,218 @@ +import openai +import pytest +import os +import logging +import sys + +# Set up logging +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", + handlers=[logging.StreamHandler(sys.stdout)], +) +logger = logging.getLogger(__name__) + +LLM_GATEWAY_ENDPOINT = os.getenv( + "LLM_GATEWAY_ENDPOINT", "http://localhost:12000/v1/chat/completions" +) + + +def test_conversation_state_management_two_turn(): + """ + Test conversation state management across two turns: + 1. Send initial message to non-OpenAI model via v1/responses + 2. Capture response_id from first response + 3. Send second message with previous_response_id + 4. Verify model receives both messages in correct order + """ + base_url = LLM_GATEWAY_ENDPOINT.replace("/v1/chat/completions", "") + client = openai.OpenAI(api_key="test-key", base_url=f"{base_url}/v1") + + logger.info("\n" + "=" * 80) + logger.info("TEST: Conversation State Management - Two Turn Flow") + logger.info("=" * 80) + + # Turn 1: Send initial message to Anthropic (non-OpenAI model) + logger.info("\n[TURN 1] Sending initial message...") + resp1 = client.responses.create( + model="claude-sonnet-4-20250514", + input="My name is Alice and I like pizza.", + ) + + # Extract response_id from first response + response_id_1 = resp1.id + logger.info(f"[TURN 1] Received response_id: {response_id_1}") + logger.info(f"[TURN 1] Model response: {resp1.output_text}") + + assert response_id_1 is not None, "First response should have an id" + assert len(resp1.output_text) > 0, "First response should have content" + + # Turn 2: Send follow-up message with previous_response_id + # Ask the model to list all messages to verify state was combined + logger.info( + f"\n[TURN 2] Sending follow-up with previous_response_id={response_id_1}" + ) + resp2 = client.responses.create( + model="claude-sonnet-4-20250514", + input="Please list all the messages you have received in our conversation, numbering each one.", + previous_response_id=response_id_1, + ) + + response_id_2 = resp2.id + logger.info(f"[TURN 2] Received response_id: {response_id_2}") + logger.info(f"[TURN 2] Model response: {resp2.output_text}") + + assert response_id_2 is not None, "Second response should have an id" + assert response_id_2 != response_id_1, "Second response should have different id" + + # Verify the model received the conversation history + # The response should reference both the initial message and the follow-up + response_lower = resp2.output_text.lower() + + # Check if the model acknowledges receiving multiple messages + # Different models might format this differently, so we check for various indicators + has_conversation_context = ( + "alice" in response_lower + or "pizza" in response_lower # References the name from turn 1 + or "two" in response_lower # References the preference from turn 1 + or "2" in response_lower # Mentions number of messages + or "first" in response_lower # Numeric indicator + or "second" # References first message + in response_lower # References second message + ) + + logger.info( + f"\n[VALIDATION] Conversation context preserved: {has_conversation_context}" + ) + logger.info( + f"[VALIDATION] Response contains conversation markers: {has_conversation_context}" + ) + + print(f"\n{'='*80}") + print("Conversation State Test Results:") + print(f"Turn 1 Response ID: {response_id_1}") + print(f"Turn 2 Response ID: {response_id_2}") + print(f"Turn 1 Output: {resp1.output_text[:100]}...") + print(f"Turn 2 Output: {resp2.output_text}") + print(f"Conversation Context Preserved: {has_conversation_context}") + print(f"{'='*80}\n") + + assert has_conversation_context, ( + f"Model should have received conversation history. " + f"Response: {resp2.output_text}" + ) + + +def test_conversation_state_management_two_turn_streaming(): + """ + Test conversation state management across two turns with streaming: + 1. Send initial streaming message to non-OpenAI model via v1/responses + 2. Capture response_id from first response + 3. Send second streaming message with previous_response_id + 4. Verify model receives both messages in correct order + """ + base_url = LLM_GATEWAY_ENDPOINT.replace("/v1/chat/completions", "") + client = openai.OpenAI(api_key="test-key", base_url=f"{base_url}/v1") + + logger.info("\n" + "=" * 80) + logger.info("TEST: Conversation State Management - Two Turn Streaming Flow") + logger.info("=" * 80) + + # Turn 1: Send initial streaming message to Anthropic (non-OpenAI model) + logger.info("\n[TURN 1] Sending initial streaming message...") + stream1 = client.responses.create( + model="claude-sonnet-4-20250514", + input="My name is Alice and I like pizza.", + stream=True, + ) + + # Collect streamed content and capture response_id + text_chunks_1 = [] + response_id_1 = None + + for event in stream1: + if getattr(event, "type", None) == "response.output_text.delta" and getattr( + event, "delta", None + ): + text_chunks_1.append(event.delta) + + # Capture response_id from response.completed event + if getattr(event, "type", None) == "response.completed" and getattr( + event, "response", None + ): + response_id_1 = event.response.id + + output_1 = "".join(text_chunks_1) + logger.info(f"[TURN 1] Received response_id: {response_id_1}") + logger.info(f"[TURN 1] Model response: {output_1}") + + assert response_id_1 is not None, "First response should have an id" + assert len(output_1) > 0, "First response should have content" + + # Turn 2: Send follow-up streaming message with previous_response_id + logger.info( + f"\n[TURN 2] Sending follow-up streaming request with previous_response_id={response_id_1}" + ) + stream2 = client.responses.create( + model="claude-sonnet-4-20250514", + input="Please list all the messages you have received in our conversation, numbering each one.", + previous_response_id=response_id_1, + stream=True, + ) + + # Collect streamed content from second response + text_chunks_2 = [] + response_id_2 = None + + for event in stream2: + if getattr(event, "type", None) == "response.output_text.delta" and getattr( + event, "delta", None + ): + text_chunks_2.append(event.delta) + + # Capture response_id from response.completed event + if getattr(event, "type", None) == "response.completed" and getattr( + event, "response", None + ): + response_id_2 = event.response.id + + output_2 = "".join(text_chunks_2) + logger.info(f"[TURN 2] Received response_id: {response_id_2}") + logger.info(f"[TURN 2] Model response: {output_2}") + + assert response_id_2 is not None, "Second response should have an id" + assert response_id_2 != response_id_1, "Second response should have different id" + + # Verify the model received the conversation history + response_lower = output_2.lower() + + # Check if the model acknowledges receiving multiple messages + has_conversation_context = ( + "alice" in response_lower + or "pizza" in response_lower # References the name from turn 1 + or "two" in response_lower # References the preference from turn 1 + or "2" in response_lower # Mentions number of messages + or "first" in response_lower # Numeric indicator + or "second" # References first message + in response_lower # References second message + ) + + logger.info( + f"\n[VALIDATION] Conversation context preserved: {has_conversation_context}" + ) + logger.info( + f"[VALIDATION] Response contains conversation markers: {has_conversation_context}" + ) + + print(f"\n{'='*80}") + print("Streaming Conversation State Test Results:") + print(f"Turn 1 Response ID: {response_id_1}") + print(f"Turn 2 Response ID: {response_id_2}") + print(f"Turn 1 Output: {output_1[:100]}...") + print(f"Turn 2 Output: {output_2}") + print(f"Conversation Context Preserved: {has_conversation_context}") + print(f"{'='*80}\n") + + assert has_conversation_context, ( + f"Model should have received conversation history. " f"Response: {output_2}" + )