mirror of
https://github.com/katanemo/plano.git
synced 2026-07-02 15:51:02 +02:00
actually call archfc for orchestration
This commit is contained in:
parent
a611e1fc88
commit
1231706120
11 changed files with 97 additions and 97 deletions
|
|
@ -178,7 +178,6 @@ mod tests {
|
|||
Arc::new(OrchestratorService::new(
|
||||
"http://localhost:8080".to_string(),
|
||||
"test-model".to_string(),
|
||||
"test-provider".to_string(),
|
||||
))
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -22,7 +22,6 @@ mod integration_tests {
|
|||
Arc::new(OrchestratorService::new(
|
||||
"http://localhost:8080".to_string(),
|
||||
"test-model".to_string(),
|
||||
"test-provider".to_string(),
|
||||
))
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -103,7 +103,6 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
|||
let orchestrator_service: Arc<OrchestratorService> = Arc::new(OrchestratorService::new(
|
||||
llm_provider_url.clone() + CHAT_COMPLETIONS_PATH,
|
||||
PLANO_ORCHESTRATOR_MODEL_NAME.to_string(),
|
||||
routing_llm_provider,
|
||||
));
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -17,6 +17,7 @@ pub struct RouterService {
|
|||
router_url: String,
|
||||
client: reqwest::Client,
|
||||
router_model: Arc<dyn RouterModel>,
|
||||
#[allow(dead_code)]
|
||||
routing_provider_name: String,
|
||||
llm_usage_defined: bool,
|
||||
}
|
||||
|
|
|
|||
|
|
@ -2,7 +2,7 @@ use std::{collections::HashMap, sync::Arc};
|
|||
|
||||
use common::{
|
||||
configuration::{AgentUsagePreference, OrchestrationPreference},
|
||||
consts::ARCH_PROVIDER_HINT_HEADER,
|
||||
consts::{ARCH_PROVIDER_HINT_HEADER, PLANO_ORCHESTRATOR_MODEL_NAME},
|
||||
};
|
||||
use hermesllm::apis::openai::{ChatCompletionsResponse, Message};
|
||||
use hyper::header;
|
||||
|
|
@ -17,7 +17,6 @@ pub struct OrchestratorService {
|
|||
orchestrator_url: String,
|
||||
client: reqwest::Client,
|
||||
orchestrator_model: Arc<dyn OrchestratorModel>,
|
||||
orchestration_provider_name: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
|
|
@ -38,7 +37,6 @@ impl OrchestratorService {
|
|||
pub fn new(
|
||||
orchestrator_url: String,
|
||||
orchestration_model_name: String,
|
||||
orchestration_provider_name: String,
|
||||
) -> Self {
|
||||
// Empty agent orchestrations - will be provided via usage_preferences in requests
|
||||
let agent_orchestrations: HashMap<String, Vec<OrchestrationPreference>> = HashMap::new();
|
||||
|
|
@ -53,7 +51,6 @@ impl OrchestratorService {
|
|||
orchestrator_url,
|
||||
client: reqwest::Client::new(),
|
||||
orchestrator_model,
|
||||
orchestration_provider_name,
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -95,7 +92,7 @@ impl OrchestratorService {
|
|||
|
||||
orchestration_request_headers.insert(
|
||||
header::HeaderName::from_static(ARCH_PROVIDER_HINT_HEADER),
|
||||
header::HeaderValue::from_str(&self.orchestration_provider_name).unwrap(),
|
||||
header::HeaderValue::from_str(PLANO_ORCHESTRATOR_MODEL_NAME).unwrap(),
|
||||
);
|
||||
|
||||
if let Some(trace_parent) = trace_parent {
|
||||
|
|
@ -107,7 +104,7 @@ impl OrchestratorService {
|
|||
|
||||
orchestration_request_headers.insert(
|
||||
header::HeaderName::from_static("model"),
|
||||
header::HeaderValue::from_static("Plano-Orchestrator"),
|
||||
header::HeaderValue::from_static(PLANO_ORCHESTRATOR_MODEL_NAME),
|
||||
);
|
||||
|
||||
let start_time = std::time::Instant::now();
|
||||
|
|
|
|||
|
|
@ -34,3 +34,4 @@ pub const LLM_ROUTE_HEADER: &str = "x-arch-llm-route";
|
|||
pub const ENVOY_RETRY_HEADER: &str = "x-envoy-max-retries";
|
||||
pub const BRIGHT_STAFF_SERVICE_NAME : &str = "brightstaff";
|
||||
pub const PLANO_ORCHESTRATOR_MODEL_NAME: &str = "Plano-Orchestrator";
|
||||
pub const ARCH_FC_CLUSTER: &str = "arch";
|
||||
|
|
@ -47,7 +47,7 @@ pub struct StreamContext {
|
|||
ttft_time: Option<u128>,
|
||||
traceparent: Option<String>,
|
||||
request_body_sent_time: Option<u128>,
|
||||
overrides: Rc<Option<Overrides>>,
|
||||
_overrides: Rc<Option<Overrides>>,
|
||||
user_message: Option<String>,
|
||||
upstream_status_code: Option<StatusCode>,
|
||||
binary_frame_decoder: Option<BedrockBinaryFrameDecoder<bytes::BytesMut>>,
|
||||
|
|
@ -65,7 +65,7 @@ impl StreamContext {
|
|||
) -> Self {
|
||||
StreamContext {
|
||||
metrics,
|
||||
overrides,
|
||||
_overrides: overrides,
|
||||
ratelimit_selector: None,
|
||||
streaming_response: false,
|
||||
response_tokens: 0,
|
||||
|
|
@ -133,6 +133,7 @@ impl StreamContext {
|
|||
.get_http_request_header(ARCH_PROVIDER_HINT_HEADER)
|
||||
.map(|llm_name| llm_name.into());
|
||||
|
||||
info!("llm_providers: {:?}", self.llm_providers);
|
||||
self.llm_provider = Some(routing::get_llm_provider(
|
||||
&self.llm_providers,
|
||||
provider_hint,
|
||||
|
|
@ -744,55 +745,37 @@ impl HttpContext for StreamContext {
|
|||
.map(|val| val == "true")
|
||||
.unwrap_or(false);
|
||||
|
||||
let use_agent_orchestrator = match self.overrides.as_ref() {
|
||||
Some(overrides) => overrides.use_agent_orchestrator.unwrap_or_default(),
|
||||
None => false,
|
||||
};
|
||||
|
||||
let routing_header_value = self.get_http_request_header(ARCH_ROUTING_HEADER);
|
||||
|
||||
if routing_header_value.is_some() && !routing_header_value.as_ref().unwrap().is_empty() {
|
||||
let routing_header_value = routing_header_value.as_ref().unwrap();
|
||||
info!("routing header already set: {}", routing_header_value);
|
||||
self.llm_provider = Some(Rc::new(LlmProvider {
|
||||
name: routing_header_value.to_string(),
|
||||
provider_interface: LlmProviderType::OpenAI,
|
||||
..Default::default() //TODO: THiS IS BROKEN. WHY ARE WE ASSUMING OPENAI FOR UPSTREAM?
|
||||
}));
|
||||
} else {
|
||||
//TODO: Fix this brittle code path. We need to return values and have compile time
|
||||
self.select_llm_provider();
|
||||
// let routing_header_value = self.get_http_request_header(ARCH_ROUTING_HEADER);
|
||||
|
||||
self.select_llm_provider();
|
||||
// Check if this is a supported API endpoint
|
||||
if SupportedAPIsFromClient::from_endpoint(&request_path).is_none() {
|
||||
self.send_http_response(404, vec![], Some(b"Unsupported endpoint"));
|
||||
return Action::Continue;
|
||||
}
|
||||
if SupportedAPIsFromClient::from_endpoint(&request_path).is_none() {
|
||||
self.send_http_response(404, vec![], Some(b"Unsupported endpoint"));
|
||||
return Action::Continue;
|
||||
}
|
||||
|
||||
// Get the SupportedApi for routing decisions
|
||||
let supported_api: Option<SupportedAPIsFromClient> =
|
||||
SupportedAPIsFromClient::from_endpoint(&request_path);
|
||||
self.client_api = supported_api;
|
||||
// Get the SupportedApi for routing decisions
|
||||
let supported_api: Option<SupportedAPIsFromClient> =
|
||||
SupportedAPIsFromClient::from_endpoint(&request_path);
|
||||
self.client_api = supported_api;
|
||||
|
||||
// Debug: log provider, client API, resolved API, and request path
|
||||
if let (Some(api), Some(provider)) =
|
||||
(self.client_api.as_ref(), self.llm_provider.as_ref())
|
||||
{
|
||||
let provider_id = provider.to_provider_id();
|
||||
self.resolved_api =
|
||||
Some(provider_id.compatible_api_for_client(api, self.streaming_response));
|
||||
// Debug: log provider, client API, resolved API, and request path
|
||||
if let (Some(api), Some(provider)) =
|
||||
(self.client_api.as_ref(), self.llm_provider.as_ref())
|
||||
{
|
||||
let provider_id = provider.to_provider_id();
|
||||
self.resolved_api =
|
||||
Some(provider_id.compatible_api_for_client(api, self.streaming_response));
|
||||
|
||||
debug!(
|
||||
"[PLANO_REQ_ID:{}] ROUTING_INFO: provider='{}' client_api={:?} resolved_api={:?} request_path='{}'",
|
||||
self.request_identifier(),
|
||||
provider.to_provider_id(),
|
||||
api,
|
||||
self.resolved_api,
|
||||
request_path
|
||||
);
|
||||
} else {
|
||||
self.resolved_api = None;
|
||||
}
|
||||
debug!(
|
||||
"[PLANO_REQ_ID:{}] ROUTING_INFO: provider='{}' client_api={:?} resolved_api={:?} request_path='{}'",
|
||||
self.request_identifier(),
|
||||
provider.to_provider_id(),
|
||||
api,
|
||||
self.resolved_api,
|
||||
request_path
|
||||
);
|
||||
|
||||
//We need to update the upstream path if there is a variation for a provider like Gemini/Groq, etc.
|
||||
self.update_upstream_path(&request_path);
|
||||
|
|
@ -816,7 +799,6 @@ impl HttpContext for StreamContext {
|
|||
if let Err(error) = self.modify_auth_headers() {
|
||||
// ensure that the provider has an endpoint if the access key is missing else return a bad request
|
||||
if self.llm_provider.as_ref().unwrap().endpoint.is_none()
|
||||
&& !use_agent_orchestrator
|
||||
&& self.llm_provider.as_ref().unwrap().provider_interface
|
||||
!= LlmProviderType::Arch
|
||||
{
|
||||
|
|
@ -918,11 +900,6 @@ impl HttpContext for StreamContext {
|
|||
None => None,
|
||||
};
|
||||
|
||||
let use_agent_orchestrator = match self.overrides.as_ref() {
|
||||
Some(overrides) => overrides.use_agent_orchestrator.unwrap_or_default(),
|
||||
None => false,
|
||||
};
|
||||
|
||||
// Store the original model for logging
|
||||
let model_requested = deserialized_client_request.model().to_string();
|
||||
|
||||
|
|
@ -930,29 +907,25 @@ impl HttpContext for StreamContext {
|
|||
let resolved_model = match model_name {
|
||||
Some(model_name) => model_name.clone(),
|
||||
None => {
|
||||
if use_agent_orchestrator {
|
||||
"agent_orchestrator".to_string()
|
||||
} else {
|
||||
warn!(
|
||||
"[PLANO_REQ_ID:{}] MODEL_RESOLUTION_ERROR: no model specified | req_model='{}' provider='{}' config_model={:?}",
|
||||
self.request_identifier(),
|
||||
model_requested,
|
||||
self.llm_provider().name,
|
||||
self.llm_provider().model
|
||||
);
|
||||
self.send_server_error(
|
||||
ServerError::BadRequest {
|
||||
why: format!(
|
||||
"No model specified in request and couldn't determine model name from arch_config. Model name in req: {}, arch_config, provider: {}, model: {:?}",
|
||||
model_requested,
|
||||
self.llm_provider().name,
|
||||
self.llm_provider().model
|
||||
),
|
||||
},
|
||||
Some(StatusCode::BAD_REQUEST),
|
||||
);
|
||||
return Action::Continue;
|
||||
}
|
||||
warn!(
|
||||
"[PLANO_REQ_ID:{}] MODEL_RESOLUTION_ERROR: no model specified | req_model='{}' provider='{}' config_model={:?}",
|
||||
self.request_identifier(),
|
||||
model_requested,
|
||||
self.llm_provider().name,
|
||||
self.llm_provider().model
|
||||
);
|
||||
self.send_server_error(
|
||||
ServerError::BadRequest {
|
||||
why: format!(
|
||||
"No model specified in request and couldn't determine model name from arch_config. Model name in req: {}, arch_config, provider: {}, model: {:?}",
|
||||
model_requested,
|
||||
self.llm_provider().name,
|
||||
self.llm_provider().model
|
||||
),
|
||||
},
|
||||
Some(StatusCode::BAD_REQUEST),
|
||||
);
|
||||
return Action::Continue;
|
||||
}
|
||||
};
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue