From e74f5d1ba69be271128acc3b9d9ba20d0798a91e Mon Sep 17 00:00:00 2001 From: alpha nerd Date: Tue, 19 May 2026 14:09:52 +0200 Subject: [PATCH] refac: request handling VI --- requests/__init__.py | 0 requests/chat.py | 218 +++++++++++++++++++ requests/messages.py | 187 +++++++++++++++++ requests/rechunk.py | 151 ++++++++++++++ router.py | 483 ++----------------------------------------- 5 files changed, 568 insertions(+), 471 deletions(-) create mode 100644 requests/__init__.py create mode 100644 requests/chat.py create mode 100644 requests/messages.py create mode 100644 requests/rechunk.py diff --git a/requests/__init__.py b/requests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/requests/chat.py b/requests/chat.py new file mode 100644 index 0000000..5bf1572 --- /dev/null +++ b/requests/chat.py @@ -0,0 +1,218 @@ +"""High-level chat request orchestrator. + +``_make_chat_request`` is the shared core that: + * picks an endpoint via ``choose_endpoint`` (which atomically reserves a slot), + * dispatches to either the native Ollama client or an OpenAI-compatible + client based on endpoint type, + * applies reactive context trimming when the backend rejects with + ``exceed_context_size_error``, + * counts tokens for billing/SSE, + * always releases the reservation via ``decrement_usage`` in ``finally``. + +``_make_moe_requests`` builds on it to implement the +"3 responses + 3 critiques + 1 final" mixture-of-experts dance. +""" +import asyncio +import re +import time + +import ollama + +import enhance +from config import get_config +from state import default_headers, token_queue +from context_window import _trim_messages_for_context, _calibrated_trim_target +from backends.normalize import is_openai_compatible +from backends.sessions import _make_openai_client +from routing import choose_endpoint, decrement_usage +from requests.messages import ( + get_last_user_content, + transform_tool_calls_to_openai, + transform_images_to_data_urls, + _strip_assistant_prefill, + _strip_images_from_messages, + _accumulate_openai_tc_delta, + _build_ollama_tool_calls, +) +from requests.rechunk import rechunk + + +async def _make_chat_request(model: str, messages: list, tools=None, stream: bool = False, think: bool = False, format=None, options=None, keep_alive: str = None) -> ollama.ChatResponse: + """ + Helper function to make a chat request to a specific endpoint. + Handles endpoint selection, client creation, usage tracking, and request execution. + """ + config = get_config() + endpoint, tracking_model = await choose_endpoint(model) # selects and atomically reserves + use_openai = is_openai_compatible(endpoint) + if use_openai: + if ":latest" in model: + model = model.split(":latest")[0] + if messages: + if any("images" in m for m in messages): + messages = await asyncio.to_thread(transform_images_to_data_urls, messages) + messages = transform_tool_calls_to_openai(messages) + messages = _strip_assistant_prefill(messages) + params = { + "messages": messages, + "model": model, + } + optional_params = { + "tools": tools, + "stream": stream, + "stream_options": {"include_usage": True} if stream else None, + "max_tokens": options.get("num_predict") if options and "num_predict" in options else None, + "frequency_penalty": options.get("frequency_penalty") if options and "frequency_penalty" in options else None, + "presence_penalty": options.get("presence_penalty") if options and "presence_penalty" in options else None, + "seed": options.get("seed") if options and "seed" in options else None, + "stop": options.get("stop") if options and "stop" in options else None, + "top_p": options.get("top_p") if options and "top_p" in options else None, + "temperature": options.get("temperature") if options and "temperature" in options else None, + "response_format": {"type": "json_schema", "json_schema": format} if format is not None else None + } + params.update({k: v for k, v in optional_params.items() if v is not None}) + oclient = _make_openai_client(endpoint, default_headers=default_headers, api_key=config.api_keys.get(endpoint, "no-key")) + else: + client = ollama.AsyncClient(host=endpoint) + + try: + if use_openai: + start_ts = time.perf_counter() + try: + response = await oclient.chat.completions.create(**params) + except Exception as e: + _e_str = str(e) + print(f"[_make_chat_request] caught {type(e).__name__}: {_e_str[:200]}") + if "exceed_context_size_error" in _e_str or "exceeds the available context size" in _e_str: + err_body = getattr(e, "body", {}) or {} + err_detail = err_body.get("error", {}) if isinstance(err_body, dict) else {} + n_ctx_limit = err_detail.get("n_ctx", 0) + actual_tokens = err_detail.get("n_prompt_tokens", 0) + if not n_ctx_limit: + _m = re.search(r"'n_ctx':\s*(\d+)", _e_str) + if _m: + n_ctx_limit = int(_m.group(1)) + _m = re.search(r"'n_prompt_tokens':\s*(\d+)", _e_str) + if _m: + actual_tokens = int(_m.group(1)) + if not n_ctx_limit: + raise + msgs_to_trim = params.get("messages", []) + cal_target = _calibrated_trim_target(msgs_to_trim, n_ctx_limit, actual_tokens) + trimmed = _trim_messages_for_context(msgs_to_trim, n_ctx_limit, target_tokens=cal_target) + print(f"[_make_chat_request] Context exceeded ({actual_tokens}/{n_ctx_limit} tokens, tiktoken_target={cal_target}), dropped {len(msgs_to_trim) - len(trimmed)} oldest message(s) and retrying") + try: + response = await oclient.chat.completions.create(**{**params, "messages": trimmed}) + except Exception as e2: + if "exceed_context_size_error" in str(e2) or "exceeds the available context size" in str(e2): + print(f"[_make_chat_request] Context still exceeded after trimming, also stripping tools") + params_no_tools = {k: v for k, v in params.items() if k not in ("tools", "tool_choice")} + response = await oclient.chat.completions.create(**{**params_no_tools, "messages": trimmed}) + else: + raise + elif "image input is not supported" in _e_str: + print(f"[_make_chat_request] Model {model} doesn't support images, retrying with text-only messages") + params = {**params, "messages": _strip_images_from_messages(params.get("messages", []))} + response = await oclient.chat.completions.create(**params) + else: + raise + if stream: + # For streaming, we need to collect all chunks + chunks = [] + tc_acc = {} # accumulate tool-call deltas + async for chunk in response: + chunks.append(chunk) + _accumulate_openai_tc_delta(chunk, tc_acc) + prompt_tok = 0 + comp_tok = 0 + if chunk.usage is not None: + prompt_tok = chunk.usage.prompt_tokens or 0 + comp_tok = chunk.usage.completion_tokens or 0 + else: + llama_usage = rechunk.extract_usage_from_llama_timings(chunk) + if llama_usage: + prompt_tok, comp_tok = llama_usage + if prompt_tok != 0 or comp_tok != 0: + await token_queue.put((endpoint, tracking_model, prompt_tok, comp_tok)) + # Convert to Ollama format + if chunks: + response = rechunk.openai_chat_completion2ollama(chunks[-1], stream, start_ts) + # Inject fully-accumulated tool calls into the final response + if tc_acc and response.message: + response.message.tool_calls = _build_ollama_tool_calls(tc_acc) + else: + prompt_tok = 0 + comp_tok = 0 + if response.usage is not None: + prompt_tok = response.usage.prompt_tokens or 0 + comp_tok = response.usage.completion_tokens or 0 + else: + llama_usage = rechunk.extract_usage_from_llama_timings(response) + if llama_usage: + prompt_tok, comp_tok = llama_usage + if prompt_tok != 0 or comp_tok != 0: + await token_queue.put((endpoint, tracking_model, prompt_tok, comp_tok)) + response = rechunk.openai_chat_completion2ollama(response, stream, start_ts) + else: + response = await client.chat(model=model, messages=messages, tools=tools, stream=stream, think=think, format=format, options=options, keep_alive=keep_alive) + if stream: + # For streaming, collect all chunks + chunks = [] + async for chunk in response: + chunks.append(chunk) + prompt_tok = chunk.prompt_eval_count or 0 + comp_tok = chunk.eval_count or 0 + if prompt_tok != 0 or comp_tok != 0: + await token_queue.put((endpoint, tracking_model, prompt_tok, comp_tok)) + if chunks: + response = chunks[-1] + else: + prompt_tok = response.prompt_eval_count or 0 + comp_tok = response.eval_count or 0 + if prompt_tok != 0 or comp_tok != 0: + await token_queue.put((endpoint, tracking_model, prompt_tok, comp_tok)) + + return response + finally: + await decrement_usage(endpoint, tracking_model) + + +async def _make_moe_requests(model: str, messages: list, tools=None, think: bool = False, format=None, options=None, keep_alive: str = None) -> ollama.ChatResponse: + """ + Helper function to make MOE (Multiple Opinions Ensemble) requests. + Generates 3 responses, 3 critiques, and returns the final selected response. + """ + query = get_last_user_content(messages) + if not query: + raise ValueError("No user query found in messages") + + if options is None: + options = {} + options["temperature"] = 1 + + moe_reqs = [] + + # Generate 3 responses — choose_endpoint is called inside _make_chat_request and + # atomically reserves a slot, so all 3 tasks see each other's load immediately. + response1_task = asyncio.create_task(_make_chat_request(model, messages, tools, stream=False, think=think, format=format, options=options, keep_alive=keep_alive)) + response2_task = asyncio.create_task(_make_chat_request(model, messages, tools, stream=False, think=think, format=format, options=options, keep_alive=keep_alive)) + response3_task = asyncio.create_task(_make_chat_request(model, messages, tools, stream=False, think=think, format=format, options=options, keep_alive=keep_alive)) + + responses = await asyncio.gather(response1_task, response2_task, response3_task) + + for n, r in enumerate(responses): + moe_req = enhance.moe(query, n, r.message.content) + moe_reqs.append(moe_req) + + # Generate 3 critiques + critique1_task = asyncio.create_task(_make_chat_request(model, [{"role": "user", "content": moe_reqs[0]}], tools, stream=False, think=think, format=format, options=options, keep_alive=keep_alive)) + critique2_task = asyncio.create_task(_make_chat_request(model, [{"role": "user", "content": moe_reqs[1]}], tools, stream=False, think=think, format=format, options=options, keep_alive=keep_alive)) + critique3_task = asyncio.create_task(_make_chat_request(model, [{"role": "user", "content": moe_reqs[2]}], tools, stream=False, think=think, format=format, options=options, keep_alive=keep_alive)) + + critiques = await asyncio.gather(critique1_task, critique2_task, critique3_task) + + # Select final response + m = enhance.moe_select_candidate(query, critiques) + + # Generate final response + return await _make_chat_request(model, [{"role": "user", "content": m}], tools, stream=False, think=think, format=format, options=options, keep_alive=keep_alive) diff --git a/requests/messages.py b/requests/messages.py new file mode 100644 index 0000000..3cd824a --- /dev/null +++ b/requests/messages.py @@ -0,0 +1,187 @@ +"""Message-shape transforms used across the chat/completions paths. + +Covers the directions between Ollama's native message format and the +OpenAI Chat Completions format: + * tool-call normalization (Ollama → OpenAI), + * images encoded as base64 lists → OpenAI multimodal ``image_url`` parts, + * trailing-assistant prefill strip (rejected by Claude/OpenAI), + * streaming tool_calls accumulation across deltas, + * logprobs translation (OpenAI choice → Ollama ``Logprob``). +""" +import secrets + +import ollama +import orjson +from ollama._types import TokenLogprob, Logprob + +from images import is_base64, resize_image_if_needed + + +def get_last_user_content(messages): + """ + Given a list of dicts (e.g., messages from an API), + return the 'content' of the last dict whose 'role' is 'user'. + If no such dict exists, return None. + """ + # Reverse iterate so we stop at the first match + for msg in reversed(messages): + if msg.get("role") == "user": + return msg.get("content") + return None + + +def _strip_assistant_prefill(messages: list) -> list: + """Remove a trailing assistant message used as prefill. + OpenAI-compatible endpoints (including Claude) do not support prefill and + will reject requests where the last message has role 'assistant'.""" + if messages and messages[-1].get("role") == "assistant": + return messages[:-1] + return messages + + +def transform_tool_calls_to_openai(message_list): + """ + Ensure tool_calls in assistant messages conform to the OpenAI format: + - Each tool call must have "type": "function" + - Each tool call must have an "id" + - arguments must be a JSON string, not a dict + Also ensure tool-role messages have a tool_call_id. + """ + # Track generated IDs so tool-role messages can reference them + last_tool_call_ids = {} + for msg in message_list: + role = msg.get("role") + if role == "assistant" and "tool_calls" in msg: + for tc in msg["tool_calls"]: + if "type" not in tc: + tc["type"] = "function" + if "id" not in tc: + tc["id"] = f"call_{secrets.token_hex(16)}" + func = tc.get("function", {}) + if isinstance(func.get("arguments"), dict): + func["arguments"] = orjson.dumps(func["arguments"]).decode("utf-8") + # Remember the id for the following tool-role message + name = func.get("name") + if name: + last_tool_call_ids[name] = tc["id"] + elif role == "tool": + if "tool_call_id" not in msg: + # Try to match by name from a preceding assistant tool_call + name = msg.get("name") or msg.get("tool_name") + if name and name in last_tool_call_ids: + msg["tool_call_id"] = last_tool_call_ids.pop(name) + return message_list + + +def transform_images_to_data_urls(message_list): + for message in message_list: + if "images" in message: + images = message.pop("images") + if not isinstance(images, list): + continue + new_content = [] + for image in images: #TODO: quality downsize if images are too big to fit into model context window size + if not is_base64(image): + raise ValueError(f"Image string is not a valid base64 encoded string.") + resized_image = resize_image_if_needed(image) + if resized_image: + data_url = f"data:image/png;base64,{resized_image}" + #new_content.append({ + # "type": "text", + # "text": "" + #}) + new_content.append({ + "type": "image_url", + "image_url": { + "url": data_url + } + }) + message["content"] = new_content + + return message_list + + +def _strip_images_from_messages(messages: list) -> list: + """Remove image_url parts from message content, keeping only text.""" + result = [] + for msg in messages: + content = msg.get("content") + if isinstance(content, list): + text_only = [p for p in content if p.get("type") != "image_url"] + if len(text_only) == 1 and text_only[0].get("type") == "text": + content = text_only[0]["text"] + else: + content = text_only + result.append({**msg, "content": content}) + else: + result.append(msg) + return result + + +def _accumulate_openai_tc_delta(chunk, accumulator: dict) -> None: + """Accumulate tool_call deltas from a single OpenAI streaming chunk. + + ``accumulator`` is a dict mapping tool-call *index* to + ``{"id": str, "name": str, "arguments": str}`` where ``arguments`` + is the concatenation of all JSON fragments seen so far. + """ + if not chunk.choices: + return + delta = chunk.choices[0].delta + tc_deltas = getattr(delta, "tool_calls", None) + if not tc_deltas: + return + for tc in tc_deltas: + idx = tc.index + if idx not in accumulator: + accumulator[idx] = { + "id": getattr(tc, "id", None) or f"call_{secrets.token_hex(16)}", + "name": tc.function.name if tc.function else None, + "arguments": "", + } + else: + if getattr(tc, "id", None): + accumulator[idx]["id"] = tc.id + if tc.function and tc.function.name: + accumulator[idx]["name"] = tc.function.name + if tc.function and tc.function.arguments: + accumulator[idx]["arguments"] += tc.function.arguments + + +def _build_ollama_tool_calls(accumulator: dict) -> list | None: + """Convert accumulated tool-call data into Ollama-format tool_calls list.""" + if not accumulator: + return None + result = [] + for idx in sorted(accumulator.keys()): + tc = accumulator[idx] + try: + args = orjson.loads(tc["arguments"]) if tc["arguments"] else {} + except (orjson.JSONDecodeError, TypeError): + args = {} + result.append(ollama.Message.ToolCall( + function=ollama.Message.ToolCall.Function(name=tc["name"], arguments=args) + )) + return result + + +def _convert_openai_logprobs(choice) -> list | None: + """Convert OpenAI logprobs from a choice into Ollama Logprob objects.""" + lp = getattr(choice, "logprobs", None) + if lp is None: + return None + content = getattr(lp, "content", None) + if not content: + return None + result = [] + for entry in content: + top = [ + TokenLogprob(token=alt.token, logprob=alt.logprob) + for alt in (entry.top_logprobs or []) + ] + result.append(Logprob( + token=entry.token, + logprob=entry.logprob, + top_logprobs=top or None, + )) + return result diff --git a/requests/rechunk.py b/requests/rechunk.py new file mode 100644 index 0000000..c7ae474 --- /dev/null +++ b/requests/rechunk.py @@ -0,0 +1,151 @@ +"""OpenAI → Ollama response shape converters. + +Methods on the ``rechunk`` class are called as bare functions +(``rechunk.openai_chat_completion2ollama(...)``) — there is no instance +state. The class is just a namespace. + +``extract_usage_from_llama_timings`` reads the ``timings`` field that +llama-server returns in place of OpenAI's ``usage`` so the router can still +count tokens for those backends. +""" +import time + +import ollama +import orjson + +from images import iso8601_ns +from requests.messages import _convert_openai_logprobs + + +class rechunk: + def openai_chat_completion2ollama(chunk: dict, stream: bool, start_ts: float) -> ollama.ChatResponse: + now = time.perf_counter() + if chunk.choices == [] and chunk.usage is not None: + return ollama.ChatResponse( + model=chunk.model, + created_at=iso8601_ns(), + done=True, + done_reason='stop', + total_duration=int((now - start_ts) * 1_000_000_000), + load_duration=100000, + prompt_eval_count=int(chunk.usage.prompt_tokens), + prompt_eval_duration=int((now - start_ts) * 1_000_000_000 * (chunk.usage.prompt_tokens / chunk.usage.completion_tokens / 100)), + eval_count=int(chunk.usage.completion_tokens), + eval_duration=int((now - start_ts) * 1_000_000_000), + message=ollama.Message(role="assistant", content=""), + ) + with_thinking = chunk.choices[0] if chunk.choices[0] else None + if stream == True: + thinking = (getattr(with_thinking.delta, "reasoning_content", None) or getattr(with_thinking.delta, "reasoning", None)) if with_thinking else None + role = chunk.choices[0].delta.role or "assistant" + content = chunk.choices[0].delta.content or '' + else: + thinking = (getattr(with_thinking.message, "reasoning_content", None) or getattr(with_thinking.message, "reasoning", None)) if with_thinking else None + role = chunk.choices[0].message.role or "assistant" + content = chunk.choices[0].message.content or '' + # Convert OpenAI tool_calls to Ollama format + # In streaming mode, tool_calls arrive as partial deltas across multiple chunks + # (name only in first delta, arguments as incremental JSON fragments). + # Callers must accumulate deltas and inject the final result; skip here. + ollama_tool_calls = None + if not stream: + raw_tool_calls = getattr(with_thinking.message, "tool_calls", None) if with_thinking else None + if raw_tool_calls: + ollama_tool_calls = [] + for tc in raw_tool_calls: + try: + args = orjson.loads(tc.function.arguments) if isinstance(tc.function.arguments, str) else (tc.function.arguments or {}) + except (orjson.JSONDecodeError, TypeError): + args = {} + ollama_tool_calls.append(ollama.Message.ToolCall( + function=ollama.Message.ToolCall.Function(name=tc.function.name, arguments=args) + )) + # Convert OpenAI logprobs to Ollama format + ollama_logprobs = _convert_openai_logprobs(with_thinking) if with_thinking else None + assistant_msg = ollama.Message( + role=role, + content=content, + thinking=thinking, + images=None, + tool_name=None, + tool_calls=ollama_tool_calls) + rechunk = ollama.ChatResponse( + model=chunk.model, + created_at=iso8601_ns(), + done=True if chunk.usage is not None else False, + done_reason=chunk.choices[0].finish_reason, #if chunk.choices[0].finish_reason is not None else None, + total_duration=int((now - start_ts) * 1_000_000_000) if chunk.usage is not None else 0, + load_duration=100000, + prompt_eval_count=int(chunk.usage.prompt_tokens) if chunk.usage is not None else 0, + prompt_eval_duration=int((now - start_ts) * 1_000_000_000 * (chunk.usage.prompt_tokens / chunk.usage.completion_tokens / 100)) if chunk.usage is not None and chunk.usage.completion_tokens != 0 else 0, + eval_count=int(chunk.usage.completion_tokens) if chunk.usage is not None else 0, + eval_duration=int((now - start_ts) * 1_000_000_000) if chunk.usage is not None else 0, + message=assistant_msg, + logprobs=ollama_logprobs) + return rechunk + + def openai_completion2ollama(chunk: dict, stream: bool, start_ts: float) -> ollama.GenerateResponse: + now = time.perf_counter() + with_thinking = chunk.choices[0] if chunk.choices[0] else None + thinking = getattr(with_thinking, "reasoning", None) if with_thinking else None + rechunk = ollama.GenerateResponse( + model=chunk.model, + created_at=iso8601_ns(), + done=True if chunk.usage is not None else False, + done_reason=chunk.choices[0].finish_reason, + total_duration=int((now - start_ts) * 1_000_000_000) if chunk.usage is not None else 0, + load_duration=10000, + prompt_eval_count=int(chunk.usage.prompt_tokens) if chunk.usage is not None else 0, + prompt_eval_duration=int((now - start_ts) * 1_000_000_000 * (chunk.usage.prompt_tokens / chunk.usage.completion_tokens / 100)) if chunk.usage is not None and chunk.usage.completion_tokens != 0 else 0, + eval_count=int(chunk.usage.completion_tokens) if chunk.usage is not None else 0, + eval_duration=int((now - start_ts) * 1_000_000_000) if chunk.usage is not None else 0, + response=chunk.choices[0].text or '', + thinking=thinking) + return rechunk + + def openai_embeddings2ollama(chunk: dict) -> ollama.EmbeddingsResponse: + rechunk = ollama.EmbeddingsResponse(embedding=chunk.data[0].embedding) + return rechunk + + def openai_embed2ollama(chunk: dict, model: str) -> ollama.EmbedResponse: + rechunk = ollama.EmbedResponse( + model=model, + created_at=iso8601_ns(), + done=None, + done_reason=None, + total_duration=None, + load_duration=None, + prompt_eval_count=None, + prompt_eval_duration=None, + eval_count=None, + eval_duration=None, + embeddings=[chunk.data[0].embedding]) + return rechunk + + def extract_usage_from_llama_timings(obj) -> tuple[int, int] | None: + """Extract (prompt_tokens, completion_tokens) from llama-server's timings object. + + llama-server returns a ``timings`` dict instead of the standard OpenAI + ``usage`` field:: + + "timings": { + "cache_n": 236, // prompt tokens reused from cache + "prompt_n": 1, // prompt tokens processed + "predicted_n": 35 // predicted (completion) tokens + } + + prompt_tokens = prompt_n + cache_n + completion_tokens = predicted_n + + Returns ``(prompt_tokens, completion_tokens)`` or ``None`` when no + timings are found. + """ + timings = getattr(obj, "timings", None) + if timings is None: + return None + if isinstance(timings, dict): + prompt_n = timings.get("prompt_n", 0) or 0 + cache_n = timings.get("cache_n", 0) or 0 + predicted_n = timings.get("predicted_n", 0) or 0 + return (prompt_n + cache_n, predicted_n) + return None diff --git a/router.py b/router.py index 7b24bcc..bf25be9 100644 --- a/router.py +++ b/router.py @@ -241,481 +241,22 @@ from backends.probe import fetch from routing import increment_usage, decrement_usage -async def _make_chat_request(model: str, messages: list, tools=None, stream: bool = False, think: bool = False, format=None, options=None, keep_alive: str = None) -> ollama.ChatResponse: - """ - Helper function to make a chat request to a specific endpoint. - Handles endpoint selection, client creation, usage tracking, and request execution. - """ - endpoint, tracking_model = await choose_endpoint(model) # selects and atomically reserves - use_openai = is_openai_compatible(endpoint) - if use_openai: - if ":latest" in model: - model = model.split(":latest")[0] - if messages: - if any("images" in m for m in messages): - messages = await asyncio.to_thread(transform_images_to_data_urls, messages) - messages = transform_tool_calls_to_openai(messages) - messages = _strip_assistant_prefill(messages) - params = { - "messages": messages, - "model": model, - } - optional_params = { - "tools": tools, - "stream": stream, - "stream_options": {"include_usage": True} if stream else None, - "max_tokens": options.get("num_predict") if options and "num_predict" in options else None, - "frequency_penalty": options.get("frequency_penalty") if options and "frequency_penalty" in options else None, - "presence_penalty": options.get("presence_penalty") if options and "presence_penalty" in options else None, - "seed": options.get("seed") if options and "seed" in options else None, - "stop": options.get("stop") if options and "stop" in options else None, - "top_p": options.get("top_p") if options and "top_p" in options else None, - "temperature": options.get("temperature") if options and "temperature" in options else None, - "response_format": {"type": "json_schema", "json_schema": format} if format is not None else None - } - params.update({k: v for k, v in optional_params.items() if v is not None}) - oclient = _make_openai_client(endpoint, default_headers=default_headers, api_key=config.api_keys.get(endpoint, "no-key")) - else: - client = ollama.AsyncClient(host=endpoint) - - try: - if use_openai: - start_ts = time.perf_counter() - try: - response = await oclient.chat.completions.create(**params) - except Exception as e: - _e_str = str(e) - print(f"[_make_chat_request] caught {type(e).__name__}: {_e_str[:200]}") - if "exceed_context_size_error" in _e_str or "exceeds the available context size" in _e_str: - err_body = getattr(e, "body", {}) or {} - err_detail = err_body.get("error", {}) if isinstance(err_body, dict) else {} - n_ctx_limit = err_detail.get("n_ctx", 0) - actual_tokens = err_detail.get("n_prompt_tokens", 0) - if not n_ctx_limit: - _m = re.search(r"'n_ctx':\s*(\d+)", _e_str) - if _m: - n_ctx_limit = int(_m.group(1)) - _m = re.search(r"'n_prompt_tokens':\s*(\d+)", _e_str) - if _m: - actual_tokens = int(_m.group(1)) - if not n_ctx_limit: - raise - msgs_to_trim = params.get("messages", []) - cal_target = _calibrated_trim_target(msgs_to_trim, n_ctx_limit, actual_tokens) - trimmed = _trim_messages_for_context(msgs_to_trim, n_ctx_limit, target_tokens=cal_target) - print(f"[_make_chat_request] Context exceeded ({actual_tokens}/{n_ctx_limit} tokens, tiktoken_target={cal_target}), dropped {len(msgs_to_trim) - len(trimmed)} oldest message(s) and retrying") - try: - response = await oclient.chat.completions.create(**{**params, "messages": trimmed}) - except Exception as e2: - if "exceed_context_size_error" in str(e2) or "exceeds the available context size" in str(e2): - print(f"[_make_chat_request] Context still exceeded after trimming, also stripping tools") - params_no_tools = {k: v for k, v in params.items() if k not in ("tools", "tool_choice")} - response = await oclient.chat.completions.create(**{**params_no_tools, "messages": trimmed}) - else: - raise - elif "image input is not supported" in _e_str: - print(f"[_make_chat_request] Model {model} doesn't support images, retrying with text-only messages") - params = {**params, "messages": _strip_images_from_messages(params.get("messages", []))} - response = await oclient.chat.completions.create(**params) - else: - raise - if stream: - # For streaming, we need to collect all chunks - chunks = [] - tc_acc = {} # accumulate tool-call deltas - async for chunk in response: - chunks.append(chunk) - _accumulate_openai_tc_delta(chunk, tc_acc) - prompt_tok = 0 - comp_tok = 0 - if chunk.usage is not None: - prompt_tok = chunk.usage.prompt_tokens or 0 - comp_tok = chunk.usage.completion_tokens or 0 - else: - llama_usage = rechunk.extract_usage_from_llama_timings(chunk) - if llama_usage: - prompt_tok, comp_tok = llama_usage - if prompt_tok != 0 or comp_tok != 0: - await token_queue.put((endpoint, tracking_model, prompt_tok, comp_tok)) - # Convert to Ollama format - if chunks: - response = rechunk.openai_chat_completion2ollama(chunks[-1], stream, start_ts) - # Inject fully-accumulated tool calls into the final response - if tc_acc and response.message: - response.message.tool_calls = _build_ollama_tool_calls(tc_acc) - else: - prompt_tok = 0 - comp_tok = 0 - if response.usage is not None: - prompt_tok = response.usage.prompt_tokens or 0 - comp_tok = response.usage.completion_tokens or 0 - else: - llama_usage = rechunk.extract_usage_from_llama_timings(response) - if llama_usage: - prompt_tok, comp_tok = llama_usage - if prompt_tok != 0 or comp_tok != 0: - await token_queue.put((endpoint, tracking_model, prompt_tok, comp_tok)) - response = rechunk.openai_chat_completion2ollama(response, stream, start_ts) - else: - response = await client.chat(model=model, messages=messages, tools=tools, stream=stream, think=think, format=format, options=options, keep_alive=keep_alive) - if stream: - # For streaming, collect all chunks - chunks = [] - async for chunk in response: - chunks.append(chunk) - prompt_tok = chunk.prompt_eval_count or 0 - comp_tok = chunk.eval_count or 0 - if prompt_tok != 0 or comp_tok != 0: - await token_queue.put((endpoint, tracking_model, prompt_tok, comp_tok)) - if chunks: - response = chunks[-1] - else: - prompt_tok = response.prompt_eval_count or 0 - comp_tok = response.eval_count or 0 - if prompt_tok != 0 or comp_tok != 0: - await token_queue.put((endpoint, tracking_model, prompt_tok, comp_tok)) - - return response - finally: - await decrement_usage(endpoint, tracking_model) - -def get_last_user_content(messages): - """ - Given a list of dicts (e.g., messages from an API), - return the 'content' of the last dict whose 'role' is 'user'. - If no such dict exists, return None. - """ - # Reverse iterate so we stop at the first match - for msg in reversed(messages): - if msg.get("role") == "user": - return msg.get("content") - return None - -async def _make_moe_requests(model: str, messages: list, tools=None, think: bool = False, format=None, options=None, keep_alive: str = None) -> ollama.ChatResponse: - """ - Helper function to make MOE (Multiple Opinions Ensemble) requests. - Generates 3 responses, 3 critiques, and returns the final selected response. - """ - query = get_last_user_content(messages) - if not query: - raise ValueError("No user query found in messages") - - if options is None: - options = {} - options["temperature"] = 1 - - moe_reqs = [] - - # Generate 3 responses — choose_endpoint is called inside _make_chat_request and - # atomically reserves a slot, so all 3 tasks see each other's load immediately. - response1_task = asyncio.create_task(_make_chat_request(model, messages, tools, stream=False, think=think, format=format, options=options, keep_alive=keep_alive)) - response2_task = asyncio.create_task(_make_chat_request(model, messages, tools, stream=False, think=think, format=format, options=options, keep_alive=keep_alive)) - response3_task = asyncio.create_task(_make_chat_request(model, messages, tools, stream=False, think=think, format=format, options=options, keep_alive=keep_alive)) - - responses = await asyncio.gather(response1_task, response2_task, response3_task) - - for n, r in enumerate(responses): - moe_req = enhance.moe(query, n, r.message.content) - moe_reqs.append(moe_req) - - # Generate 3 critiques - critique1_task = asyncio.create_task(_make_chat_request(model, [{"role": "user", "content": moe_reqs[0]}], tools, stream=False, think=think, format=format, options=options, keep_alive=keep_alive)) - critique2_task = asyncio.create_task(_make_chat_request(model, [{"role": "user", "content": moe_reqs[1]}], tools, stream=False, think=think, format=format, options=options, keep_alive=keep_alive)) - critique3_task = asyncio.create_task(_make_chat_request(model, [{"role": "user", "content": moe_reqs[2]}], tools, stream=False, think=think, format=format, options=options, keep_alive=keep_alive)) - - critiques = await asyncio.gather(critique1_task, critique2_task, critique3_task) - - # Select final response - m = enhance.moe_select_candidate(query, critiques) - - # Generate final response - return await _make_chat_request(model, [{"role": "user", "content": m}], tools, stream=False, think=think, format=format, options=options, keep_alive=keep_alive) +from requests.chat import _make_chat_request, _make_moe_requests from images import iso8601_ns, is_base64, resize_image_if_needed -def _strip_assistant_prefill(messages: list) -> list: - """Remove a trailing assistant message used as prefill. - OpenAI-compatible endpoints (including Claude) do not support prefill and - will reject requests where the last message has role 'assistant'.""" - if messages and messages[-1].get("role") == "assistant": - return messages[:-1] - return messages +from requests.messages import ( + _strip_assistant_prefill, + transform_tool_calls_to_openai, + transform_images_to_data_urls, + _strip_images_from_messages, + _accumulate_openai_tc_delta, + _build_ollama_tool_calls, + _convert_openai_logprobs, + get_last_user_content, +) +from requests.rechunk import rechunk -def transform_tool_calls_to_openai(message_list): - """ - Ensure tool_calls in assistant messages conform to the OpenAI format: - - Each tool call must have "type": "function" - - Each tool call must have an "id" - - arguments must be a JSON string, not a dict - Also ensure tool-role messages have a tool_call_id. - """ - # Track generated IDs so tool-role messages can reference them - last_tool_call_ids = {} - for msg in message_list: - role = msg.get("role") - if role == "assistant" and "tool_calls" in msg: - for tc in msg["tool_calls"]: - if "type" not in tc: - tc["type"] = "function" - if "id" not in tc: - tc["id"] = f"call_{secrets.token_hex(16)}" - func = tc.get("function", {}) - if isinstance(func.get("arguments"), dict): - func["arguments"] = orjson.dumps(func["arguments"]).decode("utf-8") - # Remember the id for the following tool-role message - name = func.get("name") - if name: - last_tool_call_ids[name] = tc["id"] - elif role == "tool": - if "tool_call_id" not in msg: - # Try to match by name from a preceding assistant tool_call - name = msg.get("name") or msg.get("tool_name") - if name and name in last_tool_call_ids: - msg["tool_call_id"] = last_tool_call_ids.pop(name) - return message_list - -def transform_images_to_data_urls(message_list): - for message in message_list: - if "images" in message: - images = message.pop("images") - if not isinstance(images, list): - continue - new_content = [] - for image in images: #TODO: quality downsize if images are too big to fit into model context window size - if not is_base64(image): - raise ValueError(f"Image string is not a valid base64 encoded string.") - resized_image = resize_image_if_needed(image) - if resized_image: - data_url = f"data:image/png;base64,{resized_image}" - #new_content.append({ - # "type": "text", - # "text": "" - #}) - new_content.append({ - "type": "image_url", - "image_url": { - "url": data_url - } - }) - message["content"] = new_content - - return message_list - -def _strip_images_from_messages(messages: list) -> list: - """Remove image_url parts from message content, keeping only text.""" - result = [] - for msg in messages: - content = msg.get("content") - if isinstance(content, list): - text_only = [p for p in content if p.get("type") != "image_url"] - if len(text_only) == 1 and text_only[0].get("type") == "text": - content = text_only[0]["text"] - else: - content = text_only - result.append({**msg, "content": content}) - else: - result.append(msg) - return result - -def _accumulate_openai_tc_delta(chunk, accumulator: dict) -> None: - """Accumulate tool_call deltas from a single OpenAI streaming chunk. - - ``accumulator`` is a dict mapping tool-call *index* to - ``{"id": str, "name": str, "arguments": str}`` where ``arguments`` - is the concatenation of all JSON fragments seen so far. - """ - if not chunk.choices: - return - delta = chunk.choices[0].delta - tc_deltas = getattr(delta, "tool_calls", None) - if not tc_deltas: - return - for tc in tc_deltas: - idx = tc.index - if idx not in accumulator: - accumulator[idx] = { - "id": getattr(tc, "id", None) or f"call_{secrets.token_hex(16)}", - "name": tc.function.name if tc.function else None, - "arguments": "", - } - else: - if getattr(tc, "id", None): - accumulator[idx]["id"] = tc.id - if tc.function and tc.function.name: - accumulator[idx]["name"] = tc.function.name - if tc.function and tc.function.arguments: - accumulator[idx]["arguments"] += tc.function.arguments - -def _build_ollama_tool_calls(accumulator: dict) -> list | None: - """Convert accumulated tool-call data into Ollama-format tool_calls list.""" - if not accumulator: - return None - result = [] - for idx in sorted(accumulator.keys()): - tc = accumulator[idx] - try: - args = orjson.loads(tc["arguments"]) if tc["arguments"] else {} - except (orjson.JSONDecodeError, TypeError): - args = {} - result.append(ollama.Message.ToolCall( - function=ollama.Message.ToolCall.Function(name=tc["name"], arguments=args) - )) - return result - -def _convert_openai_logprobs(choice) -> list | None: - """Convert OpenAI logprobs from a choice into Ollama Logprob objects.""" - lp = getattr(choice, "logprobs", None) - if lp is None: - return None - content = getattr(lp, "content", None) - if not content: - return None - result = [] - for entry in content: - top = [ - TokenLogprob(token=alt.token, logprob=alt.logprob) - for alt in (entry.top_logprobs or []) - ] - result.append(Logprob( - token=entry.token, - logprob=entry.logprob, - top_logprobs=top or None, - )) - return result - -class rechunk: - def openai_chat_completion2ollama(chunk: dict, stream: bool, start_ts: float) -> ollama.ChatResponse: - now = time.perf_counter() - if chunk.choices == [] and chunk.usage is not None: - return ollama.ChatResponse( - model=chunk.model, - created_at=iso8601_ns(), - done=True, - done_reason='stop', - total_duration=int((now - start_ts) * 1_000_000_000), - load_duration=100000, - prompt_eval_count=int(chunk.usage.prompt_tokens), - prompt_eval_duration=int((now - start_ts) * 1_000_000_000 * (chunk.usage.prompt_tokens / chunk.usage.completion_tokens / 100)), - eval_count=int(chunk.usage.completion_tokens), - eval_duration=int((now - start_ts) * 1_000_000_000), - message=ollama.Message(role="assistant", content=""), - ) - with_thinking = chunk.choices[0] if chunk.choices[0] else None - if stream == True: - thinking = (getattr(with_thinking.delta, "reasoning_content", None) or getattr(with_thinking.delta, "reasoning", None)) if with_thinking else None - role = chunk.choices[0].delta.role or "assistant" - content = chunk.choices[0].delta.content or '' - else: - thinking = (getattr(with_thinking.message, "reasoning_content", None) or getattr(with_thinking.message, "reasoning", None)) if with_thinking else None - role = chunk.choices[0].message.role or "assistant" - content = chunk.choices[0].message.content or '' - # Convert OpenAI tool_calls to Ollama format - # In streaming mode, tool_calls arrive as partial deltas across multiple chunks - # (name only in first delta, arguments as incremental JSON fragments). - # Callers must accumulate deltas and inject the final result; skip here. - ollama_tool_calls = None - if not stream: - raw_tool_calls = getattr(with_thinking.message, "tool_calls", None) if with_thinking else None - if raw_tool_calls: - ollama_tool_calls = [] - for tc in raw_tool_calls: - try: - args = orjson.loads(tc.function.arguments) if isinstance(tc.function.arguments, str) else (tc.function.arguments or {}) - except (orjson.JSONDecodeError, TypeError): - args = {} - ollama_tool_calls.append(ollama.Message.ToolCall( - function=ollama.Message.ToolCall.Function(name=tc.function.name, arguments=args) - )) - # Convert OpenAI logprobs to Ollama format - ollama_logprobs = _convert_openai_logprobs(with_thinking) if with_thinking else None - assistant_msg = ollama.Message( - role=role, - content=content, - thinking=thinking, - images=None, - tool_name=None, - tool_calls=ollama_tool_calls) - rechunk = ollama.ChatResponse( - model=chunk.model, - created_at=iso8601_ns(), - done=True if chunk.usage is not None else False, - done_reason=chunk.choices[0].finish_reason, #if chunk.choices[0].finish_reason is not None else None, - total_duration=int((now - start_ts) * 1_000_000_000) if chunk.usage is not None else 0, - load_duration=100000, - prompt_eval_count=int(chunk.usage.prompt_tokens) if chunk.usage is not None else 0, - prompt_eval_duration=int((now - start_ts) * 1_000_000_000 * (chunk.usage.prompt_tokens / chunk.usage.completion_tokens / 100)) if chunk.usage is not None and chunk.usage.completion_tokens != 0 else 0, - eval_count=int(chunk.usage.completion_tokens) if chunk.usage is not None else 0, - eval_duration=int((now - start_ts) * 1_000_000_000) if chunk.usage is not None else 0, - message=assistant_msg, - logprobs=ollama_logprobs) - return rechunk - - def openai_completion2ollama(chunk: dict, stream: bool, start_ts: float) -> ollama.GenerateResponse: - now = time.perf_counter() - with_thinking = chunk.choices[0] if chunk.choices[0] else None - thinking = getattr(with_thinking, "reasoning", None) if with_thinking else None - rechunk = ollama.GenerateResponse( - model=chunk.model, - created_at=iso8601_ns(), - done=True if chunk.usage is not None else False, - done_reason=chunk.choices[0].finish_reason, - total_duration=int((now - start_ts) * 1_000_000_000) if chunk.usage is not None else 0, - load_duration=10000, - prompt_eval_count=int(chunk.usage.prompt_tokens) if chunk.usage is not None else 0, - prompt_eval_duration=int((now - start_ts) * 1_000_000_000 * (chunk.usage.prompt_tokens / chunk.usage.completion_tokens / 100)) if chunk.usage is not None and chunk.usage.completion_tokens != 0 else 0, - eval_count=int(chunk.usage.completion_tokens) if chunk.usage is not None else 0, - eval_duration=int((now - start_ts) * 1_000_000_000) if chunk.usage is not None else 0, - response=chunk.choices[0].text or '', - thinking=thinking) - return rechunk - - def openai_embeddings2ollama(chunk: dict) -> ollama.EmbeddingsResponse: - rechunk = ollama.EmbeddingsResponse(embedding=chunk.data[0].embedding) - return rechunk - - def openai_embed2ollama(chunk: dict, model: str) -> ollama.EmbedResponse: - rechunk = ollama.EmbedResponse( - model=model, - created_at=iso8601_ns(), - done=None, - done_reason=None, - total_duration=None, - load_duration=None, - prompt_eval_count=None, - prompt_eval_duration=None, - eval_count=None, - eval_duration=None, - embeddings=[chunk.data[0].embedding]) - return rechunk - - def extract_usage_from_llama_timings(obj) -> tuple[int, int] | None: - """Extract (prompt_tokens, completion_tokens) from llama-server's timings object. - - llama-server returns a ``timings`` dict instead of the standard OpenAI - ``usage`` field:: - - "timings": { - "cache_n": 236, // prompt tokens reused from cache - "prompt_n": 1, // prompt tokens processed - "predicted_n": 35 // predicted (completion) tokens - } - - prompt_tokens = prompt_n + cache_n - completion_tokens = predicted_n - - Returns ``(prompt_tokens, completion_tokens)`` or ``None`` when no - timings are found. - """ - timings = getattr(obj, "timings", None) - if timings is None: - return None - if isinstance(timings, dict): - prompt_n = timings.get("prompt_n", 0) or 0 - cache_n = timings.get("cache_n", 0) or 0 - predicted_n = timings.get("predicted_n", 0) or 0 - return (prompt_n + cache_n, predicted_n) - return None - from sse import ( _capture_snapshot, _distribute_snapshot,