diff --git a/crates/common/src/retry/orchestrator.rs b/crates/common/src/retry/orchestrator.rs new file mode 100644 index 00000000..d9348623 --- /dev/null +++ b/crates/common/src/retry/orchestrator.rs @@ -0,0 +1,2776 @@ +use std::future::Future; +use std::sync::Arc; +use std::time::Instant; + +use bytes::Bytes; +use log::{debug, info, warn}; + +use crate::configuration::{ + extract_provider, ApplyTo, BlockScope, HighLatencyConfig, LatencyMeasure, LlmProvider, + RetryPolicy, RetryStrategy, +}; + +use super::backoff::BackoffCalculator; +use super::error_detector::{ErrorClassification, ErrorDetector, HttpResponse, TimeoutError}; +use super::latency_block_state::LatencyBlockStateManager; +use super::latency_trigger::LatencyTriggerCounter; +use super::provider_selector::{ProviderSelectionResult, ProviderSelector}; +use super::retry_after_state::RetryAfterStateManager; +use super::{ + AllProvidersExhaustedError, AttemptError, AttemptErrorType, RequestContext, RequestSignature, + RetryExhaustedError, RetryGate, +}; + +// ── RetryOrchestrator ────────────────────────────────────────────────────── + +/// Central coordinator for the retry loop. +/// +/// Handles both the initial request attempt AND subsequent retries. +/// The primary model's `retry_policy` governs the entire retry sequence, +/// including when retrying to fallback models. +pub struct RetryOrchestrator { + pub retry_after_state: Arc, + pub latency_block_state: Arc, + pub latency_trigger_counter: Arc, + pub retry_gate: Arc, +} + +impl RetryOrchestrator { + /// Create a new RetryOrchestrator with the given state managers and gate. + pub fn new( + retry_after_state: Arc, + latency_block_state: Arc, + latency_trigger_counter: Arc, + retry_gate: Arc, + ) -> Self { + Self { + retry_after_state, + latency_block_state, + latency_trigger_counter, + retry_gate, + } + } + + /// Create a RetryOrchestrator with default no-op/empty implementations (P0). + pub fn new_default() -> Self { + Self { + retry_after_state: Arc::new(RetryAfterStateManager::new()), + latency_block_state: Arc::new(LatencyBlockStateManager::new()), + latency_trigger_counter: Arc::new(LatencyTriggerCounter::new()), + retry_gate: Arc::new(RetryGate::default()), + } + } + + /// Execute a request with retry logic. + /// + /// Called from the LLM handler after model alias resolution. + /// Makes the initial request attempt and handles retries on failure. + /// + /// The `forward_request` callback sends a request to an upstream provider + /// without coupling the orchestrator to the HTTP client. + pub async fn execute( + &self, + body: &Bytes, + _request_signature: &RequestSignature, + primary_provider: &LlmProvider, + retry_policy: &RetryPolicy, + all_providers: &[LlmProvider], + request_context: &mut RequestContext, + forward_request: F, + ) -> Result + where + F: Fn(&Bytes, &LlmProvider) -> Fut + Send + Sync, + Fut: Future> + Send, + { + let error_detector = ErrorDetector; + let backoff_calculator = BackoffCalculator; + let provider_selector = ProviderSelector; + + // Acquire RetryGate permit; if unavailable, make a single attempt (fail-open). + let _permit = match self.retry_gate.try_acquire() { + Some(permit) => Some(permit), + None => { + warn!( + "RetryGate permit unavailable for request_id={}; proceeding without retry (fail-open)", + request_context.request_id + ); + // Make a single attempt with the primary provider, no retries. + let request_start = Instant::now(); + let result = forward_request(body, primary_provider).await; + let elapsed_ttfb_ms = request_start.elapsed().as_millis() as u64; + let elapsed_total_ms = elapsed_ttfb_ms; // Same for now; refined when streaming support is added + let classification = error_detector.classify( + result, + retry_policy, + elapsed_ttfb_ms, + elapsed_total_ms, + ); + return match classification { + ErrorClassification::Success(response) => Ok(response), + ErrorClassification::NonRetriableError(response) => Ok(response), + ErrorClassification::RetriableError { + status_code, + response_body, + .. + } => { + let model_id = primary_provider + .model + .as_deref() + .unwrap_or(&primary_provider.name) + .to_string(); + Err(RetryExhaustedError { + attempts: vec![AttemptError { + model_id, + error_type: AttemptErrorType::HttpError { + status_code, + body: response_body, + }, + attempt_number: 1, + }], + max_retry_after_seconds: None, + shortest_remaining_block_seconds: None, + retry_budget_exhausted: false, + }) + } + ErrorClassification::TimeoutError { duration_ms } => { + let model_id = primary_provider + .model + .as_deref() + .unwrap_or(&primary_provider.name) + .to_string(); + Err(RetryExhaustedError { + attempts: vec![AttemptError { + model_id, + error_type: AttemptErrorType::Timeout { duration_ms }, + attempt_number: 1, + }], + max_retry_after_seconds: None, + shortest_remaining_block_seconds: None, + retry_budget_exhausted: false, + }) + } + ErrorClassification::HighLatencyEvent { + measured_ms, + threshold_ms, + response, + .. + } => { + // If response completed, return it (fail-open). + if let Some(resp) = response { + return Ok(resp); + } + let model_id = primary_provider + .model + .as_deref() + .unwrap_or(&primary_provider.name) + .to_string(); + Err(RetryExhaustedError { + attempts: vec![AttemptError { + model_id, + error_type: AttemptErrorType::HighLatency { + measured_ms, + threshold_ms, + }, + attempt_number: 1, + }], + max_retry_after_seconds: None, + shortest_remaining_block_seconds: None, + retry_budget_exhausted: false, + }) + } + }; + } + }; + + // Track per-classification attempt counts: (strategy, max_attempts) -> count + let mut attempt_counts: std::collections::HashMap<(u16, Option), u32> = + std::collections::HashMap::new(); + + let mut current_provider = primary_provider; + let mut previous_provider_model = primary_provider + .model + .as_deref() + .unwrap_or(&primary_provider.name) + .to_string(); + + // The overall attempt number (1-based). + let mut overall_attempt: u32 = 0; + + loop { + overall_attempt += 1; + let current_model_id = current_provider + .model + .as_deref() + .unwrap_or(¤t_provider.name) + .to_string(); + + // Track attempted provider + request_context + .attempted_providers + .insert(current_model_id.clone()); + request_context.attempt_number = overall_attempt; + + // Forward the request + let request_start = Instant::now(); + let result = forward_request(body, current_provider).await; + let elapsed_ttfb_ms = request_start.elapsed().as_millis() as u64; + let elapsed_total_ms = elapsed_ttfb_ms; // Same for now; refined when streaming support is added + + // Emit latency metrics per model + info!( + "metric.latency: model={}, ttfb_ms={}, total_ms={}, request_id={}", + current_model_id, elapsed_ttfb_ms, elapsed_total_ms, request_context.request_id + ); + + // Classify the response + let classification = + error_detector.classify(result, retry_policy, elapsed_ttfb_ms, elapsed_total_ms); + + match classification { + ErrorClassification::Success(response) => { + if overall_attempt > 1 { + info!( + "Retry succeeded: model={}, total_attempts={}, request_id={}", + current_model_id, overall_attempt, request_context.request_id + ); + // Emit metric event for retry success + info!( + "metric.retry_success: model={}, total_attempts={}, request_id={}", + current_model_id, overall_attempt, request_context.request_id + ); + } + return Ok(response); + } + ErrorClassification::NonRetriableError(response) => { + // Non-retriable errors are returned as-is (not an exhaustion error). + return Ok(response); + } + ErrorClassification::HighLatencyEvent { + measured_ms, + threshold_ms: _, + response: Some(resp), + .. + } => { + // Completed-but-slow response: deliver to client, but record + // the event for future blocking. + if let Some(hl_config) = &retry_policy.on_high_latency { + self.record_latency_event( + ¤t_model_id, + measured_ms, + hl_config, + request_context, + ); + } + return Ok(resp); + } + _ => { + // Retriable error, timeout, or incomplete high-latency event. + // Proceed with retry logic. + } + } + + // Record Retry_After_State when a retriable error has a Retry-After header + if let ErrorClassification::RetriableError { + retry_after_seconds: Some(retry_after_value), + .. + } = &classification + { + // Log #1: Retry-After header value extracted + info!( + "Retry-After header value: {}s for model {} (request_id={})", + retry_after_value, current_model_id, request_context.request_id + ); + + let ra_config = retry_policy.effective_retry_after_config(); + let identifier = match ra_config.scope { + BlockScope::Model => current_model_id.clone(), + BlockScope::Provider => extract_provider(¤t_model_id).to_string(), + }; + + // Log #3: Retry-After value capped (if applicable) + let capped = (*retry_after_value).min(ra_config.max_retry_after_seconds); + if *retry_after_value > ra_config.max_retry_after_seconds { + warn!( + "Retry-After value capped: original={}s, capped={}s, max_retry_after_seconds={} (request_id={})", + retry_after_value, capped, ra_config.max_retry_after_seconds, request_context.request_id + ); + } + + match ra_config.apply_to { + ApplyTo::Global => { + self.retry_after_state.record( + &identifier, + *retry_after_value, + ra_config.max_retry_after_seconds, + ); + // Log #2: Retry_After_State created + info!( + "Retry_After_State created: identifier={}, expires_in={}s, apply_to=global (request_id={})", + identifier, capped, request_context.request_id + ); + } + ApplyTo::Request => { + let expires_at = Instant::now() + std::time::Duration::from_secs(capped); + request_context + .request_retry_after_state + .insert(identifier.clone(), expires_at); + // Log #2: Retry_After_State created (request-scoped) + info!( + "Retry_After_State created: identifier={}, expires_in={}s, apply_to=request (request_id={})", + identifier, capped, request_context.request_id + ); + } + } + } + + // Record latency event for HighLatencyEvent without completed response + // (triggers retry, but also records for future blocking) + if let ErrorClassification::HighLatencyEvent { measured_ms, .. } = &classification { + if let Some(hl_config) = &retry_policy.on_high_latency { + self.record_latency_event( + ¤t_model_id, + *measured_ms, + hl_config, + request_context, + ); + } + } + + // Dual-classification: TimeoutError + HighLatency + // When on_high_latency is configured and the elapsed time exceeded threshold_ms, + // record a HighLatencyEvent for blocking purposes even though we return TimeoutError + // for retry purposes. + if let ErrorClassification::TimeoutError { duration_ms } = &classification { + if let Some(hl_config) = &retry_policy.on_high_latency { + let measured_ms = match hl_config.measure { + LatencyMeasure::Ttfb => elapsed_ttfb_ms, + LatencyMeasure::Total => elapsed_total_ms, + }; + if measured_ms > hl_config.threshold_ms { + info!( + "Dual-classification: TimeoutError ({}ms) also exceeds high latency threshold ({}ms) for model={}, request_id={}", + duration_ms, hl_config.threshold_ms, current_model_id, request_context.request_id + ); + self.record_latency_event( + ¤t_model_id, + measured_ms, + hl_config, + request_context, + ); + } + } + } + + // Resolve retry params for this classification + let (strategy, max_attempts) = + error_detector.resolve_retry_params(&classification, retry_policy); + + // Build a key for per-classification attempt tracking. + // Use (status_code_or_sentinel, timeout_duration) as key. + let classification_key = match &classification { + ErrorClassification::RetriableError { status_code, .. } => (*status_code, None), + ErrorClassification::TimeoutError { .. } => { + // All timeouts share a single counter regardless of duration, + // since on_timeout has a single max_attempts value. + (0u16, None) + } + ErrorClassification::HighLatencyEvent { .. } => (1u16, None), + _ => (u16::MAX, None), + }; + + let count = attempt_counts.entry(classification_key).or_insert(0); + *count += 1; + + // Record the attempt error + let attempt_error = + build_attempt_error(&classification, ¤t_model_id, overall_attempt); + request_context.errors.push(attempt_error); + + // Log the retriable error + log_retriable_error( + &classification, + ¤t_model_id, + overall_attempt, + &request_context.request_id, + ); + + // Check max_attempts for this classification + if *count >= max_attempts { + let attempted_models: Vec = request_context + .errors + .iter() + .map(|e| e.model_id.clone()) + .collect(); + let error_types: Vec = request_context + .errors + .iter() + .map(|e| format_attempt_error_type(&e.error_type)) + .collect(); + warn!( + "All retries exhausted: attempted_models={:?}, error_types={:?}, total_attempts={}, request_id={}", + attempted_models, error_types, overall_attempt, request_context.request_id + ); + // Emit metric event for retry failure (exhausted) + info!( + "metric.retry_failure: reason=max_attempts_reached, model={}, total_attempts={}, request_id={}", + current_model_id, overall_attempt, request_context.request_id + ); + return Err(build_exhausted_error(request_context)); + } + + // Check max_retry_duration_ms budget + if let Some(max_duration_ms) = retry_policy.max_retry_duration_ms { + // Start the timer on the first retry (not the original request) + if request_context.retry_start_time.is_none() { + request_context.retry_start_time = Some(Instant::now()); + } + + if let Some(start) = request_context.retry_start_time { + let elapsed = start.elapsed(); + if elapsed.as_millis() as u64 >= max_duration_ms { + warn!( + "Retry budget exhausted ({}ms >= {}ms), request_id={}", + elapsed.as_millis(), + max_duration_ms, + request_context.request_id + ); + // Emit metric event for retry failure (budget exhausted) + info!( + "metric.retry_failure: reason=budget_exhausted, model={}, elapsed_ms={}, budget_ms={}, request_id={}", + current_model_id, elapsed.as_millis(), max_duration_ms, request_context.request_id + ); + let mut err = build_exhausted_error(request_context); + err.retry_budget_exhausted = true; + return Err(err); + } + } + } + + // Select next provider + // For same_model strategy, temporarily remove the current model from + // attempted so the provider selector can re-select it. + if strategy == RetryStrategy::SameModel { + request_context + .attempted_providers + .remove(¤t_model_id); + } + let selection = provider_selector.select( + strategy, + primary_provider + .model + .as_deref() + .unwrap_or(&primary_provider.name), + &retry_policy.fallback_models, + all_providers, + &request_context.attempted_providers, + &self.retry_after_state, + &self.latency_block_state, + request_context, + retry_policy.retry_after_handling.is_some(), + retry_policy.on_high_latency.is_some(), + ); + + let next_provider = match selection { + Ok(ProviderSelectionResult::Selected(provider)) => provider, + Ok(ProviderSelectionResult::WaitAndRetrySameModel { wait_duration }) => { + // Sleep for the wait duration, then retry the same provider. + // Pass the wait_duration as retry_after_seconds to backoff calculator. + let retry_after_secs = wait_duration.as_secs(); + let delay = backoff_calculator.calculate_delay( + overall_attempt.saturating_sub(1), + retry_policy.backoff.as_ref(), + Some(retry_after_secs), + strategy, + ¤t_model_id, + &previous_provider_model, + ); + tokio::time::sleep(delay).await; + + // For same_model wait-and-retry, we need to allow re-attempting + // the same provider, so remove it from attempted set temporarily. + request_context + .attempted_providers + .remove(¤t_model_id); + previous_provider_model = current_model_id; + continue; + } + Err(AllProvidersExhaustedError { + shortest_remaining_block_seconds, + }) => { + let attempted_models: Vec = request_context + .errors + .iter() + .map(|e| e.model_id.clone()) + .collect(); + let error_types: Vec = request_context + .errors + .iter() + .map(|e| format_attempt_error_type(&e.error_type)) + .collect(); + warn!( + "All retries exhausted (providers exhausted): attempted_models={:?}, error_types={:?}, total_attempts={}, request_id={}", + attempted_models, error_types, overall_attempt, request_context.request_id + ); + // Emit metric event for retry failure (providers exhausted) + info!( + "metric.retry_failure: reason=providers_exhausted, model={}, total_attempts={}, request_id={}", + current_model_id, overall_attempt, request_context.request_id + ); + let mut err = build_exhausted_error(request_context); + err.shortest_remaining_block_seconds = shortest_remaining_block_seconds; + return Err(err); + } + }; + + // Calculate backoff delay + let next_model_id = next_provider + .model + .as_deref() + .unwrap_or(&next_provider.name); + + let retry_after_secs = match &classification { + ErrorClassification::RetriableError { + retry_after_seconds, + .. + } => *retry_after_seconds, + _ => None, + }; + + let delay = backoff_calculator.calculate_delay( + overall_attempt.saturating_sub(1), + retry_policy.backoff.as_ref(), + retry_after_secs, + strategy, + next_model_id, + &previous_provider_model, + ); + + // Check budget again after calculating delay + if let Some(max_duration_ms) = retry_policy.max_retry_duration_ms { + if let Some(start) = request_context.retry_start_time { + let elapsed_after_delay = + start.elapsed().as_millis() as u64 + delay.as_millis() as u64; + if elapsed_after_delay >= max_duration_ms { + warn!( + "Retry budget would be exhausted after backoff delay ({}ms >= {}ms), request_id={}", + elapsed_after_delay, + max_duration_ms, + request_context.request_id + ); + // Emit metric event for retry failure (budget exhausted with backoff) + info!( + "metric.retry_failure: reason=budget_exhausted, model={}, elapsed_ms={}, budget_ms={}, request_id={}", + current_model_id, elapsed_after_delay, max_duration_ms, request_context.request_id + ); + let mut err = build_exhausted_error(request_context); + err.retry_budget_exhausted = true; + return Err(err); + } + } + } + + if !delay.is_zero() { + debug!( + "Backoff delay: {}ms before retry attempt {}, model={}, request_id={}", + delay.as_millis(), + overall_attempt + 1, + next_model_id, + request_context.request_id + ); + tokio::time::sleep(delay).await; + } + + info!( + "Retry initiated: original_model={}, target_model={}, error_type={}, attempt={}, request_id={}", + previous_provider_model, + next_model_id, + classify_error_type(&classification), + overall_attempt + 1, + request_context.request_id + ); + + // Emit metric event for retry attempt + info!( + "metric.retry_attempt: model={}, target_model={}, status_code={}, error_type={}, request_id={}", + previous_provider_model, + next_model_id, + classify_status_code(&classification), + classify_error_type(&classification), + request_context.request_id + ); + + previous_provider_model = current_model_id; + current_provider = next_provider; + } + } + + /// Record a high latency event in the LatencyTriggerCounter and, if the + /// min_triggers threshold is met, create a LatencyBlockState entry. + /// + /// The identifier is derived from the model ID based on the configured scope: + /// - `BlockScope::Model` → use the full model ID + /// - `BlockScope::Provider` → use the provider prefix (e.g., "openai" from "openai/gpt-4o") + /// + /// The block state is routed by `apply_to`: + /// - `ApplyTo::Global` → recorded via `LatencyBlockStateManager` + /// - `ApplyTo::Request` → recorded in `RequestContext.request_latency_block_state` + fn record_latency_event( + &self, + model_id: &str, + measured_ms: u64, + hl_config: &HighLatencyConfig, + request_context: &mut RequestContext, + ) { + let identifier = match hl_config.scope { + BlockScope::Model => model_id.to_string(), + BlockScope::Provider => extract_provider(model_id).to_string(), + }; + + let trigger_window = hl_config.trigger_window_seconds.unwrap_or(60); + let threshold_met = self.latency_trigger_counter.record_event( + &identifier, + hl_config.min_triggers, + trigger_window, + ); + + info!( + "High latency event recorded: identifier={}, measured_ms={}, threshold_ms={}, measure={:?}, triggers_met={}, request_id={}", + identifier, measured_ms, hl_config.threshold_ms, hl_config.measure, threshold_met, request_context.request_id + ); + + if threshold_met { + // Reset the trigger counter to prevent re-triggering on the same events + self.latency_trigger_counter.reset(&identifier); + + match hl_config.apply_to { + ApplyTo::Global => { + self.latency_block_state.record_block( + &identifier, + hl_config.block_duration_seconds, + measured_ms, + ); + info!( + "Latency_Block_State created: identifier={}, block_duration={}s, measured_ms={}, apply_to=global, request_id={}", + identifier, hl_config.block_duration_seconds, measured_ms, request_context.request_id + ); + // Emit metric for LB creation + info!( + "metric.latency_block_created: model={}, block_duration_seconds={}, measured_ms={}, apply_to=global, request_id={}", + identifier, hl_config.block_duration_seconds, measured_ms, request_context.request_id + ); + } + ApplyTo::Request => { + let expires_at = Instant::now() + + std::time::Duration::from_secs(hl_config.block_duration_seconds); + request_context + .request_latency_block_state + .insert(identifier.clone(), expires_at); + info!( + "Latency_Block_State created: identifier={}, block_duration={}s, measured_ms={}, apply_to=request, request_id={}", + identifier, hl_config.block_duration_seconds, measured_ms, request_context.request_id + ); + // Emit metric for LB creation (request-scoped) + info!( + "metric.latency_block_created: model={}, block_duration_seconds={}, measured_ms={}, apply_to=request, request_id={}", + identifier, hl_config.block_duration_seconds, measured_ms, request_context.request_id + ); + } + } + } + } +} + +// ── Helper functions ─────────────────────────────────────────────────────── + +fn build_attempt_error( + classification: &ErrorClassification, + model_id: &str, + attempt_number: u32, +) -> AttemptError { + let error_type = match classification { + ErrorClassification::RetriableError { + status_code, + response_body, + .. + } => AttemptErrorType::HttpError { + status_code: *status_code, + body: response_body.clone(), + }, + ErrorClassification::TimeoutError { duration_ms } => AttemptErrorType::Timeout { + duration_ms: *duration_ms, + }, + ErrorClassification::HighLatencyEvent { + measured_ms, + threshold_ms, + .. + } => AttemptErrorType::HighLatency { + measured_ms: *measured_ms, + threshold_ms: *threshold_ms, + }, + // Should not be called for Success/NonRetriableError, but handle gracefully. + _ => AttemptErrorType::HttpError { + status_code: 0, + body: Vec::new(), + }, + }; + + AttemptError { + model_id: model_id.to_string(), + error_type, + attempt_number, + } +} + +fn build_exhausted_error(request_context: &RequestContext) -> RetryExhaustedError { + RetryExhaustedError { + attempts: request_context.errors.clone(), + max_retry_after_seconds: None, + shortest_remaining_block_seconds: None, + retry_budget_exhausted: false, + } +} + +/// Return a human-readable error type string for a classification. +fn classify_error_type(classification: &ErrorClassification) -> &'static str { + match classification { + ErrorClassification::RetriableError { .. } => "retriable_http_error", + ErrorClassification::TimeoutError { .. } => "timeout", + ErrorClassification::HighLatencyEvent { .. } => "high_latency", + ErrorClassification::Success(_) => "success", + ErrorClassification::NonRetriableError(_) => "non_retriable", + } +} + +/// Return the HTTP status code from a classification, or 0 for non-HTTP errors. +fn classify_status_code(classification: &ErrorClassification) -> u16 { + match classification { + ErrorClassification::RetriableError { status_code, .. } => *status_code, + _ => 0, + } +} + +/// Format an AttemptErrorType for logging. +fn format_attempt_error_type(error_type: &AttemptErrorType) -> String { + match error_type { + AttemptErrorType::HttpError { status_code, .. } => format!("http_{}", status_code), + AttemptErrorType::Timeout { duration_ms } => format!("timeout_{}ms", duration_ms), + AttemptErrorType::HighLatency { + measured_ms, + threshold_ms, + } => format!( + "high_latency_{}ms_threshold_{}ms", + measured_ms, threshold_ms + ), + } +} + +fn log_retriable_error( + classification: &ErrorClassification, + model_id: &str, + attempt_number: u32, + request_id: &str, +) { + match classification { + ErrorClassification::RetriableError { + status_code, + retry_after_seconds, + .. + } => { + warn!( + "Retriable error detected: provider={}, status_code={}, retry_after={:?}, attempt={}, request_id={}", + model_id, status_code, retry_after_seconds, attempt_number, request_id + ); + // Emit metric event for retriable error per model and status code + info!( + "metric.retriable_error: model={}, status_code={}, retry_after={:?}, request_id={}", + model_id, status_code, retry_after_seconds, request_id + ); + } + ErrorClassification::TimeoutError { duration_ms } => { + warn!( + "Timeout error detected: provider={}, duration_ms={}, attempt={}, request_id={}", + model_id, duration_ms, attempt_number, request_id + ); + // Emit metric event for timeout per model + info!( + "metric.timeout_error: model={}, duration_ms={}, request_id={}", + model_id, duration_ms, request_id + ); + } + ErrorClassification::HighLatencyEvent { + measured_ms, + threshold_ms, + measure, + .. + } => { + warn!( + "High latency event detected: provider={}, measured_ms={}, threshold_ms={}, measure={:?}, attempt={}, request_id={}", + model_id, measured_ms, threshold_ms, measure, attempt_number, request_id + ); + // Emit metric event for high latency per model + info!( + "metric.high_latency_event: model={}, measured_ms={}, threshold_ms={}, measure={:?}, request_id={}", + model_id, measured_ms, threshold_ms, measure, request_id + ); + } + _ => {} + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::configuration::{ + ApplyTo, BlockScope, HighLatencyConfig, LatencyMeasure, LlmProviderType, + RetryAfterHandlingConfig, RetryPolicy, RetryStrategy, StatusCodeConfig, StatusCodeEntry, + TimeoutRetryConfig, + }; + use bytes::Bytes; + use http_body_util::{BodyExt, Full}; + use hyper::Response; + use proptest::prelude::*; + use std::collections::{HashMap, HashSet}; + + use super::super::error_detector::HttpResponse; + + /// Helper to build an HttpResponse with a given status code. + fn make_response(status: u16) -> HttpResponse { + let body = Full::new(Bytes::from("test body")) + .map_err(|_| unreachable!()) + .boxed(); + Response::builder().status(status).body(body).unwrap() + } + + /// Helper to build an HttpResponse with a given status code and headers. + fn make_response_with_headers(status: u16, headers: Vec<(&str, &str)>) -> HttpResponse { + let body = Full::new(Bytes::from("test body")) + .map_err(|_| unreachable!()) + .boxed(); + let mut builder = Response::builder().status(status); + for (name, value) in headers { + builder = builder.header(name, value); + } + builder.body(body).unwrap() + } + + /// Helper to create a test LlmProvider with a given model name. + fn make_provider(model: &str) -> LlmProvider { + LlmProvider { + name: model.to_string(), + provider_interface: LlmProviderType::OpenAI, + model: Some(model.to_string()), + access_key: Some("test-key".to_string()), + ..LlmProvider::default() + } + } + + // Feature: retry-on-ratelimit, Property 8: Bounded Retry (CP-2) + // **Validates: Requirements 1.2** + proptest! { + #![proptest_config(ProptestConfig::with_cases(100))] + + /// Property 8: For arbitrary max_attempts and max_retry_duration_ms, + /// when all providers return 429 (all-failing), the orchestrator: + /// - Returns Err(RetryExhaustedError) + /// - The number of attempts ≤ max_attempts + /// - If max_retry_duration_ms was set, retry_budget_exhausted is true when budget exceeded + #[test] + fn prop_bounded_retry( + max_attempts in 1u32..=5u32, + has_budget in proptest::bool::ANY, + budget_ms in 100u64..=5000u64, + ) { + let max_retry_duration_ms = if has_budget { Some(budget_ms) } else { None }; + + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + + // Run the async orchestrator and collect results for assertion. + let (attempt_count, retry_budget_exhausted) = rt.block_on(async { + let orchestrator = RetryOrchestrator::new_default(); + + // Use same_model strategy with a single provider so max_attempts + // is the precise bound on retry count. + let provider = make_provider("openai/gpt-4o"); + let all_providers = vec![provider.clone()]; + + let retry_policy = RetryPolicy { + fallback_models: vec![], + default_strategy: RetryStrategy::SameModel, + default_max_attempts: max_attempts, + on_status_codes: vec![ + StatusCodeConfig { + codes: vec![StatusCodeEntry::Single(429)], + strategy: RetryStrategy::SameModel, + max_attempts, + }, + ], + on_timeout: None, + on_high_latency: None, + backoff: None, + retry_after_handling: None, + max_retry_duration_ms, + }; + + let sig = RequestSignature::new( + b"test body", + &hyper::HeaderMap::new(), + false, + "openai/gpt-4o".to_string(), + ); + let mut ctx = RequestContext { + request_id: "test-req".to_string(), + attempted_providers: HashSet::new(), + retry_start_time: None, + attempt_number: 0, + request_retry_after_state: HashMap::new(), + request_latency_block_state: HashMap::new(), + request_signature: sig.clone(), + errors: vec![], + }; + + let body = Bytes::from("test body"); + + let result = orchestrator + .execute( + &body, + &sig, + &all_providers[0], + &retry_policy, + &all_providers, + &mut ctx, + |_body, _provider| async { Ok(make_response(429)) }, + ) + .await; + + // Must be an error (all providers fail) + let err = result.expect_err( + "Expected RetryExhaustedError when all providers return 429", + ); + + (err.attempts.len() as u32, err.retry_budget_exhausted) + }); + + // Attempt count must be bounded by max_attempts. + // The orchestrator makes 1 initial attempt, then the per-classification + // counter increments. When count >= max_attempts, it stops. So total + // attempts recorded in errors = max_attempts (initial + retries that + // hit the counter limit). We allow max_attempts + 1 as an upper bound + // to account for the initial attempt before the counter check. + prop_assert!( + attempt_count <= max_attempts + 1, + "Attempt count {} exceeded max_attempts + 1 ({})", + attempt_count, + max_attempts + 1 + ); + + // If max_retry_duration_ms was set, either budget was exhausted + // (retry_budget_exhausted = true) or attempts were exhausted first + // (retry_budget_exhausted = false). Both are valid outcomes. + // With no backoff and instant responses, attempts exhaust before budget. + // When no budget is set, retry_budget_exhausted must be false. + if max_retry_duration_ms.is_none() { + prop_assert!( + !retry_budget_exhausted, + "retry_budget_exhausted should be false when no budget is set" + ); + } + } + } + + // ── P0 Edge Case Unit Tests ──────────────────────────────────────────── + + /// Helper to create a RequestContext for tests. + fn make_context(request_id: &str) -> RequestContext { + let sig = RequestSignature::new( + b"test body", + &hyper::HeaderMap::new(), + false, + "openai/gpt-4o".to_string(), + ); + RequestContext { + request_id: request_id.to_string(), + attempted_providers: HashSet::new(), + retry_start_time: None, + attempt_number: 0, + request_retry_after_state: HashMap::new(), + request_latency_block_state: HashMap::new(), + request_signature: sig, + errors: vec![], + } + } + + /// Helper to create a basic retry policy for tests. + fn basic_retry_policy(max_attempts: u32) -> RetryPolicy { + RetryPolicy { + fallback_models: vec![], + default_strategy: RetryStrategy::SameModel, + default_max_attempts: max_attempts, + on_status_codes: vec![StatusCodeConfig { + codes: vec![StatusCodeEntry::Single(429)], + strategy: RetryStrategy::SameModel, + max_attempts, + }], + on_timeout: None, + on_high_latency: None, + backoff: None, + retry_after_handling: None, + max_retry_duration_ms: None, + } + } + + #[tokio::test] + async fn test_max_retry_duration_ms_exceeded_mid_retry_stops_with_most_recent_error() { + // Use different_provider strategy with multiple providers so the retry + // loop actually continues past the first attempt. The budget is small + // enough that it will be exceeded during the retry sequence. + let orchestrator = RetryOrchestrator::new_default(); + let all_providers = vec![ + make_provider("openai/gpt-4o"), + make_provider("anthropic/claude-3"), + make_provider("azure/gpt-4o"), + ]; + + let policy = RetryPolicy { + fallback_models: vec![], + default_strategy: RetryStrategy::DifferentProvider, + default_max_attempts: 10, + on_status_codes: vec![StatusCodeConfig { + codes: vec![StatusCodeEntry::Single(429)], + strategy: RetryStrategy::DifferentProvider, + max_attempts: 10, + }], + on_timeout: None, + on_high_latency: None, + backoff: None, + retry_after_handling: None, + max_retry_duration_ms: Some(1), // 1ms budget — will be exhausted quickly + }; + + let sig = RequestSignature::new( + b"test body", + &hyper::HeaderMap::new(), + false, + "openai/gpt-4o".to_string(), + ); + let mut ctx = make_context("test-budget-exceeded"); + let body = Bytes::from("test body"); + + let result = orchestrator + .execute( + &body, + &sig, + &all_providers[0], + &policy, + &all_providers, + &mut ctx, + |_body, _provider| async { + // Small sleep to ensure budget is exceeded + tokio::time::sleep(std::time::Duration::from_millis(10)).await; + Ok(make_response(429)) + }, + ) + .await; + + let err = result.expect_err("Should return RetryExhaustedError when budget exceeded"); + // Either budget was exhausted or providers were exhausted — both are valid + // since we have 3 providers and a tiny budget. The key assertion is that + // the error contains attempt details. + assert!( + !err.attempts.is_empty(), + "Should have at least one attempt recorded" + ); + // The most recent error should be a 429 + let last = err.attempts.last().unwrap(); + match &last.error_type { + AttemptErrorType::HttpError { status_code, .. } => { + assert_eq!(*status_code, 429); + } + _ => panic!("Expected HttpError for last attempt"), + } + } + + #[tokio::test] + async fn test_max_retry_duration_timer_starts_on_first_retry_not_original_request() { + // Req 3.16: Timer starts when the first retry attempt begins, not the original request. + // We verify this by checking that retry_start_time is None before the first failure + // and set after it. + let orchestrator = RetryOrchestrator::new_default(); + let provider = make_provider("openai/gpt-4o"); + let all_providers = vec![provider.clone()]; + + // Use a generous budget so we can observe the timer behavior + let mut policy = basic_retry_policy(2); + policy.max_retry_duration_ms = Some(60000); // 60s budget + + let sig = RequestSignature::new( + b"test body", + &hyper::HeaderMap::new(), + false, + "openai/gpt-4o".to_string(), + ); + let mut ctx = make_context("test-timer-start"); + + // Verify retry_start_time is None before execute + assert!( + ctx.retry_start_time.is_none(), + "retry_start_time should be None before execute" + ); + + let body = Bytes::from("test body"); + + let _result = orchestrator + .execute( + &body, + &sig, + &all_providers[0], + &policy, + &all_providers, + &mut ctx, + |_body, _provider| async { Ok(make_response(429)) }, + ) + .await; + + // After execution with retries, retry_start_time should have been set + assert!( + ctx.retry_start_time.is_some(), + "retry_start_time should be set after first retry attempt" + ); + } + + #[tokio::test] + async fn test_max_retry_duration_zero_effectively_disables_retries() { + // max_retry_duration_ms = 0 is rejected by validation (NonPositiveValue). + // With a very small budget (1ms) and multiple providers, the budget should + // be exhausted very quickly, effectively limiting retries. + let orchestrator = RetryOrchestrator::new_default(); + let all_providers = vec![ + make_provider("openai/gpt-4o"), + make_provider("anthropic/claude-3"), + make_provider("azure/gpt-4o"), + make_provider("google/gemini-pro"), + ]; + + let policy = RetryPolicy { + fallback_models: vec![], + default_strategy: RetryStrategy::DifferentProvider, + default_max_attempts: 10, + on_status_codes: vec![StatusCodeConfig { + codes: vec![StatusCodeEntry::Single(429)], + strategy: RetryStrategy::DifferentProvider, + max_attempts: 10, + }], + on_timeout: None, + on_high_latency: None, + backoff: None, + retry_after_handling: None, + max_retry_duration_ms: Some(1), // Near-zero budget + }; + + let sig = RequestSignature::new( + b"test body", + &hyper::HeaderMap::new(), + false, + "openai/gpt-4o".to_string(), + ); + let mut ctx = make_context("test-zero-budget"); + let body = Bytes::from("test body"); + + let result = orchestrator + .execute( + &body, + &sig, + &all_providers[0], + &policy, + &all_providers, + &mut ctx, + |_body, _provider| async { + tokio::time::sleep(std::time::Duration::from_millis(5)).await; + Ok(make_response(429)) + }, + ) + .await; + + let err = result.expect_err("Should exhaust budget quickly"); + // With a 1ms budget and 5ms per attempt, we should get very few attempts + // before either budget or providers are exhausted. + assert!( + err.attempts.len() <= 4, + "With near-zero budget, should have few attempts, got {}", + err.attempts.len() + ); + } + + #[tokio::test] + async fn test_no_retry_policy_returns_error_directly() { + // When no retry_policy is configured, the orchestrator should still work + // but with default behavior. The key test is that without on_status_codes + // matching, a 429 is still treated as retriable (default strategy applies). + // However, when retry_policy has no on_status_codes and default_max_attempts = 0, + // no retries should occur. + let orchestrator = RetryOrchestrator::new_default(); + let provider = make_provider("openai/gpt-4o"); + let all_providers = vec![provider.clone()]; + + // Simulate "no retry" by setting max_attempts to 1 (only initial attempt) + let policy = RetryPolicy { + fallback_models: vec![], + default_strategy: RetryStrategy::DifferentProvider, + default_max_attempts: 1, + on_status_codes: vec![], + on_timeout: None, + on_high_latency: None, + backoff: None, + retry_after_handling: None, + max_retry_duration_ms: None, + }; + + let sig = RequestSignature::new( + b"test body", + &hyper::HeaderMap::new(), + false, + "openai/gpt-4o".to_string(), + ); + let mut ctx = make_context("test-no-retry"); + let body = Bytes::from("test body"); + + let result = orchestrator + .execute( + &body, + &sig, + &all_providers[0], + &policy, + &all_providers, + &mut ctx, + |_body, _provider| async { Ok(make_response(429)) }, + ) + .await; + + let err = result.expect_err("Should return error when max_attempts exhausted"); + // With default_max_attempts = 1, should have at most 2 attempts + // (initial + 1 retry that hits the limit) + assert!( + err.attempts.len() <= 2, + "With max_attempts=1, should have at most 2 attempts, got {}", + err.attempts.len() + ); + } + + #[tokio::test] + async fn test_empty_fallback_models_different_provider_uses_provider_list() { + // When fallback_models is empty and strategy is different_provider, + // the orchestrator should select from the Provider_List. + let orchestrator = RetryOrchestrator::new_default(); + let primary = make_provider("openai/gpt-4o"); + let fallback = make_provider("anthropic/claude-3-5-sonnet"); + let all_providers = vec![primary.clone(), fallback.clone()]; + + let policy = RetryPolicy { + fallback_models: vec![], // empty — should fall back to Provider_List + default_strategy: RetryStrategy::DifferentProvider, + default_max_attempts: 2, + on_status_codes: vec![StatusCodeConfig { + codes: vec![StatusCodeEntry::Single(429)], + strategy: RetryStrategy::DifferentProvider, + max_attempts: 2, + }], + on_timeout: None, + on_high_latency: None, + backoff: None, + retry_after_handling: None, + max_retry_duration_ms: None, + }; + + let sig = RequestSignature::new( + b"test body", + &hyper::HeaderMap::new(), + false, + "openai/gpt-4o".to_string(), + ); + let mut ctx = make_context("test-empty-fallback"); + let body = Bytes::from("test body"); + + // Track which providers were called + let call_log = std::sync::Arc::new(std::sync::Mutex::new(Vec::::new())); + let call_log_clone = call_log.clone(); + + let result = orchestrator + .execute( + &body, + &sig, + &all_providers[0], + &policy, + &all_providers, + &mut ctx, + move |_body, provider| { + let log = call_log_clone.clone(); + let model = provider.model.clone().unwrap_or_default(); + async move { + log.lock().unwrap().push(model.clone()); + if model == "anthropic/claude-3-5-sonnet" { + Ok(make_response(200)) + } else { + Ok(make_response(429)) + } + } + }, + ) + .await; + + assert!( + result.is_ok(), + "Should succeed after falling back to Provider_List" + ); + let calls = call_log.lock().unwrap(); + assert!(calls.len() >= 2, "Should have at least 2 calls"); + assert_eq!(calls[0], "openai/gpt-4o", "First call should be primary"); + assert_eq!( + calls[1], "anthropic/claude-3-5-sonnet", + "Second call should be from Provider_List (different provider)" + ); + } + + // ── P1 Timeout Classification Tests ──────────────────────────────────── + + #[tokio::test] + async fn test_timeout_triggers_retry_to_different_provider() { + // When the primary provider times out and on_timeout is configured with + // different_provider strategy, the orchestrator should retry on a different provider. + let orchestrator = RetryOrchestrator::new_default(); + let primary = make_provider("openai/gpt-4o"); + let fallback = make_provider("anthropic/claude-3-5-sonnet"); + let all_providers = vec![primary.clone(), fallback.clone()]; + + let policy = RetryPolicy { + fallback_models: vec![], + default_strategy: RetryStrategy::DifferentProvider, + default_max_attempts: 2, + on_status_codes: vec![], + on_timeout: Some(TimeoutRetryConfig { + strategy: RetryStrategy::DifferentProvider, + max_attempts: 2, + }), + on_high_latency: None, + backoff: None, + retry_after_handling: None, + max_retry_duration_ms: None, + }; + + let sig = RequestSignature::new( + b"test body", + &hyper::HeaderMap::new(), + false, + "openai/gpt-4o".to_string(), + ); + let mut ctx = make_context("test-timeout-retry"); + let body = Bytes::from("test body"); + + let call_log = std::sync::Arc::new(std::sync::Mutex::new(Vec::::new())); + let call_log_clone = call_log.clone(); + + let result = orchestrator + .execute( + &body, + &sig, + &all_providers[0], + &policy, + &all_providers, + &mut ctx, + move |_body, provider| { + let log = call_log_clone.clone(); + let model = provider.model.clone().unwrap_or_default(); + async move { + log.lock().unwrap().push(model.clone()); + if model == "openai/gpt-4o" { + Err(TimeoutError { duration_ms: 5000 }) + } else { + Ok(make_response(200)) + } + } + }, + ) + .await; + + assert!( + result.is_ok(), + "Should succeed after timeout retry to different provider" + ); + let calls = call_log.lock().unwrap(); + assert_eq!(calls.len(), 2, "Should have 2 calls (primary + fallback)"); + assert_eq!(calls[0], "openai/gpt-4o"); + assert_eq!(calls[1], "anthropic/claude-3-5-sonnet"); + } + + #[tokio::test] + async fn test_timeout_uses_on_timeout_strategy_not_default() { + // Verify that on_timeout config overrides default_strategy for timeout errors. + let orchestrator = RetryOrchestrator::new_default(); + let primary = make_provider("openai/gpt-4o"); + let all_providers = vec![primary.clone()]; + + let policy = RetryPolicy { + fallback_models: vec![], + default_strategy: RetryStrategy::DifferentProvider, + default_max_attempts: 5, + on_status_codes: vec![], + on_timeout: Some(TimeoutRetryConfig { + strategy: RetryStrategy::SameModel, + max_attempts: 2, + }), + on_high_latency: None, + backoff: None, + retry_after_handling: None, + max_retry_duration_ms: None, + }; + + let sig = RequestSignature::new( + b"test body", + &hyper::HeaderMap::new(), + false, + "openai/gpt-4o".to_string(), + ); + let mut ctx = make_context("test-timeout-strategy"); + let body = Bytes::from("test body"); + + let call_count = std::sync::Arc::new(std::sync::atomic::AtomicU32::new(0)); + let call_count_clone = call_count.clone(); + + let result = orchestrator + .execute( + &body, + &sig, + &all_providers[0], + &policy, + &all_providers, + &mut ctx, + move |_body, _provider| { + let count = call_count_clone.clone(); + async move { + count.fetch_add(1, std::sync::atomic::Ordering::SeqCst); + Err(TimeoutError { duration_ms: 3000 }) + } + }, + ) + .await; + + let err = result.expect_err("Should exhaust timeout retries"); + // on_timeout max_attempts = 2, so we should see at most 3 total attempts + // (1 initial + 2 retries) + assert!( + err.attempts.len() <= 3, + "With on_timeout max_attempts=2, should have at most 3 attempts, got {}", + err.attempts.len() + ); + // All attempts should be timeout errors + for attempt in &err.attempts { + assert!( + matches!(attempt.error_type, AttemptErrorType::Timeout { .. }), + "All attempts should be timeout errors" + ); + } + } + + #[tokio::test] + async fn test_timeout_without_on_timeout_uses_defaults() { + // When on_timeout is None, timeout errors should use default_strategy and + // default_max_attempts. + let orchestrator = RetryOrchestrator::new_default(); + let primary = make_provider("openai/gpt-4o"); + let fallback = make_provider("anthropic/claude-3-5-sonnet"); + let all_providers = vec![primary.clone(), fallback.clone()]; + + let policy = RetryPolicy { + fallback_models: vec![], + default_strategy: RetryStrategy::DifferentProvider, + default_max_attempts: 2, + on_status_codes: vec![], + on_timeout: None, // No timeout-specific config + on_high_latency: None, + backoff: None, + retry_after_handling: None, + max_retry_duration_ms: None, + }; + + let sig = RequestSignature::new( + b"test body", + &hyper::HeaderMap::new(), + false, + "openai/gpt-4o".to_string(), + ); + let mut ctx = make_context("test-timeout-defaults"); + let body = Bytes::from("test body"); + + let call_log = std::sync::Arc::new(std::sync::Mutex::new(Vec::::new())); + let call_log_clone = call_log.clone(); + + let result = orchestrator + .execute( + &body, + &sig, + &all_providers[0], + &policy, + &all_providers, + &mut ctx, + move |_body, provider| { + let log = call_log_clone.clone(); + let model = provider.model.clone().unwrap_or_default(); + async move { + log.lock().unwrap().push(model.clone()); + if model == "openai/gpt-4o" { + Err(TimeoutError { duration_ms: 5000 }) + } else { + Ok(make_response(200)) + } + } + }, + ) + .await; + + // With default_strategy=DifferentProvider and default_max_attempts=1, + // should retry to the different provider and succeed. + assert!( + result.is_ok(), + "Should succeed after timeout retry using defaults" + ); + let calls = call_log.lock().unwrap(); + assert_eq!(calls[0], "openai/gpt-4o"); + assert_eq!(calls[1], "anthropic/claude-3-5-sonnet"); + } + + #[tokio::test] + async fn test_timeout_max_attempts_exhausted_returns_error() { + // When all timeout retries are exhausted, should return RetryExhaustedError + // with timeout attempt details. + let orchestrator = RetryOrchestrator::new_default(); + let primary = make_provider("openai/gpt-4o"); + let fallback = make_provider("anthropic/claude-3-5-sonnet"); + let all_providers = vec![primary.clone(), fallback.clone()]; + + let policy = RetryPolicy { + fallback_models: vec![], + default_strategy: RetryStrategy::DifferentProvider, + default_max_attempts: 2, + on_status_codes: vec![], + on_timeout: Some(TimeoutRetryConfig { + strategy: RetryStrategy::DifferentProvider, + max_attempts: 1, + }), + on_high_latency: None, + backoff: None, + retry_after_handling: None, + max_retry_duration_ms: None, + }; + + let sig = RequestSignature::new( + b"test body", + &hyper::HeaderMap::new(), + false, + "openai/gpt-4o".to_string(), + ); + let mut ctx = make_context("test-timeout-exhausted"); + let body = Bytes::from("test body"); + + let result = orchestrator + .execute( + &body, + &sig, + &all_providers[0], + &policy, + &all_providers, + &mut ctx, + |_body, _provider| async { Err(TimeoutError { duration_ms: 5000 }) }, + ) + .await; + + let err = result.expect_err("Should exhaust timeout retries"); + assert!(!err.attempts.is_empty(), "Should have recorded attempts"); + // Verify all attempts are timeout errors with correct duration + for attempt in &err.attempts { + match &attempt.error_type { + AttemptErrorType::Timeout { duration_ms } => { + assert_eq!(*duration_ms, 5000); + } + other => panic!("Expected Timeout error type, got {:?}", other), + } + } + } + + #[tokio::test] + async fn test_timeout_error_records_duration_in_attempt() { + // Verify that the timeout duration is correctly recorded in the attempt error. + let orchestrator = RetryOrchestrator::new_default(); + let primary = make_provider("openai/gpt-4o"); + let all_providers = vec![primary.clone()]; + + let policy = RetryPolicy { + fallback_models: vec![], + default_strategy: RetryStrategy::SameModel, + default_max_attempts: 1, + on_status_codes: vec![], + on_timeout: Some(TimeoutRetryConfig { + strategy: RetryStrategy::SameModel, + max_attempts: 1, + }), + on_high_latency: None, + backoff: None, + retry_after_handling: None, + max_retry_duration_ms: None, + }; + + let sig = RequestSignature::new( + b"test body", + &hyper::HeaderMap::new(), + false, + "openai/gpt-4o".to_string(), + ); + let mut ctx = make_context("test-timeout-duration"); + let body = Bytes::from("test body"); + + let result = orchestrator + .execute( + &body, + &sig, + &all_providers[0], + &policy, + &all_providers, + &mut ctx, + |_body, _provider| async { Err(TimeoutError { duration_ms: 12345 }) }, + ) + .await; + + let err = result.expect_err("Should exhaust retries"); + let first_attempt = &err.attempts[0]; + assert_eq!(first_attempt.model_id, "openai/gpt-4o"); + match &first_attempt.error_type { + AttemptErrorType::Timeout { duration_ms } => { + assert_eq!(*duration_ms, 12345, "Duration should be preserved"); + } + other => panic!("Expected Timeout, got {:?}", other), + } + } + + #[tokio::test] + async fn test_timeout_then_success_on_retry() { + // Primary times out, retry to same model succeeds. + let orchestrator = RetryOrchestrator::new_default(); + let primary = make_provider("openai/gpt-4o"); + let all_providers = vec![primary.clone()]; + + let policy = RetryPolicy { + fallback_models: vec![], + default_strategy: RetryStrategy::SameModel, + default_max_attempts: 2, + on_status_codes: vec![], + on_timeout: Some(TimeoutRetryConfig { + strategy: RetryStrategy::SameModel, + max_attempts: 2, + }), + on_high_latency: None, + backoff: None, + retry_after_handling: None, + max_retry_duration_ms: None, + }; + + let sig = RequestSignature::new( + b"test body", + &hyper::HeaderMap::new(), + false, + "openai/gpt-4o".to_string(), + ); + let mut ctx = make_context("test-timeout-then-success"); + let body = Bytes::from("test body"); + + let call_count = std::sync::Arc::new(std::sync::atomic::AtomicU32::new(0)); + let call_count_clone = call_count.clone(); + + let result = orchestrator + .execute( + &body, + &sig, + &all_providers[0], + &policy, + &all_providers, + &mut ctx, + move |_body, _provider| { + let count = call_count_clone.clone(); + async move { + let n = count.fetch_add(1, std::sync::atomic::Ordering::SeqCst); + if n == 0 { + Err(TimeoutError { duration_ms: 5000 }) + } else { + Ok(make_response(200)) + } + } + }, + ) + .await; + + assert!(result.is_ok(), "Should succeed on retry after timeout"); + assert_eq!( + call_count.load(std::sync::atomic::Ordering::SeqCst), + 2, + "Should have made 2 calls (initial timeout + successful retry)" + ); + } + + // ── Retry-After State Recording Tests (Task 16.1) ────────────────── + + #[tokio::test] + async fn test_retry_after_global_records_state_in_manager() { + // When a 429 response includes Retry-After header and apply_to is Global, + // the orchestrator should record the entry in the global RetryAfterStateManager. + let orchestrator = RetryOrchestrator::new_default(); + let primary = make_provider("openai/gpt-4o"); + let secondary = make_provider("anthropic/claude-3"); + let all_providers = vec![primary.clone(), secondary.clone()]; + + let policy = RetryPolicy { + fallback_models: vec!["anthropic/claude-3".to_string()], + default_strategy: RetryStrategy::DifferentProvider, + default_max_attempts: 2, + on_status_codes: vec![StatusCodeConfig { + codes: vec![StatusCodeEntry::Single(429)], + strategy: RetryStrategy::DifferentProvider, + max_attempts: 2, + }], + on_timeout: None, + on_high_latency: None, + backoff: None, + retry_after_handling: Some(RetryAfterHandlingConfig { + scope: BlockScope::Model, + apply_to: ApplyTo::Global, + max_retry_after_seconds: 300, + }), + // Use a tight budget so the orchestrator records state but bails + // before sleeping the full Retry-After delay. + max_retry_duration_ms: Some(1), + }; + + let sig = RequestSignature::new( + b"test body", + &hyper::HeaderMap::new(), + false, + "openai/gpt-4o".to_string(), + ); + let mut ctx = make_context("test-ra-global"); + let body = Bytes::from("test body"); + + let _result = orchestrator + .execute( + &body, + &sig, + &all_providers[0], + &policy, + &all_providers, + &mut ctx, + |_body, _provider| async { + Ok(make_response_with_headers(429, vec![("retry-after", "10")])) + }, + ) + .await; + + // The global RetryAfterStateManager should have recorded the entry + assert!( + orchestrator.retry_after_state.is_blocked("openai/gpt-4o"), + "Model should be blocked in global RetryAfterStateManager after 429 with Retry-After" + ); + } + + #[tokio::test] + async fn test_retry_after_global_provider_scope_blocks_provider() { + // When scope is Provider, the entry should be recorded with the provider prefix. + let orchestrator = RetryOrchestrator::new_default(); + let primary = make_provider("openai/gpt-4o"); + let secondary = make_provider("anthropic/claude-3"); + let all_providers = vec![primary.clone(), secondary.clone()]; + + let policy = RetryPolicy { + fallback_models: vec!["anthropic/claude-3".to_string()], + default_strategy: RetryStrategy::DifferentProvider, + default_max_attempts: 2, + on_status_codes: vec![StatusCodeConfig { + codes: vec![StatusCodeEntry::Single(429)], + strategy: RetryStrategy::DifferentProvider, + max_attempts: 2, + }], + on_timeout: None, + on_high_latency: None, + backoff: None, + retry_after_handling: Some(RetryAfterHandlingConfig { + scope: BlockScope::Provider, + apply_to: ApplyTo::Global, + max_retry_after_seconds: 300, + }), + max_retry_duration_ms: Some(1), + }; + + let sig = RequestSignature::new( + b"test body", + &hyper::HeaderMap::new(), + false, + "openai/gpt-4o".to_string(), + ); + let mut ctx = make_context("test-ra-provider-scope"); + let body = Bytes::from("test body"); + + let _result = orchestrator + .execute( + &body, + &sig, + &all_providers[0], + &policy, + &all_providers, + &mut ctx, + |_body, _provider| async { + Ok(make_response_with_headers(429, vec![("retry-after", "10")])) + }, + ) + .await; + + // Provider prefix "openai" should be blocked + assert!( + orchestrator.retry_after_state.is_blocked("openai"), + "Provider prefix should be blocked in global RetryAfterStateManager" + ); + // The full model ID should NOT be directly blocked (it's provider-scoped) + assert!( + !orchestrator.retry_after_state.is_blocked("openai/gpt-4o"), + "Full model ID should not be directly blocked when scope is Provider" + ); + } + + #[tokio::test] + async fn test_retry_after_request_scope_records_in_request_context() { + // When apply_to is Request, the entry should be recorded in request_context. + let orchestrator = RetryOrchestrator::new_default(); + let primary = make_provider("openai/gpt-4o"); + let secondary = make_provider("anthropic/claude-3"); + let all_providers = vec![primary.clone(), secondary.clone()]; + + let policy = RetryPolicy { + fallback_models: vec!["anthropic/claude-3".to_string()], + default_strategy: RetryStrategy::DifferentProvider, + default_max_attempts: 2, + on_status_codes: vec![StatusCodeConfig { + codes: vec![StatusCodeEntry::Single(429)], + strategy: RetryStrategy::DifferentProvider, + max_attempts: 2, + }], + on_timeout: None, + on_high_latency: None, + backoff: None, + retry_after_handling: Some(RetryAfterHandlingConfig { + scope: BlockScope::Model, + apply_to: ApplyTo::Request, + max_retry_after_seconds: 300, + }), + max_retry_duration_ms: Some(1), + }; + + let sig = RequestSignature::new( + b"test body", + &hyper::HeaderMap::new(), + false, + "openai/gpt-4o".to_string(), + ); + let mut ctx = make_context("test-ra-request-scope"); + let body = Bytes::from("test body"); + + let _result = orchestrator + .execute( + &body, + &sig, + &all_providers[0], + &policy, + &all_providers, + &mut ctx, + |_body, _provider| async { + Ok(make_response_with_headers(429, vec![("retry-after", "10")])) + }, + ) + .await; + + // Request-scoped state should have the entry + assert!( + ctx.request_retry_after_state.contains_key("openai/gpt-4o"), + "Model should be recorded in request-scoped retry_after_state" + ); + // Global state should NOT have the entry + assert!( + !orchestrator.retry_after_state.is_blocked("openai/gpt-4o"), + "Global RetryAfterStateManager should not have entry when apply_to is Request" + ); + } + + #[tokio::test] + async fn test_retry_after_no_header_does_not_record_state() { + // When a 429 response does NOT include Retry-After header, + // no state entry should be created. + let orchestrator = RetryOrchestrator::new_default(); + let primary = make_provider("openai/gpt-4o"); + let secondary = make_provider("anthropic/claude-3"); + let all_providers = vec![primary.clone(), secondary.clone()]; + + let policy = RetryPolicy { + fallback_models: vec!["anthropic/claude-3".to_string()], + default_strategy: RetryStrategy::DifferentProvider, + default_max_attempts: 2, + on_status_codes: vec![StatusCodeConfig { + codes: vec![StatusCodeEntry::Single(429)], + strategy: RetryStrategy::DifferentProvider, + max_attempts: 2, + }], + on_timeout: None, + on_high_latency: None, + backoff: None, + retry_after_handling: Some(RetryAfterHandlingConfig { + scope: BlockScope::Model, + apply_to: ApplyTo::Global, + max_retry_after_seconds: 300, + }), + max_retry_duration_ms: Some(1), + }; + + let sig = RequestSignature::new( + b"test body", + &hyper::HeaderMap::new(), + false, + "openai/gpt-4o".to_string(), + ); + let mut ctx = make_context("test-ra-no-header"); + let body = Bytes::from("test body"); + + let _result = orchestrator + .execute( + &body, + &sig, + &all_providers[0], + &policy, + &all_providers, + &mut ctx, + |_body, _provider| async { + // 429 without Retry-After header + Ok(make_response(429)) + }, + ) + .await; + + // No state should be recorded + assert!( + !orchestrator.retry_after_state.is_blocked("openai/gpt-4o"), + "No global state should be recorded when Retry-After header is absent" + ); + assert!( + ctx.request_retry_after_state.is_empty(), + "No request-scoped state should be recorded when Retry-After header is absent" + ); + } + + #[tokio::test] + async fn test_retry_after_malformed_header_does_not_record_state() { + // When Retry-After header has a malformed value, it should be ignored. + let orchestrator = RetryOrchestrator::new_default(); + let primary = make_provider("openai/gpt-4o"); + let secondary = make_provider("anthropic/claude-3"); + let all_providers = vec![primary.clone(), secondary.clone()]; + + let policy = RetryPolicy { + fallback_models: vec!["anthropic/claude-3".to_string()], + default_strategy: RetryStrategy::DifferentProvider, + default_max_attempts: 2, + on_status_codes: vec![StatusCodeConfig { + codes: vec![StatusCodeEntry::Single(429)], + strategy: RetryStrategy::DifferentProvider, + max_attempts: 2, + }], + on_timeout: None, + on_high_latency: None, + backoff: None, + retry_after_handling: Some(RetryAfterHandlingConfig { + scope: BlockScope::Model, + apply_to: ApplyTo::Global, + max_retry_after_seconds: 300, + }), + max_retry_duration_ms: Some(1), + }; + + let sig = RequestSignature::new( + b"test body", + &hyper::HeaderMap::new(), + false, + "openai/gpt-4o".to_string(), + ); + let mut ctx = make_context("test-ra-malformed"); + let body = Bytes::from("test body"); + + let _result = orchestrator + .execute( + &body, + &sig, + &all_providers[0], + &policy, + &all_providers, + &mut ctx, + |_body, _provider| async { + // 429 with malformed Retry-After + Ok(make_response_with_headers( + 429, + vec![("retry-after", "not-a-number")], + )) + }, + ) + .await; + + // No state should be recorded for malformed values + assert!( + !orchestrator.retry_after_state.is_blocked("openai/gpt-4o"), + "No state should be recorded when Retry-After header is malformed" + ); + } + + #[tokio::test] + async fn test_retry_after_default_config_when_retry_after_handling_omitted() { + // When retry_after_handling is None, effective_retry_after_config() returns + // defaults (scope: Model, apply_to: Global, max: 300). The orchestrator + // should still record state using these defaults. + let orchestrator = RetryOrchestrator::new_default(); + let primary = make_provider("openai/gpt-4o"); + let secondary = make_provider("anthropic/claude-3"); + let all_providers = vec![primary.clone(), secondary.clone()]; + + let policy = RetryPolicy { + fallback_models: vec!["anthropic/claude-3".to_string()], + default_strategy: RetryStrategy::DifferentProvider, + default_max_attempts: 2, + on_status_codes: vec![StatusCodeConfig { + codes: vec![StatusCodeEntry::Single(429)], + strategy: RetryStrategy::DifferentProvider, + max_attempts: 2, + }], + on_timeout: None, + on_high_latency: None, + backoff: None, + retry_after_handling: None, // Omitted — defaults apply + max_retry_duration_ms: Some(1), + }; + + let sig = RequestSignature::new( + b"test body", + &hyper::HeaderMap::new(), + false, + "openai/gpt-4o".to_string(), + ); + let mut ctx = make_context("test-ra-defaults"); + let body = Bytes::from("test body"); + + let _result = orchestrator + .execute( + &body, + &sig, + &all_providers[0], + &policy, + &all_providers, + &mut ctx, + |_body, _provider| async { + Ok(make_response_with_headers(429, vec![("retry-after", "10")])) + }, + ) + .await; + + // Default config: scope=Model, apply_to=Global + // So the model ID should be blocked globally + assert!( + orchestrator.retry_after_state.is_blocked("openai/gpt-4o"), + "Model should be blocked with default retry_after config (scope: Model, apply_to: Global)" + ); + } + + // ── Task 23.2: High latency handling tests ───────────────────────────── + + fn high_latency_retry_policy(threshold_ms: u64) -> RetryPolicy { + RetryPolicy { + fallback_models: vec!["anthropic/claude-3".to_string()], + default_strategy: RetryStrategy::DifferentProvider, + default_max_attempts: 2, + on_status_codes: vec![], + on_timeout: Some(TimeoutRetryConfig { + strategy: RetryStrategy::DifferentProvider, + max_attempts: 2, + }), + on_high_latency: Some(HighLatencyConfig { + threshold_ms, + measure: LatencyMeasure::Ttfb, + min_triggers: 1, + trigger_window_seconds: Some(60), + strategy: RetryStrategy::DifferentProvider, + max_attempts: 2, + block_duration_seconds: 300, + scope: BlockScope::Model, + apply_to: ApplyTo::Global, + }), + backoff: None, + retry_after_handling: None, + max_retry_duration_ms: None, + } + } + + #[tokio::test] + async fn test_high_latency_completed_response_delivered_and_block_state_created() { + // When a response completes but exceeds the latency threshold, + // the response should be delivered to the client AND a block state + // should be created for future requests. + let orchestrator = RetryOrchestrator::new_default(); + let primary = make_provider("openai/gpt-4o"); + let secondary = make_provider("anthropic/claude-3"); + let all_providers = vec![primary.clone(), secondary.clone()]; + + // threshold_ms=100, so any response taking >100ms is "slow" + // But since our mock returns instantly, we need the ErrorDetector to + // classify based on elapsed time. The mock returns 200 OK, and the + // ErrorDetector will see elapsed_ttfb_ms > threshold_ms. + // However, in the test the elapsed time is near-zero. + // We need to use a threshold of 0 so that any response triggers it. + let policy = high_latency_retry_policy(0); + + let sig = RequestSignature::new( + b"test body", + &hyper::HeaderMap::new(), + false, + "openai/gpt-4o".to_string(), + ); + let mut ctx = make_context("test-hl-completed"); + let body = Bytes::from("test body"); + + // The mock returns 200 OK. With threshold_ms=0, any elapsed time > 0 + // will trigger HighLatencyEvent with response: Some(resp). + // But elapsed_ttfb_ms is measured as 0 in fast tests, so we need + // threshold_ms=0 and the classify logic checks measured_ms > threshold_ms. + // 0 > 0 is false, so we need to add a small delay. + let result = orchestrator + .execute( + &body, + &sig, + &all_providers[0], + &policy, + &all_providers, + &mut ctx, + |_body, _provider| async { + // Small delay to ensure elapsed > 0 + tokio::time::sleep(std::time::Duration::from_millis(2)).await; + Ok(make_response(200)) + }, + ) + .await; + + // Response should be delivered successfully + assert!( + result.is_ok(), + "Completed-but-slow response should be delivered to client" + ); + let resp = result.unwrap(); + assert_eq!(resp.status().as_u16(), 200); + + // Block state should be created (min_triggers=1, so first event triggers block) + assert!( + orchestrator.latency_block_state.is_blocked("openai/gpt-4o"), + "Latency block state should be created for the slow model" + ); + } + + #[tokio::test] + async fn test_high_latency_completed_response_block_state_provider_scope() { + // When scope is "provider", the block should use the provider prefix. + let orchestrator = RetryOrchestrator::new_default(); + let primary = make_provider("openai/gpt-4o"); + let secondary = make_provider("anthropic/claude-3"); + let all_providers = vec![primary.clone(), secondary.clone()]; + + let mut policy = high_latency_retry_policy(0); + policy.on_high_latency.as_mut().unwrap().scope = BlockScope::Provider; + + let sig = RequestSignature::new( + b"test body", + &hyper::HeaderMap::new(), + false, + "openai/gpt-4o".to_string(), + ); + let mut ctx = make_context("test-hl-provider-scope"); + let body = Bytes::from("test body"); + + let result = orchestrator + .execute( + &body, + &sig, + &all_providers[0], + &policy, + &all_providers, + &mut ctx, + |_body, _provider| async { + tokio::time::sleep(std::time::Duration::from_millis(2)).await; + Ok(make_response(200)) + }, + ) + .await; + + assert!(result.is_ok()); + + // Provider prefix "openai" should be blocked, not the full model ID + assert!( + orchestrator.latency_block_state.is_blocked("openai"), + "Provider prefix should be blocked when scope is Provider" + ); + assert!( + !orchestrator.latency_block_state.is_blocked("openai/gpt-4o"), + "Full model ID should not be directly blocked when scope is Provider" + ); + } + + #[tokio::test] + async fn test_high_latency_completed_response_request_scoped_block() { + // When apply_to is "request", block state should be in RequestContext. + let orchestrator = RetryOrchestrator::new_default(); + let primary = make_provider("openai/gpt-4o"); + let secondary = make_provider("anthropic/claude-3"); + let all_providers = vec![primary.clone(), secondary.clone()]; + + let mut policy = high_latency_retry_policy(0); + policy.on_high_latency.as_mut().unwrap().apply_to = ApplyTo::Request; + + let sig = RequestSignature::new( + b"test body", + &hyper::HeaderMap::new(), + false, + "openai/gpt-4o".to_string(), + ); + let mut ctx = make_context("test-hl-request-scope"); + let body = Bytes::from("test body"); + + let result = orchestrator + .execute( + &body, + &sig, + &all_providers[0], + &policy, + &all_providers, + &mut ctx, + |_body, _provider| async { + tokio::time::sleep(std::time::Duration::from_millis(2)).await; + Ok(make_response(200)) + }, + ) + .await; + + assert!(result.is_ok()); + + // Block should be in request context, not global + assert!( + !orchestrator.latency_block_state.is_blocked("openai/gpt-4o"), + "Global state should NOT be blocked when apply_to is Request" + ); + assert!( + ctx.request_latency_block_state + .contains_key("openai/gpt-4o"), + "Request-scoped latency block state should be recorded" + ); + } + + #[tokio::test] + async fn test_high_latency_without_response_triggers_retry() { + // When HighLatencyEvent has no completed response (response: None), + // the orchestrator should trigger retry and record the latency event. + // This scenario happens when TTFB exceeds threshold but response hasn't completed. + // In practice, this is simulated by the ErrorDetector returning HighLatencyEvent + // with response: None. Since our ErrorDetector always returns response: Some for + // 2xx, we test this indirectly through the retry loop behavior. + // + // For a direct test, we'd need a custom ErrorDetector. Instead, we verify + // that the retry loop handles HighLatencyEvent without response by checking + // that it falls through to retry logic (the attempt is recorded as an error). + let orchestrator = RetryOrchestrator::new_default(); + let primary = make_provider("openai/gpt-4o"); + let secondary = make_provider("anthropic/claude-3"); + let all_providers = vec![primary.clone(), secondary.clone()]; + + let policy = high_latency_retry_policy(0); + + let sig = RequestSignature::new( + b"test body", + &hyper::HeaderMap::new(), + false, + "openai/gpt-4o".to_string(), + ); + let mut ctx = make_context("test-hl-no-response"); + let body = Bytes::from("test body"); + + let call_count = std::sync::Arc::new(std::sync::atomic::AtomicU32::new(0)); + let call_count_clone = call_count.clone(); + + let result = orchestrator + .execute( + &body, + &sig, + &all_providers[0], + &policy, + &all_providers, + &mut ctx, + move |_body, provider| { + let count = call_count_clone.clone(); + let _model = provider.model.clone().unwrap_or_default(); + async move { + let n = count.fetch_add(1, std::sync::atomic::Ordering::SeqCst); + if n == 0 { + // First call: slow response (200 OK but exceeds threshold) + tokio::time::sleep(std::time::Duration::from_millis(2)).await; + Ok(make_response(200)) + } else { + // Second call: fast success + Ok(make_response(200)) + } + } + }, + ) + .await; + + // The first response is completed-but-slow, so it's delivered directly. + // The block state should still be recorded. + assert!(result.is_ok()); + assert!( + orchestrator.latency_block_state.is_blocked("openai/gpt-4o"), + "Block state should be recorded even when completed response is delivered" + ); + } + + #[tokio::test] + async fn test_timeout_dual_classification_records_high_latency_event() { + // When a request times out AND on_high_latency is configured AND + // elapsed time exceeds threshold_ms, the orchestrator should: + // 1. Use TimeoutError for retry purposes + // 2. Also record a HighLatencyEvent for blocking purposes + let orchestrator = RetryOrchestrator::new_default(); + let primary = make_provider("openai/gpt-4o"); + let secondary = make_provider("anthropic/claude-3"); + let all_providers = vec![primary.clone(), secondary.clone()]; + + // threshold_ms=50, timeout will report duration_ms > 50 + let policy = high_latency_retry_policy(50); + + let sig = RequestSignature::new( + b"test body", + &hyper::HeaderMap::new(), + false, + "openai/gpt-4o".to_string(), + ); + let mut ctx = make_context("test-timeout-dual"); + let body = Bytes::from("test body"); + + let call_count = std::sync::Arc::new(std::sync::atomic::AtomicU32::new(0)); + let call_count_clone = call_count.clone(); + + let result = orchestrator + .execute( + &body, + &sig, + &all_providers[0], + &policy, + &all_providers, + &mut ctx, + move |_body, _provider| { + let count = call_count_clone.clone(); + async move { + let n = count.fetch_add(1, std::sync::atomic::Ordering::SeqCst); + if n == 0 { + // First call: timeout after 100ms (exceeds threshold of 50ms) + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + Err(TimeoutError { duration_ms: 100 }) + } else { + // Second call: success + Ok(make_response(200)) + } + } + }, + ) + .await; + + // Should succeed on retry + assert!(result.is_ok(), "Should succeed on retry after timeout"); + + // The timeout should have also recorded a latency block + // because duration_ms (100) > threshold_ms (50) + assert!( + orchestrator.latency_block_state.is_blocked("openai/gpt-4o"), + "Latency block state should be created via dual-classification (timeout + high latency)" + ); + + // The attempt error should be recorded as a Timeout (not HighLatency) + assert!( + ctx.errors + .iter() + .any(|e| matches!(e.error_type, AttemptErrorType::Timeout { .. })), + "The attempt should be recorded as a Timeout error" + ); + } + + #[tokio::test] + async fn test_timeout_no_dual_classification_when_below_threshold() { + // When a request times out but elapsed time is below threshold_ms, + // no HighLatencyEvent should be recorded for blocking. + let orchestrator = RetryOrchestrator::new_default(); + let primary = make_provider("openai/gpt-4o"); + let secondary = make_provider("anthropic/claude-3"); + let all_providers = vec![primary.clone(), secondary.clone()]; + + // threshold_ms=5000, timeout will report duration_ms=10 (below threshold) + let policy = high_latency_retry_policy(5000); + + let sig = RequestSignature::new( + b"test body", + &hyper::HeaderMap::new(), + false, + "openai/gpt-4o".to_string(), + ); + let mut ctx = make_context("test-timeout-no-dual"); + let body = Bytes::from("test body"); + + let call_count = std::sync::Arc::new(std::sync::atomic::AtomicU32::new(0)); + let call_count_clone = call_count.clone(); + + let result = orchestrator + .execute( + &body, + &sig, + &all_providers[0], + &policy, + &all_providers, + &mut ctx, + move |_body, _provider| { + let count = call_count_clone.clone(); + async move { + let n = count.fetch_add(1, std::sync::atomic::Ordering::SeqCst); + if n == 0 { + // Timeout with short duration (below threshold) + Err(TimeoutError { duration_ms: 10 }) + } else { + Ok(make_response(200)) + } + } + }, + ) + .await; + + assert!(result.is_ok()); + + // No latency block should be created since timeout duration < threshold + assert!( + !orchestrator.latency_block_state.is_blocked("openai/gpt-4o"), + "No latency block should be created when timeout duration is below threshold" + ); + } + + #[tokio::test] + async fn test_high_latency_min_triggers_not_met_no_block() { + // When min_triggers > 1 and only 1 event occurs, no block should be created. + let orchestrator = RetryOrchestrator::new_default(); + let primary = make_provider("openai/gpt-4o"); + let secondary = make_provider("anthropic/claude-3"); + let all_providers = vec![primary.clone(), secondary.clone()]; + + let mut policy = high_latency_retry_policy(0); + policy.on_high_latency.as_mut().unwrap().min_triggers = 3; + + let sig = RequestSignature::new( + b"test body", + &hyper::HeaderMap::new(), + false, + "openai/gpt-4o".to_string(), + ); + let mut ctx = make_context("test-hl-min-triggers"); + let body = Bytes::from("test body"); + + let result = orchestrator + .execute( + &body, + &sig, + &all_providers[0], + &policy, + &all_providers, + &mut ctx, + |_body, _provider| async { + tokio::time::sleep(std::time::Duration::from_millis(2)).await; + Ok(make_response(200)) + }, + ) + .await; + + assert!(result.is_ok()); + + // Only 1 event recorded, but min_triggers=3, so no block + assert!( + !orchestrator.latency_block_state.is_blocked("openai/gpt-4o"), + "No block should be created when min_triggers threshold is not met" + ); + } + + #[tokio::test] + async fn test_timeout_dual_classification_provider_scope() { + // Dual-classification with provider scope should block the provider prefix. + let orchestrator = RetryOrchestrator::new_default(); + let primary = make_provider("openai/gpt-4o"); + let secondary = make_provider("anthropic/claude-3"); + let all_providers = vec![primary.clone(), secondary.clone()]; + + let mut policy = high_latency_retry_policy(50); + policy.on_high_latency.as_mut().unwrap().scope = BlockScope::Provider; + + let sig = RequestSignature::new( + b"test body", + &hyper::HeaderMap::new(), + false, + "openai/gpt-4o".to_string(), + ); + let mut ctx = make_context("test-timeout-dual-provider"); + let body = Bytes::from("test body"); + + let call_count = std::sync::Arc::new(std::sync::atomic::AtomicU32::new(0)); + let call_count_clone = call_count.clone(); + + let result = orchestrator + .execute( + &body, + &sig, + &all_providers[0], + &policy, + &all_providers, + &mut ctx, + move |_body, _provider| { + let count = call_count_clone.clone(); + async move { + let n = count.fetch_add(1, std::sync::atomic::Ordering::SeqCst); + if n == 0 { + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + Err(TimeoutError { duration_ms: 100 }) + } else { + Ok(make_response(200)) + } + } + }, + ) + .await; + + assert!(result.is_ok()); + + // Provider prefix should be blocked + assert!( + orchestrator.latency_block_state.is_blocked("openai"), + "Provider prefix should be blocked via dual-classification" + ); + } + + // ── P2 Edge Case: successful request below threshold does NOT remove block ── + + #[tokio::test] + async fn test_successful_request_below_threshold_does_not_remove_latency_block() { + // Design Decision 9: A successful request with latency below the threshold + // does NOT remove an existing Latency_Block_State entry. Blocks expire only + // via their configured block_duration_seconds. + let orchestrator = RetryOrchestrator::new_default(); + let primary = make_provider("openai/gpt-4o"); + let secondary = make_provider("anthropic/claude-3"); + let all_providers = vec![primary.clone(), secondary.clone()]; + + // Pre-create a latency block for the primary model (simulating a previous + // high latency event that triggered a block). + orchestrator + .latency_block_state + .record_block("openai/gpt-4o", 300, 6000); + assert!( + orchestrator.latency_block_state.is_blocked("openai/gpt-4o"), + "Pre-condition: model should be blocked" + ); + + // Now send a request with a high latency config that has a high threshold. + // The response will be fast (below threshold), so no new HighLatencyEvent + // should be triggered. The existing block must remain. + let policy = high_latency_retry_policy(99999); // very high threshold — response will be fast + + let sig = RequestSignature::new( + b"test body", + &hyper::HeaderMap::new(), + false, + "openai/gpt-4o".to_string(), + ); + let mut ctx = make_context("test-block-not-removed"); + let body = Bytes::from("test body"); + + // The primary is blocked, so the orchestrator should route to the secondary. + // The secondary returns 200 quickly (below threshold). + let result = orchestrator + .execute( + &body, + &sig, + &all_providers[0], + &policy, + &all_providers, + &mut ctx, + |_body, _provider| async { Ok(make_response(200)) }, + ) + .await; + + assert!( + result.is_ok(), + "Request should succeed via secondary provider" + ); + + // The existing block on the primary model must still be present. + // A successful fast request must NOT remove the block (Design Decision 9). + assert!( + orchestrator.latency_block_state.is_blocked("openai/gpt-4o"), + "Latency block must NOT be removed by a successful request below threshold" + ); + } + + // Feature: retry-on-ratelimit, Property 20: Completed High-Latency Response Delivered + // **Validates: Requirements 2a.17, 2a.18, 3.4** + proptest! { + #![proptest_config(ProptestConfig::with_cases(100))] + + /// Property 20: For any request that completes successfully but exceeds + /// the latency threshold, the completed response must be delivered to the + /// client (no retry for the current request). However, a Latency_Block_State + /// entry must still be created (if min_triggers threshold is met) so future + /// requests skip the slow model/provider. + #[test] + fn prop_completed_high_latency_response_delivered( + min_triggers in 1u32..=3u32, + block_duration_seconds in 1u64..=600u64, + scope in prop_oneof![Just(BlockScope::Model), Just(BlockScope::Provider)], + apply_to in prop_oneof![Just(ApplyTo::Global), Just(ApplyTo::Request)], + measure in prop_oneof![Just(LatencyMeasure::Ttfb), Just(LatencyMeasure::Total)], + ) { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + + rt.block_on(async { + let orchestrator = RetryOrchestrator::new_default(); + let primary = make_provider("openai/gpt-4o"); + let secondary = make_provider("anthropic/claude-3"); + let all_providers = vec![primary.clone(), secondary.clone()]; + + // Use threshold_ms=0 so any elapsed time > 0 triggers HighLatencyEvent. + let policy = RetryPolicy { + fallback_models: vec!["anthropic/claude-3".to_string()], + default_strategy: RetryStrategy::DifferentProvider, + default_max_attempts: 2, + on_status_codes: vec![], + on_timeout: None, + on_high_latency: Some(HighLatencyConfig { + threshold_ms: 0, + measure, + min_triggers, + trigger_window_seconds: Some(60), + strategy: RetryStrategy::DifferentProvider, + max_attempts: 2, + block_duration_seconds, + scope, + apply_to, + }), + backoff: None, + retry_after_handling: None, + max_retry_duration_ms: None, + }; + + let sig = RequestSignature::new( + b"test body", + &hyper::HeaderMap::new(), + false, + "openai/gpt-4o".to_string(), + ); + let body = Bytes::from("test body"); + + // Send min_triggers requests so the trigger counter is met. + // Each request should return Ok(200) since the response completed. + for i in 0..min_triggers { + let mut ctx = RequestContext { + request_id: format!("test-prop20-{}", i), + attempted_providers: HashSet::new(), + retry_start_time: None, + attempt_number: 0, + request_retry_after_state: HashMap::new(), + request_latency_block_state: HashMap::new(), + request_signature: sig.clone(), + errors: vec![], + }; + + let result = orchestrator + .execute( + &body, + &sig, + &all_providers[0], + &policy, + &all_providers, + &mut ctx, + |_body, _provider| async { + // Small delay to ensure elapsed > 0 (threshold_ms=0) + tokio::time::sleep(std::time::Duration::from_millis(2)).await; + Ok(make_response(200)) + }, + ) + .await; + + // Response must always be delivered to the client + prop_assert!( + result.is_ok(), + "Completed-but-slow response must be delivered to client (attempt {})", + i + 1 + ); + let resp = result.unwrap(); + prop_assert_eq!( + resp.status().as_u16(), + 200u16, + "Response status must be 200 (attempt {})", + i + 1 + ); + + // After the last request that meets min_triggers, check block state + if i + 1 == min_triggers { + let expected_identifier = match scope { + BlockScope::Model => "openai/gpt-4o".to_string(), + BlockScope::Provider => "openai".to_string(), + }; + + match apply_to { + ApplyTo::Global => { + prop_assert!( + orchestrator.latency_block_state.is_blocked(&expected_identifier), + "Global block state should be created for '{}' after {} triggers", + expected_identifier, + min_triggers + ); + } + ApplyTo::Request => { + // Request-scoped block is stored in the RequestContext, + // which is local to this request. Verify it was set. + prop_assert!( + ctx.request_latency_block_state.contains_key(&expected_identifier), + "Request-scoped block state should be created for '{}' after {} triggers", + expected_identifier, + min_triggers + ); + // Global state should NOT be set for request-scoped blocks + prop_assert!( + !orchestrator.latency_block_state.is_blocked(&expected_identifier), + "Global block state should NOT be created when apply_to is Request" + ); + } + } + } + } + + Ok(()) + })?; + } + } +}