add DigitalOcean pricing, startup validation, and demo update

- MetricsSource::DigitalOceanPricing variant: fetch public DO Gen-AI pricing, normalize as lowercase(creator)/model_id, cost = input + output per million
- cost_metrics endpoint format updated to { "model": { "input_per_million": X, "output_per_million": Y } }
- Startup errors: prefer:cheapest requires cost source, prefer:fastest requires prometheus
- Startup warning: models with no pricing/latency data ranked last
- One-per-type enforcement: digitalocean_pricing; error if cost_metrics + digitalocean_pricing both configured
- cost_snapshot() / latency_snapshot() on ModelMetricsService for startup checks
- Demo config updated to v0.4.0 top-level routing_preferences with cheapest + fastest policies
- docker-compose.yaml + prometheus.yaml + metrics_server.py for demo latency metrics
- Schema and docs updated
This commit is contained in:
Adil Hafeez 2026-03-27 16:54:37 -07:00
parent 76b1f37052
commit bd7afd911e
10 changed files with 427 additions and 80 deletions

View file

@ -220,6 +220,10 @@ async fn init_app_state(
.iter()
.filter(|s| matches!(s, MetricsSource::PrometheusMetrics { .. }))
.count();
let do_count = sources
.iter()
.filter(|s| matches!(s, MetricsSource::DigitalOceanPricing { .. }))
.count();
if cost_count > 1 {
return Err("model_metrics_sources: only one cost_metrics source is allowed".into());
}
@ -228,12 +232,87 @@ async fn init_app_state(
"model_metrics_sources: only one prometheus_metrics source is allowed".into(),
);
}
if do_count > 1 {
return Err(
"model_metrics_sources: only one digitalocean_pricing source is allowed".into(),
);
}
if cost_count > 0 && do_count > 0 {
return Err(
"model_metrics_sources: cost_metrics and digitalocean_pricing cannot both be configured — use one or the other".into(),
);
}
let svc = ModelMetricsService::new(sources, reqwest::Client::new()).await;
Some(Arc::new(svc))
} else {
None
};
// Validate that selection_policy.prefer is compatible with the configured metric sources.
if let Some(ref prefs) = config.routing_preferences {
use common::configuration::{MetricsSource, SelectionPreference};
let has_cost_source = config
.model_metrics_sources
.as_deref()
.unwrap_or_default()
.iter()
.any(|s| {
matches!(
s,
MetricsSource::CostMetrics { .. } | MetricsSource::DigitalOceanPricing { .. }
)
});
let has_prometheus = config
.model_metrics_sources
.as_deref()
.unwrap_or_default()
.iter()
.any(|s| matches!(s, MetricsSource::PrometheusMetrics { .. }));
for pref in prefs {
if pref.selection_policy.prefer == SelectionPreference::Cheapest && !has_cost_source {
return Err(format!(
"routing_preferences route '{}' uses prefer: cheapest but no cost data source is configured — \
add cost_metrics or digitalocean_pricing to model_metrics_sources",
pref.name
)
.into());
}
if pref.selection_policy.prefer == SelectionPreference::Fastest && !has_prometheus {
return Err(format!(
"routing_preferences route '{}' uses prefer: fastest but no prometheus_metrics source is configured — \
add prometheus_metrics to model_metrics_sources",
pref.name
)
.into());
}
}
}
// Warn about models in routing_preferences that have no matching pricing/latency data.
if let (Some(ref prefs), Some(ref svc)) = (&config.routing_preferences, &metrics_service) {
let cost_data = svc.cost_snapshot().await;
let latency_data = svc.latency_snapshot().await;
for pref in prefs {
use common::configuration::SelectionPreference;
for model in &pref.models {
let missing = match pref.selection_policy.prefer {
SelectionPreference::Cheapest => !cost_data.contains_key(model.as_str()),
SelectionPreference::Fastest => !latency_data.contains_key(model.as_str()),
_ => false,
};
if missing {
warn!(
model = %model,
route = %pref.name,
"model has no metric data — will be ranked last"
);
}
}
}
}
let router_service = Arc::new(RouterService::new(
config.routing_preferences.clone(),
metrics_service,

View file

@ -6,6 +6,8 @@ use common::configuration::{MetricsSource, SelectionPolicy, SelectionPreference}
use tokio::sync::RwLock;
use tracing::{info, warn};
const DO_PRICING_URL: &str = "https://api.digitalocean.com/v2/gen-ai/models";
pub struct ModelMetricsService {
cost: Arc<RwLock<HashMap<String, f64>>>,
latency: Arc<RwLock<HashMap<String, f64>>>,
@ -70,6 +72,25 @@ impl ModelMetricsService {
});
}
}
MetricsSource::DigitalOceanPricing { refresh_interval } => {
let data = fetch_do_pricing(&client).await;
info!(models = data.len(), "fetched digitalocean pricing");
*cost_data.write().await = data;
if let Some(interval_secs) = refresh_interval {
let cost_clone = Arc::clone(&cost_data);
let client_clone = client.clone();
let interval = Duration::from_secs(*interval_secs);
tokio::spawn(async move {
loop {
tokio::time::sleep(interval).await;
let data = fetch_do_pricing(&client_clone).await;
info!(models = data.len(), "refreshed digitalocean pricing");
*cost_clone.write().await = data;
}
});
}
}
}
}
@ -95,6 +116,16 @@ impl ModelMetricsService {
SelectionPreference::None => models.to_vec(),
}
}
/// Returns a snapshot of the current cost data. Used at startup to warn about unmatched models.
pub async fn cost_snapshot(&self) -> HashMap<String, f64> {
self.cost.read().await.clone()
}
/// Returns a snapshot of the current latency data. Used at startup to warn about unmatched models.
pub async fn latency_snapshot(&self) -> HashMap<String, f64> {
self.latency.read().await.clone()
}
}
fn rank_by_ascending_metric(models: &[String], data: &HashMap<String, f64>) -> Vec<String> {
@ -134,6 +165,12 @@ fn shuffle(models: &[String]) -> Vec<String> {
result
}
#[derive(serde::Deserialize)]
struct CostEntry {
input_per_million: f64,
output_per_million: f64,
}
async fn fetch_cost_metrics(
url: &str,
auth: Option<&common::configuration::MetricsAuth>,
@ -148,8 +185,11 @@ async fn fetch_cost_metrics(
}
}
match req.send().await {
Ok(resp) => match resp.json::<HashMap<String, f64>>().await {
Ok(data) => data,
Ok(resp) => match resp.json::<HashMap<String, CostEntry>>().await {
Ok(data) => data
.into_iter()
.map(|(k, v)| (k, v.input_per_million + v.output_per_million))
.collect(),
Err(err) => {
warn!(error = %err, url = %url, "failed to parse cost metrics response");
HashMap::new()
@ -162,6 +202,49 @@ async fn fetch_cost_metrics(
}
}
#[derive(serde::Deserialize)]
struct DoModelList {
data: Vec<DoModel>,
}
#[derive(serde::Deserialize)]
struct DoModel {
model_id: String,
creator: String,
pricing: DoPricing,
}
#[derive(serde::Deserialize)]
struct DoPricing {
input_price_per_million: f64,
output_price_per_million: f64,
}
async fn fetch_do_pricing(client: &reqwest::Client) -> HashMap<String, f64> {
match client.get(DO_PRICING_URL).send().await {
Ok(resp) => match resp.json::<DoModelList>().await {
Ok(list) => list
.data
.into_iter()
.map(|m| {
let key = format!("{}/{}", m.creator.to_lowercase(), m.model_id);
let cost =
m.pricing.input_price_per_million + m.pricing.output_price_per_million;
(key, cost)
})
.collect(),
Err(err) => {
warn!(error = %err, url = DO_PRICING_URL, "failed to parse digitalocean pricing response");
HashMap::new()
}
},
Err(err) => {
warn!(error = %err, url = DO_PRICING_URL, "failed to fetch digitalocean pricing");
HashMap::new()
}
}
}
#[derive(serde::Deserialize)]
struct PrometheusResponse {
data: PrometheusData,

View file

@ -147,6 +147,9 @@ pub enum MetricsSource {
query: String,
refresh_interval: Option<u64>,
},
DigitalOceanPricing {
refresh_interval: Option<u64>,
},
}
#[derive(Debug, Clone, Serialize, Deserialize)]