removing model_server python module to brightstaff (function calling) (#615)

* adding function_calling functionality via rust

* fixed rendered YAML file

* removed model_server from envoy.template and forwarding traffic to bright_staff

* fixed bugs in function_calling.rs that were breaking tests. All good now

* updating e2e test to clean up disk usage

* removing Arch* models to be used as a default model if one is not specified

* if the user sets arch-function base_url we should honor it

* fixing demos as we needed to pin to a particular version of huggingface_hub else the chatbot ui wouldn't build

* adding a constant for Arch-Function model name

* fixing some edge cases with calls made to Arch-Function

* fixed JSON parsing issues in function_calling.rs

* fixed bug where the raw response from Arch-Function was re-encoded

* removed debug from supervisord.conf

* commenting out disk cleanup

* adding back disk space

---------

Co-authored-by: Salman Paracha <salmanparacha@MacBook-Pro-288.local>
Co-authored-by: Salman Paracha <salmanparacha@MacBook-Pro-342.local>
This commit is contained in:
Salman Paracha 2025-11-22 12:55:00 -08:00 committed by GitHub
parent 126b029345
commit 88c2bd1851
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
40 changed files with 2517 additions and 1356 deletions

File diff suppressed because it is too large Load diff

View file

@ -1,9 +1,11 @@
pub mod agent_chat_completions;
pub mod agent_selector;
pub mod chat_completions;
pub mod router;
pub mod models;
pub mod function_calling;
pub mod pipeline_processor;
pub mod response_handler;
pub mod utils;
#[cfg(test)]
mod integration_tests;

View file

@ -6,18 +6,15 @@ use hermesllm::clients::endpoints::SupportedUpstreamAPIs;
use hermesllm::clients::SupportedAPIs;
use hermesllm::{ProviderRequest, ProviderRequestType};
use http_body_util::combinators::BoxBody;
use http_body_util::{BodyExt, Full, StreamBody};
use hyper::body::Frame;
use http_body_util::{BodyExt, Full};
use hyper::header::{self};
use hyper::{Request, Response, StatusCode};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use tokio_stream::StreamExt;
use tracing::{debug, info, warn};
use crate::router::llm_router::RouterService;
use crate::handlers::utils::{create_streaming_response, PassthroughProcessor};
fn full<T: Into<Bytes>>(chunk: T) -> BoxBody<Bytes, hyper::Error> {
Full::new(chunk.into())
@ -25,7 +22,7 @@ fn full<T: Into<Bytes>>(chunk: T) -> BoxBody<Bytes, hyper::Error> {
.boxed()
}
pub async fn chat(
pub async fn router_chat(
request: Request<hyper::body::Incoming>,
router_service: Arc<RouterService>,
full_qualified_llm_provider_url: String,
@ -237,34 +234,12 @@ pub async fn chat(
headers.insert(header_name, header_value.clone());
}
// channel to create async stream
let (tx, rx) = mpsc::channel::<Bytes>(16);
// Use the streaming utility with a passthrough processor (no modification of chunks)
let byte_stream = llm_response.bytes_stream();
let processor = PassthroughProcessor;
let streaming_response = create_streaming_response(byte_stream, processor, 16);
// Spawn a task to send data as it becomes available
tokio::spawn(async move {
let mut byte_stream = llm_response.bytes_stream();
while let Some(item) = byte_stream.next().await {
let item = match item {
Ok(item) => item,
Err(err) => {
warn!("Error receiving chunk: {:?}", err);
break;
}
};
if tx.send(item).await.is_err() {
warn!("Receiver dropped");
break;
}
}
});
let stream = ReceiverStream::new(rx).map(|chunk| Ok::<_, hyper::Error>(Frame::data(chunk)));
let stream_body = BoxBody::new(StreamBody::new(stream));
match response.body(stream_body) {
match response.body(streaming_response.body) {
Ok(response) => Ok(response),
Err(err) => {
let err_msg = format!("Failed to create response: {}", err);

View file

@ -0,0 +1,93 @@
use bytes::Bytes;
use http_body_util::combinators::BoxBody;
use http_body_util::StreamBody;
use hyper::body::Frame;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use tokio_stream::StreamExt;
use tracing::warn;
/// Trait for processing streaming chunks
/// Implementors can inject custom logic during streaming (e.g., hallucination detection, logging)
pub trait StreamProcessor: Send + 'static {
/// Process an incoming chunk of bytes
fn process_chunk(&mut self, chunk: Bytes) -> Result<Option<Bytes>, String>;
/// Called when streaming completes successfully
fn on_complete(&mut self) {}
/// Called when streaming encounters an error
fn on_error(&mut self, _error: &str) {}
}
/// A no-op processor that just forwards chunks as-is
pub struct PassthroughProcessor;
impl StreamProcessor for PassthroughProcessor {
fn process_chunk(&mut self, chunk: Bytes) -> Result<Option<Bytes>, String> {
Ok(Some(chunk))
}
}
/// Result of creating a streaming response
pub struct StreamingResponse {
pub body: BoxBody<Bytes, hyper::Error>,
pub processor_handle: tokio::task::JoinHandle<()>,
}
pub fn create_streaming_response<S, P>(
mut byte_stream: S,
mut processor: P,
buffer_size: usize,
) -> StreamingResponse
where
S: StreamExt<Item = Result<Bytes, reqwest::Error>> + Send + Unpin + 'static,
P: StreamProcessor,
{
let (tx, rx) = mpsc::channel::<Bytes>(buffer_size);
// Spawn a task to process and forward chunks
let processor_handle = tokio::spawn(async move {
while let Some(item) = byte_stream.next().await {
let chunk = match item {
Ok(chunk) => chunk,
Err(err) => {
let err_msg = format!("Error receiving chunk: {:?}", err);
warn!("{}", err_msg);
processor.on_error(&err_msg);
break;
}
};
// Process the chunk
match processor.process_chunk(chunk) {
Ok(Some(processed_chunk)) => {
if tx.send(processed_chunk).await.is_err() {
warn!("Receiver dropped");
break;
}
}
Ok(None) => {
// Skip this chunk
continue;
}
Err(err) => {
warn!("Processor error: {}", err);
processor.on_error(&err);
break;
}
}
}
processor.on_complete();
});
// Convert channel receiver to HTTP stream
let stream = ReceiverStream::new(rx).map(|chunk| Ok::<_, hyper::Error>(Frame::data(chunk)));
let stream_body = BoxBody::new(StreamBody::new(stream));
StreamingResponse {
body: stream_body,
processor_handle,
}
}

View file

@ -1,6 +1,7 @@
use brightstaff::handlers::agent_chat_completions::agent_chat;
use brightstaff::handlers::chat_completions::chat;
use brightstaff::handlers::router::router_chat;
use brightstaff::handlers::models::list_models;
use brightstaff::handlers::function_calling::{function_calling_chat_handler};
use brightstaff::router::llm_router::RouterService;
use brightstaff::utils::tracing::init_tracer;
use bytes::Bytes;
@ -125,7 +126,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
(&Method::POST, CHAT_COMPLETIONS_PATH | MESSAGES_PATH) => {
let fully_qualified_url =
format!("{}{}", llm_provider_url, req.uri().path());
chat(req, router_service, fully_qualified_url, model_aliases)
router_chat(req, router_service, fully_qualified_url, model_aliases)
.with_context(parent_cx)
.await
}
@ -142,6 +143,14 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
.with_context(parent_cx)
.await
}
(&Method::POST, "/function_calling") => {
let fully_qualified_url =
format!("{}{}", llm_provider_url, "/v1/chat/completions");
function_calling_chat_handler(req, fully_qualified_url)
.with_context(parent_cx)
.await
}
(&Method::GET, "/v1/models" | "/agents/v1/models") => {
Ok(list_models(llm_providers).await)
}