feat: set calculator as custom tool on demand

This commit is contained in:
Abhishek Kumar 2026-04-02 14:07:03 +05:30
parent 89fce77438
commit f368fe5134
13 changed files with 265 additions and 157 deletions

View file

@ -0,0 +1,61 @@
"""add calculator in ToolCategory
Revision ID: c71db647d354
Revises: b3a1c7e94f12
Create Date: 2026-04-02 13:53:46.184244
"""
from typing import Sequence, Union
from alembic import op
from alembic_postgresql_enum import TableReference
# revision identifiers, used by Alembic.
revision: str = "c71db647d354"
down_revision: Union[str, None] = "b3a1c7e94f12"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_index(op.f("ix_api_keys_key_hash"), table_name="api_keys")
op.create_index("ix_api_keys_key_hash", "api_keys", ["key_hash"], unique=False)
op.sync_enum_values(
enum_schema="public",
enum_name="tool_category",
new_values=[
"http_api",
"end_call",
"transfer_call",
"calculator",
"native",
"integration",
],
affected_columns=[
TableReference(
table_schema="public", table_name="tools", column_name="category"
)
],
enum_values_to_rename=[],
)
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.sync_enum_values(
enum_schema="public",
enum_name="tool_category",
new_values=["http_api", "end_call", "transfer_call", "native", "integration"],
affected_columns=[
TableReference(
table_schema="public", table_name="tools", column_name="category"
)
],
enum_values_to_rename=[],
)
op.drop_index("ix_api_keys_key_hash", table_name="api_keys")
op.create_index(op.f("ix_api_keys_key_hash"), "api_keys", ["key_hash"], unique=True)
# ### end Alembic commands ###

View file

@ -128,6 +128,7 @@ class ToolCategory(Enum):
HTTP_API = "http_api" # Custom HTTP API calls (implemented)
END_CALL = "end_call" # End call tool
TRANSFER_CALL = "transfer_call" # Transfer call to phone number (Twilio only)
CALCULATOR = "calculator" # Built-in calculator tool
NATIVE = "native" # Built-in integrations (future: dtmf_input)
INTEGRATION = "integration" # Third-party integrations (future: Google Calendar, Salesforce, etc.)

View file

@ -134,9 +134,21 @@ class TransferCallToolDefinition(BaseModel):
config: TransferCallConfig = Field(description="Transfer Call configuration")
class CalculatorToolDefinition(BaseModel):
"""Tool definition for Calculator tools (no configuration needed)."""
schema_version: int = Field(default=1, description="Schema version")
type: Literal["calculator"] = Field(description="Tool type")
# Union type for tool definitions - Pydantic will discriminate based on 'type' field
ToolDefinition = Annotated[
Union[HttpApiToolDefinition, EndCallToolDefinition, TransferCallToolDefinition],
Union[
HttpApiToolDefinition,
EndCallToolDefinition,
TransferCallToolDefinition,
CalculatorToolDefinition,
],
Field(discriminator="type"),
]

View file

@ -318,9 +318,7 @@ OPENAI_REALTIME_VOICES = [
GOOGLE_REALTIME_MODELS = ["gemini-3.1-flash-live-preview"]
GOOGLE_REALTIME_VOICES = ["Puck", "Charon", "Kore", "Fenrir", "Aoede"]
GOOGLE_REALTIME_LANGUAGES = [
"en"
]
GOOGLE_REALTIME_LANGUAGES = ["en"]
@register_service(ServiceType.REALTIME)

View file

@ -40,20 +40,14 @@ from api.services.workflow.pipecat_engine_context_composer import (
)
from api.services.workflow.pipecat_engine_custom_tools import (
CustomToolManager,
get_function_schema,
)
from api.services.workflow.pipecat_engine_variable_extractor import (
VariableExtractionManager,
)
from api.services.workflow.tools.calculator import get_calculator_tools, safe_calculator
from api.services.workflow.tools.knowledge_base import (
retrieve_from_knowledge_base,
)
from api.services.workflow.tools.timezone import (
convert_time,
get_current_time,
get_time_tools,
)
from api.services.workflow.tools.timezone import get_current_time
from api.utils.template_renderer import render_template
@ -93,9 +87,6 @@ class PipecatEngine:
# access to _context
self._variable_extraction_manager = None
# Lazy loaded built-in function schemas
self._builtin_function_schemas: Optional[list[dict]] = None
# Track current LLM reference text for TTS aggregation correction
self._current_llm_generation_reference_text: str = ""
@ -144,36 +135,6 @@ class PipecatEngine:
return None
return tracing_ctx.get_turn_context() or tracing_ctx.get_conversation_context()
@property
def builtin_function_schemas(self) -> list[dict]:
"""Get built-in function schemas (calculator and timezone tools)."""
if self._builtin_function_schemas is None:
self._builtin_function_schemas = []
# Transform calculator tools to get_function_schema format
for tool in get_calculator_tools():
func = tool["function"]
schema = get_function_schema(
func["name"],
func["description"],
properties=func["parameters"]["properties"],
required=func["parameters"]["required"],
)
self._builtin_function_schemas.append(schema)
# Transform timezone tools to get_function_schema format
for tool in get_time_tools():
func = tool["function"]
schema = get_function_schema(
func["name"],
func["description"],
properties=func["parameters"]["properties"],
required=func["parameters"]["required"],
)
self._builtin_function_schemas.append(schema)
return self._builtin_function_schemas
async def initialize(self):
# TODO: May be set_node in a separate task so that we return from initialize immediately
if self._initialized:
@ -197,9 +158,6 @@ class PipecatEngine:
except Exception as e:
logger.error(f"Failed to fetch current EST time: {e}")
# Register built-in functions with the LLM
await self._register_builtin_functions()
await self.set_node(self.workflow.start_node_id)
logger.debug(f"{self.__class__.__name__} initialized")
@ -316,57 +274,6 @@ class PipecatEngine:
cancel_on_interruption=False,
)
async def _register_builtin_functions(self):
"""Register built-in functions (calculator and timezone) with the LLM."""
logger.debug("Registering built-in functions with LLM")
# Register calculator function
async def calculate_func(function_call_params: FunctionCallParams) -> None:
logger.info(f"LLM Function Call EXECUTED: safe_calculator")
logger.info(f"Arguments: {function_call_params.arguments}")
try:
expr = function_call_params.arguments.get("expression", "")
result = safe_calculator(expr)
await function_call_params.result_callback(
{"expression": expr, "result": result}
)
except Exception as e:
await function_call_params.result_callback({"error": str(e)})
# Register timezone functions
async def get_current_time_func(
function_call_params: FunctionCallParams,
) -> None:
logger.info(f"LLM Function Call EXECUTED: get_current_time")
logger.info(f"Arguments: {function_call_params.arguments}")
try:
timezone = function_call_params.arguments.get("timezone", "UTC")
result = get_current_time(timezone)
await function_call_params.result_callback(result)
except Exception as e:
await function_call_params.result_callback({"error": str(e)})
async def convert_time_func(function_call_params: FunctionCallParams) -> None:
logger.info(f"LLM Function Call EXECUTED: convert_time")
logger.info(f"Arguments: {function_call_params.arguments}")
try:
result = convert_time(
function_call_params.arguments.get("source_timezone"),
function_call_params.arguments.get("time"),
function_call_params.arguments.get("target_timezone"),
)
await function_call_params.result_callback(result)
except Exception as e:
await function_call_params.result_callback({"error": str(e)})
# Register all built-in functions
self.llm.register_function("safe_calculator", calculate_func)
self.llm.register_function("get_current_time", get_current_time_func)
self.llm.register_function("convert_time", convert_time_func)
async def _register_knowledge_base_function(
self, document_uuids: list[str]
) -> None:
@ -553,7 +460,6 @@ class PipecatEngine:
)
functions = await compose_functions_for_node(
node=node,
builtin_function_schemas=self.builtin_function_schemas,
custom_tool_manager=self._custom_tool_manager,
)
await self._update_llm_context(system_prompt, functions)

View file

@ -86,27 +86,23 @@ def compose_system_prompt_for_node(
async def compose_functions_for_node(
*,
node: "Node",
builtin_function_schemas: list[dict],
custom_tool_manager: Optional["CustomToolManager"],
) -> list[dict]:
"""Compose the function/tool schemas for a workflow node.
Gathers built-in tools, knowledge-base tools, custom tools,
and transition function schemas into a single list.
Gathers knowledge-base tools, custom tools (including built-in
categories like calculator), and transition function schemas
into a single list.
Args:
node: The workflow node to compose functions for.
builtin_function_schemas: Pre-computed schemas for built-in tools.
custom_tool_manager: Manager for user-defined custom tools (may be None).
custom_tool_manager: Manager for custom and built-in tools (may be None).
Returns:
A list of function schemas to register with the LLM.
"""
functions: list[dict] = []
# Built-in tools (calculator, timezone)
functions.extend(builtin_function_schemas)
# Knowledge base retrieval tool
if node.document_uuids:
kb_tool_def = get_knowledge_base_tool(node.document_uuids)

View file

@ -23,6 +23,7 @@ from api.services.telephony.transfer_event_protocol import TransferContext
from api.services.workflow.disposition_mapper import (
get_organization_id_from_workflow_run,
)
from api.services.workflow.tools.calculator import get_calculator_tools, safe_calculator
from api.services.workflow.tools.custom_tool import (
execute_http_tool,
tool_to_function_schema,
@ -105,6 +106,20 @@ class CustomToolManager:
schemas: list[FunctionSchema] = []
for tool in tools:
if tool.category == ToolCategory.CALCULATOR.value:
# Built-in calculator: return pre-defined schemas
for tool_def in get_calculator_tools():
func = tool_def["function"]
schemas.append(
get_function_schema(
func["name"],
func["description"],
properties=func["parameters"]["properties"],
required=func["parameters"]["required"],
)
)
continue
raw_schema = tool_to_function_schema(tool)
function_name = raw_schema["function"]["name"]
@ -146,6 +161,14 @@ class CustomToolManager:
tools = await db_client.get_tools_by_uuids(tool_uuids, organization_id)
for tool in tools:
if tool.category == ToolCategory.CALCULATOR.value:
self._register_calculator_handler()
logger.debug(
f"Registered calculator tool handler "
f"(tool_uuid: {tool.tool_uuid})"
)
continue
schema = tool_to_function_schema(tool)
function_name = schema["function"]["name"]
@ -193,6 +216,23 @@ class CustomToolManager:
return handler, timeout_secs, cancel_on_interruption
def _register_calculator_handler(self) -> None:
"""Register the built-in calculator function with the LLM."""
async def calculate_func(function_call_params: FunctionCallParams) -> None:
logger.info("LLM Function Call EXECUTED: safe_calculator")
logger.info(f"Arguments: {function_call_params.arguments}")
try:
expr = function_call_params.arguments.get("expression", "")
result = safe_calculator(expr)
await function_call_params.result_callback(
{"expression": expr, "result": result}
)
except Exception as e:
await function_call_params.result_callback({"error": str(e)})
self._engine.llm.register_function("safe_calculator", calculate_func)
def _create_http_tool_handler(self, tool: Any, function_name: str):
"""Create a handler function for an HTTP API tool.