dograh/api/services/workflow/pipecat_engine_context_composer.py

133 lines
5.2 KiB
Python
Raw Permalink Normal View History

"""System prompt and function schema composition for PipecatEngine nodes.
Extracts prompt and function composition logic from PipecatEngine into
reusable functions. Defines recording response mode markers and instructions.
"""
from typing import TYPE_CHECKING, Callable, Optional
if TYPE_CHECKING:
from api.services.workflow.pipecat_engine_custom_tools import CustomToolManager
from api.services.workflow.workflow_graph import Node, WorkflowGraph
from api.services.workflow.pipecat_engine_custom_tools import get_function_schema
from api.services.workflow.tools.knowledge_base import get_knowledge_base_tool
# ---------------------------------------------------------------------------
# Recording response mode markers
# ---------------------------------------------------------------------------
RECORDING_MARKER = "" # Play pre-recorded audio
TTS_MARKER = "" # Generate dynamic TTS text
# ---------------------------------------------------------------------------
# Recording response mode system prompt instructions
# ---------------------------------------------------------------------------
RECORDING_RESPONSE_MODE_INSTRUCTIONS = """\
RESPONSE MODE INSTRUCTIONS - MANDATORY FORMAT:
Every response you generate MUST begin with excatcly one response mode indicator.
You have two modes for responding:
1. DYNAMIC SPEECH (): Generate text that will be converted to speech by TTS.
Format: followed by a space and your full spoken response. Nothing else.
Example: Hello! How can I help you today?
2. PRE-RECORDED AUDIO (): Play a pre-recorded audio message.
Format: followed by a space followed by recording_id followed by provided transcript. Nothing else.
Example: rec_greeting_01 [ Provided Transcript ]
RULES:
- Your response MUST start with either or as the very first character.
- For (dynamic speech): Follow with a space and your response to be generated using TTS engine. Dont mix with
- For (pre-recorded audio): Follow with a space and recording_id of the audio clip with its transcript. Dont mix with
- Use when a pre-recorded message matches the situation well.
- Use when you need to generate a dynamic, contextual response.
- *NEVER* mix modes in a single response, since we rely on the markers to decide whether to play using TTS or Pre-recorded audio."""
def compose_system_prompt_for_node(
*,
node: "Node",
workflow: "WorkflowGraph",
format_prompt: Callable[[str], str],
has_recordings: bool,
) -> str:
"""Compose the full system prompt text for a workflow node.
Combines the global prompt, node-specific prompt, and (when recordings
are enabled anywhere in the workflow) the recording response mode
instructions into a single string.
Args:
node: The workflow node to compose the prompt for.
workflow: The full workflow graph (needed for global node prompt).
format_prompt: Callable to render template variables in prompts.
has_recordings: Whether any node in the workflow uses recordings.
Returns:
The composed system prompt text.
"""
global_prompt = ""
if workflow.global_node_id and node.add_global_prompt:
global_node = workflow.nodes[workflow.global_node_id]
global_prompt = format_prompt(global_node.prompt)
formatted_node_prompt = format_prompt(node.prompt)
parts = [p for p in (global_prompt, formatted_node_prompt) if p]
if has_recordings and "RECORDING_ID:" in formatted_node_prompt:
parts.append(RECORDING_RESPONSE_MODE_INSTRUCTIONS)
return "\n\n".join(parts)
async def compose_functions_for_node(
*,
node: "Node",
custom_tool_manager: Optional["CustomToolManager"],
) -> list[dict]:
"""Compose the function/tool schemas for a workflow node.
Gathers knowledge-base tools, custom tools (including built-in
categories like calculator), and transition function schemas
into a single list.
Args:
node: The workflow node to compose functions for.
custom_tool_manager: Manager for custom and built-in tools (may be None).
Returns:
A list of function schemas to register with the LLM.
"""
functions: list[dict] = []
# Knowledge base retrieval tool
if node.document_uuids:
kb_tool_def = get_knowledge_base_tool(node.document_uuids)
kb_schema = get_function_schema(
kb_tool_def["function"]["name"],
kb_tool_def["function"]["description"],
properties=kb_tool_def["function"]["parameters"].get("properties", {}),
required=kb_tool_def["function"]["parameters"].get("required", []),
)
functions.append(kb_schema)
# Custom tools
if node.tool_uuids and custom_tool_manager:
custom_tool_schemas = await custom_tool_manager.get_tool_schemas(
node.tool_uuids,
mcp_tool_filters=getattr(node, "mcp_tool_filters", None),
)
functions.extend(custom_tool_schemas)
# Transition function schemas
for outgoing_edge in node.out_edges:
function_schema = get_function_schema(
outgoing_edge.get_function_name(), outgoing_edge.condition
)
functions.append(function_schema)
return functions