added the state_storage_v1_responses flag, and use that to store state appropriately

This commit is contained in:
Salman Paracha 2025-12-16 11:18:19 -08:00
parent 142aaabc10
commit 1212b526b8
8 changed files with 134 additions and 10 deletions

View file

@ -331,6 +331,31 @@ properties:
model:
type: string
additionalProperties: false
state_storage_v1_responses:
type: object
properties:
type:
type: string
enum:
- memory
- postgres
connection_string:
type: string
description: Required when type is postgres. Supports environment variable substitution using $VAR or ${VAR} syntax.
additionalProperties: false
required:
- type
# Note: connection_string is conditionally required based on type
# If type is 'postgres', connection_string must be provided
# If type is 'memory', connection_string is not needed
allOf:
- if:
properties:
type:
const: postgres
then:
required:
- connection_string
prompt_guards:
type: object
properties:

View file

@ -2,7 +2,7 @@
nodaemon=true
[program:brightstaff]
command=sh -c "RUST_LOG=info /app/brightstaff 2>&1 | tee /var/log/brightstaff.log | while IFS= read -r line; do echo '[brightstaff]' \"$line\"; done"
command=sh -c "envsubst < /app/arch_config_rendered.yaml > /app/arch_config_rendered.env_sub.yaml && RUST_LOG=debug ARCH_CONFIG_PATH_RENDERED=/app/arch_config_rendered.env_sub.yaml /app/brightstaff 2>&1 | tee /var/log/brightstaff.log | while IFS= read -r line; do echo '[brightstaff]' \"$line\"; done"
stdout_logfile=/dev/stdout
redirect_stderr=true
stdout_logfile_maxbytes=0

View file

@ -148,6 +148,20 @@ def get_llm_provider_access_keys(arch_config_file):
if access_key is not None:
access_key_list.append(access_key)
# Extract environment variables from state_storage.connection_string
state_storage = arch_config_yaml.get("state_storage_v1_responses")
if state_storage:
connection_string = state_storage.get("connection_string")
if connection_string and isinstance(connection_string, str):
# Extract all $VAR and ${VAR} patterns from connection string
import re
# Match both $VAR and ${VAR} patterns
pattern = r"\$\{?([A-Z_][A-Z0-9_]*)\}?"
matches = re.findall(pattern, connection_string)
for var in matches:
access_key_list.append(f"${var}")
return access_key_list

View file

@ -37,7 +37,7 @@ pub async fn llm_chat(
model_aliases: Arc<Option<HashMap<String, ModelAlias>>>,
llm_providers: Arc<RwLock<Vec<LlmProvider>>>,
trace_collector: Arc<TraceCollector>,
state_storage: Arc<dyn StateStorage>,
state_storage: Option<Arc<dyn StateStorage>>,
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
let request_path = request.uri().path().to_string();
@ -106,8 +106,9 @@ pub async fn llm_chat(
// === v1/responses state management: Determine upstream API and combine input if needed ===
// Do this BEFORE routing since routing consumes the request
// Only process state if state_storage is configured
let mut should_manage_state = false;
if is_responses_api_client {
if is_responses_api_client && state_storage.is_some() {
if let ProviderRequestType::ResponsesAPIRequest(ref mut responses_req) = client_request {
// Extract original input once
original_input_items = extract_input_items(&responses_req.input);
@ -130,7 +131,7 @@ pub async fn llm_chat(
// Retrieve and combine conversation history if previous_response_id exists
if let Some(ref prev_resp_id) = responses_req.previous_response_id {
match retrieve_and_combine_input(
state_storage.clone(),
state_storage.as_ref().unwrap().clone(),
prev_resp_id,
original_input_items, // Pass ownership instead of cloning
)
@ -267,8 +268,8 @@ pub async fn llm_chat(
);
// === v1/responses state management: Wrap with ResponsesStateProcessor ===
// Only wrap if we need to manage state (client is ResponsesAPI AND upstream is NOT ResponsesAPI)
let streaming_response = if should_manage_state && !original_input_items.is_empty() {
// Only wrap if we need to manage state (client is ResponsesAPI AND upstream is NOT ResponsesAPI AND state_storage is configured)
let streaming_response = if should_manage_state && !original_input_items.is_empty() && state_storage.is_some() {
// Extract Content-Encoding header to handle decompression for state parsing
let content_encoding = response_headers
.get("content-encoding")
@ -278,7 +279,7 @@ pub async fn llm_chat(
// Wrap with state management processor to store state after response completes
let state_processor = ResponsesStateProcessor::new(
base_processor,
state_storage,
state_storage.unwrap(),
original_input_items,
resolved_model.clone(),
model_name.clone(),

View file

@ -5,6 +5,7 @@ use brightstaff::handlers::function_calling::{function_calling_chat_handler};
use brightstaff::router::llm_router::RouterService;
use brightstaff::state::memory::MemoryConversationalStorage;
use brightstaff::state::StateStorage;
use brightstaff::state::supabase::SupabaseConversationalStorage;
use brightstaff::utils::tracing::init_tracer;
use bytes::Bytes;
use common::configuration::Configuration;
@ -104,9 +105,35 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let _flusher_handle = trace_collector.clone().start_background_flusher();
// Initialize conversation state storage for v1/responses
// TODO: Make this configurable (MEMORY vs SUPABASE) via arch_config.yaml
let state_storage: Arc<dyn StateStorage> = Arc::new(MemoryConversationalStorage::new());
info!("Initialized conversation state storage: Memory");
// Configurable via arch_config.yaml state_storage section
// If not configured, state management is disabled
// Environment variables are substituted by envsubst before config is read
let state_storage: Option<Arc<dyn StateStorage>> = if let Some(storage_config) = &arch_config.state_storage_v1_responses {
let storage: Arc<dyn StateStorage> = match storage_config.storage_type {
common::configuration::StateStorageType::Memory => {
info!("Initialized conversation state storage: Memory");
Arc::new(MemoryConversationalStorage::new())
}
common::configuration::StateStorageType::Postgres => {
let connection_string = storage_config
.connection_string
.as_ref()
.expect("connection_string is required for postgres state_storage");
debug!("Postgres connection string (full): {}", connection_string);
info!("Initializing conversation state storage: Postgres");
Arc::new(
SupabaseConversationalStorage::new(connection_string.clone())
.await
.expect("Failed to initialize Postgres state storage"),
)
}
};
Some(storage)
} else {
info!("No state_storage configured - conversation state management disabled");
None
};
loop {

View file

@ -41,6 +41,20 @@ pub struct Listener {
pub port: u16,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StateStorageConfig {
#[serde(rename = "type")]
pub storage_type: StateStorageType,
pub connection_string: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
pub enum StateStorageType {
Memory,
Postgres,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Configuration {
pub version: String,
@ -58,6 +72,7 @@ pub struct Configuration {
pub routing: Option<Routing>,
pub agents: Option<Vec<Agent>>,
pub listeners: Vec<Listener>,
pub state_storage_v1_responses: Option<StateStorageConfig>,
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]

View file

@ -92,3 +92,13 @@ model_aliases:
tracing:
random_sampling: 100
state_storage:
# Type: memory | postgres
type: postgres
# Connection string for postgres type
# Environment variables are supported using $VAR_NAME or ${VAR_NAME} syntax
# Variables MUST be set before running config validation/rendering
# Example with environment variable substitution:
connection_string: "postgresql://postgres.saueycoonskiktmozyvp:$DB_PASSWORD@aws-0-us-west-2.pooler.supabase.com:5432/postgres"

View file

@ -0,0 +1,32 @@
version: v0.1
listeners:
egress_traffic:
address: 0.0.0.0
port: 12000
message_format: openai
timeout: 30s
llm_providers:
# OpenAI Models
- model: openai/gpt-5-mini-2025-08-07
access_key: $OPENAI_API_KEY
default: true
# Anthropic Models
- model: anthropic/claude-sonnet-4-20250514
access_key: $ANTHROPIC_API_KEY
# State storage configuration for v1/responses API
# Manages conversation state for multi-turn conversations
state_storage:
# Type: memory | postgres
type: postgres
# Connection string for postgres type
# Environment variables are supported using $VAR_NAME or ${VAR_NAME} syntax
# Replace [USER] and [HOST] with your actual database credentials
# Variables like $DB_PASSWORD MUST be set before running config validation/rendering
# Example: Replace [USER] with 'myuser' and [HOST] with 'db.example.com:5432'
connection_string: "postgresql://[USER]:$DB_PASSWORD@[HOST]:5432/postgres"