mirror of
https://github.com/katanemo/plano.git
synced 2026-06-08 14:55:14 +02:00
retry: add state managers for latency blocking and retry-after
Add three state management components: - LatencyBlockStateManager: tracks providers blocked due to high latency with configurable block duration and scope - LatencyTriggerCounter: counts consecutive latency threshold breaches before triggering provider blocking - RetryAfterStateManager: honors Retry-After headers with per-provider/model/endpoint blocking scope Signed-off-by: Troy Mitchell <i@troy-y.org>
This commit is contained in:
parent
47a3e8a8e6
commit
46b6324819
3 changed files with 1115 additions and 0 deletions
375
crates/common/src/retry/latency_block_state.rs
Normal file
375
crates/common/src/retry/latency_block_state.rs
Normal file
|
|
@ -0,0 +1,375 @@
|
|||
use std::time::{Duration, Instant};
|
||||
|
||||
use dashmap::DashMap;
|
||||
use log::info;
|
||||
|
||||
use crate::configuration::{extract_provider, BlockScope};
|
||||
|
||||
/// Thread-safe global state manager for latency-based blocking.
|
||||
///
|
||||
/// Blocks expire only via `block_duration_seconds` — successful requests
|
||||
/// do NOT remove existing blocks. There is no `remove_block()` method.
|
||||
///
|
||||
/// This manager handles ONLY global state (`apply_to: "global"`).
|
||||
/// Request-scoped state (`apply_to: "request"`) is stored in
|
||||
/// `RequestContext.request_latency_block_state` and managed by the orchestrator.
|
||||
///
|
||||
/// Entries use max-expiration semantics: if a new block is recorded for an
|
||||
/// identifier that already has an entry, the expiration is updated only if
|
||||
/// the new expiration is later than the existing one.
|
||||
pub struct LatencyBlockStateManager {
|
||||
/// Global state: identifier (model ID or provider prefix) -> (expiration timestamp, measured_latency_ms)
|
||||
global_state: DashMap<String, (Instant, u64)>,
|
||||
}
|
||||
|
||||
impl LatencyBlockStateManager {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
global_state: DashMap::new(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Record a latency block after min_triggers threshold is met.
|
||||
///
|
||||
/// If an entry already exists for the identifier, updates only if the new
|
||||
/// expiration is later than the existing one (max-expiration semantics).
|
||||
/// The `measured_latency_ms` is always updated to the latest value when
|
||||
/// the expiration is extended.
|
||||
pub fn record_block(
|
||||
&self,
|
||||
identifier: &str,
|
||||
block_duration_seconds: u64,
|
||||
measured_latency_ms: u64,
|
||||
) {
|
||||
let new_expiration = Instant::now() + Duration::from_secs(block_duration_seconds);
|
||||
|
||||
self.global_state
|
||||
.entry(identifier.to_string())
|
||||
.and_modify(|existing| {
|
||||
if new_expiration > existing.0 {
|
||||
existing.0 = new_expiration;
|
||||
existing.1 = measured_latency_ms;
|
||||
}
|
||||
})
|
||||
.or_insert((new_expiration, measured_latency_ms));
|
||||
}
|
||||
|
||||
/// Check if an identifier is currently blocked.
|
||||
///
|
||||
/// Lazily cleans up expired entries.
|
||||
pub fn is_blocked(&self, identifier: &str) -> bool {
|
||||
if let Some(entry) = self.global_state.get(identifier) {
|
||||
if Instant::now() < entry.0 {
|
||||
return true;
|
||||
}
|
||||
// Entry expired — drop the read guard before removing
|
||||
drop(entry);
|
||||
self.global_state.remove(identifier);
|
||||
info!("Latency_Block_State expired: identifier={}", identifier);
|
||||
info!("metric.latency_block_expired: model={}", identifier);
|
||||
}
|
||||
false
|
||||
}
|
||||
|
||||
/// Get remaining block duration for an identifier, if blocked.
|
||||
///
|
||||
/// Returns `None` if the identifier is not blocked or the entry has expired.
|
||||
/// Lazily cleans up expired entries.
|
||||
pub fn remaining_block_duration(&self, identifier: &str) -> Option<Duration> {
|
||||
if let Some(entry) = self.global_state.get(identifier) {
|
||||
let now = Instant::now();
|
||||
if now < entry.0 {
|
||||
return Some(entry.0 - now);
|
||||
}
|
||||
// Entry expired — drop the read guard before removing
|
||||
drop(entry);
|
||||
self.global_state.remove(identifier);
|
||||
info!("Latency_Block_State expired: identifier={}", identifier);
|
||||
info!("metric.latency_block_expired: model={}", identifier);
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
/// Check if a model is blocked, considering scope (model or provider).
|
||||
///
|
||||
/// - `BlockScope::Model`: checks if the exact `model_id` is blocked.
|
||||
/// - `BlockScope::Provider`: extracts the provider prefix from `model_id`
|
||||
/// and checks if that prefix is blocked.
|
||||
pub fn is_model_blocked(&self, model_id: &str, scope: BlockScope) -> bool {
|
||||
match scope {
|
||||
BlockScope::Model => self.is_blocked(model_id),
|
||||
BlockScope::Provider => {
|
||||
let provider = extract_provider(model_id);
|
||||
self.is_blocked(provider)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for LatencyBlockStateManager {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use std::thread;
|
||||
use std::time::Duration;
|
||||
|
||||
#[test]
|
||||
fn test_new_manager_has_no_blocks() {
|
||||
let mgr = LatencyBlockStateManager::new();
|
||||
assert!(!mgr.is_blocked("openai/gpt-4o"));
|
||||
assert!(mgr.remaining_block_duration("openai/gpt-4o").is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_record_block_and_is_blocked() {
|
||||
let mgr = LatencyBlockStateManager::new();
|
||||
mgr.record_block("openai/gpt-4o", 60, 5500);
|
||||
assert!(mgr.is_blocked("openai/gpt-4o"));
|
||||
assert!(!mgr.is_blocked("anthropic/claude"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_remaining_block_duration() {
|
||||
let mgr = LatencyBlockStateManager::new();
|
||||
mgr.record_block("openai/gpt-4o", 10, 5000);
|
||||
let remaining = mgr.remaining_block_duration("openai/gpt-4o").unwrap();
|
||||
assert!(remaining <= Duration::from_secs(11));
|
||||
assert!(remaining > Duration::from_secs(8));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_expired_entry_cleaned_up_on_is_blocked() {
|
||||
let mgr = LatencyBlockStateManager::new();
|
||||
mgr.record_block("openai/gpt-4o", 0, 5000);
|
||||
thread::sleep(Duration::from_millis(10));
|
||||
assert!(!mgr.is_blocked("openai/gpt-4o"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_expired_entry_cleaned_up_on_remaining() {
|
||||
let mgr = LatencyBlockStateManager::new();
|
||||
mgr.record_block("openai/gpt-4o", 0, 5000);
|
||||
thread::sleep(Duration::from_millis(10));
|
||||
assert!(mgr.remaining_block_duration("openai/gpt-4o").is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_max_expiration_semantics_longer_wins() {
|
||||
let mgr = LatencyBlockStateManager::new();
|
||||
mgr.record_block("openai/gpt-4o", 10, 5000);
|
||||
let first_remaining = mgr.remaining_block_duration("openai/gpt-4o").unwrap();
|
||||
|
||||
mgr.record_block("openai/gpt-4o", 60, 6000);
|
||||
let second_remaining = mgr.remaining_block_duration("openai/gpt-4o").unwrap();
|
||||
assert!(second_remaining > first_remaining);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_max_expiration_semantics_shorter_does_not_overwrite() {
|
||||
let mgr = LatencyBlockStateManager::new();
|
||||
mgr.record_block("openai/gpt-4o", 60, 5000);
|
||||
let first_remaining = mgr.remaining_block_duration("openai/gpt-4o").unwrap();
|
||||
|
||||
mgr.record_block("openai/gpt-4o", 5, 6000);
|
||||
let second_remaining = mgr.remaining_block_duration("openai/gpt-4o").unwrap();
|
||||
// Should still be close to the original 60s
|
||||
assert!(second_remaining > Duration::from_secs(50));
|
||||
let diff = if first_remaining > second_remaining {
|
||||
first_remaining - second_remaining
|
||||
} else {
|
||||
second_remaining - first_remaining
|
||||
};
|
||||
assert!(diff < Duration::from_secs(2));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_is_model_blocked_model_scope() {
|
||||
let mgr = LatencyBlockStateManager::new();
|
||||
mgr.record_block("openai/gpt-4o", 60, 5000);
|
||||
|
||||
assert!(mgr.is_model_blocked("openai/gpt-4o", BlockScope::Model));
|
||||
assert!(!mgr.is_model_blocked("openai/gpt-4o-mini", BlockScope::Model));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_is_model_blocked_provider_scope() {
|
||||
let mgr = LatencyBlockStateManager::new();
|
||||
mgr.record_block("openai", 60, 5000);
|
||||
|
||||
assert!(mgr.is_model_blocked("openai/gpt-4o", BlockScope::Provider));
|
||||
assert!(mgr.is_model_blocked("openai/gpt-4o-mini", BlockScope::Provider));
|
||||
assert!(!mgr.is_model_blocked("anthropic/claude", BlockScope::Provider));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_multiple_identifiers_independent() {
|
||||
let mgr = LatencyBlockStateManager::new();
|
||||
mgr.record_block("openai/gpt-4o", 60, 5000);
|
||||
mgr.record_block("anthropic/claude", 30, 4000);
|
||||
|
||||
assert!(mgr.is_blocked("openai/gpt-4o"));
|
||||
assert!(mgr.is_blocked("anthropic/claude"));
|
||||
assert!(!mgr.is_blocked("azure/gpt-4o"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_record_block_stores_measured_latency() {
|
||||
let mgr = LatencyBlockStateManager::new();
|
||||
mgr.record_block("openai/gpt-4o", 60, 5500);
|
||||
|
||||
// Verify the entry exists and has the correct latency
|
||||
let entry = mgr.global_state.get("openai/gpt-4o").unwrap();
|
||||
assert_eq!(entry.1, 5500);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_latency_updated_when_expiration_extended() {
|
||||
let mgr = LatencyBlockStateManager::new();
|
||||
mgr.record_block("openai/gpt-4o", 10, 5000);
|
||||
|
||||
// Extend with longer duration and different latency
|
||||
mgr.record_block("openai/gpt-4o", 60, 7000);
|
||||
|
||||
let entry = mgr.global_state.get("openai/gpt-4o").unwrap();
|
||||
assert_eq!(entry.1, 7000);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_latency_not_updated_when_expiration_not_extended() {
|
||||
let mgr = LatencyBlockStateManager::new();
|
||||
mgr.record_block("openai/gpt-4o", 60, 5000);
|
||||
|
||||
// Shorter duration — should NOT update
|
||||
mgr.record_block("openai/gpt-4o", 5, 9000);
|
||||
|
||||
let entry = mgr.global_state.get("openai/gpt-4o").unwrap();
|
||||
// Latency should remain 5000 since expiration wasn't extended
|
||||
assert_eq!(entry.1, 5000);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_zero_duration_block_expires_immediately() {
|
||||
let mgr = LatencyBlockStateManager::new();
|
||||
mgr.record_block("openai/gpt-4o", 0, 5000);
|
||||
thread::sleep(Duration::from_millis(5));
|
||||
assert!(!mgr.is_blocked("openai/gpt-4o"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_default_trait() {
|
||||
let mgr = LatencyBlockStateManager::default();
|
||||
assert!(!mgr.is_blocked("anything"));
|
||||
}
|
||||
|
||||
// --- Property-based tests ---
|
||||
|
||||
use proptest::prelude::*;
|
||||
|
||||
fn arb_identifier() -> impl Strategy<Value = String> {
|
||||
prop_oneof![
|
||||
"[a-z]{3,8}/[a-z0-9\\-]{3,12}".prop_map(|s| s),
|
||||
"[a-z]{3,8}".prop_map(|s| s),
|
||||
]
|
||||
}
|
||||
|
||||
/// A single block recording: (block_duration_seconds, measured_latency_ms)
|
||||
fn arb_block_recording() -> impl Strategy<Value = (u64, u64)> {
|
||||
(1u64..=600, 100u64..=30_000)
|
||||
}
|
||||
|
||||
// Feature: retry-on-ratelimit, Property 22: Latency Block State Max Expiration Update
|
||||
// **Validates: Requirements 14.15**
|
||||
proptest! {
|
||||
#![proptest_config(ProptestConfig::with_cases(100))]
|
||||
|
||||
/// Property 22 – Case 1: After recording multiple blocks for the same identifier
|
||||
/// with different durations, the remaining block duration reflects the maximum
|
||||
/// duration recorded (max-expiration semantics).
|
||||
#[test]
|
||||
fn prop_latency_block_max_expiration_update(
|
||||
identifier in arb_identifier(),
|
||||
recordings in prop::collection::vec(arb_block_recording(), 2..=10),
|
||||
) {
|
||||
let mgr = LatencyBlockStateManager::new();
|
||||
|
||||
for &(duration, latency) in &recordings {
|
||||
mgr.record_block(&identifier, duration, latency);
|
||||
}
|
||||
|
||||
let max_duration = recordings.iter().map(|&(d, _)| d).max().unwrap();
|
||||
|
||||
// The identifier should still be blocked
|
||||
let remaining = mgr.remaining_block_duration(&identifier);
|
||||
prop_assert!(
|
||||
remaining.is_some(),
|
||||
"Identifier {} should be blocked after {} recordings (max_duration={}s)",
|
||||
identifier, recordings.len(), max_duration
|
||||
);
|
||||
|
||||
let remaining_secs = remaining.unwrap().as_secs();
|
||||
|
||||
// Remaining should be close to max_duration (allow 2s tolerance for execution time)
|
||||
prop_assert!(
|
||||
remaining_secs >= max_duration.saturating_sub(2),
|
||||
"Remaining {}s should reflect the max duration ({}s), not a smaller value. Recordings: {:?}",
|
||||
remaining_secs, max_duration, recordings
|
||||
);
|
||||
|
||||
prop_assert!(
|
||||
remaining_secs <= max_duration + 1,
|
||||
"Remaining {}s should not exceed max duration {}s + tolerance. Recordings: {:?}",
|
||||
remaining_secs, max_duration, recordings
|
||||
);
|
||||
}
|
||||
|
||||
/// Property 22 – Case 2: measured_latency_ms is updated when expiration is extended
|
||||
/// but NOT when a shorter duration is recorded.
|
||||
#[test]
|
||||
fn prop_latency_block_measured_latency_update_semantics(
|
||||
identifier in arb_identifier(),
|
||||
first_duration in 10u64..=300,
|
||||
first_latency in 100u64..=30_000,
|
||||
extra_duration in 1u64..=300,
|
||||
longer_latency in 100u64..=30_000,
|
||||
shorter_duration in 1u64..=9,
|
||||
shorter_latency in 100u64..=30_000,
|
||||
) {
|
||||
let mgr = LatencyBlockStateManager::new();
|
||||
|
||||
// Record initial block
|
||||
mgr.record_block(&identifier, first_duration, first_latency);
|
||||
{
|
||||
let entry = mgr.global_state.get(&identifier).unwrap();
|
||||
prop_assert_eq!(entry.1, first_latency);
|
||||
}
|
||||
|
||||
// Record a longer duration — latency SHOULD be updated
|
||||
let longer_duration = first_duration + extra_duration;
|
||||
mgr.record_block(&identifier, longer_duration, longer_latency);
|
||||
{
|
||||
let entry = mgr.global_state.get(&identifier).unwrap();
|
||||
prop_assert_eq!(
|
||||
entry.1, longer_latency,
|
||||
"Latency should be updated to {} when expiration is extended (duration {} > {})",
|
||||
longer_latency, longer_duration, first_duration
|
||||
);
|
||||
}
|
||||
|
||||
// Record a shorter duration — latency should NOT be updated
|
||||
mgr.record_block(&identifier, shorter_duration, shorter_latency);
|
||||
{
|
||||
let entry = mgr.global_state.get(&identifier).unwrap();
|
||||
prop_assert_eq!(
|
||||
entry.1, longer_latency,
|
||||
"Latency should remain {} (not {}) when shorter duration {} < {} doesn't extend expiration",
|
||||
longer_latency, shorter_latency, shorter_duration, longer_duration
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
230
crates/common/src/retry/latency_trigger.rs
Normal file
230
crates/common/src/retry/latency_trigger.rs
Normal file
|
|
@ -0,0 +1,230 @@
|
|||
use std::time::Instant;
|
||||
|
||||
use dashmap::DashMap;
|
||||
|
||||
/// Thread-safe sliding window counter for tracking High_Latency_Events.
|
||||
///
|
||||
/// Maintains per-identifier timestamps of latency events within a configurable
|
||||
/// sliding window. When the count of recent events meets or exceeds `min_triggers`,
|
||||
/// the caller should create a `Latency_Block_State` entry and then call `reset()`.
|
||||
pub struct LatencyTriggerCounter {
|
||||
/// model/provider identifier -> list of event timestamps within the window
|
||||
counters: DashMap<String, Vec<Instant>>,
|
||||
}
|
||||
|
||||
impl LatencyTriggerCounter {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
counters: DashMap::new(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Record a High_Latency_Event. Returns true if `min_triggers` threshold
|
||||
/// is now met (caller should create a Latency_Block_State).
|
||||
///
|
||||
/// Lazily discards events older than `trigger_window_seconds` before checking
|
||||
/// the count.
|
||||
pub fn record_event(
|
||||
&self,
|
||||
identifier: &str,
|
||||
min_triggers: u32,
|
||||
trigger_window_seconds: u64,
|
||||
) -> bool {
|
||||
let now = Instant::now();
|
||||
let window = std::time::Duration::from_secs(trigger_window_seconds);
|
||||
|
||||
let mut entry = self.counters.entry(identifier.to_string()).or_default();
|
||||
// Add current event
|
||||
entry.push(now);
|
||||
// Discard events older than the window
|
||||
entry.retain(|ts| now.duration_since(*ts) <= window);
|
||||
// Check threshold
|
||||
entry.len() >= min_triggers as usize
|
||||
}
|
||||
|
||||
/// Reset the counter for an identifier (called after a block is created
|
||||
/// to prevent re-triggering on the same events).
|
||||
pub fn reset(&self, identifier: &str) {
|
||||
if let Some(mut entry) = self.counters.get_mut(identifier) {
|
||||
entry.clear();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for LatencyTriggerCounter {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use std::thread::sleep;
|
||||
use std::time::Duration;
|
||||
|
||||
#[test]
|
||||
fn test_record_event_returns_true_when_threshold_met() {
|
||||
let counter = LatencyTriggerCounter::new();
|
||||
assert!(!counter.record_event("model-a", 3, 60));
|
||||
assert!(!counter.record_event("model-a", 3, 60));
|
||||
assert!(counter.record_event("model-a", 3, 60));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_record_event_single_trigger_always_fires() {
|
||||
let counter = LatencyTriggerCounter::new();
|
||||
assert!(counter.record_event("model-a", 1, 60));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_events_expire_outside_window() {
|
||||
let counter = LatencyTriggerCounter::new();
|
||||
// Record 2 events
|
||||
counter.record_event("model-a", 3, 1);
|
||||
counter.record_event("model-a", 3, 1);
|
||||
// Wait for them to expire
|
||||
sleep(Duration::from_millis(1100));
|
||||
// Third event should not meet threshold since previous two expired
|
||||
assert!(!counter.record_event("model-a", 3, 1));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_reset_clears_counter() {
|
||||
let counter = LatencyTriggerCounter::new();
|
||||
counter.record_event("model-a", 3, 60);
|
||||
counter.record_event("model-a", 3, 60);
|
||||
counter.reset("model-a");
|
||||
// After reset, need 3 fresh events again
|
||||
assert!(!counter.record_event("model-a", 3, 60));
|
||||
assert!(!counter.record_event("model-a", 3, 60));
|
||||
assert!(counter.record_event("model-a", 3, 60));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_reset_nonexistent_identifier_is_noop() {
|
||||
let counter = LatencyTriggerCounter::new();
|
||||
// Should not panic
|
||||
counter.reset("nonexistent");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_separate_identifiers_are_independent() {
|
||||
let counter = LatencyTriggerCounter::new();
|
||||
counter.record_event("model-a", 2, 60);
|
||||
counter.record_event("model-b", 2, 60);
|
||||
// model-a has 1 event, model-b has 1 event — neither at threshold of 2
|
||||
assert!(!counter.record_event("model-b", 3, 60));
|
||||
// model-a reaches threshold
|
||||
assert!(counter.record_event("model-a", 2, 60));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_threshold_exceeded_still_returns_true() {
|
||||
let counter = LatencyTriggerCounter::new();
|
||||
assert!(counter.record_event("model-a", 1, 60));
|
||||
// Already past threshold, still returns true
|
||||
assert!(counter.record_event("model-a", 1, 60));
|
||||
assert!(counter.record_event("model-a", 1, 60));
|
||||
}
|
||||
|
||||
// --- Property-based tests ---
|
||||
|
||||
use proptest::prelude::*;
|
||||
|
||||
// Feature: retry-on-ratelimit, Property 18: Latency Trigger Counter Sliding Window
|
||||
// **Validates: Requirements 2a.6, 2a.7, 2a.8, 2a.21, 14.1, 14.2, 14.3, 14.12**
|
||||
proptest! {
|
||||
#![proptest_config(ProptestConfig::with_cases(100))]
|
||||
|
||||
/// Property 18 – Case 1: Recording N events in quick succession (all within window)
|
||||
/// returns true iff N >= min_triggers.
|
||||
#[test]
|
||||
fn prop_sliding_window_threshold(
|
||||
min_triggers in 1u32..=10,
|
||||
trigger_window_seconds in 1u64..=60,
|
||||
num_events in 1u32..=20,
|
||||
) {
|
||||
let counter = LatencyTriggerCounter::new();
|
||||
let identifier = "test-model";
|
||||
|
||||
let mut last_result = false;
|
||||
for i in 1..=num_events {
|
||||
last_result = counter.record_event(identifier, min_triggers, trigger_window_seconds);
|
||||
// Before reaching threshold, should be false
|
||||
if i < min_triggers {
|
||||
prop_assert!(!last_result, "Expected false at event {} with min_triggers {}", i, min_triggers);
|
||||
} else {
|
||||
// At or past threshold, should be true
|
||||
prop_assert!(last_result, "Expected true at event {} with min_triggers {}", i, min_triggers);
|
||||
}
|
||||
}
|
||||
|
||||
// Final result should match whether we recorded enough events
|
||||
prop_assert_eq!(last_result, num_events >= min_triggers);
|
||||
}
|
||||
|
||||
/// Property 18 – Case 2: After reset, counter starts fresh and previous events
|
||||
/// do not count toward the threshold.
|
||||
#[test]
|
||||
fn prop_reset_clears_counter(
|
||||
min_triggers in 2u32..=10,
|
||||
trigger_window_seconds in 1u64..=60,
|
||||
events_before_reset in 1u32..=10,
|
||||
) {
|
||||
let counter = LatencyTriggerCounter::new();
|
||||
let identifier = "test-model";
|
||||
|
||||
// Record some events before reset
|
||||
for _ in 0..events_before_reset {
|
||||
counter.record_event(identifier, min_triggers, trigger_window_seconds);
|
||||
}
|
||||
|
||||
// Reset the counter
|
||||
counter.reset(identifier);
|
||||
|
||||
// After reset, a single event should not meet threshold (min_triggers >= 2)
|
||||
let result = counter.record_event(identifier, min_triggers, trigger_window_seconds);
|
||||
prop_assert!(!result, "After reset, first event should not meet threshold of {}", min_triggers);
|
||||
|
||||
// Need min_triggers - 1 more events to reach threshold again
|
||||
let mut final_result = result;
|
||||
for _ in 1..min_triggers {
|
||||
final_result = counter.record_event(identifier, min_triggers, trigger_window_seconds);
|
||||
}
|
||||
prop_assert!(final_result, "After reset + {} events, should meet threshold", min_triggers);
|
||||
}
|
||||
|
||||
/// Property 18 – Case 3: Different identifiers are independent — events for one
|
||||
/// identifier do not affect the count for another.
|
||||
#[test]
|
||||
fn prop_identifiers_independent(
|
||||
min_triggers in 1u32..=10,
|
||||
trigger_window_seconds in 1u64..=60,
|
||||
events_a in 1u32..=20,
|
||||
events_b in 1u32..=20,
|
||||
) {
|
||||
let counter = LatencyTriggerCounter::new();
|
||||
let id_a = "model-a";
|
||||
let id_b = "model-b";
|
||||
|
||||
// Record events for identifier A
|
||||
let mut result_a = false;
|
||||
for _ in 0..events_a {
|
||||
result_a = counter.record_event(id_a, min_triggers, trigger_window_seconds);
|
||||
}
|
||||
|
||||
// Record events for identifier B
|
||||
let mut result_b = false;
|
||||
for _ in 0..events_b {
|
||||
result_b = counter.record_event(id_b, min_triggers, trigger_window_seconds);
|
||||
}
|
||||
|
||||
// Each identifier's result depends only on its own event count
|
||||
prop_assert_eq!(result_a, events_a >= min_triggers,
|
||||
"id_a: events={}, min_triggers={}", events_a, min_triggers);
|
||||
prop_assert_eq!(result_b, events_b >= min_triggers,
|
||||
"id_b: events={}, min_triggers={}", events_b, min_triggers);
|
||||
}
|
||||
}
|
||||
} // mod tests
|
||||
510
crates/common/src/retry/retry_after_state.rs
Normal file
510
crates/common/src/retry/retry_after_state.rs
Normal file
|
|
@ -0,0 +1,510 @@
|
|||
use std::time::{Duration, Instant};
|
||||
|
||||
use dashmap::DashMap;
|
||||
use log::info;
|
||||
|
||||
use crate::configuration::{extract_provider, BlockScope};
|
||||
|
||||
/// Thread-safe global state manager for Retry-After header blocking.
|
||||
///
|
||||
/// This manager handles ONLY global state (`apply_to: "global"`).
|
||||
/// Request-scoped state (`apply_to: "request"`) is stored in
|
||||
/// `RequestContext.request_retry_after_state` and managed by the orchestrator.
|
||||
///
|
||||
/// Entries use max-expiration semantics: if a new Retry-After value is recorded
|
||||
/// for an identifier that already has an entry, the expiration is updated only
|
||||
/// if the new expiration is later than the existing one.
|
||||
pub struct RetryAfterStateManager {
|
||||
/// Global state: identifier (model ID or provider prefix) -> expiration timestamp
|
||||
global_state: DashMap<String, Instant>,
|
||||
}
|
||||
|
||||
impl RetryAfterStateManager {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
global_state: DashMap::new(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Record a Retry-After header, creating or updating the block entry.
|
||||
///
|
||||
/// The `retry_after_seconds` value is capped at `max_retry_after_seconds`.
|
||||
/// Uses max-expiration semantics: if an entry already exists, the expiration
|
||||
/// is updated only if the new expiration is later.
|
||||
pub fn record(&self, identifier: &str, retry_after_seconds: u64, max_retry_after_seconds: u64) {
|
||||
let capped = retry_after_seconds.min(max_retry_after_seconds);
|
||||
let new_expiration = Instant::now() + Duration::from_secs(capped);
|
||||
|
||||
self.global_state
|
||||
.entry(identifier.to_string())
|
||||
.and_modify(|existing| {
|
||||
if new_expiration > *existing {
|
||||
*existing = new_expiration;
|
||||
}
|
||||
})
|
||||
.or_insert(new_expiration);
|
||||
}
|
||||
|
||||
/// Check if an identifier is currently blocked.
|
||||
///
|
||||
/// Lazily cleans up expired entries.
|
||||
pub fn is_blocked(&self, identifier: &str) -> bool {
|
||||
if let Some(entry) = self.global_state.get(identifier) {
|
||||
if Instant::now() < *entry {
|
||||
return true;
|
||||
}
|
||||
// Entry expired — drop the read guard before removing
|
||||
drop(entry);
|
||||
self.global_state.remove(identifier);
|
||||
info!("Retry_After_State expired: identifier={}", identifier);
|
||||
}
|
||||
false
|
||||
}
|
||||
|
||||
/// Get remaining block duration for an identifier, if blocked.
|
||||
///
|
||||
/// Returns `None` if the identifier is not blocked or the entry has expired.
|
||||
/// Lazily cleans up expired entries.
|
||||
pub fn remaining_block_duration(&self, identifier: &str) -> Option<Duration> {
|
||||
if let Some(entry) = self.global_state.get(identifier) {
|
||||
let now = Instant::now();
|
||||
if now < *entry {
|
||||
return Some(*entry - now);
|
||||
}
|
||||
// Entry expired — drop the read guard before removing
|
||||
drop(entry);
|
||||
self.global_state.remove(identifier);
|
||||
info!("Retry_After_State expired: identifier={}", identifier);
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
/// Check if a model is blocked, considering scope (model or provider).
|
||||
///
|
||||
/// - `BlockScope::Model`: checks if the exact `model_id` is blocked.
|
||||
/// - `BlockScope::Provider`: extracts the provider prefix from `model_id`
|
||||
/// and checks if that prefix is blocked.
|
||||
pub fn is_model_blocked(&self, model_id: &str, scope: BlockScope) -> bool {
|
||||
match scope {
|
||||
BlockScope::Model => self.is_blocked(model_id),
|
||||
BlockScope::Provider => {
|
||||
let provider = extract_provider(model_id);
|
||||
self.is_blocked(provider)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for RetryAfterStateManager {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use std::thread;
|
||||
use std::time::Duration;
|
||||
|
||||
#[test]
|
||||
fn test_new_manager_has_no_blocks() {
|
||||
let mgr = RetryAfterStateManager::new();
|
||||
assert!(!mgr.is_blocked("openai/gpt-4o"));
|
||||
assert!(mgr.remaining_block_duration("openai/gpt-4o").is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_record_and_is_blocked() {
|
||||
let mgr = RetryAfterStateManager::new();
|
||||
mgr.record("openai/gpt-4o", 60, 300);
|
||||
assert!(mgr.is_blocked("openai/gpt-4o"));
|
||||
assert!(!mgr.is_blocked("anthropic/claude"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_record_caps_at_max() {
|
||||
let mgr = RetryAfterStateManager::new();
|
||||
// Retry-After of 600 seconds, but max is 300
|
||||
mgr.record("openai/gpt-4o", 600, 300);
|
||||
let remaining = mgr.remaining_block_duration("openai/gpt-4o").unwrap();
|
||||
// Should be capped at ~300 seconds (allow some tolerance)
|
||||
assert!(remaining <= Duration::from_secs(301));
|
||||
assert!(remaining > Duration::from_secs(298));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_remaining_block_duration() {
|
||||
let mgr = RetryAfterStateManager::new();
|
||||
mgr.record("openai/gpt-4o", 10, 300);
|
||||
let remaining = mgr.remaining_block_duration("openai/gpt-4o").unwrap();
|
||||
assert!(remaining <= Duration::from_secs(11));
|
||||
assert!(remaining > Duration::from_secs(8));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_expired_entry_cleaned_up_on_is_blocked() {
|
||||
let mgr = RetryAfterStateManager::new();
|
||||
// Record with 0 seconds — effectively expires immediately
|
||||
mgr.record("openai/gpt-4o", 0, 300);
|
||||
// Sleep briefly to ensure expiration
|
||||
thread::sleep(Duration::from_millis(10));
|
||||
assert!(!mgr.is_blocked("openai/gpt-4o"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_expired_entry_cleaned_up_on_remaining() {
|
||||
let mgr = RetryAfterStateManager::new();
|
||||
mgr.record("openai/gpt-4o", 0, 300);
|
||||
thread::sleep(Duration::from_millis(10));
|
||||
assert!(mgr.remaining_block_duration("openai/gpt-4o").is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_max_expiration_semantics_longer_wins() {
|
||||
let mgr = RetryAfterStateManager::new();
|
||||
mgr.record("openai/gpt-4o", 10, 300);
|
||||
let first_remaining = mgr.remaining_block_duration("openai/gpt-4o").unwrap();
|
||||
|
||||
// Record a longer duration — should update
|
||||
mgr.record("openai/gpt-4o", 60, 300);
|
||||
let second_remaining = mgr.remaining_block_duration("openai/gpt-4o").unwrap();
|
||||
assert!(second_remaining > first_remaining);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_max_expiration_semantics_shorter_does_not_overwrite() {
|
||||
let mgr = RetryAfterStateManager::new();
|
||||
mgr.record("openai/gpt-4o", 60, 300);
|
||||
let first_remaining = mgr.remaining_block_duration("openai/gpt-4o").unwrap();
|
||||
|
||||
// Record a shorter duration — should NOT overwrite
|
||||
mgr.record("openai/gpt-4o", 5, 300);
|
||||
let second_remaining = mgr.remaining_block_duration("openai/gpt-4o").unwrap();
|
||||
// The remaining should still be close to the original 60s
|
||||
assert!(second_remaining > Duration::from_secs(50));
|
||||
// Allow small timing variance
|
||||
let diff = if first_remaining > second_remaining {
|
||||
first_remaining - second_remaining
|
||||
} else {
|
||||
second_remaining - first_remaining
|
||||
};
|
||||
assert!(diff < Duration::from_secs(2));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_is_model_blocked_model_scope() {
|
||||
let mgr = RetryAfterStateManager::new();
|
||||
mgr.record("openai/gpt-4o", 60, 300);
|
||||
|
||||
assert!(mgr.is_model_blocked("openai/gpt-4o", BlockScope::Model));
|
||||
assert!(!mgr.is_model_blocked("openai/gpt-4o-mini", BlockScope::Model));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_is_model_blocked_provider_scope() {
|
||||
let mgr = RetryAfterStateManager::new();
|
||||
// Block at provider level by recording with provider prefix
|
||||
mgr.record("openai", 60, 300);
|
||||
|
||||
// Both openai models should be blocked
|
||||
assert!(mgr.is_model_blocked("openai/gpt-4o", BlockScope::Provider));
|
||||
assert!(mgr.is_model_blocked("openai/gpt-4o-mini", BlockScope::Provider));
|
||||
// Anthropic should not be blocked
|
||||
assert!(!mgr.is_model_blocked("anthropic/claude", BlockScope::Provider));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_model_scope_does_not_block_other_models() {
|
||||
let mgr = RetryAfterStateManager::new();
|
||||
mgr.record("openai/gpt-4o", 60, 300);
|
||||
|
||||
// Model scope: only exact match is blocked
|
||||
assert!(mgr.is_model_blocked("openai/gpt-4o", BlockScope::Model));
|
||||
assert!(!mgr.is_model_blocked("openai/gpt-4o-mini", BlockScope::Model));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_multiple_identifiers_independent() {
|
||||
let mgr = RetryAfterStateManager::new();
|
||||
mgr.record("openai/gpt-4o", 60, 300);
|
||||
mgr.record("anthropic/claude", 30, 300);
|
||||
|
||||
assert!(mgr.is_blocked("openai/gpt-4o"));
|
||||
assert!(mgr.is_blocked("anthropic/claude"));
|
||||
assert!(!mgr.is_blocked("azure/gpt-4o"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_record_with_zero_seconds() {
|
||||
let mgr = RetryAfterStateManager::new();
|
||||
mgr.record("openai/gpt-4o", 0, 300);
|
||||
// With 0 seconds, the entry expires at Instant::now() + 0,
|
||||
// which is effectively immediately
|
||||
thread::sleep(Duration::from_millis(5));
|
||||
assert!(!mgr.is_blocked("openai/gpt-4o"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_max_retry_after_seconds_zero_caps_to_zero() {
|
||||
let mgr = RetryAfterStateManager::new();
|
||||
// Even with retry_after_seconds=60, max=0 caps to 0
|
||||
mgr.record("openai/gpt-4o", 60, 0);
|
||||
thread::sleep(Duration::from_millis(5));
|
||||
assert!(!mgr.is_blocked("openai/gpt-4o"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_default_trait() {
|
||||
let mgr = RetryAfterStateManager::default();
|
||||
assert!(!mgr.is_blocked("anything"));
|
||||
}
|
||||
|
||||
// --- Proptest strategies ---
|
||||
|
||||
use proptest::prelude::*;
|
||||
|
||||
fn arb_provider_prefix() -> impl Strategy<Value = String> {
|
||||
prop_oneof![
|
||||
Just("openai".to_string()),
|
||||
Just("anthropic".to_string()),
|
||||
Just("azure".to_string()),
|
||||
Just("google".to_string()),
|
||||
Just("cohere".to_string()),
|
||||
]
|
||||
}
|
||||
|
||||
fn arb_model_suffix() -> impl Strategy<Value = String> {
|
||||
prop_oneof![
|
||||
Just("gpt-4o".to_string()),
|
||||
Just("gpt-4o-mini".to_string()),
|
||||
Just("claude-3".to_string()),
|
||||
Just("gemini-pro".to_string()),
|
||||
]
|
||||
}
|
||||
|
||||
fn arb_model_id() -> impl Strategy<Value = String> {
|
||||
(arb_provider_prefix(), arb_model_suffix())
|
||||
.prop_map(|(prefix, suffix)| format!("{}/{}", prefix, suffix))
|
||||
}
|
||||
|
||||
fn arb_scope() -> impl Strategy<Value = BlockScope> {
|
||||
prop_oneof![Just(BlockScope::Model), Just(BlockScope::Provider),]
|
||||
}
|
||||
|
||||
// Feature: retry-on-ratelimit, Property 15: Retry_After_State Scope Behavior
|
||||
// **Validates: Requirements 11.5, 11.6, 11.7, 11.8, 12.9, 12.10, 13.10, 13.11**
|
||||
proptest! {
|
||||
#![proptest_config(ProptestConfig::with_cases(100))]
|
||||
|
||||
/// Property 15 – Case 1: Model scope blocks only the exact model_id.
|
||||
#[test]
|
||||
fn prop_model_scope_blocks_exact_model_only(
|
||||
model_id in arb_model_id(),
|
||||
other_model_id in arb_model_id(),
|
||||
retry_after in 1u64..300,
|
||||
) {
|
||||
prop_assume!(model_id != other_model_id);
|
||||
|
||||
let mgr = RetryAfterStateManager::new();
|
||||
// Record with the exact model_id (model scope records the full model ID)
|
||||
mgr.record(&model_id, retry_after, 300);
|
||||
|
||||
// The exact model should be blocked
|
||||
prop_assert!(
|
||||
mgr.is_model_blocked(&model_id, BlockScope::Model),
|
||||
"Model {} should be blocked with Model scope after recording",
|
||||
model_id
|
||||
);
|
||||
|
||||
// A different model should NOT be blocked (even if same provider)
|
||||
prop_assert!(
|
||||
!mgr.is_model_blocked(&other_model_id, BlockScope::Model),
|
||||
"Model {} should NOT be blocked when {} was recorded with Model scope",
|
||||
other_model_id, model_id
|
||||
);
|
||||
}
|
||||
|
||||
/// Property 15 – Case 2: Provider scope blocks all models from the same provider.
|
||||
#[test]
|
||||
fn prop_provider_scope_blocks_all_same_provider_models(
|
||||
provider in arb_provider_prefix(),
|
||||
suffix1 in arb_model_suffix(),
|
||||
suffix2 in arb_model_suffix(),
|
||||
other_provider in arb_provider_prefix(),
|
||||
other_suffix in arb_model_suffix(),
|
||||
retry_after in 1u64..300,
|
||||
) {
|
||||
let model1 = format!("{}/{}", provider, suffix1);
|
||||
let model2 = format!("{}/{}", provider, suffix2);
|
||||
let other_model = format!("{}/{}", other_provider, other_suffix);
|
||||
prop_assume!(provider != other_provider);
|
||||
|
||||
let mgr = RetryAfterStateManager::new();
|
||||
// Record at provider level (provider scope records the provider prefix)
|
||||
mgr.record(&provider, retry_after, 300);
|
||||
|
||||
// Both models from the same provider should be blocked
|
||||
prop_assert!(
|
||||
mgr.is_model_blocked(&model1, BlockScope::Provider),
|
||||
"Model {} should be blocked with Provider scope after recording provider {}",
|
||||
model1, provider
|
||||
);
|
||||
prop_assert!(
|
||||
mgr.is_model_blocked(&model2, BlockScope::Provider),
|
||||
"Model {} should be blocked with Provider scope after recording provider {}",
|
||||
model2, provider
|
||||
);
|
||||
|
||||
// Model from a different provider should NOT be blocked
|
||||
prop_assert!(
|
||||
!mgr.is_model_blocked(&other_model, BlockScope::Provider),
|
||||
"Model {} should NOT be blocked when provider {} was recorded",
|
||||
other_model, provider
|
||||
);
|
||||
}
|
||||
|
||||
/// Property 15 – Case 3: Global state is visible across different "requests"
|
||||
/// (same manager instance is shared).
|
||||
#[test]
|
||||
fn prop_global_state_shared_across_requests(
|
||||
model_id in arb_model_id(),
|
||||
scope in arb_scope(),
|
||||
retry_after in 1u64..300,
|
||||
) {
|
||||
let mgr = RetryAfterStateManager::new();
|
||||
|
||||
// Determine the identifier to record based on scope
|
||||
let identifier = match scope {
|
||||
BlockScope::Model => model_id.clone(),
|
||||
BlockScope::Provider => extract_provider(&model_id).to_string(),
|
||||
};
|
||||
mgr.record(&identifier, retry_after, 300);
|
||||
|
||||
// Simulate "different requests" by checking from the same manager instance.
|
||||
// Global state means any check against the same manager sees the block.
|
||||
// Check 1 (simulating request A)
|
||||
let blocked_a = mgr.is_model_blocked(&model_id, scope);
|
||||
// Check 2 (simulating request B)
|
||||
let blocked_b = mgr.is_model_blocked(&model_id, scope);
|
||||
|
||||
prop_assert!(
|
||||
blocked_a && blocked_b,
|
||||
"Global state should be visible to all requests: request_a={}, request_b={}",
|
||||
blocked_a, blocked_b
|
||||
);
|
||||
}
|
||||
|
||||
/// Property 15 – Case 4: Request-scoped state (HashMap) is isolated per request.
|
||||
/// Two separate HashMaps don't share state.
|
||||
#[test]
|
||||
fn prop_request_scoped_state_isolated(
|
||||
model_id in arb_model_id(),
|
||||
retry_after in 1u64..300,
|
||||
) {
|
||||
use std::collections::HashMap;
|
||||
use std::time::Instant;
|
||||
|
||||
// Simulate request-scoped state using separate HashMaps
|
||||
// (as RequestContext.request_retry_after_state would be)
|
||||
let mut request_a_state: HashMap<String, Instant> = HashMap::new();
|
||||
let mut request_b_state: HashMap<String, Instant> = HashMap::new();
|
||||
|
||||
// Request A records a Retry-After entry
|
||||
let expiration = Instant::now() + Duration::from_secs(retry_after);
|
||||
request_a_state.insert(model_id.clone(), expiration);
|
||||
|
||||
// Request A should see the block
|
||||
let a_blocked = request_a_state
|
||||
.get(&model_id)
|
||||
.map_or(false, |exp| Instant::now() < *exp);
|
||||
|
||||
// Request B should NOT see the block (separate HashMap)
|
||||
let b_blocked = request_b_state
|
||||
.get(&model_id)
|
||||
.map_or(false, |exp| Instant::now() < *exp);
|
||||
|
||||
prop_assert!(
|
||||
a_blocked,
|
||||
"Request A should see its own block for {}",
|
||||
model_id
|
||||
);
|
||||
prop_assert!(
|
||||
!b_blocked,
|
||||
"Request B should NOT see Request A's block for {}",
|
||||
model_id
|
||||
);
|
||||
|
||||
// Recording in request B should not affect request A
|
||||
let expiration_b = Instant::now() + Duration::from_secs(retry_after);
|
||||
request_b_state.insert(model_id.clone(), expiration_b);
|
||||
|
||||
// Both should now be blocked independently
|
||||
let a_still_blocked = request_a_state
|
||||
.get(&model_id)
|
||||
.map_or(false, |exp| Instant::now() < *exp);
|
||||
let b_now_blocked = request_b_state
|
||||
.get(&model_id)
|
||||
.map_or(false, |exp| Instant::now() < *exp);
|
||||
|
||||
prop_assert!(a_still_blocked, "Request A should still be blocked");
|
||||
prop_assert!(b_now_blocked, "Request B should now be blocked independently");
|
||||
}
|
||||
}
|
||||
|
||||
// Feature: retry-on-ratelimit, Property 16: Retry_After_State Max Expiration Update
|
||||
// **Validates: Requirements 12.11**
|
||||
proptest! {
|
||||
#![proptest_config(ProptestConfig::with_cases(100))]
|
||||
|
||||
/// Property 16: Recording multiple Retry-After values for the same identifier
|
||||
/// should result in the expiration reflecting the maximum value, not the most recent.
|
||||
#[test]
|
||||
fn prop_max_expiration_update(
|
||||
identifier in arb_model_id(),
|
||||
// Generate 2..=10 Retry-After values, each between 1 and 600 seconds
|
||||
retry_after_values in prop::collection::vec(1u64..=600, 2..=10),
|
||||
max_cap in 300u64..=600,
|
||||
) {
|
||||
let mgr = RetryAfterStateManager::new();
|
||||
|
||||
// Record all values for the same identifier
|
||||
for &val in &retry_after_values {
|
||||
mgr.record(&identifier, val, max_cap);
|
||||
}
|
||||
|
||||
// The effective maximum is the max of all capped values
|
||||
let effective_max = retry_after_values
|
||||
.iter()
|
||||
.map(|&v| v.min(max_cap))
|
||||
.max()
|
||||
.unwrap();
|
||||
|
||||
// The remaining block duration should be close to the effective maximum
|
||||
let remaining = mgr.remaining_block_duration(&identifier);
|
||||
prop_assert!(
|
||||
remaining.is_some(),
|
||||
"Identifier {} should still be blocked after recording {} values (effective_max={}s)",
|
||||
identifier, retry_after_values.len(), effective_max
|
||||
);
|
||||
|
||||
let remaining_secs = remaining.unwrap().as_secs();
|
||||
|
||||
// The remaining duration should be within a reasonable tolerance of the
|
||||
// effective maximum (allow up to 2 seconds for test execution time).
|
||||
// It must be at least (effective_max - 2) to prove the max won.
|
||||
prop_assert!(
|
||||
remaining_secs >= effective_max.saturating_sub(2),
|
||||
"Remaining {}s should reflect the max ({}s), not a smaller value. Values: {:?}",
|
||||
remaining_secs, effective_max, retry_after_values
|
||||
);
|
||||
|
||||
// It should not exceed the effective max (plus small tolerance for timing)
|
||||
prop_assert!(
|
||||
remaining_secs <= effective_max + 1,
|
||||
"Remaining {}s should not exceed effective max {}s + tolerance. Values: {:?}",
|
||||
remaining_secs, effective_max, retry_after_values
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue