mirror of
https://github.com/katanemo/plano.git
synced 2026-06-23 15:38:07 +02:00
Handle intent matching better in arch gateway (#391)
This commit is contained in:
parent
10cad4d0b7
commit
e77fc47225
10 changed files with 653 additions and 309 deletions
|
|
@ -138,7 +138,7 @@ impl From<String> for ParameterType {
|
|||
_ => {
|
||||
log::warn!("Unknown parameter type: {}, assuming type str", s);
|
||||
ParameterType::String
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -205,13 +205,6 @@ pub struct ToolCallState {
|
|||
pub enum ArchState {
|
||||
ToolCall(Vec<ToolCallState>),
|
||||
}
|
||||
#[derive(Deserialize, Serialize)]
|
||||
#[serde(untagged)]
|
||||
pub enum ModelServerResponse {
|
||||
ChatCompletionsResponse(ChatCompletionsResponse),
|
||||
ModelServerErrorResponse(ModelServerErrorResponse),
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct ModelServerErrorResponse {
|
||||
pub result: String,
|
||||
|
|
|
|||
|
|
@ -2,7 +2,7 @@ use crate::metrics::Metrics;
|
|||
use crate::tools::compute_request_path_body;
|
||||
use common::api::open_ai::{
|
||||
to_server_events, ArchState, ChatCompletionStreamResponse, ChatCompletionsRequest,
|
||||
ChatCompletionsResponse, Message, ModelServerResponse, ToolCall,
|
||||
ChatCompletionsResponse, Message, ToolCall,
|
||||
};
|
||||
use common::configuration::{Overrides, PromptTarget, Tracing};
|
||||
use common::consts::{
|
||||
|
|
@ -128,7 +128,7 @@ impl StreamContext {
|
|||
debug!("model server response received");
|
||||
trace!("response body: {}", body_str);
|
||||
|
||||
let model_server_response: ModelServerResponse = match serde_json::from_str(&body_str) {
|
||||
let model_server_response: ChatCompletionsResponse = match serde_json::from_str(&body_str) {
|
||||
Ok(arch_fc_response) => arch_fc_response,
|
||||
Err(e) => {
|
||||
warn!(
|
||||
|
|
@ -139,77 +139,121 @@ impl StreamContext {
|
|||
}
|
||||
};
|
||||
|
||||
let arch_fc_response = match model_server_response {
|
||||
ModelServerResponse::ChatCompletionsResponse(response) => response,
|
||||
ModelServerResponse::ModelServerErrorResponse(response) => {
|
||||
debug!("archgw <= modelserver error response: {}", response.result);
|
||||
if response.result == "No intent matched" {
|
||||
if let Some(default_prompt_target) = self
|
||||
.prompt_targets
|
||||
.values()
|
||||
.find(|pt| pt.default.unwrap_or(false))
|
||||
{
|
||||
debug!("default prompt target found, forwarding request to default prompt target");
|
||||
let endpoint = default_prompt_target.endpoint.clone().unwrap();
|
||||
let upstream_path: String = endpoint.path.unwrap_or(String::from("/"));
|
||||
// intent was matched if we see function_latency in metadata
|
||||
let intent_matched = model_server_response
|
||||
.metadata
|
||||
.as_ref()
|
||||
.and_then(|metadata| metadata.get("function_latency"))
|
||||
.is_some();
|
||||
|
||||
let upstream_endpoint = endpoint.name;
|
||||
let mut params = HashMap::new();
|
||||
params.insert(
|
||||
MESSAGES_KEY.to_string(),
|
||||
callout_context.request_body.messages.clone(),
|
||||
);
|
||||
let arch_messages_json = serde_json::to_string(¶ms).unwrap();
|
||||
let timeout_str = DEFAULT_TARGET_REQUEST_TIMEOUT_MS.to_string();
|
||||
if !intent_matched {
|
||||
debug!("intent not matched");
|
||||
// check if we have a default prompt target
|
||||
if let Some(default_prompt_target) = self
|
||||
.prompt_targets
|
||||
.values()
|
||||
.find(|pt| pt.default.unwrap_or(false))
|
||||
{
|
||||
debug!("default prompt target found, forwarding request to default prompt target");
|
||||
let endpoint = default_prompt_target.endpoint.clone().unwrap();
|
||||
let upstream_path: String = endpoint.path.unwrap_or(String::from("/"));
|
||||
|
||||
let mut headers = vec![
|
||||
(":method", "POST"),
|
||||
(ARCH_UPSTREAM_HOST_HEADER, &upstream_endpoint),
|
||||
(":path", &upstream_path),
|
||||
(":authority", &upstream_endpoint),
|
||||
("content-type", "application/json"),
|
||||
("x-envoy-max-retries", "3"),
|
||||
("x-envoy-upstream-rq-timeout-ms", timeout_str.as_str()),
|
||||
];
|
||||
let upstream_endpoint = endpoint.name;
|
||||
let mut params = HashMap::new();
|
||||
params.insert(
|
||||
MESSAGES_KEY.to_string(),
|
||||
callout_context.request_body.messages.clone(),
|
||||
);
|
||||
let arch_messages_json = serde_json::to_string(¶ms).unwrap();
|
||||
let timeout_str = DEFAULT_TARGET_REQUEST_TIMEOUT_MS.to_string();
|
||||
|
||||
if self.request_id.is_some() {
|
||||
headers.push((REQUEST_ID_HEADER, self.request_id.as_ref().unwrap()));
|
||||
}
|
||||
let mut headers = vec![
|
||||
(":method", "POST"),
|
||||
(ARCH_UPSTREAM_HOST_HEADER, &upstream_endpoint),
|
||||
(":path", &upstream_path),
|
||||
(":authority", &upstream_endpoint),
|
||||
("content-type", "application/json"),
|
||||
("x-envoy-max-retries", "3"),
|
||||
("x-envoy-upstream-rq-timeout-ms", timeout_str.as_str()),
|
||||
];
|
||||
|
||||
// if self.trace_arch_internal() && self.traceparent.is_some() {
|
||||
// headers.push((TRACE_PARENT_HEADER, self.traceparent.as_ref().unwrap()));
|
||||
// }
|
||||
if self.request_id.is_some() {
|
||||
headers.push((REQUEST_ID_HEADER, self.request_id.as_ref().unwrap()));
|
||||
}
|
||||
|
||||
let call_args = CallArgs::new(
|
||||
ARCH_INTERNAL_CLUSTER_NAME,
|
||||
&upstream_path,
|
||||
headers,
|
||||
Some(arch_messages_json.as_bytes()),
|
||||
vec![],
|
||||
Duration::from_secs(5),
|
||||
);
|
||||
callout_context.response_handler_type = ResponseHandlerType::DefaultTarget;
|
||||
callout_context.prompt_target_name =
|
||||
Some(default_prompt_target.name.clone());
|
||||
let call_args = CallArgs::new(
|
||||
ARCH_INTERNAL_CLUSTER_NAME,
|
||||
&upstream_path,
|
||||
headers,
|
||||
Some(arch_messages_json.as_bytes()),
|
||||
vec![],
|
||||
Duration::from_secs(5),
|
||||
);
|
||||
callout_context.response_handler_type = ResponseHandlerType::DefaultTarget;
|
||||
callout_context.prompt_target_name = Some(default_prompt_target.name.clone());
|
||||
|
||||
if let Err(e) = self.http_call(call_args, callout_context) {
|
||||
warn!("error dispatching default prompt target request: {}", e);
|
||||
return self.send_server_error(
|
||||
ServerError::HttpDispatch(e),
|
||||
Some(StatusCode::BAD_REQUEST),
|
||||
);
|
||||
}
|
||||
return;
|
||||
if let Err(e) = self.http_call(call_args, callout_context) {
|
||||
warn!("error dispatching default prompt target request: {}", e);
|
||||
return self.send_server_error(
|
||||
ServerError::HttpDispatch(e),
|
||||
Some(StatusCode::BAD_REQUEST),
|
||||
);
|
||||
}
|
||||
return;
|
||||
} else {
|
||||
debug!("no default prompt target found, forwarding request to upstream llm");
|
||||
let mut messages = Vec::new();
|
||||
// add system prompt
|
||||
match self.system_prompt.as_ref() {
|
||||
None => {}
|
||||
Some(system_prompt) => {
|
||||
let system_prompt_message = Message {
|
||||
role: SYSTEM_ROLE.to_string(),
|
||||
content: Some(system_prompt.clone()),
|
||||
model: None,
|
||||
tool_calls: None,
|
||||
tool_call_id: None,
|
||||
};
|
||||
messages.push(system_prompt_message);
|
||||
}
|
||||
}
|
||||
return self.send_server_error(
|
||||
ServerError::LogicError(response.result),
|
||||
Some(StatusCode::BAD_REQUEST),
|
||||
);
|
||||
}
|
||||
};
|
||||
|
||||
arch_fc_response.choices[0]
|
||||
messages.append(
|
||||
&mut self
|
||||
.filter_out_arch_messages(callout_context.request_body.messages.as_ref()),
|
||||
);
|
||||
|
||||
let chat_completion_request = ChatCompletionsRequest {
|
||||
model: self
|
||||
.chat_completions_request
|
||||
.as_ref()
|
||||
.unwrap()
|
||||
.model
|
||||
.clone(),
|
||||
messages,
|
||||
tools: None,
|
||||
stream: callout_context.request_body.stream,
|
||||
stream_options: callout_context.request_body.stream_options,
|
||||
metadata: None,
|
||||
};
|
||||
|
||||
let chat_completion_request_json =
|
||||
serde_json::to_string(&chat_completion_request).unwrap();
|
||||
debug!(
|
||||
"archgw => upstream llm request: {}",
|
||||
chat_completion_request_json
|
||||
);
|
||||
self.set_http_request_body(
|
||||
0,
|
||||
self.request_body_size,
|
||||
chat_completion_request_json.as_bytes(),
|
||||
);
|
||||
self.resume_http_request();
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
model_server_response.choices[0]
|
||||
.message
|
||||
.tool_calls
|
||||
.clone_into(&mut self.tool_calls);
|
||||
|
|
@ -238,7 +282,7 @@ impl StreamContext {
|
|||
),
|
||||
ChatCompletionStreamResponse::new(
|
||||
Some(
|
||||
arch_fc_response.choices[0]
|
||||
model_server_response.choices[0]
|
||||
.message
|
||||
.content
|
||||
.as_ref()
|
||||
|
|
|
|||
|
|
@ -363,7 +363,11 @@ fn prompt_gateway_request_to_llm_gateway() {
|
|||
},
|
||||
}],
|
||||
model: String::from("test"),
|
||||
metadata: None,
|
||||
metadata: {
|
||||
let mut map: HashMap<String, String> = HashMap::new();
|
||||
map.insert("function_latency".to_string(), "0.0".to_string());
|
||||
Some(map)
|
||||
},
|
||||
};
|
||||
|
||||
let expected_body = "{\"city\":\"seattle\"}";
|
||||
|
|
@ -378,17 +382,17 @@ fn prompt_gateway_request_to_llm_gateway() {
|
|||
.expect_log(Some(LogLevel::Trace), None)
|
||||
.expect_log(Some(LogLevel::Debug), None)
|
||||
.expect_log(Some(LogLevel::Trace), None)
|
||||
.expect_log(Some(LogLevel::Trace), None)
|
||||
.expect_log(Some(LogLevel::Debug), None)
|
||||
.expect_http_call(
|
||||
Some("arch_internal"),
|
||||
Some(vec![
|
||||
(":method", "POST"),
|
||||
("content-type", "application/json"),
|
||||
("x-arch-upstream", "api_server"),
|
||||
(":authority", "api_server"),
|
||||
("x-envoy-max-retries", "3"),
|
||||
(":path", "/weather"),
|
||||
("x-arch-upstream", "api_server"),
|
||||
("content-type", "application/json"),
|
||||
("x-envoy-upstream-rq-timeout-ms", "30000"),
|
||||
(":path", "/weather"),
|
||||
(":method", "POST"),
|
||||
(":authority", "api_server"),
|
||||
]),
|
||||
Some(expected_body),
|
||||
None,
|
||||
|
|
@ -452,3 +456,252 @@ fn prompt_gateway_request_to_llm_gateway() {
|
|||
.execute_and_expect(ReturnType::Action(Action::Continue))
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[serial]
|
||||
fn prompt_gateway_request_no_intent_match() {
|
||||
let args = tester::MockSettings {
|
||||
wasm_path: wasm_module(),
|
||||
quiet: false,
|
||||
allow_unexpected: false,
|
||||
};
|
||||
let mut module = tester::mock(args).unwrap();
|
||||
|
||||
module
|
||||
.call_start()
|
||||
.execute_and_expect(ReturnType::None)
|
||||
.unwrap();
|
||||
|
||||
// Setup Filter
|
||||
let mut config: Configuration = serde_yaml::from_str(default_config()).unwrap();
|
||||
config.ratelimits.as_mut().unwrap()[0].limit.tokens += 1000;
|
||||
let config_str = serde_json::to_string(&config).unwrap();
|
||||
|
||||
let filter_context = setup_filter(&mut module, &config_str);
|
||||
|
||||
// Setup HTTP Stream
|
||||
let http_context = 2;
|
||||
|
||||
normal_flow(&mut module, filter_context, http_context);
|
||||
|
||||
let arch_fc_resp = ChatCompletionsResponse {
|
||||
usage: Some(Usage {
|
||||
completion_tokens: 0,
|
||||
}),
|
||||
choices: vec![Choice {
|
||||
finish_reason: Some("test".to_string()),
|
||||
index: Some(0),
|
||||
message: Message {
|
||||
role: "system".to_string(),
|
||||
content: None,
|
||||
tool_calls: Some(vec![ToolCall {
|
||||
id: String::from("test"),
|
||||
tool_type: ToolType::Function,
|
||||
function: FunctionCallDetail {
|
||||
name: String::from("weather_forecast"),
|
||||
arguments: HashMap::from([(
|
||||
String::from("city"),
|
||||
Value::String(String::from("seattle")),
|
||||
)]),
|
||||
},
|
||||
}]),
|
||||
model: None,
|
||||
tool_call_id: None,
|
||||
},
|
||||
}],
|
||||
model: String::from("test"),
|
||||
metadata: None,
|
||||
};
|
||||
|
||||
let arch_fc_resp_str = serde_json::to_string(&arch_fc_resp).unwrap();
|
||||
module
|
||||
.call_proxy_on_http_call_response(http_context, 1, 0, arch_fc_resp_str.len() as i32, 0)
|
||||
.expect_metric_increment("active_http_calls", -1)
|
||||
.expect_get_buffer_bytes(Some(BufferType::HttpCallResponseBody))
|
||||
.returning(Some(&arch_fc_resp_str))
|
||||
.expect_log(Some(LogLevel::Warn), None)
|
||||
.expect_log(Some(LogLevel::Debug), None)
|
||||
.expect_log(Some(LogLevel::Trace), None)
|
||||
.expect_log(Some(LogLevel::Debug), Some("intent not matched"))
|
||||
.expect_log(
|
||||
Some(LogLevel::Debug),
|
||||
Some("no default prompt target found, forwarding request to upstream llm"),
|
||||
)
|
||||
.expect_log(Some(LogLevel::Debug), None)
|
||||
.expect_log(Some(LogLevel::Debug), None)
|
||||
.expect_set_buffer_bytes(Some(BufferType::HttpRequestBody), None)
|
||||
.execute_and_expect(ReturnType::None)
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
fn arch_config_default_target() -> &'static str {
|
||||
r#"
|
||||
version: "0.1-beta"
|
||||
|
||||
listener:
|
||||
address: 0.0.0.0
|
||||
port: 10000
|
||||
message_format: huggingface
|
||||
connect_timeout: 0.005s
|
||||
|
||||
endpoints:
|
||||
api_server:
|
||||
endpoint: api_server:80
|
||||
connect_timeout: 0.005s
|
||||
|
||||
llm_providers:
|
||||
- name: open-ai-gpt-4
|
||||
provider_interface: openai
|
||||
access_key: secret_key
|
||||
model: gpt-4
|
||||
default: true
|
||||
|
||||
overrides:
|
||||
# confidence threshold for prompt target intent matching
|
||||
prompt_target_intent_matching_threshold: 0.0
|
||||
|
||||
system_prompt: |
|
||||
You are a helpful assistant.
|
||||
|
||||
prompt_guards:
|
||||
input_guards:
|
||||
jailbreak:
|
||||
on_exception:
|
||||
message: "Looks like you're curious about my abilities, but I can only provide assistance within my programmed parameters."
|
||||
|
||||
prompt_targets:
|
||||
- name: weather_forecast
|
||||
description: This function provides realtime weather forecast information for a given city.
|
||||
parameters:
|
||||
- name: city
|
||||
required: true
|
||||
description: The city for which the weather forecast is requested.
|
||||
- name: days
|
||||
description: The number of days for which the weather forecast is requested.
|
||||
- name: units
|
||||
description: The units in which the weather forecast is requested.
|
||||
endpoint:
|
||||
name: api_server
|
||||
path: /weather
|
||||
http_method: POST
|
||||
system_prompt: |
|
||||
You are a helpful weather forecaster. Use weater data that is provided to you. Please following following guidelines when responding to user queries:
|
||||
- Use farenheight for temperature
|
||||
- Use miles per hour for wind speed
|
||||
|
||||
- name: default_target
|
||||
default: true
|
||||
description: This is the default target for all unmatched prompts.
|
||||
endpoint:
|
||||
name: weather_forecast_service
|
||||
path: /default_target
|
||||
http_method: POST
|
||||
system_prompt: |
|
||||
You are a helpful assistant! Summarize the user's request and provide a helpful response.
|
||||
# if it is set to false arch will send response that it received from this prompt target to the user
|
||||
# if true arch will forward the response to the default LLM
|
||||
auto_llm_dispatch_on_response: false
|
||||
|
||||
ratelimits:
|
||||
- model: gpt-4
|
||||
selector:
|
||||
key: selector-key
|
||||
value: selector-value
|
||||
limit:
|
||||
tokens: 1
|
||||
unit: minute
|
||||
"#
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[serial]
|
||||
fn prompt_gateway_request_no_intent_match_default_target() {
|
||||
let args = tester::MockSettings {
|
||||
wasm_path: wasm_module(),
|
||||
quiet: false,
|
||||
allow_unexpected: false,
|
||||
};
|
||||
let mut module = tester::mock(args).unwrap();
|
||||
|
||||
module
|
||||
.call_start()
|
||||
.execute_and_expect(ReturnType::None)
|
||||
.unwrap();
|
||||
|
||||
// Setup Filter
|
||||
let mut config: Configuration = serde_yaml::from_str(arch_config_default_target()).unwrap();
|
||||
config.ratelimits.as_mut().unwrap()[0].limit.tokens += 1000;
|
||||
let config_str = serde_json::to_string(&config).unwrap();
|
||||
|
||||
let filter_context = setup_filter(&mut module, &config_str);
|
||||
|
||||
// Setup HTTP Stream
|
||||
let http_context = 2;
|
||||
|
||||
normal_flow(&mut module, filter_context, http_context);
|
||||
|
||||
let arch_fc_resp = ChatCompletionsResponse {
|
||||
usage: Some(Usage {
|
||||
completion_tokens: 0,
|
||||
}),
|
||||
choices: vec![Choice {
|
||||
finish_reason: Some("test".to_string()),
|
||||
index: Some(0),
|
||||
message: Message {
|
||||
role: "system".to_string(),
|
||||
content: None,
|
||||
tool_calls: Some(vec![ToolCall {
|
||||
id: String::from("test"),
|
||||
tool_type: ToolType::Function,
|
||||
function: FunctionCallDetail {
|
||||
name: String::from("weather_forecast"),
|
||||
arguments: HashMap::from([(
|
||||
String::from("city"),
|
||||
Value::String(String::from("seattle")),
|
||||
)]),
|
||||
},
|
||||
}]),
|
||||
model: None,
|
||||
tool_call_id: None,
|
||||
},
|
||||
}],
|
||||
model: String::from("test"),
|
||||
metadata: None,
|
||||
};
|
||||
|
||||
let arch_fc_resp_str = serde_json::to_string(&arch_fc_resp).unwrap();
|
||||
module
|
||||
.call_proxy_on_http_call_response(http_context, 1, 0, arch_fc_resp_str.len() as i32, 0)
|
||||
.expect_metric_increment("active_http_calls", -1)
|
||||
.expect_get_buffer_bytes(Some(BufferType::HttpCallResponseBody))
|
||||
.returning(Some(&arch_fc_resp_str))
|
||||
.expect_log(Some(LogLevel::Warn), None)
|
||||
.expect_log(Some(LogLevel::Debug), None)
|
||||
.expect_log(Some(LogLevel::Trace), None)
|
||||
.expect_log(Some(LogLevel::Debug), Some("intent not matched"))
|
||||
.expect_log(
|
||||
Some(LogLevel::Debug),
|
||||
Some("default prompt target found, forwarding request to default prompt target"),
|
||||
)
|
||||
.expect_log(Some(LogLevel::Trace), None)
|
||||
.expect_log(Some(LogLevel::Debug), None)
|
||||
.expect_http_call(
|
||||
Some("arch_internal"),
|
||||
Some(vec![
|
||||
(":method", "POST"),
|
||||
("x-arch-upstream", "weather_forecast_service"),
|
||||
(":path", "/default_target"),
|
||||
(":authority", "weather_forecast_service"),
|
||||
("content-type", "application/json"),
|
||||
("x-envoy-max-retries", "3"),
|
||||
("x-envoy-upstream-rq-timeout-ms", "30000"),
|
||||
]),
|
||||
None,
|
||||
None,
|
||||
Some(5000),
|
||||
)
|
||||
.returning(Some(2))
|
||||
.expect_metric_increment("active_http_calls", 1)
|
||||
.execute_and_expect(ReturnType::None)
|
||||
.unwrap();
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue