From 142aaabc10bee77d41ff12a0eb56305877f3d820 Mon Sep 17 00:00:00 2001 From: Salman Paracha Date: Mon, 15 Dec 2025 13:53:29 -0800 Subject: [PATCH] added support for supabase --- crates/Cargo.lock | 178 ++++++++- crates/brightstaff/Cargo.toml | 1 + crates/brightstaff/src/state/supabase.rs | 477 ++++++++++++++++------- docs/db_setup/README.md | 109 ++++++ docs/db_setup/conversation_states.sql | 31 ++ 5 files changed, 660 insertions(+), 136 deletions(-) create mode 100644 docs/db_setup/README.md create mode 100644 docs/db_setup/conversation_states.sql diff --git a/crates/Cargo.lock b/crates/Cargo.lock index 69e76fe3..01b15dc5 100644 --- a/crates/Cargo.lock +++ b/crates/Cargo.lock @@ -338,6 +338,7 @@ dependencies = [ "thiserror 2.0.12", "time", "tokio", + "tokio-postgres", "tokio-stream", "tracing", "tracing-opentelemetry", @@ -362,6 +363,12 @@ version = "3.18.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "793db76d6187cd04dff33004d8e6c9cc4e05cd330500379d2394209271b4aeee" +[[package]] +name = "byteorder" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" + [[package]] name = "bytes" version = "1.10.1" @@ -606,6 +613,7 @@ checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292" dependencies = [ "block-buffer", "crypto-common", + "subtle", ] [[package]] @@ -693,6 +701,12 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "fallible-iterator" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4443176a9f2c162692bd3d352d745ef9413eec5782a80d8fd6f8a1ac692a07f7" + [[package]] name = "fancy-regex" version = "0.12.0" @@ -998,6 +1012,15 @@ version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" +[[package]] +name = "hmac" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c49c37c09c17a53d937dfbb742eb3a961d65a994e6bcdcf37e7399d0cc8ab5e" +dependencies = [ + "digest", +] + [[package]] name = "http" version = "0.2.12" @@ -1432,6 +1455,17 @@ version = "0.2.172" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d750af042f7ef4f724306de029d18836c26c1765a54a6a3f094cbd23a7267ffa" +[[package]] +name = "libredox" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "416f7e718bdb06000964960ffa43b4335ad4012ae8b99060261aa4a8088d5ccb" +dependencies = [ + "bitflags", + "libc", + "redox_syscall", +] + [[package]] name = "linux-raw-sys" version = "0.9.4" @@ -1504,6 +1538,16 @@ version = "0.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94" +[[package]] +name = "md-5" +version = "0.10.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d89e7ee0cfbedfc4da3340218492196241d89eefb6dab27de5df917a6d2e78cf" +dependencies = [ + "cfg-if", + "digest", +] + [[package]] name = "md5" version = "0.7.0" @@ -1849,6 +1893,24 @@ version = "2.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e" +[[package]] +name = "phf" +version = "0.11.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fd6780a80ae0c52cc120a26a1a42c1ae51b247a253e4e06113d23d2c2edd078" +dependencies = [ + "phf_shared", +] + +[[package]] +name = "phf_shared" +version = "0.11.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "67eabc2ef2a60eb7faa00097bd1ffdb5bd28e62bf39990626a582201b7a754e5" +dependencies = [ + "siphasher", +] + [[package]] name = "pin-project" version = "1.1.10" @@ -1893,6 +1955,37 @@ version = "1.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "350e9b48cbc6b0e028b0473b114454c6316e57336ee184ceab6e53f72c178b3e" +[[package]] +name = "postgres-protocol" +version = "0.6.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fbef655056b916eb868048276cfd5d6a7dea4f81560dfd047f97c8c6fe3fcfd4" +dependencies = [ + "base64 0.22.1", + "byteorder", + "bytes", + "fallible-iterator", + "hmac", + "md-5", + "memchr", + "rand 0.9.2", + "sha2", + "stringprep", +] + +[[package]] +name = "postgres-types" +version = "0.2.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "613283563cd90e1dfc3518d548caee47e0e725455ed619881f5cf21f36de4b48" +dependencies = [ + "bytes", + "fallible-iterator", + "postgres-protocol", + "serde", + "serde_json", +] + [[package]] name = "potential_utf" version = "0.1.2" @@ -2122,9 +2215,9 @@ dependencies = [ [[package]] name = "redox_syscall" -version = "0.5.12" +version = "0.5.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "928fca9cf2aa042393a8325b9ead81d2f0df4cb12e1e24cef072922ccd99c5af" +checksum = "ed2bf2547551a7053d6fdfafda3f938979645c44812fbfcda098faae3f1a362d" dependencies = [ "bitflags", ] @@ -2675,6 +2768,12 @@ version = "2.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bbbb5d9659141646ae647b42fe094daf6c6192d1620870b449d9557f748b2daa" +[[package]] +name = "siphasher" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "56199f7ddabf13fe5074ce809e7d3f42b42ae711800501b5b16ea82ad029c39d" + [[package]] name = "slab" version = "0.4.9" @@ -2715,6 +2814,17 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3" +[[package]] +name = "stringprep" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b4df3d392d81bd458a8a621b8bffbd2302a12ffe288a9d931670948749463b1" +dependencies = [ + "unicode-bidi", + "unicode-normalization", + "unicode-properties", +] + [[package]] name = "strsim" version = "0.11.1" @@ -2973,6 +3083,32 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-postgres" +version = "0.7.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c95d533c83082bb6490e0189acaa0bbeef9084e60471b696ca6988cd0541fb0" +dependencies = [ + "async-trait", + "byteorder", + "bytes", + "fallible-iterator", + "futures-channel", + "futures-util", + "log", + "parking_lot", + "percent-encoding", + "phf", + "pin-project-lite", + "postgres-protocol", + "postgres-types", + "rand 0.9.2", + "socket2", + "tokio", + "tokio-util", + "whoami", +] + [[package]] name = "tokio-rustls" version = "0.24.1" @@ -3208,12 +3344,33 @@ version = "2.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "75b844d17643ee918803943289730bec8aac480150456169e647ed0b576ba539" +[[package]] +name = "unicode-bidi" +version = "0.3.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c1cb5db39152898a79168971543b1cb5020dff7fe43c8dc468b0885f5e29df5" + [[package]] name = "unicode-ident" version = "1.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5a5f39404a5da50712a4c1eecf25e90dd62b613502b7e925fd4e4d19b5c96512" +[[package]] +name = "unicode-normalization" +version = "0.1.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5fd4f6878c9cb28d874b009da9e8d183b5abc80117c40bbd187a1fde336be6e8" +dependencies = [ + "tinyvec", +] + +[[package]] +name = "unicode-properties" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7df058c713841ad818f1dc5d3fd88063241cc61f49f5fbea4b951e8cf5a8d71d" + [[package]] name = "unsafe-libyaml" version = "0.2.11" @@ -3309,6 +3466,12 @@ dependencies = [ "wit-bindgen-rt", ] +[[package]] +name = "wasite" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8dad83b4f25e74f184f64c43b150b91efe7647395b42289f38e50566d82855b" + [[package]] name = "wasm-bindgen" version = "0.2.100" @@ -3413,6 +3576,17 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "whoami" +version = "1.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5d4a4db5077702ca3015d3d02d74974948aba2ad9e12ab7df718ee64ccd7e97d" +dependencies = [ + "libredox", + "wasite", + "web-sys", +] + [[package]] name = "winapi" version = "0.3.9" diff --git a/crates/brightstaff/Cargo.toml b/crates/brightstaff/Cargo.toml index 6d5012a7..233a4da3 100644 --- a/crates/brightstaff/Cargo.toml +++ b/crates/brightstaff/Cargo.toml @@ -33,6 +33,7 @@ serde_with = "3.13.0" serde_yaml = "0.9.34" thiserror = "2.0.12" tokio = { version = "1.44.2", features = ["full"] } +tokio-postgres = { version = "0.7", features = ["with-serde_json-1"] } tokio-stream = "0.1" time = { version = "0.3", features = ["formatting", "macros"] } tracing = "0.1" diff --git a/crates/brightstaff/src/state/supabase.rs b/crates/brightstaff/src/state/supabase.rs index 71c032ad..990b0a77 100644 --- a/crates/brightstaff/src/state/supabase.rs +++ b/crates/brightstaff/src/state/supabase.rs @@ -1,188 +1,387 @@ use super::{OpenAIConversationState, StateStorage, StateStorageError}; use async_trait::async_trait; -use tracing::{debug, warn}; +use serde_json; +use std::sync::Arc; +use tokio::sync::OnceCell; +use tokio_postgres::{Client, NoTls}; +use tracing::{debug, info, warn}; /// Supabase/PostgreSQL storage backend for conversation state -/// This is a placeholder implementation that can be extended with actual PostgreSQL logic #[derive(Clone)] pub struct SupabaseConversationalStorage { - // Connection pool or client would go here - // e.g., sqlx::PgPool or tokio_postgres::Client - _connection_string: String, + client: Arc, + table_verified: Arc>, } impl SupabaseConversationalStorage { - pub fn new(connection_string: String) -> Self { - Self { - _connection_string: connection_string, - } + /// Creates a new Supabase storage instance with the given connection string + pub async fn new(connection_string: String) -> Result { + let (client, connection) = tokio_postgres::connect(&connection_string, NoTls) + .await + .map_err(|e| { + StateStorageError::StorageError(format!("Failed to connect to database: {}", e)) + })?; + + // Spawn the connection to run in the background + tokio::spawn(async move { + if let Err(e) = connection.await { + warn!("Database connection error: {}", e); + } + }); + + Ok(Self { + client: Arc::new(client), + table_verified: Arc::new(OnceCell::new()), + }) + } + + /// Ensures the conversation_states table exists (checks once, caches result) + async fn ensure_ready(&self) -> Result<(), StateStorageError> { + self.table_verified + .get_or_try_init(|| async { + let row = self + .client + .query_one( + "SELECT EXISTS ( + SELECT FROM pg_tables + WHERE tablename = 'conversation_states' + )", + &[], + ) + .await + .map_err(|e| { + StateStorageError::StorageError(format!( + "Failed to verify table existence: {}", + e + )) + })?; + + let exists: bool = row.get(0); + + if !exists { + return Err(StateStorageError::StorageError( + "Table 'conversation_states' does not exist. \ + Please run the setup SQL from docs/db_setup/conversation_states.sql" + .to_string(), + )); + } + + info!("Conversation state storage table verified"); + Ok(()) + }) + .await?; + + Ok(()) } } #[async_trait] impl StateStorage for SupabaseConversationalStorage { async fn put(&self, state: OpenAIConversationState) -> Result<(), StateStorageError> { - warn!( - "Supabase storage not yet implemented - would store response_id: {}", - state.response_id - ); + self.ensure_ready().await?; - // TODO: Implement PostgreSQL storage - // SQL: INSERT INTO conversation_states (response_id, input_items, created_at, model, provider) - // VALUES ($1, $2, $3, $4, $5) - // ON CONFLICT (response_id) DO UPDATE SET ... + // Serialize input_items to JSONB + let input_items_json = serde_json::to_value(&state.input_items).map_err(|e| { + StateStorageError::StorageError(format!("Failed to serialize input_items: {}", e)) + })?; - Err(StateStorageError::StorageError( - "Supabase storage not yet implemented".to_string(), - )) + // Upsert the conversation state + self.client + .execute( + r#" + INSERT INTO conversation_states + (response_id, input_items, created_at, model, provider, updated_at) + VALUES ($1, $2, $3, $4, $5, NOW()) + ON CONFLICT (response_id) + DO UPDATE SET + input_items = EXCLUDED.input_items, + model = EXCLUDED.model, + provider = EXCLUDED.provider, + updated_at = NOW() + "#, + &[ + &state.response_id, + &input_items_json, + &state.created_at, + &state.model, + &state.provider, + ], + ) + .await + .map_err(|e| { + StateStorageError::StorageError(format!( + "Failed to store conversation state for {}: {}", + state.response_id, e + )) + })?; + + debug!("Stored conversation state for {}", state.response_id); + Ok(()) } async fn get(&self, response_id: &str) -> Result { - warn!( - "Supabase storage not yet implemented - would retrieve response_id: {}", - response_id - ); + self.ensure_ready().await?; - // TODO: Implement PostgreSQL retrieval - // SQL: SELECT * FROM conversation_states WHERE response_id = $1 + let row = self + .client + .query_opt( + r#" + SELECT response_id, input_items, created_at, model, provider + FROM conversation_states + WHERE response_id = $1 + "#, + &[&response_id], + ) + .await + .map_err(|e| { + StateStorageError::StorageError(format!( + "Failed to fetch conversation state for {}: {}", + response_id, e + )) + })?; - Err(StateStorageError::StorageError( - "Supabase storage not yet implemented".to_string(), - )) + match row { + Some(row) => { + let response_id: String = row.get("response_id"); + let input_items_json: serde_json::Value = row.get("input_items"); + let created_at: i64 = row.get("created_at"); + let model: String = row.get("model"); + let provider: String = row.get("provider"); + + // Deserialize input_items from JSONB + let input_items = + serde_json::from_value(input_items_json).map_err(|e| { + StateStorageError::StorageError(format!( + "Failed to deserialize input_items: {}", + e + )) + })?; + + Ok(OpenAIConversationState { + response_id, + input_items, + created_at, + model, + provider, + }) + } + None => Err(StateStorageError::NotFound(format!( + "Conversation state not found for response_id: {}", + response_id + ))), + } } async fn exists(&self, response_id: &str) -> Result { - debug!("Checking existence for response_id: {}", response_id); + self.ensure_ready().await?; - // TODO: Implement PostgreSQL existence check - // SQL: SELECT EXISTS(SELECT 1 FROM conversation_states WHERE response_id = $1) + let row = self + .client + .query_one( + "SELECT EXISTS(SELECT 1 FROM conversation_states WHERE response_id = $1)", + &[&response_id], + ) + .await + .map_err(|e| { + StateStorageError::StorageError(format!( + "Failed to check existence for {}: {}", + response_id, e + )) + })?; - Err(StateStorageError::StorageError( - "Supabase storage not yet implemented".to_string(), - )) + let exists: bool = row.get(0); + Ok(exists) } async fn delete(&self, response_id: &str) -> Result<(), StateStorageError> { - debug!("Deleting response_id: {}", response_id); + self.ensure_ready().await?; - // TODO: Implement PostgreSQL deletion - // SQL: DELETE FROM conversation_states WHERE response_id = $1 + let rows_affected = self + .client + .execute( + "DELETE FROM conversation_states WHERE response_id = $1", + &[&response_id], + ) + .await + .map_err(|e| { + StateStorageError::StorageError(format!( + "Failed to delete conversation state for {}: {}", + response_id, e + )) + })?; - Err(StateStorageError::StorageError( - "Supabase storage not yet implemented".to_string(), - )) + if rows_affected == 0 { + return Err(StateStorageError::NotFound(format!( + "Conversation state not found for response_id: {}", + response_id + ))); + } + + debug!("Deleted conversation state for {}", response_id); + Ok(()) } } /* -Suggested PostgreSQL schema: - -CREATE TABLE conversation_states ( - response_id TEXT PRIMARY KEY, - input_items JSONB NOT NULL, - created_at BIGINT NOT NULL, - model TEXT NOT NULL, - provider TEXT NOT NULL, - updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP -); - -CREATE INDEX idx_conversation_states_created_at ON conversation_states(created_at); -CREATE INDEX idx_conversation_states_provider ON conversation_states(provider); +PostgreSQL schema is maintained in docs/db_setup/conversation_states.sql +Run that SQL file against your database before using this storage backend. */ #[cfg(test)] mod tests { use super::*; - use hermesllm::apis::openai_responses::{InputItem, InputMessage, MessageRole, InputContent}; + use hermesllm::apis::openai_responses::{InputContent, InputItem, InputMessage, MessageRole}; fn create_test_state(response_id: &str) -> OpenAIConversationState { OpenAIConversationState { response_id: response_id.to_string(), - input_items: vec![ - InputItem::Message(InputMessage { - role: MessageRole::User, - content: vec![InputContent::InputText { - text: "Test message".to_string(), - }], - }), - ], + input_items: vec![InputItem::Message(InputMessage { + role: MessageRole::User, + content: vec![InputContent::InputText { + text: "Test message".to_string(), + }], + })], created_at: 1234567890, model: "gpt-4".to_string(), provider: "openai".to_string(), } } - // These tests validate the current "not implemented" behavior - // Once the Supabase implementation is complete with actual PostgreSQL integration, - // these should be replaced with comprehensive tests similar to memory.rs + // Note: These tests require a running PostgreSQL database + // Set TEST_DATABASE_URL environment variable to run integration tests + // Example: TEST_DATABASE_URL=postgresql://user:pass@localhost/test_db - #[tokio::test] - async fn test_supabase_put_returns_not_implemented() { - let storage = SupabaseConversationalStorage::new("mock_connection_string".to_string()); - let state = create_test_state("resp_001"); - - let result = storage.put(state).await; - assert!(result.is_err()); - - match result.unwrap_err() { - StateStorageError::StorageError(msg) => { - assert!(msg.contains("not yet implemented")); + async fn get_test_storage() -> Option { + if let Ok(db_url) = std::env::var("TEST_DATABASE_URL") { + match SupabaseConversationalStorage::new(db_url).await { + Ok(storage) => Some(storage), + Err(e) => { + eprintln!("Failed to create test storage: {}", e); + None + } } - _ => panic!("Expected StorageError"), + } else { + eprintln!("TEST_DATABASE_URL not set, skipping Supabase integration tests"); + None } } #[tokio::test] - async fn test_supabase_get_returns_not_implemented() { - let storage = SupabaseConversationalStorage::new("mock_connection_string".to_string()); + async fn test_supabase_put_and_get_success() { + let Some(storage) = get_test_storage().await else { + return; + }; - let result = storage.get("resp_002").await; - assert!(result.is_err()); + let state = create_test_state("test_resp_001"); + storage.put(state.clone()).await.unwrap(); - match result.unwrap_err() { - StateStorageError::StorageError(msg) => { - assert!(msg.contains("not yet implemented")); - } - _ => panic!("Expected StorageError"), - } + let retrieved = storage.get("test_resp_001").await.unwrap(); + assert_eq!(retrieved.response_id, "test_resp_001"); + assert_eq!(retrieved.input_items.len(), 1); + assert_eq!(retrieved.model, "gpt-4"); + assert_eq!(retrieved.provider, "openai"); + + // Cleanup + let _ = storage.delete("test_resp_001").await; } #[tokio::test] - async fn test_supabase_exists_returns_not_implemented() { - let storage = SupabaseConversationalStorage::new("mock_connection_string".to_string()); + async fn test_supabase_put_overwrites_existing() { + let Some(storage) = get_test_storage().await else { + return; + }; - let result = storage.exists("resp_003").await; - assert!(result.is_err()); + let state1 = create_test_state("test_resp_002"); + storage.put(state1).await.unwrap(); - match result.unwrap_err() { - StateStorageError::StorageError(msg) => { - assert!(msg.contains("not yet implemented")); - } - _ => panic!("Expected StorageError"), - } + let mut state2 = create_test_state("test_resp_002"); + state2.model = "gpt-4-turbo".to_string(); + state2.input_items.push(InputItem::Message(InputMessage { + role: MessageRole::Assistant, + content: vec![InputContent::InputText { + text: "Response".to_string(), + }], + })); + storage.put(state2).await.unwrap(); + + let retrieved = storage.get("test_resp_002").await.unwrap(); + assert_eq!(retrieved.model, "gpt-4-turbo"); + assert_eq!(retrieved.input_items.len(), 2); + + // Cleanup + let _ = storage.delete("test_resp_002").await; } #[tokio::test] - async fn test_supabase_delete_returns_not_implemented() { - let storage = SupabaseConversationalStorage::new("mock_connection_string".to_string()); + async fn test_supabase_get_not_found() { + let Some(storage) = get_test_storage().await else { + return; + }; - let result = storage.delete("resp_004").await; + let result = storage.get("nonexistent_id").await; assert!(result.is_err()); + assert!(matches!(result.unwrap_err(), StateStorageError::NotFound(_))); + } - match result.unwrap_err() { - StateStorageError::StorageError(msg) => { - assert!(msg.contains("not yet implemented")); - } - _ => panic!("Expected StorageError"), - } + #[tokio::test] + async fn test_supabase_exists_returns_false() { + let Some(storage) = get_test_storage().await else { + return; + }; + + let exists = storage.exists("nonexistent_id").await.unwrap(); + assert!(!exists); + } + + #[tokio::test] + async fn test_supabase_exists_returns_true_after_put() { + let Some(storage) = get_test_storage().await else { + return; + }; + + let state = create_test_state("test_resp_003"); + storage.put(state).await.unwrap(); + + let exists = storage.exists("test_resp_003").await.unwrap(); + assert!(exists); + + // Cleanup + let _ = storage.delete("test_resp_003").await; + } + + #[tokio::test] + async fn test_supabase_delete_success() { + let Some(storage) = get_test_storage().await else { + return; + }; + + let state = create_test_state("test_resp_004"); + storage.put(state).await.unwrap(); + + storage.delete("test_resp_004").await.unwrap(); + + let exists = storage.exists("test_resp_004").await.unwrap(); + assert!(!exists); + } + + #[tokio::test] + async fn test_supabase_delete_not_found() { + let Some(storage) = get_test_storage().await else { + return; + }; + + let result = storage.delete("nonexistent_id").await; + assert!(result.is_err()); + assert!(matches!(result.unwrap_err(), StateStorageError::NotFound(_))); } #[tokio::test] async fn test_supabase_merge_works() { - // merge() is implemented in the trait default, so it should work even without DB - let storage = SupabaseConversationalStorage::new("mock_connection_string".to_string()); + let Some(storage) = get_test_storage().await else { + return; + }; - let prev_state = create_test_state("resp_005"); + let prev_state = create_test_state("test_resp_005"); let current_input = vec![InputItem::Message(InputMessage { role: MessageRole::User, content: vec![InputContent::InputText { @@ -196,28 +395,38 @@ mod tests { assert_eq!(merged.len(), 2); } - /* TODO: Add comprehensive tests when SupabaseConversationalStorage is implemented - * - * Once the actual PostgreSQL integration is complete, add tests similar to those - * in memory.rs, including: - * - * - test_supabase_put_and_get_success: Store and retrieve state - * - test_supabase_put_overwrites_existing: Verify upsert behavior - * - test_supabase_get_not_found: Check NotFound error handling - * - test_supabase_exists_returns_false: Test non-existent ID - * - test_supabase_exists_returns_true_after_put: Verify existence after insert - * - test_supabase_delete_success: Delete and verify removal - * - test_supabase_delete_not_found: Delete non-existent ID - * - test_supabase_merge_various_scenarios: Test merge with different input combinations - * - test_supabase_concurrent_access: Test with multiple concurrent operations - * - test_supabase_serialization: Verify JSON serialization of input_items - * - test_supabase_connection_failure: Handle connection errors - * - test_supabase_invalid_data: Handle malformed JSON in database - * - * Test setup would require: - * - Test database setup/teardown (perhaps using testcontainers-rs or docker) - * - Connection pool initialization - * - Table creation before tests - * - Data cleanup between tests - */ + #[tokio::test] + async fn test_supabase_table_verification() { + let Some(storage) = get_test_storage().await else { + return; + }; + + // This should trigger table verification + let result = storage.ensure_ready().await; + assert!(result.is_ok(), "Table verification should succeed"); + + // Second call should use cached result + let result2 = storage.ensure_ready().await; + assert!(result2.is_ok(), "Cached verification should succeed"); + } + + #[tokio::test] + #[ignore] // Run manually with: cargo test test_verify_data_in_supabase -- --ignored + async fn test_verify_data_in_supabase() { + let Some(storage) = get_test_storage().await else { + return; + }; + + // Create a test record that persists + let state = create_test_state("manual_test_verification"); + storage.put(state).await.unwrap(); + + println!("✅ Data written to Supabase!"); + println!("Check your Supabase dashboard:"); + println!(" SELECT * FROM conversation_states WHERE response_id = 'manual_test_verification';"); + println!("\nTo cleanup, run:"); + println!(" DELETE FROM conversation_states WHERE response_id = 'manual_test_verification';"); + + // DON'T cleanup - leave it for manual verification + } } diff --git a/docs/db_setup/README.md b/docs/db_setup/README.md new file mode 100644 index 00000000..34aff973 --- /dev/null +++ b/docs/db_setup/README.md @@ -0,0 +1,109 @@ +# Database Setup for Conversation State Storage + +This directory contains SQL scripts needed to set up database tables for storing conversation state when using the OpenAI Responses API. + +## Prerequisites + +- PostgreSQL database (Supabase or self-hosted) +- Database connection credentials +- `psql` CLI tool or database admin access + +## Setup Instructions + +### Option 1: Using psql + +```bash +psql $DATABASE_URL -f docs/db_setup/conversation_states.sql +``` + +### Option 2: Using Supabase Dashboard + +1. Log in to your Supabase project dashboard +2. Navigate to the SQL Editor +3. Copy and paste the contents of `conversation_states.sql` +4. Run the query + +### Option 3: Direct Database Connection + +Connect to your PostgreSQL database using your preferred client and execute the SQL from `conversation_states.sql`. + +## Verification + +After running the setup, verify the table was created: + +```sql +SELECT tablename FROM pg_tables WHERE tablename = 'conversation_states'; +``` + +You should see `conversation_states` in the results. + +## Configuration + +After setting up the database table, configure your application to use Supabase storage by setting the appropriate environment variable or configuration parameter with your database connection string. + +### Supabase Connection String + +**Important:** Supabase requires different connection strings depending on your network: + +- **IPv4 Networks (Most Common)**: Use the **Session Pooler** connection string (port 5432): + ``` + postgresql://postgres.[PROJECT-REF]:[PASSWORD]@aws-0-[REGION].pooler.supabase.com:5432/postgres + ``` + +- **IPv6 Networks**: Use the direct connection (port 5432): + ``` + postgresql://postgres:[PASSWORD]@db.[PROJECT-REF].supabase.co:5432/postgres + ``` + +**How to get your connection string:** +1. Go to your Supabase project dashboard +2. Settings → Database → Connection Pooling +3. Copy the **Session mode** connection string +4. Replace `[YOUR-PASSWORD]` with your actual database password +5. URL-encode special characters in the password (e.g., `#` becomes `%23`) + +**Example:** +```bash +# If your password is "MyPass#123", encode it as "MyPass%23123" +export DATABASE_URL="postgresql://postgres.myproject:MyPass%23123@aws-0-us-west-2.pooler.supabase.com:5432/postgres" +``` + +### Testing the Connection + +To test your connection string works: +```bash +export TEST_DATABASE_URL="your-connection-string-here" +cd crates/brightstaff +cargo test supabase -- --nocapture +``` + +## Table Schema + +The `conversation_states` table stores: +- `response_id` (TEXT, PRIMARY KEY): Unique identifier for each conversation +- `input_items` (JSONB): Array of conversation messages and context +- `created_at` (BIGINT): Unix timestamp when conversation started +- `model` (TEXT): Model name used for the conversation +- `provider` (TEXT): LLM provider name +- `updated_at` (TIMESTAMP): Last update time (auto-managed) + +## Maintenance + +### Cleanup Old Conversations + +To prevent unbounded growth, consider periodically cleaning up old conversation states: + +```sql +-- Delete conversations older than 7 days +DELETE FROM conversation_states +WHERE updated_at < NOW() - INTERVAL '7 days'; +``` + +You can automate this with a cron job or database trigger. + +## Troubleshooting + +If you encounter errors on first use: +- **"Table 'conversation_states' does not exist"**: Run the setup SQL +- **Connection errors**: Verify your DATABASE_URL is correct +- **Permission errors**: Ensure your database user has CREATE TABLE privileges diff --git a/docs/db_setup/conversation_states.sql b/docs/db_setup/conversation_states.sql new file mode 100644 index 00000000..26272423 --- /dev/null +++ b/docs/db_setup/conversation_states.sql @@ -0,0 +1,31 @@ +-- Conversation State Storage Table +-- This table stores conversational context for the OpenAI Responses API +-- Run this SQL against your PostgreSQL/Supabase database before enabling conversation state storage + +CREATE TABLE IF NOT EXISTS conversation_states ( + response_id TEXT PRIMARY KEY, + input_items JSONB NOT NULL, + created_at BIGINT NOT NULL, + model TEXT NOT NULL, + provider TEXT NOT NULL, + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP +); + +-- Indexes for common query patterns +CREATE INDEX IF NOT EXISTS idx_conversation_states_created_at + ON conversation_states(created_at); + +CREATE INDEX IF NOT EXISTS idx_conversation_states_provider + ON conversation_states(provider); + +-- Optional: Add a policy for automatic cleanup of old conversations +-- Uncomment and adjust the retention period as needed +-- CREATE INDEX IF NOT EXISTS idx_conversation_states_updated_at +-- ON conversation_states(updated_at); + +COMMENT ON TABLE conversation_states IS 'Stores conversation history for OpenAI Responses API continuity'; +COMMENT ON COLUMN conversation_states.response_id IS 'Unique identifier for the conversation state'; +COMMENT ON COLUMN conversation_states.input_items IS 'JSONB array of conversation messages and context'; +COMMENT ON COLUMN conversation_states.created_at IS 'Unix timestamp (seconds) when the conversation started'; +COMMENT ON COLUMN conversation_states.model IS 'Model name used for this conversation'; +COMMENT ON COLUMN conversation_states.provider IS 'LLM provider (e.g., openai, anthropic, bedrock)';