"""Translation between the OpenAI **Responses API** and **Chat Completions**. The router speaks Chat Completions to every backend (Ollama, llama-server, external OpenAI). To expose ``/v1/responses`` transparently on top of that, this module converts in both directions: * request: Responses ``input`` / ``instructions`` / ``tools`` → chat ``messages`` / ``tools`` * response: chat ``choices[0].message`` → Responses ``output`` items * stream: chat completion deltas → Responses typed SSE events Pure functions / a stream-translator class — no I/O, mirroring the style of ``requests/messages.py``. The native passthrough path (external OpenAI) does not use this module; it forwards the SDK's Responses objects directly. """ import secrets import time import orjson from requests.messages import _accumulate_openai_tc_delta # --------------------------------------------------------------------------- # Request direction: Responses → Chat Completions # --------------------------------------------------------------------------- def _responses_content_to_chat(content): """Convert a Responses message ``content`` into Chat Completions content. Collapses a single text part to a plain string (what most backends expect); keeps a multimodal list otherwise. """ if content is None or isinstance(content, str): return content if not isinstance(content, list): return str(content) parts = [] for p in content: if not isinstance(p, dict): parts.append({"type": "text", "text": str(p)}) continue ptype = p.get("type") if ptype in ("input_text", "output_text", "text"): parts.append({"type": "text", "text": p.get("text", "")}) elif ptype in ("input_image", "image_url"): url = p.get("image_url") if isinstance(url, dict): url = url.get("url") if url: parts.append({"type": "image_url", "image_url": {"url": url}}) # input_file / refusal / reasoning parts have no chat equivalent → skip if len(parts) == 1 and parts[0].get("type") == "text": return parts[0]["text"] return parts def _input_item_to_message(item): """Convert a single Responses ``input`` item to a chat message (or None).""" if isinstance(item, str): return {"role": "user", "content": item} if not isinstance(item, dict): return None itype = item.get("type") if itype == "function_call": return { "role": "assistant", "content": None, "tool_calls": [{ "id": item.get("call_id") or item.get("id"), "type": "function", "function": { "name": item.get("name"), "arguments": item.get("arguments", ""), }, }], } if itype == "function_call_output": output = item.get("output", "") if not isinstance(output, str): output = orjson.dumps(output).decode("utf-8") return { "role": "tool", "tool_call_id": item.get("call_id") or item.get("id"), "content": output, } if itype in ("reasoning",): # No Chat Completions equivalent — drop. return None # "message" item or a bare {role, content} chat-style item role = item.get("role") if role is None: return None return {"role": role, "content": _responses_content_to_chat(item.get("content"))} def responses_input_to_messages(input_data, instructions=None): """Build a Chat Completions ``messages`` list from Responses ``input``. ``instructions`` becomes a leading system message; a string ``input`` becomes a single user message; a list ``input`` is mapped item-by-item. """ messages = [] if instructions: messages.append({"role": "system", "content": instructions}) if input_data is None: return messages if isinstance(input_data, str): messages.append({"role": "user", "content": input_data}) return messages if isinstance(input_data, list): for item in input_data: msg = _input_item_to_message(item) if msg is not None: messages.append(msg) return messages def _chat_content_to_responses_parts(content, assistant=False): """Convert chat message content → Responses content parts.""" text_type = "output_text" if assistant else "input_text" if content is None: return [] if isinstance(content, str): return [{"type": text_type, "text": content}] parts = [] for p in content if isinstance(content, list) else []: if not isinstance(p, dict): parts.append({"type": text_type, "text": str(p)}) elif p.get("type") == "text": parts.append({"type": text_type, "text": p.get("text", "")}) elif p.get("type") == "image_url": url = (p.get("image_url") or {}).get("url") if url: parts.append({"type": "input_image", "image_url": url}) return parts def messages_to_responses_input(messages): """Convert chat messages → ``(instructions, Responses input items)``. Used for the native passthrough path: history that the router has resolved in chat-message space is re-expressed as Responses ``input``. Leading/standalone system messages are merged into ``instructions``. """ instructions_parts = [] items = [] for m in messages: role = m.get("role") if role == "system": c = m.get("content") instructions_parts.append(c if isinstance(c, str) else orjson.dumps(c).decode("utf-8")) continue if role == "tool": out = m.get("content") if not isinstance(out, str): out = orjson.dumps(out).decode("utf-8") items.append({"type": "function_call_output", "call_id": m.get("tool_call_id"), "output": out}) continue if role == "assistant" and m.get("tool_calls"): for tc in m["tool_calls"]: fn = tc.get("function", {}) items.append({"type": "function_call", "call_id": tc.get("id"), "name": fn.get("name"), "arguments": fn.get("arguments", "")}) if m.get("content"): items.append({"role": "assistant", "content": _chat_content_to_responses_parts(m["content"], assistant=True)}) continue items.append({"role": role, "content": _chat_content_to_responses_parts(m.get("content"), assistant=(role == "assistant"))}) instructions = "\n\n".join(p for p in instructions_parts if p) or None return instructions, items def responses_object_to_sse(resp): """Render a *finished* Responses object as a valid SSE event stream. Used to serve cache/store hits to streaming clients without a backend call. """ seq = [-1] def ev(etype, payload): seq[0] += 1 body = {"type": etype, "sequence_number": seq[0], **payload} return f"event: {etype}\ndata: {orjson.dumps(body).decode('utf-8')}\n\n".encode("utf-8") parts_out = [] in_progress = {**resp, "status": "in_progress", "output": [], "output_text": ""} parts_out.append(ev("response.created", {"response": in_progress})) parts_out.append(ev("response.in_progress", {"response": in_progress})) for oi, item in enumerate(resp.get("output", [])): parts_out.append(ev("response.output_item.added", {"output_index": oi, "item": {**item, "status": "in_progress"}})) if item.get("type") == "message": for ci, part in enumerate(item.get("content", [])): if part.get("type") == "output_text": iid = item.get("id") parts_out.append(ev("response.content_part.added", { "item_id": iid, "output_index": oi, "content_index": ci, "part": {"type": "output_text", "text": "", "annotations": []}})) parts_out.append(ev("response.output_text.delta", { "item_id": iid, "output_index": oi, "content_index": ci, "delta": part.get("text", "")})) parts_out.append(ev("response.output_text.done", { "item_id": iid, "output_index": oi, "content_index": ci, "text": part.get("text", "")})) parts_out.append(ev("response.content_part.done", { "item_id": iid, "output_index": oi, "content_index": ci, "part": part})) parts_out.append(ev("response.output_item.done", {"output_index": oi, "item": item})) parts_out.append(ev("response.completed", {"response": resp})) return b"".join(parts_out) def tools_responses_to_chat(tools): """Map Responses tool definitions (flattened) → Chat Completions (nested).""" if not tools: return None out = [] for t in tools: if isinstance(t, dict) and t.get("type") == "function" and "function" not in t: fn = {k: t[k] for k in ("name", "description", "parameters", "strict") if k in t} out.append({"type": "function", "function": fn}) else: out.append(t) return out # --------------------------------------------------------------------------- # Response direction: Chat Completions → Responses # --------------------------------------------------------------------------- def _new_id(prefix): return f"{prefix}_{secrets.token_hex(16)}" def chat_message_to_output_items(message): """Convert an assistant chat message (dict) into Responses output items.""" items = [] content = message.get("content") if content: items.append({ "type": "message", "id": _new_id("msg"), "status": "completed", "role": "assistant", "content": [{"type": "output_text", "text": content, "annotations": []}], }) for tc in message.get("tool_calls") or []: fn = tc.get("function", {}) items.append({ "type": "function_call", "id": _new_id("fc"), "call_id": tc.get("id"), "name": fn.get("name"), "arguments": fn.get("arguments", ""), "status": "completed", }) return items def usage_chat_to_responses(usage): """Map chat usage ``{prompt_tokens, completion_tokens}`` → Responses usage.""" if not usage: return None prompt = usage.get("prompt_tokens") or 0 completion = usage.get("completion_tokens") or 0 return { "input_tokens": prompt, "output_tokens": completion, "total_tokens": usage.get("total_tokens") or (prompt + completion), } def output_items_to_text(output_items): """Concatenate the ``output_text`` parts of all message items.""" chunks = [] for item in output_items or []: if item.get("type") != "message": continue for part in item.get("content") or []: if part.get("type") == "output_text": chunks.append(part.get("text", "")) return "".join(chunks) def build_response_object( *, response_id, model, output_items=None, usage=None, status="completed", created_at=None, previous_response_id=None, instructions=None, error=None, metadata=None, ): """Assemble a full ``object:"response"`` body for a non-streaming reply.""" output_items = output_items or [] return { "id": response_id, "object": "response", "created_at": created_at or int(time.time()), "status": status, "model": model, "output": output_items, "output_text": output_items_to_text(output_items), "instructions": instructions, "previous_response_id": previous_response_id, "usage": usage_chat_to_responses(usage) if usage and "input_tokens" not in usage else usage, "error": error, "metadata": metadata or {}, } # --------------------------------------------------------------------------- # Streaming direction: Chat Completions deltas → Responses typed SSE events # --------------------------------------------------------------------------- class ChatToResponsesStream: """Translate a Chat Completions streaming generator into Responses events. Usage:: translator = ChatToResponsesStream(response_id, model, created_at) async for sse_bytes in translator.events(chat_async_gen): yield sse_bytes # translator.output_items / translator.usage now populated for storage Emits the ordered event family ``response.created`` → ``response.in_progress`` → (``response.output_item.added`` → ``response.content_part.added`` → ``response.output_text.delta``* → ``response.output_text.done`` → ``response.content_part.done`` → ``response.output_item.done``) and/or function-call item events → ``response.completed`` (carrying usage). """ def __init__(self, response_id, model, created_at=None, previous_response_id=None, instructions=None, metadata=None): self.response_id = response_id self.model = model self.created_at = created_at or int(time.time()) self.previous_response_id = previous_response_id self.instructions = instructions self.metadata = metadata or {} self.seq = -1 self.output_items = [] self.usage = None def _snapshot(self, status, output=None): return build_response_object( response_id=self.response_id, model=self.model, output_items=output if output is not None else [], usage=self.usage, status=status, created_at=self.created_at, previous_response_id=self.previous_response_id, instructions=self.instructions, metadata=self.metadata, ) def _event(self, etype, payload): self.seq += 1 body = {"type": etype, "sequence_number": self.seq, **payload} return f"event: {etype}\ndata: {orjson.dumps(body).decode('utf-8')}\n\n".encode("utf-8") async def events(self, async_gen): yield self._event("response.created", {"response": self._snapshot("in_progress")}) yield self._event("response.in_progress", {"response": self._snapshot("in_progress")}) next_oi = 0 # text message state msg_item_id = None msg_oi = None text_parts = [] # function-call state, keyed by chat tool_call index tc_state = {} # idx -> {oi, item_id, call_id, name, args} async for chunk in async_gen: usage = getattr(chunk, "usage", None) if usage is not None: self.usage = { "prompt_tokens": usage.prompt_tokens or 0, "completion_tokens": usage.completion_tokens or 0, } choices = getattr(chunk, "choices", None) if not choices: continue delta = choices[0].delta content_piece = getattr(delta, "content", None) if content_piece: if msg_item_id is None: msg_item_id = _new_id("msg") msg_oi = next_oi next_oi += 1 item = { "id": msg_item_id, "type": "message", "status": "in_progress", "role": "assistant", "content": [], } yield self._event("response.output_item.added", {"output_index": msg_oi, "item": item}) yield self._event("response.content_part.added", { "item_id": msg_item_id, "output_index": msg_oi, "content_index": 0, "part": {"type": "output_text", "text": "", "annotations": []}, }) text_parts.append(content_piece) yield self._event("response.output_text.delta", { "item_id": msg_item_id, "output_index": msg_oi, "content_index": 0, "delta": content_piece, }) for tc in getattr(delta, "tool_calls", None) or []: idx = tc.index fn = getattr(tc, "function", None) if idx not in tc_state: item_id = _new_id("fc") state = { "oi": next_oi, "item_id": item_id, "call_id": getattr(tc, "id", None) or _new_id("call"), "name": (fn.name if fn else None), "args": "", } next_oi += 1 tc_state[idx] = state yield self._event("response.output_item.added", { "output_index": state["oi"], "item": { "id": item_id, "type": "function_call", "status": "in_progress", "call_id": state["call_id"], "name": state["name"], "arguments": "", }, }) else: state = tc_state[idx] if getattr(tc, "id", None): state["call_id"] = tc.id if fn and fn.name: state["name"] = fn.name if fn and fn.arguments: state["args"] += fn.arguments yield self._event("response.function_call_arguments.delta", { "item_id": state["item_id"], "output_index": state["oi"], "delta": fn.arguments, }) # finalize message item if msg_item_id is not None: full_text = "".join(text_parts) yield self._event("response.output_text.done", { "item_id": msg_item_id, "output_index": msg_oi, "content_index": 0, "text": full_text, }) done_part = {"type": "output_text", "text": full_text, "annotations": []} yield self._event("response.content_part.done", { "item_id": msg_item_id, "output_index": msg_oi, "content_index": 0, "part": done_part, }) msg_item = { "id": msg_item_id, "type": "message", "status": "completed", "role": "assistant", "content": [done_part], } yield self._event("response.output_item.done", {"output_index": msg_oi, "item": msg_item}) # finalize function-call items (in output-index order) tc_items = {} for idx, state in tc_state.items(): yield self._event("response.function_call_arguments.done", { "item_id": state["item_id"], "output_index": state["oi"], "arguments": state["args"], }) fc_item = { "id": state["item_id"], "type": "function_call", "status": "completed", "call_id": state["call_id"], "name": state["name"], "arguments": state["args"], } tc_items[state["oi"]] = fc_item yield self._event("response.output_item.done", {"output_index": state["oi"], "item": fc_item}) # assemble final output items ordered by output index ordered = [] if msg_item_id is not None: ordered.append((msg_oi, msg_item)) ordered.extend(tc_items.items()) self.output_items = [item for _, item in sorted(ordered, key=lambda kv: kv[0])] yield self._event("response.completed", {"response": self._snapshot("completed", self.output_items)})