diff --git a/apps/python-sdk/pyproject.toml b/apps/python-sdk/pyproject.toml index f03b944f..6d1cabdc 100644 --- a/apps/python-sdk/pyproject.toml +++ b/apps/python-sdk/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "hatchling.build" [project] name = "rowboat" -version = "1.0.6" +version = "2.1.0" authors = [ { name = "Your Name", email = "your.email@example.com" }, ] diff --git a/apps/python-sdk/src/rowboat/client.py b/apps/python-sdk/src/rowboat/client.py index f0ad3b31..2997ed08 100644 --- a/apps/python-sdk/src/rowboat/client.py +++ b/apps/python-sdk/src/rowboat/client.py @@ -38,7 +38,8 @@ class Client: workflowId=workflow_id, testProfileId=test_profile_id ) - response = requests.post(self.base_url, headers=self.headers, data=request.model_dump_json()) + json_data = request.model_dump() + response = requests.post(self.base_url, headers=self.headers, json=json_data) if not response.status_code == 200: raise ValueError(f"Error: {response.status_code} - {response.text}") @@ -90,7 +91,7 @@ class Client: ) -> Tuple[List[ApiMessage], Optional[Dict[str, Any]]]: """Stateless chat method that handles a single conversation turn with multiple tool call rounds""" - current_messages = messages + current_messages = messages[:] current_state = state turns = 0 diff --git a/apps/rowboat/app/projects/[projectId]/config/voice.tsx b/apps/rowboat/app/projects/[projectId]/config/voice.tsx index b12df972..cd154a8c 100644 --- a/apps/rowboat/app/projects/[projectId]/config/voice.tsx +++ b/apps/rowboat/app/projects/[projectId]/config/voice.tsx @@ -1,7 +1,7 @@ 'use client'; import { useState, useEffect, useCallback } from 'react'; -import { Button } from "@nextui-org/react"; +import { Button } from "@heroui/react"; import { configureTwilioNumber, mockConfigureTwilioNumber, getTwilioConfigs, deleteTwilioConfig } from "../../../actions/voice_actions"; import { FormSection } from "../../../lib/components/form-section"; import { EditableField } from "../../../lib/components/editable-field-with-immediate-save"; diff --git a/apps/rowboat/package-lock.json b/apps/rowboat/package-lock.json index 8b8583f3..5a3ba385 100644 --- a/apps/rowboat/package-lock.json +++ b/apps/rowboat/package-lock.json @@ -52,6 +52,7 @@ "tailwind-merge": "^2.5.5", "tailwindcss-animate": "^1.0.7", "tiktoken": "^1.0.17", + "twilio": "^5.4.5", "typewriter-effect": "^2.21.0", "zod": "^3.23.8", "zod-to-json-schema": "^3.23.5" @@ -13991,6 +13992,17 @@ "acorn": "^6.0.0 || ^7.0.0 || ^8.0.0" } }, + "node_modules/agent-base": { + "version": "6.0.2", + "resolved": "https://registry.npmjs.org/agent-base/-/agent-base-6.0.2.tgz", + "integrity": "sha512-RZNwNclF7+MS/8bDg70amg32dyeZGZxiDuQmZxKLAlQjr3jGyLx+4Kkk58UO7D2QdgFIQCovuSuZESne6RG6XQ==", + "dependencies": { + "debug": "4" + }, + "engines": { + "node": ">= 6.0.0" + } + }, "node_modules/agentkeepalive": { "version": "4.5.0", "resolved": "https://registry.npmjs.org/agentkeepalive/-/agentkeepalive-4.5.0.tgz", @@ -14398,9 +14410,9 @@ } }, "node_modules/axios": { - "version": "1.7.5", - "resolved": "https://registry.npmjs.org/axios/-/axios-1.7.5.tgz", - "integrity": "sha512-fZu86yCo+svH3uqJ/yTdQ0QHpQu5oL+/QE+QPSv6BZSkDAoky9vytxp7u5qk83OJFS3kEBcesWni9WTZAv3tSw==", + "version": "1.8.1", + "resolved": "https://registry.npmjs.org/axios/-/axios-1.8.1.tgz", + "integrity": "sha512-NN+fvwH/kV01dYUQ3PTOZns4LWtWhOFCAhQ/pHb88WQ1hNe5V/dvFwc4VJcDL11LT9xSX0QtsR8sWUuyOuOq7g==", "dependencies": { "follow-redirects": "^1.15.6", "form-data": "^4.0.0", @@ -14604,6 +14616,11 @@ "node": ">=16.20.1" } }, + "node_modules/buffer-equal-constant-time": { + "version": "1.0.1", + "resolved": "https://registry.npmjs.org/buffer-equal-constant-time/-/buffer-equal-constant-time-1.0.1.tgz", + "integrity": "sha512-zRpUiDwd/xk6ADqPMATG8vc9VPrkck7T07OIx0gnjmJAnHnTVXNQG3vfvWNuiZIkwu9KrKdA1iJKfsfTVxE6NA==" + }, "node_modules/busboy": { "version": "1.6.0", "resolved": "https://registry.npmjs.org/busboy/-/busboy-1.6.0.tgz", @@ -15197,6 +15214,11 @@ "url": "https://github.com/sponsors/ljharb" } }, + "node_modules/dayjs": { + "version": "1.11.13", + "resolved": "https://registry.npmjs.org/dayjs/-/dayjs-1.11.13.tgz", + "integrity": "sha512-oaMBel6gjolK862uaPQOVTA7q3TZhuSvuMQAAglQDOWYO9A91IrAOUJEyKVlqJlHE0vq5p5UXxzdPfMH/x6xNg==" + }, "node_modules/debug": { "version": "4.4.0", "resolved": "https://registry.npmjs.org/debug/-/debug-4.4.0.tgz", @@ -15503,6 +15525,14 @@ "resolved": "https://registry.npmjs.org/ee-first/-/ee-first-1.1.1.tgz", "integrity": "sha512-WMwm9LhRUo+WUaRN+vRuETqG89IgZphVSNkdFgeb6sS/E4OrDIN7t48CAewSHXc6C8lefD8KKfr5vY61brQlow==" }, + "node_modules/ecdsa-sig-formatter": { + "version": "1.0.11", + "resolved": "https://registry.npmjs.org/ecdsa-sig-formatter/-/ecdsa-sig-formatter-1.0.11.tgz", + "integrity": "sha512-nagl3RYrbNv6kQkeJIpt6NJZy8twLB/2vtz6yN9Z4vRKHN4/QZJIEbqohALSgwKdnksuY3k5Addp5lg8sVoVcQ==", + "dependencies": { + "safe-buffer": "^5.0.1" + } + }, "node_modules/electron-to-chromium": { "version": "1.5.74", "resolved": "https://registry.npmjs.org/electron-to-chromium/-/electron-to-chromium-1.5.74.tgz", @@ -17129,6 +17159,18 @@ "node": ">= 0.8" } }, + "node_modules/https-proxy-agent": { + "version": "5.0.1", + "resolved": "https://registry.npmjs.org/https-proxy-agent/-/https-proxy-agent-5.0.1.tgz", + "integrity": "sha512-dFcAjpTQFgoLMzC2VwU+C/CbS7uRL0lWmxDITmqm7C+7F0Odmj6s9l6alZc6AELXhrnggM2CeWSXHGOdX2YtwA==", + "dependencies": { + "agent-base": "6", + "debug": "4" + }, + "engines": { + "node": ">= 6" + } + }, "node_modules/humanize-ms": { "version": "1.2.1", "resolved": "https://registry.npmjs.org/humanize-ms/-/humanize-ms-1.2.1.tgz", @@ -17911,6 +17953,27 @@ "url": "https://github.com/chalk/chalk?sponsor=1" } }, + "node_modules/jsonwebtoken": { + "version": "9.0.2", + "resolved": "https://registry.npmjs.org/jsonwebtoken/-/jsonwebtoken-9.0.2.tgz", + "integrity": "sha512-PRp66vJ865SSqOlgqS8hujT5U4AOgMfhrwYIuIhfKaoSCZcirrmASQr8CX7cUg+RMih+hgznrjp99o+W4pJLHQ==", + "dependencies": { + "jws": "^3.2.2", + "lodash.includes": "^4.3.0", + "lodash.isboolean": "^3.0.3", + "lodash.isinteger": "^4.0.4", + "lodash.isnumber": "^3.0.3", + "lodash.isplainobject": "^4.0.6", + "lodash.isstring": "^4.0.1", + "lodash.once": "^4.0.0", + "ms": "^2.1.1", + "semver": "^7.5.4" + }, + "engines": { + "node": ">=12", + "npm": ">=6" + } + }, "node_modules/jsx-ast-utils": { "version": "3.3.5", "resolved": "https://registry.npmjs.org/jsx-ast-utils/-/jsx-ast-utils-3.3.5.tgz", @@ -17926,6 +17989,25 @@ "node": ">=4.0" } }, + "node_modules/jwa": { + "version": "1.4.1", + "resolved": "https://registry.npmjs.org/jwa/-/jwa-1.4.1.tgz", + "integrity": "sha512-qiLX/xhEEFKUAJ6FiBMbes3w9ATzyk5W7Hvzpa/SLYdxNtng+gcurvrI7TbACjIXlsJyr05/S1oUhZrc63evQA==", + "dependencies": { + "buffer-equal-constant-time": "1.0.1", + "ecdsa-sig-formatter": "1.0.11", + "safe-buffer": "^5.0.1" + } + }, + "node_modules/jws": { + "version": "3.2.2", + "resolved": "https://registry.npmjs.org/jws/-/jws-3.2.2.tgz", + "integrity": "sha512-YHlZCB6lMTllWDtSPHz/ZXTsi8S00usEV6v1tjq8tOUZzw7DpSDWVXjXDre6ed1w/pd495ODpHZYSdkRTsa0HA==", + "dependencies": { + "jwa": "^1.4.1", + "safe-buffer": "^5.0.1" + } + }, "node_modules/keyv": { "version": "4.5.4", "resolved": "https://registry.npmjs.org/keyv/-/keyv-4.5.4.tgz", @@ -18064,6 +18146,16 @@ "resolved": "https://registry.npmjs.org/lodash.clonedeep/-/lodash.clonedeep-4.5.0.tgz", "integrity": "sha512-H5ZhCF25riFd9uB5UCkVKo61m3S/xZk1x4wA6yp/L3RFP6Z/eHH1ymQcGLo7J3GMPfm0V/7m1tryHuGVxpqEBQ==" }, + "node_modules/lodash.includes": { + "version": "4.3.0", + "resolved": "https://registry.npmjs.org/lodash.includes/-/lodash.includes-4.3.0.tgz", + "integrity": "sha512-W3Bx6mdkRTGtlJISOvVD/lbqjTlPPUDTMnlXZFnVwi9NKJ6tiAk6LVdlhZMm17VZisqhKcgzpO5Wz91PCt5b0w==" + }, + "node_modules/lodash.isboolean": { + "version": "3.0.3", + "resolved": "https://registry.npmjs.org/lodash.isboolean/-/lodash.isboolean-3.0.3.tgz", + "integrity": "sha512-Bz5mupy2SVbPHURB98VAcw+aHh4vRV5IPNhILUCsOzRmsTmSQ17jIuqopAentWoehktxGd9e/hbIXq980/1QJg==" + }, "node_modules/lodash.isempty": { "version": "4.4.0", "resolved": "https://registry.npmjs.org/lodash.isempty/-/lodash.isempty-4.4.0.tgz", @@ -18075,17 +18167,42 @@ "integrity": "sha512-pDo3lu8Jhfjqls6GkMgpahsF9kCyayhgykjyLMNFTKWrpVdAQtYyB4muAMWozBB4ig/dtWAmsMxLEI8wuz+DYQ==", "deprecated": "This package is deprecated. Use require('node:util').isDeepStrictEqual instead." }, + "node_modules/lodash.isinteger": { + "version": "4.0.4", + "resolved": "https://registry.npmjs.org/lodash.isinteger/-/lodash.isinteger-4.0.4.tgz", + "integrity": "sha512-DBwtEWN2caHQ9/imiNeEA5ys1JoRtRfY3d7V9wkqtbycnAmTvRRmbHKDV4a0EYc678/dia0jrte4tjYwVBaZUA==" + }, + "node_modules/lodash.isnumber": { + "version": "3.0.3", + "resolved": "https://registry.npmjs.org/lodash.isnumber/-/lodash.isnumber-3.0.3.tgz", + "integrity": "sha512-QYqzpfwO3/CWf3XP+Z+tkQsfaLL/EnUlXWVkIk5FUPc4sBdTehEqZONuyRt2P67PXAk+NXmTBcc97zw9t1FQrw==" + }, "node_modules/lodash.isobject": { "version": "3.0.2", "resolved": "https://registry.npmjs.org/lodash.isobject/-/lodash.isobject-3.0.2.tgz", "integrity": "sha512-3/Qptq2vr7WeJbB4KHUSKlq8Pl7ASXi3UG6CMbBm8WRtXi8+GHm7mKaU3urfpSEzWe2wCIChs6/sdocUsTKJiA==" }, + "node_modules/lodash.isplainobject": { + "version": "4.0.6", + "resolved": "https://registry.npmjs.org/lodash.isplainobject/-/lodash.isplainobject-4.0.6.tgz", + "integrity": "sha512-oSXzaWypCMHkPC3NvBEaPHf0KsA5mvPrOPgQWDsbg8n7orZ290M0BmC/jgRZ4vcJ6DTAhjrsSYgdsW/F+MFOBA==" + }, + "node_modules/lodash.isstring": { + "version": "4.0.1", + "resolved": "https://registry.npmjs.org/lodash.isstring/-/lodash.isstring-4.0.1.tgz", + "integrity": "sha512-0wJxfxH1wgO3GrbuP+dTTk7op+6L41QCXbGINEmD+ny/G/eCqGzxyCsh7159S+mgDDcoarnBw6PC1PS5+wUGgw==" + }, "node_modules/lodash.merge": { "version": "4.6.2", "resolved": "https://registry.npmjs.org/lodash.merge/-/lodash.merge-4.6.2.tgz", "integrity": "sha512-0KpjqXRVvrYyCsX1swR/XTK0va6VQkQM6MNo7PqW77ByjAhoARA8EfrP1N4+KlKj8YS0ZUCtRT/YUuhyYDujIQ==", "dev": true }, + "node_modules/lodash.once": { + "version": "4.1.1", + "resolved": "https://registry.npmjs.org/lodash.once/-/lodash.once-4.1.1.tgz", + "integrity": "sha512-Sb487aTOCr9drQVL8pIxOzVhafOjZN9UU54hiN8PU3uAiSV7lx1yYNpbNmex2PK6dSJoNTSJUUswT651yww3Mg==" + }, "node_modules/longest-streak": { "version": "3.1.0", "resolved": "https://registry.npmjs.org/longest-streak/-/longest-streak-3.1.0.tgz", @@ -20170,11 +20287,11 @@ } }, "node_modules/qs": { - "version": "6.13.0", - "resolved": "https://registry.npmjs.org/qs/-/qs-6.13.0.tgz", - "integrity": "sha512-+38qI9SOr8tfZ4QmJNplMUxqjbe7LKvvZgWdExBOmd+egZTtjLB67Gu0HRX3u/XOq7UU2Nx6nsjvS16Z9uwfpg==", + "version": "6.14.0", + "resolved": "https://registry.npmjs.org/qs/-/qs-6.14.0.tgz", + "integrity": "sha512-YWWTjgABSKcvs/nWBi9PycY/JiPJqOD4JA6o9Sej2AtvSGarXxKC3OQSk4pAarbdQlKAh5D4FCQkJNkW+GAn3w==", "dependencies": { - "side-channel": "^1.0.6" + "side-channel": "^1.1.0" }, "engines": { "node": ">=0.6" @@ -20747,6 +20864,11 @@ "loose-envify": "^1.1.0" } }, + "node_modules/scmp": { + "version": "2.1.0", + "resolved": "https://registry.npmjs.org/scmp/-/scmp-2.1.0.tgz", + "integrity": "sha512-o/mRQGk9Rcer/jEEw/yw4mwo3EU/NvYvp577/Btqrym9Qy5/MdWGBqipbALgd2lrdWTJ5/gqDusxfnQBxOxT2Q==" + }, "node_modules/scroll-into-view-if-needed": { "version": "3.0.10", "resolved": "https://registry.npmjs.org/scroll-into-view-if-needed/-/scroll-into-view-if-needed-3.0.10.tgz", @@ -21713,6 +21835,23 @@ "fsevents": "~2.3.3" } }, + "node_modules/twilio": { + "version": "5.4.5", + "resolved": "https://registry.npmjs.org/twilio/-/twilio-5.4.5.tgz", + "integrity": "sha512-PIteif0CBOrA42SWZiT8IwUuqTNakAFgvXYWsrjEPGaDSczu/GvBs3vUock4S+UguXj7cV4qBswWgXs5ySjGNg==", + "dependencies": { + "axios": "^1.7.8", + "dayjs": "^1.11.9", + "https-proxy-agent": "^5.0.0", + "jsonwebtoken": "^9.0.2", + "qs": "^6.9.4", + "scmp": "^2.1.0", + "xmlbuilder": "^13.0.2" + }, + "engines": { + "node": ">=14.0" + } + }, "node_modules/type-check": { "version": "0.4.0", "resolved": "https://registry.npmjs.org/type-check/-/type-check-0.4.0.tgz", @@ -22460,6 +22599,14 @@ } } }, + "node_modules/xmlbuilder": { + "version": "13.0.2", + "resolved": "https://registry.npmjs.org/xmlbuilder/-/xmlbuilder-13.0.2.tgz", + "integrity": "sha512-Eux0i2QdDYKbdbA6AM6xE4m6ZTZr4G4xF9kahI2ukSEMCzwce2eX9WlTI5J3s+NU7hpasFsr8hWIONae7LluAQ==", + "engines": { + "node": ">=6.0" + } + }, "node_modules/yallist": { "version": "3.1.1", "resolved": "https://registry.npmjs.org/yallist/-/yallist-3.1.1.tgz", diff --git a/apps/twilio_handler/.dockerignore b/apps/twilio_handler/.dockerignore new file mode 100644 index 00000000..75f5f8c0 --- /dev/null +++ b/apps/twilio_handler/.dockerignore @@ -0,0 +1,2 @@ +__pycache__ +.venv/ \ No newline at end of file diff --git a/apps/twilio_handler/.env.example b/apps/twilio_handler/.env.example new file mode 100644 index 00000000..fdafbe37 --- /dev/null +++ b/apps/twilio_handler/.env.example @@ -0,0 +1,24 @@ +# Environment variables for the Voice API application + +# Twilio configuration +TWILIO_ACCOUNT_SID=your_account_sid_here +TWILIO_AUTH_TOKEN=your_auth_token_here +BASE_URL=https://your-public-url-here.ngrok.io + +# RowBoat API configuration +ROWBOAT_API_HOST=http://localhost:3000 +ROWBOAT_PROJECT_ID=your_project_id_here +ROWBOAT_API_KEY=your_api_key_here + +# Speech processing APIs +DEEPGRAM_API_KEY=your_deepgram_api_key_here +ELEVENLABS_API_KEY=your_elevenlabs_api_key_here + +# Server configuration +PORT=3009 +WHATSAPP_PORT=3010 + +# Redis configuration for persistent state +REDIS_URL=redis://localhost:6379/0 +REDIS_EXPIRY_SECONDS=86400 +SERVICE_NAME=rowboat-voice \ No newline at end of file diff --git a/apps/twilio_handler/.gitignore b/apps/twilio_handler/.gitignore new file mode 100644 index 00000000..9f7550b1 --- /dev/null +++ b/apps/twilio_handler/.gitignore @@ -0,0 +1,2 @@ +__pycache__ +.venv diff --git a/apps/twilio_handler/Dockerfile b/apps/twilio_handler/Dockerfile new file mode 100644 index 00000000..7b5b3017 --- /dev/null +++ b/apps/twilio_handler/Dockerfile @@ -0,0 +1,18 @@ +FROM python:3.12-slim + +WORKDIR /app + +# Copy requirements first to leverage Docker cache +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +# Copy application code +COPY . . + +# Set environment variables +ENV FLASK_APP=app +ENV PYTHONUNBUFFERED=1 +ENV PYTHONPATH=/app + +# Command to run Flask development server +CMD ["flask", "run", "--host=0.0.0.0", "--port=4010"] \ No newline at end of file diff --git a/apps/twilio_handler/app.py b/apps/twilio_handler/app.py new file mode 100644 index 00000000..2b94605b --- /dev/null +++ b/apps/twilio_handler/app.py @@ -0,0 +1,631 @@ +from flask import Flask, request, jsonify, Response +from twilio.twiml.voice_response import VoiceResponse, Gather +import os +import logging +import uuid +from typing import Dict, Any, Optional +import json +from time import time +from rowboat.schema import SystemMessage, UserMessage, ApiMessage +import elevenlabs +# Load environment variables +from load_env import load_environment +load_environment() + +from twilio_api import process_conversation_turn + + +# Import MongoDB utility functions +from util import ( + get_call_state, + save_call_state, + delete_call_state, + get_mongodb_status, + get_twilio_config, + CallState +) + +Message = SystemMessage | UserMessage + +ELEVENLABS_API_KEY = os.environ.get("ELEVENLABS_API_KEY") +elevenlabs_client = elevenlabs.ElevenLabs(api_key=ELEVENLABS_API_KEY) + +app = Flask(__name__) + +# Configure logging to stdout for Docker compatibility +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', + handlers=[logging.StreamHandler()] # Send logs to stdout +) +logger = logging.getLogger(__name__) + +# Local in-memory cache of call state (temporary cache only - not primary storage) +# MongoDB is the primary storage for state across multiple instances +active_calls = {} + +# TTS configuration +TTS_VOICE = "Markus - Mature and Chill" +TTS_MODEL = "eleven_flash_v2_5" + +@app.route('/inbound', methods=['POST']) +def handle_inbound_call(): + """Handle incoming calls to Twilio numbers configured for RowBoat""" + try: + # Log the entire request for debugging + logger.info(f"Received inbound call request: {request.values}") + + # Get the Twilio phone number that received the call + to_number = request.values.get('To') + call_sid = request.values.get('CallSid') + from_number = request.values.get('From') + + logger.info(f"Inbound call from {from_number} to {to_number}, CallSid: {call_sid}") + logger.info(f"Raw To number value: '{to_number}', Type: {type(to_number)}") + + # Get configuration ONLY from MongoDB + system_prompt = "You are a helpful assistant. Provide concise and clear answers." + workflow_id = None + project_id = None + + # Look up configuration in MongoDB + twilio_config = get_twilio_config(to_number) + if twilio_config: + workflow_id = twilio_config['workflow_id'] + project_id = twilio_config['project_id'] + system_prompt = twilio_config.get('system_prompt', system_prompt) + logger.info(f"Found MongoDB configuration for {to_number}: project_id={project_id}, workflow_id={workflow_id}") + else: + logger.warning(f"No active configuration found in MongoDB for phone number {to_number}") + + if not workflow_id: + # No workflow found - provide error message + logger.error(f"No workflow_id found for inbound call to {to_number}") + response = VoiceResponse() + response.say("I'm sorry, this phone number is not properly configured in our system. Please contact support.", voice='alice') + # Include additional information in TwiML for debugging + response.say(f"Received call to number {to_number}", voice='alice') + response.hangup() + return str(response) + + # Initialize call state with stateless API fields + call_state = CallState( + workflow_id=workflow_id, + project_id=project_id, + system_prompt=system_prompt, + conversation_history=[], + messages=[], # For stateless API + state=None, # For stateless API state + turn_count=0, + inbound=True, + to_number=to_number, + created_at=int(time()) # Add timestamp for expiration tracking + ) + + # Save to MongoDB (primary source of truth) + try: + save_call_state(call_sid, call_state) + logger.info(f"Saved initial call state to MongoDB for inbound call {call_sid}") + except Exception as e: + logger.error(f"Error saving inbound call state to MongoDB: {str(e)}") + raise RuntimeError(f"Failed to save call state to MongoDB: {str(e)}") + + # Only use memory storage as a temporary cache + # The service that handles the next request might be different + active_calls[call_sid] = call_state + + logger.info(f"Initialized call state for {call_sid}, proceeding to handle_call") + + # Create a direct response instead of redirecting + return handle_call(call_sid, workflow_id, project_id) + + except Exception as e: + # Log the full error with traceback + import traceback + logger.error(f"Error in handle_inbound_call: {str(e)}") + logger.error(traceback.format_exc()) + + # Return a basic TwiML response so Twilio doesn't get a 500 error + response = VoiceResponse() + response.say("I'm sorry, we encountered an error processing your call. Please try again later.", voice='alice') + response.hangup() + return str(response) + +@app.route('/twiml', methods=['POST']) +def handle_twiml_call(): + """TwiML endpoint for outbound call handling""" + call_sid = request.values.get('CallSid') + + # Get call state to retrieve workflow_id and project_id + call_state = get_call_state(call_sid) + if call_state: + workflow_id = call_state.get('workflow_id') + project_id = call_state.get('project_id') + return handle_call(call_sid, workflow_id, project_id) + else: + # No call state found - error response + response = VoiceResponse() + response.say("I'm sorry, your call session has expired. Please try again.", voice='alice') + response.hangup() + return str(response) + +def handle_call(call_sid, workflow_id, project_id=None): + """Common handler for both inbound and outbound calls""" + try: + logger.info(f"handle_call: processing call {call_sid} with workflow {workflow_id}, project_id {project_id}") + + # Get or initialize call state, first from MongoDB + call_state = None + + try: + # Query MongoDB for the call state + call_state = get_call_state(call_sid) + if call_state: + logger.info(f"Loaded and restored call state from MongoDB for {call_sid}") + except Exception as e: + logger.error(f"Error retrieving MongoDB state for {call_sid}: {str(e)}") + call_state = None + + # Try in-memory cache as fallback (temporary local cache) + if call_state is None and call_sid in active_calls: + call_state = active_calls.get(call_sid) + logger.info(f"Using in-memory cache for call state of {call_sid}") + + # Initialize new state if needed + if call_state is None and workflow_id: + call_state = CallState( + workflow_id=workflow_id, + project_id=project_id, + system_prompt="You are a helpful assistant. Provide concise and clear answers.", + conversation_history=[], + messages=[], # For stateless API + state=None, # For stateless API state + turn_count=0, + inbound=False, # Default for outbound calls + to_number="", # This will be set properly for inbound calls + created_at=int(time()), # Add timestamp for expiration tracking + last_transcription="" + ) + + # Save to MongoDB (primary source of truth) + try: + save_call_state(call_sid, call_state) + logger.info(f"Initialized and saved new call state to MongoDB for {call_sid}") + except Exception as e: + logger.error(f"Error saving new call state to MongoDB: {str(e)}") + raise RuntimeError(f"Failed to save call state to MongoDB: {str(e)}") + + # Only use memory as temporary cache for this request + active_calls[call_sid] = call_state + logger.info(f"Initialized new call state for {call_sid}") + + logger.info(f"Using call state: {call_state}") + + # Create TwiML response + response = VoiceResponse() + + # Check if this is a new call + if call_state.get('turn_count', 0) == 0: + # Initial greeting for new calls + greeting = "Hello! I'm your RowBoat assistant. How can I help you today?" + logger.info(f"New call, preparing greeting: {greeting}") + + try: + # Use streaming audio endpoint instead of generating files + # Include a unique ID to prevent caching + unique_id = str(uuid.uuid4()) + # Use a relative URL - Twilio will use the same host as the webhook + audio_url = f"/stream-audio/{call_sid}/greeting/{unique_id}" + logger.info(f"Streaming greeting from relative URL: {audio_url}") + + # Play the greeting via streaming + response.play(audio_url) + except Exception as e: + logger.error(f"Error with audio streaming for greeting: {str(e)}") + import traceback + logger.error(traceback.format_exc()) + # Fallback to Twilio TTS + response.say(greeting, voice='alice') + + # Update call state + call_state['turn_count'] = 1 + + # Save to MongoDB (primary source of truth) + try: + save_call_state(call_sid, call_state) + logger.info(f"Saved greeting state to MongoDB for {call_sid}") + except Exception as e: + logger.error(f"Error saving greeting state to MongoDB: {str(e)}") + raise RuntimeError(f"Failed to save greeting state to MongoDB: {str(e)}") + + # Update local memory cache + active_calls[call_sid] = call_state + + # Instead of using both Gather and Record which compete for input, + # just use Gather for speech recognition, and rely on its SpeechResult + # This is more reliable than trying to use Record and Deepgram + gather = Gather( + input='speech', + action=f'/process_speech?call_sid={call_sid}', + speech_timeout='auto', + language='en-US', + enhanced=True, # Enable enhanced speech recognition + speechModel='phone_call' # Optimize for phone calls + ) + response.append(gather) + + # If no input detected, redirect to twiml endpoint + # Call state will be retrieved from MongoDB + response.redirect('/twiml') + + logger.info(f"Returning response: {str(response)}") + return str(response) + + except Exception as e: + # Log the full error with traceback + import traceback + logger.error(f"Error in handle_call: {str(e)}") + logger.error(traceback.format_exc()) + + # Return a basic TwiML response + response = VoiceResponse() + response.say("I'm sorry, we encountered an error processing your call. Please try again later.", voice='alice') + response.hangup() + return str(response) + +@app.route('/process_speech', methods=['POST']) +def process_speech(): + """Process user speech input and generate AI response""" + try: + logger.info(f"Processing speech: {request.values}") + + call_sid = request.args.get('call_sid') + + # Log all request values for debugging + logger.info(f"FULL REQUEST VALUES: {dict(request.values)}") + logger.info(f"FULL REQUEST ARGS: {dict(request.args)}") + + # Get the speech result directly from Twilio + # We're now relying on Twilio's enhanced speech recognition instead of Deepgram + speech_result = request.values.get('SpeechResult') + confidence = request.values.get('Confidence') + + logger.info(f"Twilio SpeechResult: {speech_result}") + logger.info(f"Twilio Confidence: {confidence}") + + if not call_sid: + logger.warning(f"Missing call_sid: {call_sid}") + response = VoiceResponse() + response.say("I'm sorry, I couldn't process that request.", voice='alice') + response.hangup() + return str(response) + + if not speech_result: + logger.warning("No speech result after transcription attempts") + response = VoiceResponse() + response.say("I'm sorry, I didn't catch what you said. Could you please try again?", voice='alice') + + # Gather user input again + gather = Gather( + input='speech', + action=f'/process_speech?call_sid={call_sid}', + speech_timeout='auto', + language='en-US', + enhanced=True, + speechModel='phone_call' + ) + response.append(gather) + + # Redirect to twiml endpoint which will get call state from MongoDB + response.redirect('/twiml') + + return str(response) + + # Load call state from MongoDB (primary source of truth) + call_state = None + + try: + call_state = get_call_state(call_sid) + if call_state: + logger.info(f"Loaded call state from MongoDB for speech processing: {call_sid}") + except Exception as e: + logger.error(f"Error retrieving MongoDB state for speech processing: {str(e)}") + call_state = None + + # Try memory cache as fallback + if call_state is None and call_sid in active_calls: + call_state = active_calls[call_sid] + logger.info(f"Using in-memory state for speech processing: {call_sid}") + + # Check if we have valid state + if not call_state: + logger.warning(f"No call state found for speech processing: {call_sid}") + response = VoiceResponse() + response.say("I'm sorry, your call session has expired. Please call back.", voice='alice') + response.hangup() + return str(response) + + # Extract key information + workflow_id = call_state.get('workflow_id') + project_id = call_state.get('project_id') + system_prompt = call_state.get('system_prompt', "You are a helpful assistant.") + + # Check if we have a Deepgram transcription stored in the call state + if 'last_transcription' in call_state and call_state['last_transcription']: + deepgram_transcription = call_state['last_transcription'] + logger.info(f"Found stored Deepgram transcription: {deepgram_transcription}") + logger.info(f"Comparing with Twilio transcription: {speech_result}") + + # Use the Deepgram transcription instead of Twilio's + speech_result = deepgram_transcription + # Remove it so we don't use it again + del call_state['last_transcription'] + logger.info(f"Using Deepgram transcription instead") + + # Log final user input that will be used + logger.info(f"Final user input: {speech_result}") + + # Process with RowBoat agent + try: + # Clean up the speech result if needed + if speech_result: + # Remove any common filler words or fix typical transcription issues + import re + # Convert to lowercase for easier pattern matching + cleaned_input = speech_result.lower() + # Remove filler words that might be at the beginning + cleaned_input = re.sub(r'^(um|uh|like|so|okay|well)\s+', '', cleaned_input) + # Capitalize first letter for better appearance + if cleaned_input: + speech_result = cleaned_input[0].upper() + cleaned_input[1:] + + logger.info(f"Sending to RowBoat: '{speech_result}'") + + # Get previous messages and state from call state + previous_messages = call_state.get('messages', []) + previous_state = call_state.get('state') + + # Process with stateless API + ai_response, updated_messages, updated_state = process_conversation_turn( + user_input=speech_result, + workflow_id=workflow_id, + system_prompt=system_prompt, + previous_messages=previous_messages, + previous_state=previous_state, + project_id=project_id + ) + + # Update the messages and state in call state + call_state['messages'] = updated_messages + call_state['state'] = updated_state + + logger.info(f"RowBoat response: {ai_response}") + except Exception as e: + logger.error(f"Error processing with RowBoat: {str(e)}") + ai_response = "I'm sorry, I encountered an issue processing your request. Could you please try again?" + + # Conversation history is updated in the streaming response section below + + # Create TwiML response + response = VoiceResponse() + + # Use streaming audio for the response + logger.info("Setting up response streaming with ElevenLabs") + + try: + # Store the AI response in conversation history first + # (The stream-audio endpoint will read it from here) + + # Update conversation history (do this before streaming so the endpoint can access it) + call_state['conversation_history'].append({ + 'user': speech_result, + 'assistant': ai_response + }) + call_state['turn_count'] += 1 + + # Save to MongoDB (primary source of truth) + try: + save_call_state(call_sid, call_state) + logger.info(f"Saved response state to MongoDB for {call_sid}") + except Exception as e: + logger.error(f"Error saving response state to MongoDB: {str(e)}") + raise RuntimeError(f"Failed to save response state to MongoDB: {str(e)}") + + # Update local memory cache + active_calls[call_sid] = call_state + + # Generate a unique ID to prevent caching + unique_id = str(uuid.uuid4()) + # Use a relative URL - Twilio will use the same host as the webhook + audio_url = f"/stream-audio/{call_sid}/response/{unique_id}" + logger.info(f"Streaming response from relative URL: {audio_url}") + + # Play the response via streaming + response.play(audio_url) + except Exception as e: + logger.error(f"Error with audio streaming for response: {str(e)}") + import traceback + logger.error(traceback.format_exc()) + # Fallback to Twilio TTS + response.say(ai_response, voice='alice') + + # Gather next user input with enhanced speech recognition + gather = Gather( + input='speech', + action=f'/process_speech?call_sid={call_sid}', + speech_timeout='auto', + language='en-US', + enhanced=True, # Enable enhanced speech recognition + speechModel='phone_call' # Optimize for phone calls + ) + response.append(gather) + + # If no input detected, redirect to twiml endpoint + # Call state will be retrieved from MongoDB + response.redirect('/twiml') + + logger.info(f"Returning TwiML response for speech processing") + return str(response) + + except Exception as e: + # Log the full error with traceback + import traceback + logger.error(f"Error in process_speech: {str(e)}") + logger.error(traceback.format_exc()) + + # Return a basic TwiML response + response = VoiceResponse() + response.say("I'm sorry, we encountered an error processing your speech. Please try again.", voice='alice') + response.gather( + input='speech', + action=f'/process_speech?call_sid={request.args.get("call_sid")}', + speech_timeout='auto' + ) + return str(response) + +@app.route('/stream-audio///', methods=['GET']) +def stream_audio(call_sid, text_type, unique_id): + """Stream audio directly from ElevenLabs to Twilio without saving to disk""" + try: + logger.info(f"Audio streaming requested for call {call_sid}, type {text_type}") + + # Determine what text to synthesize + text_to_speak = "" + + if text_type == "greeting": + # Use default greeting + text_to_speak = "Hello! I'm your RowBoat assistant. How can I help you today?" + elif text_type == "response": + # Get the text from call state (try MongoDB first, then memory) + call_state = None + + # Try MongoDB first + try: + call_state = get_call_state(call_sid) + if call_state: + logger.info(f"Loaded call state from MongoDB for streaming: {call_sid}") + except Exception as e: + logger.error(f"Error retrieving MongoDB state for streaming: {str(e)}") + call_state = None + + # Fall back to memory if needed + if call_state is None: + if call_sid not in active_calls: + logger.error(f"Call SID not found for streaming: {call_sid}") + return "Call not found", 404 + + call_state = active_calls[call_sid] + logger.info(f"Using in-memory state for streaming: {call_sid}") + if call_state.get('conversation_history') and len(call_state['conversation_history']) > 0: + # Get the most recent AI response + text_to_speak = call_state['conversation_history'][-1]['assistant'] + else: + logger.warning(f"No conversation history found for call {call_sid}") + text_to_speak = "I'm sorry, I don't have a response ready. Could you please repeat?" + else: + # Direct text may be passed as the text_type (for testing) + text_to_speak = text_type + + if not text_to_speak: + logger.error("No text to synthesize") + return "No text to synthesize", 400 + + logger.info(f"Streaming audio for text: {text_to_speak[:50]}...") + + + def generate(): + try: + # Generate and stream the audio directly + audio_stream = elevenlabs_client.generate( + text=text_to_speak, + voice=TTS_VOICE, + model=TTS_MODEL, + output_format="mp3_44100_128" + ) + + # Stream chunks directly to the response + for chunk in audio_stream: + yield chunk + + logger.info(f"Finished streaming audio for call {call_sid}") + except Exception as e: + logger.error(f"Error in audio stream generator: {str(e)}") + import traceback + logger.error(traceback.format_exc()) + + # Return a streaming response + response = Response(generate(), mimetype='audio/mpeg') + return response + + except Exception as e: + logger.error(f"Error setting up audio stream: {str(e)}") + import traceback + logger.error(traceback.format_exc()) + return "Error streaming audio", 500 + +@app.route('/call-status', methods=['POST']) +def call_status_callback(): + """Handle call status callbacks from Twilio""" + call_sid = request.values.get('CallSid') + call_status = request.values.get('CallStatus') + + logger.info(f"Call {call_sid} status: {call_status}") + + # Clean up resources when call completes + if call_status in ['completed', 'failed', 'busy', 'no-answer', 'canceled']: + # Get call state from MongoDB or memory + call_state = None + + # Try to load from MongoDB first + try: + call_state = get_call_state(call_sid) + if call_state: + logger.info(f"Loaded final state from MongoDB for {call_sid}") + except Exception as e: + logger.error(f"Error retrieving final state from MongoDB: {str(e)}") + call_state = None + + # Fall back to memory if needed + if call_state is None and call_sid in active_calls: + call_state = active_calls[call_sid] + logger.info(f"Using in-memory state for final call state of {call_sid}") + + if call_state: + # Remove from active calls in both memory and MongoDB + if call_sid in active_calls: + del active_calls[call_sid] + logger.info(f"Removed call {call_sid} from active calls memory") + + try: + # Remove the document from MongoDB + delete_call_state(call_sid) + logger.info(f"Removed call {call_sid} from MongoDB") + except Exception as e: + logger.error(f"Error removing call state from MongoDB: {str(e)}") + return '', 204 + + +@app.route('/health', methods=['GET']) +def health_check(): + """Simple health check endpoint""" + health_data = { + "status": "healthy", + "active_calls_memory": len(active_calls) + } + + # Get MongoDB status + try: + mongodb_status = get_mongodb_status() + health_data["mongodb"] = mongodb_status + health_data["active_calls_mongodb"] = mongodb_status.get("active_calls", 0) + except Exception as e: + health_data["mongodb_error"] = str(e) + health_data["status"] = "degraded" + + return jsonify(health_data) + +if __name__ == '__main__': + # Log startup information + logger.info(f"Starting Twilio-RowBoat server") + # Remove the explicit run configuration since Flask CLI will handle it + app.run() \ No newline at end of file diff --git a/apps/voice/load_env.py b/apps/twilio_handler/load_env.py similarity index 100% rename from apps/voice/load_env.py rename to apps/twilio_handler/load_env.py diff --git a/apps/twilio_handler/requirements.txt b/apps/twilio_handler/requirements.txt new file mode 100644 index 00000000..a3986fa6 --- /dev/null +++ b/apps/twilio_handler/requirements.txt @@ -0,0 +1,39 @@ +aiohappyeyeballs==2.5.0 +aiohttp==3.11.13 +aiohttp-retry==2.9.1 +aiosignal==1.3.2 +annotated-types==0.7.0 +anyio==4.8.0 +attrs==25.1.0 +blinker==1.9.0 +certifi==2025.1.31 +charset-normalizer==3.4.1 +click==8.1.8 +dnspython==2.7.0 +dotenv==0.9.9 +elevenlabs==1.52.0 +Flask==3.1.0 +frozenlist==1.5.0 +h11==0.14.0 +httpcore==1.0.7 +httpx==0.28.1 +idna==3.10 +itsdangerous==2.2.0 +Jinja2==3.1.6 +MarkupSafe==3.0.2 +multidict==6.1.0 +propcache==0.3.0 +pydantic==2.10.6 +pydantic_core==2.27.2 +PyJWT==2.10.1 +pymongo==4.11.2 +python-dotenv==1.0.1 +requests==2.32.3 +rowboat==2.1.0 +sniffio==1.3.1 +twilio==9.4.6 +typing_extensions==4.12.2 +urllib3==2.3.0 +websockets==15.0.1 +Werkzeug==3.1.3 +yarl==1.18.3 diff --git a/apps/twilio_handler/twilio_api.py b/apps/twilio_handler/twilio_api.py new file mode 100644 index 00000000..fa69e07a --- /dev/null +++ b/apps/twilio_handler/twilio_api.py @@ -0,0 +1,109 @@ +from twilio.rest import Client as TwilioClient +from rowboat.client import Client +from rowboat.schema import UserMessage, SystemMessage +import os +from typing import Dict, List, Optional, Tuple, Any +import logging +from util import get_api_key +import time +import json + +# Load environment variables +from load_env import load_environment +load_environment() + +# Configure logging to stdout for Docker compatibility +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', + handlers=[logging.StreamHandler()] # Send logs to stdout +) +logger = logging.getLogger(__name__) + +# Environment variables and configuration +ROWBOAT_API_HOST = os.environ.get("ROWBOAT_API_HOST").strip() + +Message = UserMessage | SystemMessage + +def process_conversation_turn( + user_input: str, + workflow_id: str, + system_prompt: str = "You are a helpful assistant. Provide concise and clear answers.", + previous_messages: List[Message] = None, + previous_state: Any = None, + project_id: str = None +) -> Tuple[str, List[Message], Any]: + """ + Process a single conversation turn with the RowBoat agent using the stateless API. + + Args: + user_input: User's transcribed input + workflow_id: RowBoat workflow ID + system_prompt: System prompt for the agent + previous_messages: Previous messages in the conversation + previous_state: Previous state from RowBoat + project_id: RowBoat project ID (if different from default) + + Returns: + A tuple of (response_text, updated_messages, updated_state) + """ + try: + # Initialize messages list if not provided + messages = [] if previous_messages is None else previous_messages.copy() + + # If we're starting a new conversation, add the system message + if not messages or not any(msg.role == 'system' for msg in messages): + messages.append(SystemMessage(role='system', content=system_prompt)) + + # Add the user's new message + messages.append(UserMessage(role='user', content=user_input)) + + # Process the conversation using the stateless API + logger.info(f"Sending to RowBoat API with {len(messages)} messages") + + # Create client with custom project_id if provided + + client = Client( + host=ROWBOAT_API_HOST, + project_id=project_id, + api_key=get_api_key(project_id) + ) + + response_messages, new_state = client.chat( + messages=messages, + workflow_id=workflow_id, + state=previous_state + ) + + # Extract the assistant's response (last message) + if response_messages and len(response_messages) > 0: + assistant_response = response_messages[-1].content + else: + assistant_response = "I'm sorry, I didn't receive a proper response." + + # Update messages list with the new responses + final_messages = messages + response_messages + + # dump_data = { + # 'messages': [msg.model_dump() for msg in messages], + # 'response_messages': [msg.model_dump() for msg in response_messages], + # 'state': new_state + # } + # # Write messages to a debug file for inspection + # fname = f'debug_dump_{time.time()}.json' + # try: + # with open(fname, 'w') as f: + # json.dump(dump_data, f, indent=2) + # logger.info(f"Wrote debug info to {fname}") + # except Exception as e: + # logger.error(f"Failed to write message debug file: {str(e)}") + + + logger.info(f"Got response from RowBoat API: {assistant_response[:100]}...") + return assistant_response, final_messages, new_state + + except Exception as e: + logger.error(f"Error processing conversation turn: {str(e)}") + import traceback + logger.error(traceback.format_exc()) + return "I'm sorry, I encountered an error processing your request.", previous_messages, previous_state \ No newline at end of file diff --git a/apps/twilio_handler/util.py b/apps/twilio_handler/util.py new file mode 100644 index 00000000..013809a8 --- /dev/null +++ b/apps/twilio_handler/util.py @@ -0,0 +1,423 @@ +import os +import logging +import datetime +from typing import Dict, Any, Optional, List, Union +import copy +from pymongo import MongoClient +from pymongo.errors import ConnectionFailure, PyMongoError +from pymongo.collection import Collection +from bson import json_util +from pydantic import BaseModel +from rowboat.schema import ApiMessage + +# Configure logging to stdout for Docker compatibility +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', + handlers=[logging.StreamHandler()] # Send logs to stdout +) +logger = logging.getLogger(__name__) + +# MongoDB Configuration +MONGODB_URI = os.environ.get('MONGODB_URI') +MONGODB_DB = 'rowboat' + +CALL_STATE_COLLECTION = 'call-state' +MONGODB_EXPIRY_SECONDS = 86400 # Default 24 hours +API_KEYS_COLLECTION = "api_keys" +# MongoDB client singleton +_mongo_client = None +_db = None +_call_state_collection = None +_api_keys_collection = None + +# Define chat state pydantic model +class CallState(BaseModel): + messages: List[ApiMessage] = [] + workflow_id: str + project_id: str + system_prompt: str + turn_count: int = 0 + inbound: bool = False + conversation_history: List[Dict[str, str]] = [] # Using Dict instead of ApiMessage for chat history + to_number: str = "" + created_at: int + state: Any = None # Allow any type since the API might return a complex state object + last_transcription: Optional[str] = None + + # Enable dictionary-style access for compatibility with existing code + def __getitem__(self, key): + return getattr(self, key) + + def __setitem__(self, key, value): + setattr(self, key, value) + + def get(self, key, default=None): + return getattr(self, key, default) + + model_config = { + # Allow extra fields for flexibility + "extra": "allow", + # More lenient type validation + "arbitrary_types_allowed": True, + # Allow population by field name + "populate_by_name": True + } + +def init_mongodb(): + """Initialize MongoDB connection and set up indexes.""" + global _mongo_client, _db, _call_state_collection, _api_keys_collection + + try: + _mongo_client = MongoClient(MONGODB_URI) + # Force a command to check the connection + _mongo_client.admin.command('ping') + + # Set up database and collection + _db = _mongo_client[MONGODB_DB] + _call_state_collection = _db[CALL_STATE_COLLECTION] + _api_keys_collection = _db[API_KEYS_COLLECTION] + # Create TTL index if it doesn't exist + if 'expires_at_1' not in _call_state_collection.index_information(): + _call_state_collection.create_index('expires_at', expireAfterSeconds=0) + + logger.info(f"Connected to MongoDB at {MONGODB_URI}") + return True + except ConnectionFailure as e: + logger.error(f"Failed to connect to MongoDB: {str(e)}") + raise RuntimeError(f"Could not connect to MongoDB: {str(e)}") + +def get_collection() -> Collection: + """Get the MongoDB collection, initializing if needed.""" + global _call_state_collection + + if _call_state_collection is None: + init_mongodb() + + return _call_state_collection + +def get_api_keys_collection() -> Collection: + """Get the MongoDB collection, initializing if needed.""" + global _api_keys_collection + + if _api_keys_collection is None: + init_mongodb() + + return _api_keys_collection + +def get_api_key(project_id: str) -> Optional[str]: + """Get the API key for a given project ID.""" + collection = get_api_keys_collection() + doc = collection.find_one({"projectId": project_id}) + return doc["key"] if doc else None + +def save_call_state(call_sid: str, call_state: CallState) -> bool: + """ + Save call state to MongoDB. + + Args: + call_sid: The call SID to use as document ID + call_state: The call state dictionary to save + + Returns: + True if successful, False otherwise + """ + try: + # Validate call_state is a CallState object + if not isinstance(call_state, CallState): + raise ValueError(f"call_state must be a CallState object, got {type(call_state)}") + + collection = get_collection() + # Use call_sid as document ID + collection.update_one( + {'_id': call_sid}, + {'$set': call_state.model_dump()}, + upsert=True + ) + logger.info(f"Saved call state to MongoDB for call {call_sid}") + return True + except PyMongoError as e: + logger.error(f"Error saving call state to MongoDB for call {call_sid}: {str(e)}") + raise RuntimeError(f"Failed to save call state to MongoDB: {str(e)}") + except Exception as e: + logger.error(f"Unexpected error in save_call_state: {str(e)}") + raise RuntimeError(f"Failed to save call state: {str(e)}") + +def get_call_state(call_sid: str) -> Optional[CallState]: + """ + Retrieve call state from MongoDB. + + Args: + call_sid: The call SID to retrieve + + Returns: + Call state dictionary or None if not found + """ + try: + collection = get_collection() + + # Query MongoDB for the call state + state_doc = collection.find_one({'_id': call_sid}) + if not state_doc: + logger.info(f"No call state found in MongoDB for call {call_sid}") + return None + + call_state = CallState.model_validate(state_doc) + + logger.info(f"Retrieved call state from MongoDB for call {call_sid}") + return call_state + except PyMongoError as e: + logger.error(f"Error retrieving call state from MongoDB for call {call_sid}: {str(e)}") + raise RuntimeError(f"Failed to retrieve call state from MongoDB: {str(e)}") + except Exception as e: + logger.error(f"Unexpected error in get_call_state: {str(e)}") + raise RuntimeError(f"Failed to retrieve call state: {str(e)}") + +def delete_call_state(call_sid: str) -> bool: + """ + Delete call state from MongoDB. + + Args: + call_sid: The call SID to delete + + Returns: + True if successful, False if not found + """ + try: + collection = get_collection() + + # Delete the document from MongoDB + result = collection.delete_one({'_id': call_sid}) + if result.deleted_count > 0: + logger.info(f"Deleted call state from MongoDB for call {call_sid}") + return True + else: + logger.info(f"No call state found to delete in MongoDB for call {call_sid}") + return False + except PyMongoError as e: + logger.error(f"Error deleting call state from MongoDB for call {call_sid}: {str(e)}") + raise RuntimeError(f"Failed to delete call state from MongoDB: {str(e)}") + except Exception as e: + logger.error(f"Unexpected error in delete_call_state: {str(e)}") + raise RuntimeError(f"Failed to delete call state: {str(e)}") + +def count_active_calls() -> int: + """ + Count active call documents in MongoDB. + + Returns: + Number of active call documents + """ + try: + collection = get_collection() + return collection.count_documents({}) + except PyMongoError as e: + logger.error(f"Error counting active calls in MongoDB: {str(e)}") + raise RuntimeError(f"Failed to count active calls in MongoDB: {str(e)}") + except Exception as e: + logger.error(f"Unexpected error in count_active_calls: {str(e)}") + raise RuntimeError(f"Failed to count active calls: {str(e)}") + +def get_mongodb_status() -> Dict[str, Any]: + """ + Get MongoDB connection status information. + + Returns: + Dictionary with status information + """ + status = { + "status": "connected", + "uri": MONGODB_URI, + "database": MONGODB_DB, + "collection": CALL_STATE_COLLECTION + } + + try: + # First check connection with a simple command + collection = get_collection() + db = collection.database + db.command('ping') + status["connection"] = "ok" + + # Count active calls + count = count_active_calls() + status["active_calls"] = count + + # Get collection stats + try: + stats = db.command("collStats", CALL_STATE_COLLECTION) + status["size_bytes"] = stats.get("size", 0) + status["document_count"] = stats.get("count", 0) + status["index_count"] = len(stats.get("indexSizes", {})) + except Exception as stats_error: + status["stats_error"] = str(stats_error) + + except Exception as e: + status["status"] = "error" + status["error"] = str(e) + status["timestamp"] = datetime.datetime.utcnow().isoformat() + + return status + +# Twilio configuration functions +def get_twilio_config(phone_number: str) -> Optional[Dict[str, Any]]: + """ + Get Twilio configuration for a specific phone number from MongoDB. + + Args: + phone_number: The phone number to look up configuration for + + Returns: + Configuration dictionary or None if not found/active + """ + try: + # Get MongoDB client and database + client = get_collection().database.client + db = client[MONGODB_DB] + + # Use the twilio_configs collection + config_collection = db['twilio_configs'] + + # Enhanced logging for phone number format + logger.info(f"Looking up configuration for phone number: '{phone_number}'") + + # Try different formats of the phone number + cleaned_number = phone_number.strip().replace(' ', '').replace('-', '').replace('(', '').replace(')', '') + + possible_formats = [ + phone_number, # Original format from Twilio + cleaned_number, # Thoroughly cleaned number + '+' + cleaned_number if not cleaned_number.startswith('+') else cleaned_number, # Ensure + prefix + + # Try with different country code formats + '+1' + cleaned_number[-10:] if len(cleaned_number) >= 10 else cleaned_number, # US format with +1 + '1' + cleaned_number[-10:] if len(cleaned_number) >= 10 else cleaned_number, # US format with 1 + cleaned_number[-10:] if len(cleaned_number) >= 10 else cleaned_number, # US format without country code + ] + + # Remove duplicates while preserving order + unique_formats = [] + for fmt in possible_formats: + if fmt not in unique_formats: + unique_formats.append(fmt) + possible_formats = unique_formats + + # Log the formats we're trying + logger.info(f"Trying phone number formats: {possible_formats}") + + # Try each format + for phone_format in possible_formats: + # Look up the configuration for this phone number format with status=active + config = config_collection.find_one({'phone_number': phone_format, 'status': 'active'}) + if config: + logger.info(f"Found active configuration for '{phone_format}': project_id={config.get('project_id')}, workflow_id={config.get('workflow_id')}") + break # Found a match, exit the loop + + # If we didn't find any match + if not config: + # Try a more generic query to see what configurations exist + try: + all_configs = list(config_collection.find({'phone_number': {'$regex': phone_number[-10:] if len(phone_number) >= 10 else phone_number}})) + if all_configs: + logger.warning(f"Found {len(all_configs)} configurations that match phone number {phone_number}, but none are active:") + for cfg in all_configs: + logger.warning(f" - Phone: {cfg.get('phone_number')}, Status: {cfg.get('status')}, Workflow: {cfg.get('workflow_id')}") + else: + logger.warning(f"No configurations found at all for phone number {phone_number} or related formats") + except Exception as e: + logger.error(f"Error running regex query: {str(e)}") + + logger.warning(f"No active configuration found for any format of phone number {phone_number}") + return None + + # Make sure required fields are present + if 'project_id' not in config or 'workflow_id' not in config: + logger.error(f"Configuration for {phone_number} is missing required fields") + return None + + logger.info(f"Found active configuration for {phone_number}: project_id={config['project_id']}, workflow_id={config['workflow_id']}") + return config + except Exception as e: + logger.error(f"Error retrieving Twilio configuration for {phone_number}: {str(e)}") + # Return None instead of raising an exception to allow fallback to default behavior + return None + +def list_active_twilio_configs() -> List[Dict[str, Any]]: + """ + List all active Twilio configurations from MongoDB. + + Returns: + List of active configuration dictionaries + """ + try: + # Get MongoDB client and database + client = get_collection().database.client + db = client[MONGODB_DB] + + # Use the twilio_configs collection + config_collection = db['twilio_configs'] + + # Find all active configurations + configs = list(config_collection.find({'status': 'active'})) + + logger.info(f"Found {len(configs)} active Twilio configurations") + return configs + except Exception as e: + logger.error(f"Error retrieving active Twilio configurations: {str(e)}") + return [] + +def save_twilio_config(config: Dict[str, Any]) -> bool: + """ + Save a Twilio configuration to MongoDB. + + Args: + config: Configuration dictionary with at least phone_number, project_id, and workflow_id + + Returns: + True if successful, False otherwise + """ + required_fields = ['phone_number', 'project_id', 'workflow_id'] + for field in required_fields: + if field not in config: + logger.error(f"Missing required field '{field}' in Twilio configuration") + return False + + try: + # Get MongoDB client and database + client = get_collection().database.client + db = client[MONGODB_DB] + + # Use the twilio_configs collection + config_collection = db['twilio_configs'] + + # Ensure status is set to active + if 'status' not in config: + config['status'] = 'active' + + # Add timestamp + config['updated_at'] = datetime.datetime.utcnow() + if 'created_at' not in config: + config['created_at'] = config['updated_at'] + + # Use phone_number as the ID + phone_number = config['phone_number'] + + # Update or insert the configuration + result = config_collection.update_one( + {'phone_number': phone_number}, + {'$set': config}, + upsert=True + ) + + if result.matched_count > 0: + logger.info(f"Updated Twilio configuration for {phone_number}") + else: + logger.info(f"Created new Twilio configuration for {phone_number}") + + return True + except Exception as e: + logger.error(f"Error saving Twilio configuration: {str(e)}") + return False + +# Initialize MongoDB on module import +init_mongodb() \ No newline at end of file diff --git a/apps/voice/Dockerfile b/apps/voice/Dockerfile deleted file mode 100644 index b0d7fe4c..00000000 --- a/apps/voice/Dockerfile +++ /dev/null @@ -1,27 +0,0 @@ -FROM python:3.11-slim - -WORKDIR /app - -# Install system dependencies -RUN apt-get update && apt-get install -y \ - build-essential \ - && rm -rf /var/lib/apt/lists/* - -# Copy requirements first to leverage Docker cache -COPY requirements.txt . -RUN pip install --no-cache-dir -r requirements.txt - -# Copy application code -COPY . . - -# Debug: List files in /app -RUN ls -la /app - -# Set environment variables -ENV PYTHONPATH=/app - -# Expose the port the app runs on -EXPOSE 3009 - -# Command to run the application directly with Python -CMD ["python", "twilio_server.py"] \ No newline at end of file diff --git a/apps/voice/__init__.py b/apps/voice/__init__.py deleted file mode 100644 index 3aae30f6..00000000 --- a/apps/voice/__init__.py +++ /dev/null @@ -1 +0,0 @@ -# Empty file to mark directory as Python package \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml index e5f81a8c..bdce8a7c 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -61,6 +61,16 @@ services: - API_KEY=${COPILOT_API_KEY} restart: unless-stopped + tools_webhook: + build: + context: ./apps/tools_webhook + dockerfile: Dockerfile + ports: + - "3005:3005" + environment: + - SIGNING_SECRET=${SIGNING_SECRET} + restart: unless-stopped + simulation_runner: build: context: ./apps/simulation_runner @@ -125,17 +135,6 @@ services: - QDRANT_API_KEY=${QDRANT_API_KEY} restart: unless-stopped - tools_webhook: - build: - context: ./apps/tools_webhook - dockerfile: Dockerfile - profiles: [ "tools_webhook" ] - ports: - - "3005:3005" - environment: - - SIGNING_SECRET=${SIGNING_SECRET} - restart: unless-stopped - chat_widget: build: context: ./apps/chat_widget @@ -163,20 +162,14 @@ services: ports: - "8000:8000" - voice: + twilio_handler: build: - context: ./apps/voice + context: ./apps/twilio_handler dockerfile: Dockerfile ports: - "3009:3009" environment: - - TWILIO_ACCOUNT_SID=${TWILIO_ACCOUNT_SID} - - TWILIO_AUTH_TOKEN=${TWILIO_AUTH_TOKEN} - - DEEPGRAM_API_KEY=${DEEPGRAM_API_KEY} - ELEVENLABS_API_KEY=${ELEVENLABS_API_KEY} - ROWBOAT_API_HOST=http://rowboat:3000 - - ROWBOAT_PROJECT_ID=${ROWBOAT_PROJECT_ID} - - ROWBOAT_API_KEY=${ROWBOAT_API_KEY} - - REDIS_URL=redis://redis:6379 - - BASE_URL=${BASE_URL} + - MONGODB_URI=${MONGODB_CONNECTION_STRING} restart: unless-stopped