plano/crates/brightstaff/src/state/mod.rs
Salman Paracha d5a273f740
enable state management for v1/responses (#631)
* first commit with tests to enable state mamangement via memory

* fixed logs to follow the conversational flow a bit better

* added support for supabase

* added the state_storage_v1_responses flag, and use that to store state appropriately

* cleaned up logs and fixed issue with connectivity for llm gateway in weather forecast demo

* fixed mixed inputs from openai v1/responses api (#632)

* fixed mixed inputs from openai v1/responses api

* removing tracing from model-alias-rouing

* handling additional input types from openairs

---------

Co-authored-by: Salman Paracha <salmanparacha@MacBook-Pro-342.local>

* resolving PR comments

---------

Co-authored-by: Salman Paracha <salmanparacha@MacBook-Pro-342.local>
2025-12-17 12:18:38 -08:00

147 lines
4.8 KiB
Rust

use async_trait::async_trait;
use hermesllm::apis::openai_responses::{InputItem, InputMessage, InputContent, MessageContent, MessageRole, InputParam};
use serde::{Deserialize, Serialize};
use std::error::Error;
use std::fmt;
use std::sync::Arc;
use tracing::{debug};
pub mod memory;
pub mod response_state_processor;
pub mod postgresql;
/// Represents the conversational state for a v1/responses request
/// Contains the complete input/output history that can be restored
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct OpenAIConversationState {
/// The response ID this state is associated with
pub response_id: String,
/// The complete input history (original input + accumulated outputs)
/// This is what gets prepended to new requests via previous_response_id
pub input_items: Vec<InputItem>,
/// Timestamp when this state was created
pub created_at: i64,
/// Model used for this response
pub model: String,
/// Provider that generated this response (e.g., "anthropic", "openai")
pub provider: String,
}
/// Error types for state storage operations
#[derive(Debug)]
pub enum StateStorageError {
/// State not found for given response_id
NotFound(String),
/// Storage backend error (network, database, etc.)
StorageError(String),
/// Serialization/deserialization error
SerializationError(String),
}
impl fmt::Display for StateStorageError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
StateStorageError::NotFound(id) => write!(f, "Conversation state not found for response_id: {}", id),
StateStorageError::StorageError(msg) => write!(f, "Storage error: {}", msg),
StateStorageError::SerializationError(msg) => write!(f, "Serialization error: {}", msg),
}
}
}
impl Error for StateStorageError {}
/// Trait for conversation state storage backends
#[async_trait]
pub trait StateStorage: Send + Sync {
/// Store conversation state for a response
async fn put(&self, state: OpenAIConversationState) -> Result<(), StateStorageError>;
/// Retrieve conversation state by response_id
async fn get(&self, response_id: &str) -> Result<OpenAIConversationState, StateStorageError>;
/// Check if state exists for a response_id
async fn exists(&self, response_id: &str) -> Result<bool, StateStorageError>;
/// Delete state for a response_id (optional, for cleanup)
async fn delete(&self, response_id: &str) -> Result<(), StateStorageError>;
fn merge(
&self,
prev_state: &OpenAIConversationState,
current_input: Vec<InputItem>,
) -> Vec<InputItem> {
// Default implementation: prepend previous input, append current
let prev_count = prev_state.input_items.len();
let current_count = current_input.len();
let mut combined_input = prev_state.input_items.clone();
combined_input.extend(current_input);
debug!(
"PLANO | BRIGHTSTAFF | STATE_STORAGE | RESP_ID:{} | Merged state: prev_items={}, current_items={}, total_items={}, combined_json={}",
prev_state.response_id,
prev_count,
current_count,
combined_input.len(),
serde_json::to_string(&combined_input).unwrap_or_else(|_| "serialization_error".to_string())
);
combined_input
}
}
/// Storage backend type enum
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum StorageBackend {
Memory,
Supabase,
}
impl StorageBackend {
pub fn from_str(s: &str) -> Option<Self> {
match s.to_lowercase().as_str() {
"memory" => Some(StorageBackend::Memory),
"supabase" => Some(StorageBackend::Supabase),
_ => None,
}
}
}
// === Utility functions for state management ===
/// Extract input items from InputParam, converting text to structured format
pub fn extract_input_items(input: &InputParam) -> Vec<InputItem> {
match input {
InputParam::Text(text) => {
vec![InputItem::Message(InputMessage {
role: MessageRole::User,
content: MessageContent::Items(vec![InputContent::InputText {
text: text.clone(),
}]),
})]
}
InputParam::Items(items) => items.clone(),
}
}
/// Retrieve previous conversation state and combine with current input
/// Returns combined input if previous state found, or original input if not found/error
pub async fn retrieve_and_combine_input(
storage: Arc<dyn StateStorage>,
previous_response_id: &str,
current_input: Vec<InputItem>,
) -> Result<Vec<InputItem>, StateStorageError> {
// First get the previous state
let prev_state = storage.get(previous_response_id).await?;
let combined_input = storage.merge(&prev_state, current_input);
Ok(combined_input)
}