diff --git a/arch/arch_config_schema.yaml b/arch/arch_config_schema.yaml index f481b389..f1690ad4 100644 --- a/arch/arch_config_schema.yaml +++ b/arch/arch_config_schema.yaml @@ -331,6 +331,31 @@ properties: model: type: string additionalProperties: false + state_storage_v1_responses: + type: object + properties: + type: + type: string + enum: + - memory + - postgres + connection_string: + type: string + description: Required when type is postgres. Supports environment variable substitution using $VAR or ${VAR} syntax. + additionalProperties: false + required: + - type + # Note: connection_string is conditionally required based on type + # If type is 'postgres', connection_string must be provided + # If type is 'memory', connection_string is not needed + allOf: + - if: + properties: + type: + const: postgres + then: + required: + - connection_string prompt_guards: type: object properties: diff --git a/arch/supervisord.conf b/arch/supervisord.conf index 2b715a1e..9761f779 100644 --- a/arch/supervisord.conf +++ b/arch/supervisord.conf @@ -2,7 +2,7 @@ nodaemon=true [program:brightstaff] -command=sh -c "RUST_LOG=info /app/brightstaff 2>&1 | tee /var/log/brightstaff.log | while IFS= read -r line; do echo '[brightstaff]' \"$line\"; done" +command=sh -c "envsubst < /app/arch_config_rendered.yaml > /app/arch_config_rendered.env_sub.yaml && RUST_LOG=debug ARCH_CONFIG_PATH_RENDERED=/app/arch_config_rendered.env_sub.yaml /app/brightstaff 2>&1 | tee /var/log/brightstaff.log | while IFS= read -r line; do echo '[brightstaff]' \"$line\"; done" stdout_logfile=/dev/stdout redirect_stderr=true stdout_logfile_maxbytes=0 diff --git a/arch/tools/cli/utils.py b/arch/tools/cli/utils.py index 2f29b16e..21dd5af4 100644 --- a/arch/tools/cli/utils.py +++ b/arch/tools/cli/utils.py @@ -148,6 +148,20 @@ def get_llm_provider_access_keys(arch_config_file): if access_key is not None: access_key_list.append(access_key) + # Extract environment variables from state_storage.connection_string + state_storage = arch_config_yaml.get("state_storage_v1_responses") + if state_storage: + connection_string = state_storage.get("connection_string") + if connection_string and isinstance(connection_string, str): + # Extract all $VAR and ${VAR} patterns from connection string + import re + + # Match both $VAR and ${VAR} patterns + pattern = r"\$\{?([A-Z_][A-Z0-9_]*)\}?" + matches = re.findall(pattern, connection_string) + for var in matches: + access_key_list.append(f"${var}") + return access_key_list diff --git a/crates/brightstaff/src/handlers/llm.rs b/crates/brightstaff/src/handlers/llm.rs index 5af6f112..5c5bcf01 100644 --- a/crates/brightstaff/src/handlers/llm.rs +++ b/crates/brightstaff/src/handlers/llm.rs @@ -37,7 +37,7 @@ pub async fn llm_chat( model_aliases: Arc>>, llm_providers: Arc>>, trace_collector: Arc, - state_storage: Arc, + state_storage: Option>, ) -> Result>, hyper::Error> { let request_path = request.uri().path().to_string(); @@ -106,8 +106,9 @@ pub async fn llm_chat( // === v1/responses state management: Determine upstream API and combine input if needed === // Do this BEFORE routing since routing consumes the request + // Only process state if state_storage is configured let mut should_manage_state = false; - if is_responses_api_client { + if is_responses_api_client && state_storage.is_some() { if let ProviderRequestType::ResponsesAPIRequest(ref mut responses_req) = client_request { // Extract original input once original_input_items = extract_input_items(&responses_req.input); @@ -130,7 +131,7 @@ pub async fn llm_chat( // Retrieve and combine conversation history if previous_response_id exists if let Some(ref prev_resp_id) = responses_req.previous_response_id { match retrieve_and_combine_input( - state_storage.clone(), + state_storage.as_ref().unwrap().clone(), prev_resp_id, original_input_items, // Pass ownership instead of cloning ) @@ -267,8 +268,8 @@ pub async fn llm_chat( ); // === v1/responses state management: Wrap with ResponsesStateProcessor === - // Only wrap if we need to manage state (client is ResponsesAPI AND upstream is NOT ResponsesAPI) - let streaming_response = if should_manage_state && !original_input_items.is_empty() { + // Only wrap if we need to manage state (client is ResponsesAPI AND upstream is NOT ResponsesAPI AND state_storage is configured) + let streaming_response = if should_manage_state && !original_input_items.is_empty() && state_storage.is_some() { // Extract Content-Encoding header to handle decompression for state parsing let content_encoding = response_headers .get("content-encoding") @@ -278,7 +279,7 @@ pub async fn llm_chat( // Wrap with state management processor to store state after response completes let state_processor = ResponsesStateProcessor::new( base_processor, - state_storage, + state_storage.unwrap(), original_input_items, resolved_model.clone(), model_name.clone(), diff --git a/crates/brightstaff/src/main.rs b/crates/brightstaff/src/main.rs index 78be13a5..8217acc8 100644 --- a/crates/brightstaff/src/main.rs +++ b/crates/brightstaff/src/main.rs @@ -5,6 +5,7 @@ use brightstaff::handlers::function_calling::{function_calling_chat_handler}; use brightstaff::router::llm_router::RouterService; use brightstaff::state::memory::MemoryConversationalStorage; use brightstaff::state::StateStorage; +use brightstaff::state::supabase::SupabaseConversationalStorage; use brightstaff::utils::tracing::init_tracer; use bytes::Bytes; use common::configuration::Configuration; @@ -104,9 +105,35 @@ async fn main() -> Result<(), Box> { let _flusher_handle = trace_collector.clone().start_background_flusher(); // Initialize conversation state storage for v1/responses - // TODO: Make this configurable (MEMORY vs SUPABASE) via arch_config.yaml - let state_storage: Arc = Arc::new(MemoryConversationalStorage::new()); - info!("Initialized conversation state storage: Memory"); + // Configurable via arch_config.yaml state_storage section + // If not configured, state management is disabled + // Environment variables are substituted by envsubst before config is read + let state_storage: Option> = if let Some(storage_config) = &arch_config.state_storage_v1_responses { + let storage: Arc = match storage_config.storage_type { + common::configuration::StateStorageType::Memory => { + info!("Initialized conversation state storage: Memory"); + Arc::new(MemoryConversationalStorage::new()) + } + common::configuration::StateStorageType::Postgres => { + let connection_string = storage_config + .connection_string + .as_ref() + .expect("connection_string is required for postgres state_storage"); + + debug!("Postgres connection string (full): {}", connection_string); + info!("Initializing conversation state storage: Postgres"); + Arc::new( + SupabaseConversationalStorage::new(connection_string.clone()) + .await + .expect("Failed to initialize Postgres state storage"), + ) + } + }; + Some(storage) + } else { + info!("No state_storage configured - conversation state management disabled"); + None + }; loop { diff --git a/crates/common/src/configuration.rs b/crates/common/src/configuration.rs index 27f8ebd9..ff039c3c 100644 --- a/crates/common/src/configuration.rs +++ b/crates/common/src/configuration.rs @@ -41,6 +41,20 @@ pub struct Listener { pub port: u16, } +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct StateStorageConfig { + #[serde(rename = "type")] + pub storage_type: StateStorageType, + pub connection_string: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +#[serde(rename_all = "lowercase")] +pub enum StateStorageType { + Memory, + Postgres, +} + #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Configuration { pub version: String, @@ -58,6 +72,7 @@ pub struct Configuration { pub routing: Option, pub agents: Option>, pub listeners: Vec, + pub state_storage_v1_responses: Option, } #[derive(Debug, Clone, Serialize, Deserialize, Default)] diff --git a/demos/use_cases/model_alias_routing/arch_config_with_aliases.yaml b/demos/use_cases/model_alias_routing/arch_config_with_aliases.yaml index 0aaaa537..a002c384 100644 --- a/demos/use_cases/model_alias_routing/arch_config_with_aliases.yaml +++ b/demos/use_cases/model_alias_routing/arch_config_with_aliases.yaml @@ -92,3 +92,13 @@ model_aliases: tracing: random_sampling: 100 + +state_storage: + # Type: memory | postgres + type: postgres + + # Connection string for postgres type + # Environment variables are supported using $VAR_NAME or ${VAR_NAME} syntax + # Variables MUST be set before running config validation/rendering + # Example with environment variable substitution: + connection_string: "postgresql://postgres.saueycoonskiktmozyvp:$DB_PASSWORD@aws-0-us-west-2.pooler.supabase.com:5432/postgres" diff --git a/docs/source/resources/includes/arch_config_state_storage_example.yaml b/docs/source/resources/includes/arch_config_state_storage_example.yaml new file mode 100644 index 00000000..27a417c0 --- /dev/null +++ b/docs/source/resources/includes/arch_config_state_storage_example.yaml @@ -0,0 +1,32 @@ +version: v0.1 + +listeners: + egress_traffic: + address: 0.0.0.0 + port: 12000 + message_format: openai + timeout: 30s + +llm_providers: + + # OpenAI Models + - model: openai/gpt-5-mini-2025-08-07 + access_key: $OPENAI_API_KEY + default: true + + # Anthropic Models + - model: anthropic/claude-sonnet-4-20250514 + access_key: $ANTHROPIC_API_KEY + +# State storage configuration for v1/responses API +# Manages conversation state for multi-turn conversations +state_storage: + # Type: memory | postgres + type: postgres + + # Connection string for postgres type + # Environment variables are supported using $VAR_NAME or ${VAR_NAME} syntax + # Replace [USER] and [HOST] with your actual database credentials + # Variables like $DB_PASSWORD MUST be set before running config validation/rendering + # Example: Replace [USER] with 'myuser' and [HOST] with 'db.example.com:5432' + connection_string: "postgresql://[USER]:$DB_PASSWORD@[HOST]:5432/postgres"