Merge pull request #48 from rowboatlabs/voice-rebased

add Twilio voice handler
This commit is contained in:
Ramnique Singh 2025-03-27 12:37:11 +05:30 committed by GitHub
commit 19cb3d3de8
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
20 changed files with 2322 additions and 86 deletions

View file

@ -4,7 +4,7 @@ build-backend = "hatchling.build"
[project]
name = "rowboat"
version = "1.0.6"
version = "2.1.0"
authors = [
{ name = "Your Name", email = "your.email@example.com" },
]

View file

@ -38,7 +38,8 @@ class Client:
workflowId=workflow_id,
testProfileId=test_profile_id
)
response = requests.post(self.base_url, headers=self.headers, data=request.model_dump_json())
json_data = request.model_dump()
response = requests.post(self.base_url, headers=self.headers, json=json_data)
if not response.status_code == 200:
raise ValueError(f"Error: {response.status_code} - {response.text}")
@ -90,7 +91,7 @@ class Client:
) -> Tuple[List[ApiMessage], Optional[Dict[str, Any]]]:
"""Stateless chat method that handles a single conversation turn with multiple tool call rounds"""
current_messages = messages
current_messages = messages[:]
current_state = state
turns = 0

View file

@ -0,0 +1,280 @@
'use server';
import { TwilioConfigParams, TwilioConfigResponse, TwilioConfig, InboundConfigResponse } from "../lib/types/voice_types";
import { twilioConfigsCollection } from "../lib/mongodb";
import { ObjectId } from "mongodb";
import twilio from 'twilio';
import { Twilio } from 'twilio';
// Helper function to serialize MongoDB documents
function serializeConfig(config: any) {
return {
...config,
_id: config._id.toString(),
createdAt: config.createdAt.toISOString(),
};
}
// Real implementation for configuring Twilio number
export async function configureTwilioNumber(params: TwilioConfigParams): Promise<TwilioConfigResponse> {
console.log('configureTwilioNumber - Received params:', params);
try {
const client = twilio(params.account_sid, params.auth_token);
try {
// List all phone numbers and find the matching one
const numbers = await client.incomingPhoneNumbers.list();
console.log('Twilio numbers for this account:', numbers);
const phoneExists = numbers.some(
number => number.phoneNumber === params.phone_number
);
if (!phoneExists) {
throw new Error('Phone number not found in this account');
}
} catch (error) {
console.error('Error verifying phone number:', error);
throw new Error(
error instanceof Error
? error.message
: 'Invalid phone number or phone number does not belong to this account'
);
}
// Save to MongoDB after successful validation
const savedConfig = await saveTwilioConfig(params);
console.log('configureTwilioNumber - Saved config result:', savedConfig);
return { success: true };
} catch (error) {
console.error('Error in configureTwilioNumber:', error);
return {
success: false,
error: error instanceof Error ? error.message : 'Failed to configure Twilio number'
};
}
}
// Save Twilio configuration to MongoDB
export async function saveTwilioConfig(params: TwilioConfigParams): Promise<TwilioConfig> {
console.log('saveTwilioConfig - Incoming params:', {
...params,
label: {
value: params.label,
type: typeof params.label,
length: params.label?.length,
isEmpty: params.label === ''
}
});
// First, list all configs to see what's in the database
const allConfigs = await twilioConfigsCollection
.find({ status: 'active' as const })
.toArray();
console.log('saveTwilioConfig - All active configs in DB:', allConfigs);
// Find existing config for this project
const existingConfig = await twilioConfigsCollection.findOne({
project_id: params.project_id,
status: 'active' as const
});
console.log('saveTwilioConfig - Existing config search by project:', {
searchCriteria: {
project_id: params.project_id,
status: 'active'
},
found: existingConfig
});
const configToSave = {
phone_number: params.phone_number,
account_sid: params.account_sid,
auth_token: params.auth_token,
label: params.label || '', // Use empty string instead of undefined
project_id: params.project_id,
workflow_id: params.workflow_id,
createdAt: existingConfig?.createdAt || new Date(),
status: 'active' as const
};
console.log('saveTwilioConfig - Config to save:', configToSave);
try {
// Configure inbound calls first
await configureInboundCall(
params.phone_number,
params.account_sid,
params.auth_token,
params.workflow_id
);
// Then save/update the config in database
if (existingConfig) {
console.log('saveTwilioConfig - Updating existing config:', existingConfig._id);
const result = await twilioConfigsCollection.updateOne(
{ _id: existingConfig._id },
{ $set: configToSave }
);
console.log('saveTwilioConfig - Update result:', result);
} else {
console.log('saveTwilioConfig - No existing config found, creating new');
const result = await twilioConfigsCollection.insertOne(configToSave);
console.log('saveTwilioConfig - Insert result:', result);
}
const savedConfig = await twilioConfigsCollection.findOne({
project_id: params.project_id,
status: 'active'
});
if (!savedConfig) {
throw new Error('Failed to save Twilio configuration');
}
console.log('configureTwilioNumber - Saved config result:', savedConfig);
return savedConfig;
} catch (error) {
console.error('Error saving Twilio config:', error);
throw error;
}
}
// Get Twilio configuration for a workflow
export async function getTwilioConfigs(projectId: string) {
console.log('getTwilioConfigs - Fetching for projectId:', projectId);
const configs = await twilioConfigsCollection
.find({
project_id: projectId,
status: 'active' as const
})
.sort({ createdAt: -1 })
.limit(1)
.toArray();
console.log('getTwilioConfigs - Raw configs:', configs);
const serializedConfigs = configs.map(serializeConfig);
console.log('getTwilioConfigs - Serialized configs:', serializedConfigs);
return serializedConfigs;
}
// Delete a Twilio configuration (soft delete)
export async function deleteTwilioConfig(projectId: string, configId: string) {
console.log('deleteTwilioConfig - Deleting config:', { projectId, configId });
const result = await twilioConfigsCollection.updateOne(
{
_id: new ObjectId(configId),
project_id: projectId
},
{
$set: { status: 'deleted' as const }
}
);
console.log('deleteTwilioConfig - Delete result:', result);
return result;
}
// Mock implementation for testing/development
export async function mockConfigureTwilioNumber(params: TwilioConfigParams): Promise<TwilioConfigResponse> {
await new Promise(resolve => setTimeout(resolve, 1000));
await saveTwilioConfig(params);
return { success: true };
}
export async function configureInboundCall(
phone_number: string,
account_sid: string,
auth_token: string,
workflow_id: string
): Promise<InboundConfigResponse> {
try {
// Normalize phone number format
if (!phone_number.startsWith('+')) {
phone_number = '+' + phone_number;
}
console.log('Configuring inbound call for:', {
phone_number,
workflow_id
});
// Initialize Twilio client
const client = new Twilio(account_sid, auth_token);
// Find the phone number in Twilio account
const incomingPhoneNumbers = await client.incomingPhoneNumbers.list({ phoneNumber: phone_number });
console.log('Found Twilio numbers:', incomingPhoneNumbers.map(n => ({
phoneNumber: n.phoneNumber,
currentVoiceUrl: n.voiceUrl,
currentStatusCallback: n.statusCallback,
sid: n.sid
})));
if (!incomingPhoneNumbers.length) {
throw new Error(`Phone number ${phone_number} not found in Twilio account`);
}
const phoneSid = incomingPhoneNumbers[0].sid;
const currentVoiceUrl = incomingPhoneNumbers[0].voiceUrl;
const wasPreviouslyConfigured = Boolean(currentVoiceUrl);
// Get base URL from environment - MUST be a public URL
const baseUrl = process.env.VOICE_API_URL;
if (!baseUrl) {
throw new Error('Voice service URL not configured. Please set VOICE_API_URL environment variable.');
}
// Validate URL is not localhost
if (baseUrl.includes('localhost')) {
throw new Error('Voice service must use a public URL, not localhost.');
}
const inboundUrl = `${baseUrl}/inbound?workflow_id=${workflow_id}`;
console.log('Setting up webhooks:', {
voiceUrl: inboundUrl,
statusCallback: `${baseUrl}/call-status`,
currentConfig: {
voiceUrl: currentVoiceUrl,
statusCallback: incomingPhoneNumbers[0].statusCallback
}
});
// Update the phone number configuration
const updatedNumber = await client.incomingPhoneNumbers(phoneSid).update({
voiceUrl: inboundUrl,
voiceMethod: 'POST',
statusCallback: `${baseUrl}/call-status`,
statusCallbackMethod: 'POST'
});
console.log('Webhook configuration complete:', {
phoneNumber: updatedNumber.phoneNumber,
newVoiceUrl: updatedNumber.voiceUrl,
newStatusCallback: updatedNumber.statusCallback,
success: updatedNumber.voiceUrl === inboundUrl
});
return {
status: wasPreviouslyConfigured ? 'reconfigured' : 'configured',
phone_number: phone_number,
workflow_id: workflow_id,
previous_webhook: wasPreviouslyConfigured ? currentVoiceUrl : undefined
};
} catch (err: unknown) {
console.error('Error configuring inbound call:', err);
// Type guard for error with message property
if (err instanceof Error) {
if (err.message.includes('localhost')) {
throw new Error('Voice service needs to be accessible from the internet. Please check your configuration.');
}
// Type guard for Twilio error
if ('code' in err && err.code === 21402) {
throw new Error('Invalid voice service URL. Please make sure it\'s a public, secure URL.');
}
}
// If we can't determine the specific error, throw a generic one
throw new Error('Failed to configure phone number. Please check your settings and try again.');
}
}

View file

@ -0,0 +1,204 @@
import { Button, Input, Textarea } from "@heroui/react";
import { useEffect, useRef, useState } from "react";
import { useClickAway } from "../../../hooks/use-click-away";
import MarkdownContent from "./markdown-content";
import clsx from "clsx";
import { Label } from "./label";
import { SparklesIcon } from "lucide-react";
interface EditableFieldProps {
value: string;
onChange: (value: string) => void;
label?: string;
placeholder?: string;
markdown?: boolean;
multiline?: boolean;
locked?: boolean;
className?: string;
validate?: (value: string) => { valid: boolean; errorMessage?: string };
light?: boolean;
error?: string | null;
inline?: boolean;
showGenerateButton?: {
show: boolean;
setShow: (show: boolean) => void;
};
disabled?: boolean;
type?: string;
}
export function EditableField({
value,
onChange,
label,
placeholder = "Click to edit...",
markdown = false,
multiline = false,
locked = false,
className = "flex flex-col gap-1 w-full",
validate,
light = false,
error,
inline = false,
showGenerateButton,
disabled = false,
type = "text",
}: EditableFieldProps) {
const [isEditing, setIsEditing] = useState(false);
const [localValue, setLocalValue] = useState(value);
const ref = useRef<HTMLDivElement>(null);
const validationResult = validate?.(localValue);
const isValid = !validate || validationResult?.valid;
useEffect(() => {
setLocalValue(value);
}, [value]);
useClickAway(ref, () => {
if (isEditing) {
if (isValid && localValue !== value) {
onChange(localValue);
} else {
setLocalValue(value);
}
setIsEditing(false);
}
});
const onValueChange = (newValue: string) => {
setLocalValue(newValue);
onChange(newValue); // Always save immediately
};
const commonProps = {
autoFocus: true,
value: localValue,
onValueChange: onValueChange,
variant: "bordered" as const,
labelPlacement: "outside" as const,
placeholder: markdown ? '' : placeholder,
classNames: {
input: "rounded-md",
inputWrapper: "rounded-md border-medium"
},
radius: "md" as const,
isInvalid: !isValid,
errorMessage: validationResult?.errorMessage,
onKeyDown: (e: React.KeyboardEvent) => {
if (!multiline && e.key === "Enter") {
e.preventDefault();
if (isValid && localValue !== value) {
onChange(localValue);
}
setIsEditing(false);
}
if (e.key === "Escape") {
setLocalValue(value);
setIsEditing(false);
}
},
};
if (isEditing) {
return (
<div ref={ref} className={clsx("flex flex-col gap-1 w-full", className)}>
{label && (
<div className="flex justify-between items-center">
<Label label={label} />
<div className="flex gap-2 items-center">
{showGenerateButton && (
<Button
variant="light"
size="sm"
startContent={<SparklesIcon size={16} />}
onPress={() => showGenerateButton.setShow(true)}
>
Generate
</Button>
)}
</div>
</div>
)}
{multiline && <Textarea
{...commonProps}
minRows={3}
maxRows={20}
className="w-full"
classNames={{
...commonProps.classNames,
input: "rounded-md py-2",
inputWrapper: "rounded-md border-medium py-1"
}}
/>}
{!multiline && <Input
{...commonProps}
type={type}
className="w-full"
classNames={{
...commonProps.classNames,
input: "rounded-md py-2",
inputWrapper: "rounded-md border-medium py-1"
}}
/>}
</div>
);
}
return (
<div ref={ref} className={clsx("cursor-text", className)}>
{label && (
<div className="flex justify-between items-center">
<Label label={label} />
{showGenerateButton && (
<Button
variant="light"
size="sm"
startContent={<SparklesIcon size={16} />}
onPress={() => showGenerateButton.setShow(true)}
>
Generate
</Button>
)}
</div>
)}
<div
className={clsx(
{
"border border-gray-300 dark:border-gray-600 rounded px-3 py-3": !inline,
"bg-transparent focus:outline-none focus:ring-0 border-0 rounded-none text-gray-900 dark:text-gray-100": inline,
}
)}
style={inline ? {
border: 'none',
borderRadius: '0',
padding: '0'
} : undefined}
onClick={() => !locked && setIsEditing(true)}
>
{value ? (
<>
{markdown && <div className="max-h-[420px] overflow-y-auto">
<MarkdownContent content={value} />
</div>}
{!markdown && <div className={`${multiline ? 'whitespace-pre-wrap max-h-[420px] overflow-y-auto' : 'flex items-center'}`}>
{value}
</div>}
</>
) : (
<>
{markdown && <div className="max-h-[420px] overflow-y-auto text-gray-400">
<MarkdownContent content={placeholder} />
</div>}
{!markdown && <span className="text-gray-400">{placeholder}</span>}
</>
)}
{error && (
<div className="text-xs text-red-500 mt-1">
{error}
</div>
)}
</div>
</div>
);
}

View file

@ -8,6 +8,7 @@ import { EmbeddingDoc } from "./types/datasource_types";
import { DataSourceDoc } from "./types/datasource_types";
import { DataSource } from "./types/datasource_types";
import { TestScenario, TestResult, TestRun, TestProfile, TestSimulation } from "./types/testing_types";
import { TwilioConfig } from "./types/voice_types";
import { z } from 'zod';
import { apiV1 } from "rowboat-shared";
@ -28,4 +29,16 @@ export const testSimulationsCollection = db.collection<z.infer<typeof TestSimula
export const testRunsCollection = db.collection<z.infer<typeof TestRun>>("test_runs");
export const testResultsCollection = db.collection<z.infer<typeof TestResult>>("test_results");
export const chatsCollection = db.collection<z.infer<typeof apiV1.Chat>>("chats");
export const chatMessagesCollection = db.collection<z.infer<typeof apiV1.ChatMessage>>("chat_messages");
export const chatMessagesCollection = db.collection<z.infer<typeof apiV1.ChatMessage>>("chat_messages");
export const twilioConfigsCollection = db.collection<z.infer<typeof TwilioConfig>>("twilio_configs");
// Create indexes
twilioConfigsCollection.createIndexes([
{
key: { workflow_id: 1, status: 1 },
name: "workflow_status_idx",
// This ensures only one active config per workflow
unique: true,
partialFilterExpression: { status: "active" }
}
]);

View file

@ -0,0 +1,32 @@
import { z } from 'zod';
import { WithId } from 'mongodb';
export const TwilioConfigParams = z.object({
phone_number: z.string(),
account_sid: z.string(),
auth_token: z.string(),
label: z.string(),
project_id: z.string(),
workflow_id: z.string(),
});
export const TwilioConfig = TwilioConfigParams.extend({
createdAt: z.date(),
status: z.enum(['active', 'deleted']),
});
export type TwilioConfigParams = z.infer<typeof TwilioConfigParams>;
export type TwilioConfig = WithId<z.infer<typeof TwilioConfig>>;
export interface TwilioConfigResponse {
success: boolean;
error?: string;
}
export interface InboundConfigResponse {
status: 'configured' | 'reconfigured';
phone_number: string;
workflow_id: string;
previous_webhook?: string;
error?: string;
}

View file

@ -7,12 +7,21 @@ import { getProjectConfig, updateProjectName, updateWebhookUrl, createApiKey, de
import { updateMcpServers } from "../../../actions/mcp_actions";
import { CopyButton } from "../../../lib/components/copy-button";
import { EditableField } from "../../../lib/components/editable-field";
import { EyeIcon, EyeOffIcon, CopyIcon, MoreVerticalIcon, PlusIcon, EllipsisVerticalIcon } from "lucide-react";
import { EyeIcon, EyeOffIcon, CopyIcon, MoreVerticalIcon, PlusIcon, EllipsisVerticalIcon, CheckCircleIcon, XCircleIcon } from "lucide-react";
import { WithStringId } from "../../../lib/types/types";
import { ApiKey } from "../../../lib/types/project_types";
import { z } from "zod";
import { RelativeTime } from "@primer/react";
import { Label } from "../../../lib/components/label";
import { ListItem } from "../../../lib/components/structured-list";
import { FormSection } from "../../../lib/components/form-section";
import { StructuredPanel } from "../../../lib/components/structured-panel";
import {
ResizableHandle,
ResizablePanel,
ResizablePanelGroup,
} from "../../../../components/ui/resizable"
import { VoiceSection } from './voice';
export const metadata: Metadata = {
title: "Project config",
@ -79,38 +88,29 @@ export function BasicSettingsSection({
}
return <Section title="Basic settings">
<SectionRow>
<LeftLabel label="Project name" />
<RightContent>
<div className="flex flex-row gap-2 items-center">
{loading && <Spinner size="sm" />}
{!loading && <EditableField
value={projectName || ''}
onChange={updateName}
className="w-full"
/>}
</div>
</RightContent>
</SectionRow>
<FormSection label="Project name">
{loading && <Spinner size="sm" />}
{!loading && <EditableField
value={projectName || ''}
onChange={updateName}
className="w-full"
/>}
</FormSection>
<Divider />
<SectionRow>
<LeftLabel label="Project ID" />
<RightContent>
<div className="flex flex-row gap-2 items-center">
<div className="text-gray-600 text-sm font-mono">{projectId}</div>
<CopyButton
onCopy={() => {
navigator.clipboard.writeText(projectId);
}}
label="Copy"
successLabel="Copied"
/>
</div>
</RightContent>
</SectionRow>
<FormSection label="Project ID">
<div className="flex flex-row gap-2 items-center">
<div className="text-gray-600 text-sm font-mono">{projectId}</div>
<CopyButton
onCopy={() => {
navigator.clipboard.writeText(projectId);
}}
label="Copy"
successLabel="Copied"
/>
</div>
</FormSection>
</Section>;
}
@ -630,20 +630,15 @@ export function WebhookUrlSection({
In workflow editor, tool calls will be posted to this URL, unless they are mocked.
</p>
<Divider />
<SectionRow>
<LeftLabel label="Webhook URL" />
<RightContent>
<div className="flex flex-row gap-2 items-center">
{loading && <Spinner size="sm" />}
{!loading && <EditableField
value={webhookUrl || ''}
onChange={update}
validate={validate}
className="w-full"
/>}
</div>
</RightContent>
</SectionRow>
<FormSection label="Webhook URL">
{loading && <Spinner size="sm" />}
{!loading && <EditableField
value={webhookUrl || ''}
onChange={update}
validate={validate}
className="w-full"
/>}
</FormSection>
</Section>;
}
@ -789,7 +784,30 @@ export function DeleteProjectSection({
);
}
export default function App({
function NavigationMenu({
selected,
onSelect
}: {
selected: string;
onSelect: (page: string) => void;
}) {
const items = ['Project', 'Tools', 'Voice'];
return (
<StructuredPanel title="SETTINGS">
{items.map((item) => (
<ListItem
key={item}
name={item}
isSelected={selected === item}
onClick={() => onSelect(item)}
/>
))}
</StructuredPanel>
);
}
export function ConfigApp({
projectId,
useChatWidget,
chatWidgetHost,
@ -798,22 +816,54 @@ export default function App({
useChatWidget: boolean;
chatWidgetHost: string;
}) {
return <div className="flex flex-col h-full">
<div className="shrink-0 flex justify-between items-center pb-4 border-b border-border">
<div className="flex flex-col">
<h1 className="text-lg">Project config</h1>
</div>
</div>
<div className="grow overflow-auto py-4">
<div className="max-w-[768px] mx-auto flex flex-col gap-4">
<BasicSettingsSection projectId={projectId} />
<SecretSection projectId={projectId} />
<ApiKeysSection projectId={projectId} />
<McpServersSection projectId={projectId} />
<WebhookUrlSection projectId={projectId} />
{useChatWidget && <ChatWidgetSection projectId={projectId} chatWidgetHost={chatWidgetHost} />}
<DeleteProjectSection projectId={projectId} />
</div>
</div>
</div>;
}
const [selectedPage, setSelectedPage] = useState('Project');
const renderContent = () => {
switch (selectedPage) {
case 'Project':
return (
<div className="h-full overflow-auto p-6 space-y-6">
<BasicSettingsSection projectId={projectId} />
<SecretSection projectId={projectId} />
<McpServersSection projectId={projectId} />
<WebhookUrlSection projectId={projectId} />
<ApiKeysSection projectId={projectId} />
{useChatWidget && <ChatWidgetSection projectId={projectId} chatWidgetHost={chatWidgetHost} />}
<DeleteProjectSection projectId={projectId} />
</div>
);
case 'Tools':
return (
<div className="h-full overflow-auto p-6">
<WebhookUrlSection projectId={projectId} />
</div>
);
case 'Voice':
return (
<div className="h-full overflow-auto p-6">
<VoiceSection projectId={projectId} />
</div>
);
default:
return null;
}
};
return (
<ResizablePanelGroup direction="horizontal" className="h-screen gap-1">
<ResizablePanel minSize={10} defaultSize={15}>
<NavigationMenu
selected={selectedPage}
onSelect={setSelectedPage}
/>
</ResizablePanel>
<ResizableHandle />
<ResizablePanel minSize={20} defaultSize={85}>
{renderContent()}
</ResizablePanel>
</ResizablePanelGroup>
);
}
// Add default export
export default ConfigApp;

View file

@ -0,0 +1,229 @@
'use client';
import { useState, useEffect, useCallback } from 'react';
import { Button } from "@heroui/react";
import { configureTwilioNumber, mockConfigureTwilioNumber, getTwilioConfigs, deleteTwilioConfig } from "../../../actions/voice_actions";
import { FormSection } from "../../../lib/components/form-section";
import { EditableField } from "../../../lib/components/editable-field-with-immediate-save";
import { StructuredPanel } from "../../../lib/components/structured-panel";
import { TwilioConfig } from "../../../lib/types/voice_types";
import { CheckCircleIcon, XCircleIcon, InfoIcon } from "lucide-react";
export function VoiceSection({
projectId,
}: {
projectId: string;
}) {
const [formState, setFormState] = useState({
phone: '',
accountSid: '',
authToken: '',
label: ''
});
const [existingConfig, setExistingConfig] = useState<TwilioConfig | null>(null);
const [error, setError] = useState<string | null>(null);
const [loading, setLoading] = useState(false);
const [success, setSuccess] = useState(false);
const [configurationValid, setConfigurationValid] = useState(false);
const [isDirty, setIsDirty] = useState(false);
const loadConfig = useCallback(async () => {
try {
const configs = await getTwilioConfigs(projectId);
if (configs.length > 0) {
const config = configs[0];
setExistingConfig(config);
setFormState({
phone: config.phone_number,
accountSid: config.account_sid,
authToken: config.auth_token,
label: config.label || ''
});
setConfigurationValid(true);
setIsDirty(false);
}
} catch (err) {
console.error('Error loading config:', err);
}
}, [projectId]);
useEffect(() => {
loadConfig();
}, [loadConfig]);
const handleFieldChange = (field: string, value: string) => {
setFormState(prev => ({
...prev,
[field]: value
}));
setIsDirty(true);
setError(null);
};
const handleConfigureTwilio = async () => {
if (!formState.phone || !formState.accountSid || !formState.authToken) {
setError('Please fill in all required fields');
setConfigurationValid(false);
return;
}
const workflowId = localStorage.getItem(`lastWorkflowId_${projectId}`);
if (!workflowId) {
setError('No workflow selected. Please select a workflow first.');
setConfigurationValid(false);
return;
}
setLoading(true);
setError(null);
const configParams = {
phone_number: formState.phone,
account_sid: formState.accountSid,
auth_token: formState.authToken,
label: formState.label,
project_id: projectId,
workflow_id: workflowId,
};
const result = await configureTwilioNumber(configParams);
if (result.success) {
await loadConfig();
setSuccess(true);
setConfigurationValid(true);
setIsDirty(false);
setTimeout(() => setSuccess(false), 3000);
} else {
setError(result.error || 'Failed to validate Twilio credentials or phone number');
setConfigurationValid(false);
}
setLoading(false);
};
const handleDeleteConfig = async () => {
if (!existingConfig) return;
if (confirm('Are you sure you want to delete this phone number configuration?')) {
await deleteTwilioConfig(projectId, existingConfig._id.toString());
setExistingConfig(null);
setFormState({
phone: '',
accountSid: '',
authToken: '',
label: ''
});
setConfigurationValid(false);
setIsDirty(false);
}
};
return (
<div className="flex flex-col gap-4">
<StructuredPanel title="CONFIGURE TWILIO PHONE NUMBER">
<div className="flex flex-col gap-4 p-6">
{success && (
<div className="bg-green-50 text-green-700 p-4 rounded-md flex items-center gap-2">
<CheckCircleIcon className="w-5 h-5" />
<span>
{existingConfig
? 'Twilio number validated and updated successfully!'
: 'Twilio number validated and configured successfully!'}
</span>
</div>
)}
{error && (
<div className="bg-red-50 text-red-700 p-4 rounded-md flex items-center gap-2">
<XCircleIcon className="w-5 h-5" />
<span>{error}</span>
</div>
)}
{existingConfig && configurationValid && !error && (
<div className="bg-blue-50 text-blue-700 p-4 rounded-md flex items-center gap-2">
<InfoIcon className="w-5 h-5" />
<span>This is your currently assigned phone number for this project</span>
</div>
)}
<FormSection label="TWILIO PHONE NUMBER">
<EditableField
value={formState.phone}
onChange={(value) => handleFieldChange('phone', value)}
placeholder="+14156021922"
disabled={loading}
/>
</FormSection>
<FormSection label="TWILIO ACCOUNT SID">
<EditableField
value={formState.accountSid}
onChange={(value) => handleFieldChange('accountSid', value)}
placeholder="AC5588686d3ec65df89615274..."
disabled={loading}
/>
</FormSection>
<FormSection label="TWILIO AUTH TOKEN">
<EditableField
value={formState.authToken}
onChange={(value) => handleFieldChange('authToken', value)}
placeholder="b74e48f9098764ef834cf6bd..."
type="password"
disabled={loading}
/>
</FormSection>
<FormSection label="LABEL">
<EditableField
value={formState.label}
onChange={(value) => handleFieldChange('label', value)}
placeholder="Enter a label for this number..."
disabled={loading}
/>
</FormSection>
<div className="flex gap-2 mt-4">
<Button
color="primary"
onClick={handleConfigureTwilio}
isLoading={loading}
disabled={loading || !isDirty}
>
{existingConfig ? 'Update Twilio Config' : 'Import from Twilio'}
</Button>
{existingConfig ? (
<Button
color="danger"
variant="flat"
onClick={handleDeleteConfig}
disabled={loading}
>
Delete Configuration
</Button>
) : (
<Button
variant="flat"
onClick={() => {
setFormState({
phone: '',
accountSid: '',
authToken: '',
label: ''
});
setError(null);
setIsDirty(false);
}}
disabled={loading}
>
Cancel
</Button>
)}
</div>
</div>
</StructuredPanel>
</div>
);
}

View file

@ -52,6 +52,7 @@
"tailwind-merge": "^2.5.5",
"tailwindcss-animate": "^1.0.7",
"tiktoken": "^1.0.17",
"twilio": "^5.4.5",
"typewriter-effect": "^2.21.0",
"zod": "^3.23.8",
"zod-to-json-schema": "^3.23.5"
@ -13991,6 +13992,17 @@
"acorn": "^6.0.0 || ^7.0.0 || ^8.0.0"
}
},
"node_modules/agent-base": {
"version": "6.0.2",
"resolved": "https://registry.npmjs.org/agent-base/-/agent-base-6.0.2.tgz",
"integrity": "sha512-RZNwNclF7+MS/8bDg70amg32dyeZGZxiDuQmZxKLAlQjr3jGyLx+4Kkk58UO7D2QdgFIQCovuSuZESne6RG6XQ==",
"dependencies": {
"debug": "4"
},
"engines": {
"node": ">= 6.0.0"
}
},
"node_modules/agentkeepalive": {
"version": "4.5.0",
"resolved": "https://registry.npmjs.org/agentkeepalive/-/agentkeepalive-4.5.0.tgz",
@ -14398,9 +14410,9 @@
}
},
"node_modules/axios": {
"version": "1.7.5",
"resolved": "https://registry.npmjs.org/axios/-/axios-1.7.5.tgz",
"integrity": "sha512-fZu86yCo+svH3uqJ/yTdQ0QHpQu5oL+/QE+QPSv6BZSkDAoky9vytxp7u5qk83OJFS3kEBcesWni9WTZAv3tSw==",
"version": "1.8.1",
"resolved": "https://registry.npmjs.org/axios/-/axios-1.8.1.tgz",
"integrity": "sha512-NN+fvwH/kV01dYUQ3PTOZns4LWtWhOFCAhQ/pHb88WQ1hNe5V/dvFwc4VJcDL11LT9xSX0QtsR8sWUuyOuOq7g==",
"dependencies": {
"follow-redirects": "^1.15.6",
"form-data": "^4.0.0",
@ -14604,6 +14616,11 @@
"node": ">=16.20.1"
}
},
"node_modules/buffer-equal-constant-time": {
"version": "1.0.1",
"resolved": "https://registry.npmjs.org/buffer-equal-constant-time/-/buffer-equal-constant-time-1.0.1.tgz",
"integrity": "sha512-zRpUiDwd/xk6ADqPMATG8vc9VPrkck7T07OIx0gnjmJAnHnTVXNQG3vfvWNuiZIkwu9KrKdA1iJKfsfTVxE6NA=="
},
"node_modules/busboy": {
"version": "1.6.0",
"resolved": "https://registry.npmjs.org/busboy/-/busboy-1.6.0.tgz",
@ -15197,6 +15214,11 @@
"url": "https://github.com/sponsors/ljharb"
}
},
"node_modules/dayjs": {
"version": "1.11.13",
"resolved": "https://registry.npmjs.org/dayjs/-/dayjs-1.11.13.tgz",
"integrity": "sha512-oaMBel6gjolK862uaPQOVTA7q3TZhuSvuMQAAglQDOWYO9A91IrAOUJEyKVlqJlHE0vq5p5UXxzdPfMH/x6xNg=="
},
"node_modules/debug": {
"version": "4.4.0",
"resolved": "https://registry.npmjs.org/debug/-/debug-4.4.0.tgz",
@ -15498,6 +15520,14 @@
"resolved": "https://registry.npmjs.org/eastasianwidth/-/eastasianwidth-0.2.0.tgz",
"integrity": "sha512-I88TYZWc9XiYHRQ4/3c5rjjfgkjhLyW2luGIheGERbNQ6OY7yTybanSpDXZa8y7VUP9YmDcYa+eyq4ca7iLqWA=="
},
"node_modules/ecdsa-sig-formatter": {
"version": "1.0.11",
"resolved": "https://registry.npmjs.org/ecdsa-sig-formatter/-/ecdsa-sig-formatter-1.0.11.tgz",
"integrity": "sha512-nagl3RYrbNv6kQkeJIpt6NJZy8twLB/2vtz6yN9Z4vRKHN4/QZJIEbqohALSgwKdnksuY3k5Addp5lg8sVoVcQ==",
"dependencies": {
"safe-buffer": "^5.0.1"
}
},
"node_modules/ee-first": {
"version": "1.1.1",
"resolved": "https://registry.npmjs.org/ee-first/-/ee-first-1.1.1.tgz",
@ -16385,6 +16415,20 @@
"node": ">= 0.6"
}
},
"node_modules/express/node_modules/qs": {
"version": "6.13.0",
"resolved": "https://registry.npmjs.org/qs/-/qs-6.13.0.tgz",
"integrity": "sha512-+38qI9SOr8tfZ4QmJNplMUxqjbe7LKvvZgWdExBOmd+egZTtjLB67Gu0HRX3u/XOq7UU2Nx6nsjvS16Z9uwfpg==",
"dependencies": {
"side-channel": "^1.0.6"
},
"engines": {
"node": ">=0.6"
},
"funding": {
"url": "https://github.com/sponsors/ljharb"
}
},
"node_modules/extend": {
"version": "3.0.2",
"resolved": "https://registry.npmjs.org/extend/-/extend-3.0.2.tgz",
@ -17129,6 +17173,18 @@
"node": ">= 0.8"
}
},
"node_modules/https-proxy-agent": {
"version": "5.0.1",
"resolved": "https://registry.npmjs.org/https-proxy-agent/-/https-proxy-agent-5.0.1.tgz",
"integrity": "sha512-dFcAjpTQFgoLMzC2VwU+C/CbS7uRL0lWmxDITmqm7C+7F0Odmj6s9l6alZc6AELXhrnggM2CeWSXHGOdX2YtwA==",
"dependencies": {
"agent-base": "6",
"debug": "4"
},
"engines": {
"node": ">= 6"
}
},
"node_modules/humanize-ms": {
"version": "1.2.1",
"resolved": "https://registry.npmjs.org/humanize-ms/-/humanize-ms-1.2.1.tgz",
@ -17911,6 +17967,27 @@
"url": "https://github.com/chalk/chalk?sponsor=1"
}
},
"node_modules/jsonwebtoken": {
"version": "9.0.2",
"resolved": "https://registry.npmjs.org/jsonwebtoken/-/jsonwebtoken-9.0.2.tgz",
"integrity": "sha512-PRp66vJ865SSqOlgqS8hujT5U4AOgMfhrwYIuIhfKaoSCZcirrmASQr8CX7cUg+RMih+hgznrjp99o+W4pJLHQ==",
"dependencies": {
"jws": "^3.2.2",
"lodash.includes": "^4.3.0",
"lodash.isboolean": "^3.0.3",
"lodash.isinteger": "^4.0.4",
"lodash.isnumber": "^3.0.3",
"lodash.isplainobject": "^4.0.6",
"lodash.isstring": "^4.0.1",
"lodash.once": "^4.0.0",
"ms": "^2.1.1",
"semver": "^7.5.4"
},
"engines": {
"node": ">=12",
"npm": ">=6"
}
},
"node_modules/jsx-ast-utils": {
"version": "3.3.5",
"resolved": "https://registry.npmjs.org/jsx-ast-utils/-/jsx-ast-utils-3.3.5.tgz",
@ -17926,6 +18003,25 @@
"node": ">=4.0"
}
},
"node_modules/jwa": {
"version": "1.4.1",
"resolved": "https://registry.npmjs.org/jwa/-/jwa-1.4.1.tgz",
"integrity": "sha512-qiLX/xhEEFKUAJ6FiBMbes3w9ATzyk5W7Hvzpa/SLYdxNtng+gcurvrI7TbACjIXlsJyr05/S1oUhZrc63evQA==",
"dependencies": {
"buffer-equal-constant-time": "1.0.1",
"ecdsa-sig-formatter": "1.0.11",
"safe-buffer": "^5.0.1"
}
},
"node_modules/jws": {
"version": "3.2.2",
"resolved": "https://registry.npmjs.org/jws/-/jws-3.2.2.tgz",
"integrity": "sha512-YHlZCB6lMTllWDtSPHz/ZXTsi8S00usEV6v1tjq8tOUZzw7DpSDWVXjXDre6ed1w/pd495ODpHZYSdkRTsa0HA==",
"dependencies": {
"jwa": "^1.4.1",
"safe-buffer": "^5.0.1"
}
},
"node_modules/keyv": {
"version": "4.5.4",
"resolved": "https://registry.npmjs.org/keyv/-/keyv-4.5.4.tgz",
@ -18064,6 +18160,16 @@
"resolved": "https://registry.npmjs.org/lodash.clonedeep/-/lodash.clonedeep-4.5.0.tgz",
"integrity": "sha512-H5ZhCF25riFd9uB5UCkVKo61m3S/xZk1x4wA6yp/L3RFP6Z/eHH1ymQcGLo7J3GMPfm0V/7m1tryHuGVxpqEBQ=="
},
"node_modules/lodash.includes": {
"version": "4.3.0",
"resolved": "https://registry.npmjs.org/lodash.includes/-/lodash.includes-4.3.0.tgz",
"integrity": "sha512-W3Bx6mdkRTGtlJISOvVD/lbqjTlPPUDTMnlXZFnVwi9NKJ6tiAk6LVdlhZMm17VZisqhKcgzpO5Wz91PCt5b0w=="
},
"node_modules/lodash.isboolean": {
"version": "3.0.3",
"resolved": "https://registry.npmjs.org/lodash.isboolean/-/lodash.isboolean-3.0.3.tgz",
"integrity": "sha512-Bz5mupy2SVbPHURB98VAcw+aHh4vRV5IPNhILUCsOzRmsTmSQ17jIuqopAentWoehktxGd9e/hbIXq980/1QJg=="
},
"node_modules/lodash.isempty": {
"version": "4.4.0",
"resolved": "https://registry.npmjs.org/lodash.isempty/-/lodash.isempty-4.4.0.tgz",
@ -18075,17 +18181,42 @@
"integrity": "sha512-pDo3lu8Jhfjqls6GkMgpahsF9kCyayhgykjyLMNFTKWrpVdAQtYyB4muAMWozBB4ig/dtWAmsMxLEI8wuz+DYQ==",
"deprecated": "This package is deprecated. Use require('node:util').isDeepStrictEqual instead."
},
"node_modules/lodash.isinteger": {
"version": "4.0.4",
"resolved": "https://registry.npmjs.org/lodash.isinteger/-/lodash.isinteger-4.0.4.tgz",
"integrity": "sha512-DBwtEWN2caHQ9/imiNeEA5ys1JoRtRfY3d7V9wkqtbycnAmTvRRmbHKDV4a0EYc678/dia0jrte4tjYwVBaZUA=="
},
"node_modules/lodash.isnumber": {
"version": "3.0.3",
"resolved": "https://registry.npmjs.org/lodash.isnumber/-/lodash.isnumber-3.0.3.tgz",
"integrity": "sha512-QYqzpfwO3/CWf3XP+Z+tkQsfaLL/EnUlXWVkIk5FUPc4sBdTehEqZONuyRt2P67PXAk+NXmTBcc97zw9t1FQrw=="
},
"node_modules/lodash.isobject": {
"version": "3.0.2",
"resolved": "https://registry.npmjs.org/lodash.isobject/-/lodash.isobject-3.0.2.tgz",
"integrity": "sha512-3/Qptq2vr7WeJbB4KHUSKlq8Pl7ASXi3UG6CMbBm8WRtXi8+GHm7mKaU3urfpSEzWe2wCIChs6/sdocUsTKJiA=="
},
"node_modules/lodash.isplainobject": {
"version": "4.0.6",
"resolved": "https://registry.npmjs.org/lodash.isplainobject/-/lodash.isplainobject-4.0.6.tgz",
"integrity": "sha512-oSXzaWypCMHkPC3NvBEaPHf0KsA5mvPrOPgQWDsbg8n7orZ290M0BmC/jgRZ4vcJ6DTAhjrsSYgdsW/F+MFOBA=="
},
"node_modules/lodash.isstring": {
"version": "4.0.1",
"resolved": "https://registry.npmjs.org/lodash.isstring/-/lodash.isstring-4.0.1.tgz",
"integrity": "sha512-0wJxfxH1wgO3GrbuP+dTTk7op+6L41QCXbGINEmD+ny/G/eCqGzxyCsh7159S+mgDDcoarnBw6PC1PS5+wUGgw=="
},
"node_modules/lodash.merge": {
"version": "4.6.2",
"resolved": "https://registry.npmjs.org/lodash.merge/-/lodash.merge-4.6.2.tgz",
"integrity": "sha512-0KpjqXRVvrYyCsX1swR/XTK0va6VQkQM6MNo7PqW77ByjAhoARA8EfrP1N4+KlKj8YS0ZUCtRT/YUuhyYDujIQ==",
"dev": true
},
"node_modules/lodash.once": {
"version": "4.1.1",
"resolved": "https://registry.npmjs.org/lodash.once/-/lodash.once-4.1.1.tgz",
"integrity": "sha512-Sb487aTOCr9drQVL8pIxOzVhafOjZN9UU54hiN8PU3uAiSV7lx1yYNpbNmex2PK6dSJoNTSJUUswT651yww3Mg=="
},
"node_modules/longest-streak": {
"version": "3.1.0",
"resolved": "https://registry.npmjs.org/longest-streak/-/longest-streak-3.1.0.tgz",
@ -20170,11 +20301,11 @@
}
},
"node_modules/qs": {
"version": "6.13.0",
"resolved": "https://registry.npmjs.org/qs/-/qs-6.13.0.tgz",
"integrity": "sha512-+38qI9SOr8tfZ4QmJNplMUxqjbe7LKvvZgWdExBOmd+egZTtjLB67Gu0HRX3u/XOq7UU2Nx6nsjvS16Z9uwfpg==",
"version": "6.14.0",
"resolved": "https://registry.npmjs.org/qs/-/qs-6.14.0.tgz",
"integrity": "sha512-YWWTjgABSKcvs/nWBi9PycY/JiPJqOD4JA6o9Sej2AtvSGarXxKC3OQSk4pAarbdQlKAh5D4FCQkJNkW+GAn3w==",
"dependencies": {
"side-channel": "^1.0.6"
"side-channel": "^1.1.0"
},
"engines": {
"node": ">=0.6"
@ -20747,6 +20878,11 @@
"loose-envify": "^1.1.0"
}
},
"node_modules/scmp": {
"version": "2.1.0",
"resolved": "https://registry.npmjs.org/scmp/-/scmp-2.1.0.tgz",
"integrity": "sha512-o/mRQGk9Rcer/jEEw/yw4mwo3EU/NvYvp577/Btqrym9Qy5/MdWGBqipbALgd2lrdWTJ5/gqDusxfnQBxOxT2Q=="
},
"node_modules/scroll-into-view-if-needed": {
"version": "3.0.10",
"resolved": "https://registry.npmjs.org/scroll-into-view-if-needed/-/scroll-into-view-if-needed-3.0.10.tgz",
@ -21713,6 +21849,23 @@
"fsevents": "~2.3.3"
}
},
"node_modules/twilio": {
"version": "5.4.5",
"resolved": "https://registry.npmjs.org/twilio/-/twilio-5.4.5.tgz",
"integrity": "sha512-PIteif0CBOrA42SWZiT8IwUuqTNakAFgvXYWsrjEPGaDSczu/GvBs3vUock4S+UguXj7cV4qBswWgXs5ySjGNg==",
"dependencies": {
"axios": "^1.7.8",
"dayjs": "^1.11.9",
"https-proxy-agent": "^5.0.0",
"jsonwebtoken": "^9.0.2",
"qs": "^6.9.4",
"scmp": "^2.1.0",
"xmlbuilder": "^13.0.2"
},
"engines": {
"node": ">=14.0"
}
},
"node_modules/type-check": {
"version": "0.4.0",
"resolved": "https://registry.npmjs.org/type-check/-/type-check-0.4.0.tgz",
@ -22460,6 +22613,14 @@
}
}
},
"node_modules/xmlbuilder": {
"version": "13.0.2",
"resolved": "https://registry.npmjs.org/xmlbuilder/-/xmlbuilder-13.0.2.tgz",
"integrity": "sha512-Eux0i2QdDYKbdbA6AM6xE4m6ZTZr4G4xF9kahI2ukSEMCzwce2eX9WlTI5J3s+NU7hpasFsr8hWIONae7LluAQ==",
"engines": {
"node": ">=6.0"
}
},
"node_modules/yallist": {
"version": "3.1.1",
"resolved": "https://registry.npmjs.org/yallist/-/yallist-3.1.1.tgz",

View file

@ -58,6 +58,7 @@
"tailwind-merge": "^2.5.5",
"tailwindcss-animate": "^1.0.7",
"tiktoken": "^1.0.17",
"twilio": "^5.4.5",
"typewriter-effect": "^2.21.0",
"zod": "^3.23.8",
"zod-to-json-schema": "^3.23.5"

View file

@ -0,0 +1,2 @@
__pycache__
.venv/

View file

@ -0,0 +1,24 @@
# Environment variables for the Voice API application
# Twilio configuration
TWILIO_ACCOUNT_SID=your_account_sid_here
TWILIO_AUTH_TOKEN=your_auth_token_here
BASE_URL=https://your-public-url-here.ngrok.io
# RowBoat API configuration
ROWBOAT_API_HOST=http://localhost:3000
ROWBOAT_PROJECT_ID=your_project_id_here
ROWBOAT_API_KEY=your_api_key_here
# Speech processing APIs
DEEPGRAM_API_KEY=your_deepgram_api_key_here
ELEVENLABS_API_KEY=your_elevenlabs_api_key_here
# Server configuration
PORT=3009
WHATSAPP_PORT=3010
# Redis configuration for persistent state
REDIS_URL=redis://localhost:6379/0
REDIS_EXPIRY_SECONDS=86400
SERVICE_NAME=rowboat-voice

2
apps/twilio_handler/.gitignore vendored Normal file
View file

@ -0,0 +1,2 @@
__pycache__
.venv

View file

@ -0,0 +1,18 @@
FROM python:3.12-slim
WORKDIR /app
# Copy requirements first to leverage Docker cache
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application code
COPY . .
# Set environment variables
ENV FLASK_APP=app
ENV PYTHONUNBUFFERED=1
ENV PYTHONPATH=/app
# Command to run Flask development server
CMD ["flask", "run", "--host=0.0.0.0", "--port=4010"]

631
apps/twilio_handler/app.py Normal file
View file

@ -0,0 +1,631 @@
from flask import Flask, request, jsonify, Response
from twilio.twiml.voice_response import VoiceResponse, Gather
import os
import logging
import uuid
from typing import Dict, Any, Optional
import json
from time import time
from rowboat.schema import SystemMessage, UserMessage, ApiMessage
import elevenlabs
# Load environment variables
from load_env import load_environment
load_environment()
from twilio_api import process_conversation_turn
# Import MongoDB utility functions
from util import (
get_call_state,
save_call_state,
delete_call_state,
get_mongodb_status,
get_twilio_config,
CallState
)
Message = SystemMessage | UserMessage
ELEVENLABS_API_KEY = os.environ.get("ELEVENLABS_API_KEY")
elevenlabs_client = elevenlabs.ElevenLabs(api_key=ELEVENLABS_API_KEY)
app = Flask(__name__)
# Configure logging to stdout for Docker compatibility
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[logging.StreamHandler()] # Send logs to stdout
)
logger = logging.getLogger(__name__)
# Local in-memory cache of call state (temporary cache only - not primary storage)
# MongoDB is the primary storage for state across multiple instances
active_calls = {}
# TTS configuration
TTS_VOICE = "Markus - Mature and Chill"
TTS_MODEL = "eleven_flash_v2_5"
@app.route('/inbound', methods=['POST'])
def handle_inbound_call():
"""Handle incoming calls to Twilio numbers configured for RowBoat"""
try:
# Log the entire request for debugging
logger.info(f"Received inbound call request: {request.values}")
# Get the Twilio phone number that received the call
to_number = request.values.get('To')
call_sid = request.values.get('CallSid')
from_number = request.values.get('From')
logger.info(f"Inbound call from {from_number} to {to_number}, CallSid: {call_sid}")
logger.info(f"Raw To number value: '{to_number}', Type: {type(to_number)}")
# Get configuration ONLY from MongoDB
system_prompt = "You are a helpful assistant. Provide concise and clear answers."
workflow_id = None
project_id = None
# Look up configuration in MongoDB
twilio_config = get_twilio_config(to_number)
if twilio_config:
workflow_id = twilio_config['workflow_id']
project_id = twilio_config['project_id']
system_prompt = twilio_config.get('system_prompt', system_prompt)
logger.info(f"Found MongoDB configuration for {to_number}: project_id={project_id}, workflow_id={workflow_id}")
else:
logger.warning(f"No active configuration found in MongoDB for phone number {to_number}")
if not workflow_id:
# No workflow found - provide error message
logger.error(f"No workflow_id found for inbound call to {to_number}")
response = VoiceResponse()
response.say("I'm sorry, this phone number is not properly configured in our system. Please contact support.", voice='alice')
# Include additional information in TwiML for debugging
response.say(f"Received call to number {to_number}", voice='alice')
response.hangup()
return str(response)
# Initialize call state with stateless API fields
call_state = CallState(
workflow_id=workflow_id,
project_id=project_id,
system_prompt=system_prompt,
conversation_history=[],
messages=[], # For stateless API
state=None, # For stateless API state
turn_count=0,
inbound=True,
to_number=to_number,
created_at=int(time()) # Add timestamp for expiration tracking
)
# Save to MongoDB (primary source of truth)
try:
save_call_state(call_sid, call_state)
logger.info(f"Saved initial call state to MongoDB for inbound call {call_sid}")
except Exception as e:
logger.error(f"Error saving inbound call state to MongoDB: {str(e)}")
raise RuntimeError(f"Failed to save call state to MongoDB: {str(e)}")
# Only use memory storage as a temporary cache
# The service that handles the next request might be different
active_calls[call_sid] = call_state
logger.info(f"Initialized call state for {call_sid}, proceeding to handle_call")
# Create a direct response instead of redirecting
return handle_call(call_sid, workflow_id, project_id)
except Exception as e:
# Log the full error with traceback
import traceback
logger.error(f"Error in handle_inbound_call: {str(e)}")
logger.error(traceback.format_exc())
# Return a basic TwiML response so Twilio doesn't get a 500 error
response = VoiceResponse()
response.say("I'm sorry, we encountered an error processing your call. Please try again later.", voice='alice')
response.hangup()
return str(response)
@app.route('/twiml', methods=['POST'])
def handle_twiml_call():
"""TwiML endpoint for outbound call handling"""
call_sid = request.values.get('CallSid')
# Get call state to retrieve workflow_id and project_id
call_state = get_call_state(call_sid)
if call_state:
workflow_id = call_state.get('workflow_id')
project_id = call_state.get('project_id')
return handle_call(call_sid, workflow_id, project_id)
else:
# No call state found - error response
response = VoiceResponse()
response.say("I'm sorry, your call session has expired. Please try again.", voice='alice')
response.hangup()
return str(response)
def handle_call(call_sid, workflow_id, project_id=None):
"""Common handler for both inbound and outbound calls"""
try:
logger.info(f"handle_call: processing call {call_sid} with workflow {workflow_id}, project_id {project_id}")
# Get or initialize call state, first from MongoDB
call_state = None
try:
# Query MongoDB for the call state
call_state = get_call_state(call_sid)
if call_state:
logger.info(f"Loaded and restored call state from MongoDB for {call_sid}")
except Exception as e:
logger.error(f"Error retrieving MongoDB state for {call_sid}: {str(e)}")
call_state = None
# Try in-memory cache as fallback (temporary local cache)
if call_state is None and call_sid in active_calls:
call_state = active_calls.get(call_sid)
logger.info(f"Using in-memory cache for call state of {call_sid}")
# Initialize new state if needed
if call_state is None and workflow_id:
call_state = CallState(
workflow_id=workflow_id,
project_id=project_id,
system_prompt="You are a helpful assistant. Provide concise and clear answers.",
conversation_history=[],
messages=[], # For stateless API
state=None, # For stateless API state
turn_count=0,
inbound=False, # Default for outbound calls
to_number="", # This will be set properly for inbound calls
created_at=int(time()), # Add timestamp for expiration tracking
last_transcription=""
)
# Save to MongoDB (primary source of truth)
try:
save_call_state(call_sid, call_state)
logger.info(f"Initialized and saved new call state to MongoDB for {call_sid}")
except Exception as e:
logger.error(f"Error saving new call state to MongoDB: {str(e)}")
raise RuntimeError(f"Failed to save call state to MongoDB: {str(e)}")
# Only use memory as temporary cache for this request
active_calls[call_sid] = call_state
logger.info(f"Initialized new call state for {call_sid}")
logger.info(f"Using call state: {call_state}")
# Create TwiML response
response = VoiceResponse()
# Check if this is a new call
if call_state.get('turn_count', 0) == 0:
# Initial greeting for new calls
greeting = "Hello! I'm your RowBoat assistant. How can I help you today?"
logger.info(f"New call, preparing greeting: {greeting}")
try:
# Use streaming audio endpoint instead of generating files
# Include a unique ID to prevent caching
unique_id = str(uuid.uuid4())
# Use a relative URL - Twilio will use the same host as the webhook
audio_url = f"/stream-audio/{call_sid}/greeting/{unique_id}"
logger.info(f"Streaming greeting from relative URL: {audio_url}")
# Play the greeting via streaming
response.play(audio_url)
except Exception as e:
logger.error(f"Error with audio streaming for greeting: {str(e)}")
import traceback
logger.error(traceback.format_exc())
# Fallback to Twilio TTS
response.say(greeting, voice='alice')
# Update call state
call_state['turn_count'] = 1
# Save to MongoDB (primary source of truth)
try:
save_call_state(call_sid, call_state)
logger.info(f"Saved greeting state to MongoDB for {call_sid}")
except Exception as e:
logger.error(f"Error saving greeting state to MongoDB: {str(e)}")
raise RuntimeError(f"Failed to save greeting state to MongoDB: {str(e)}")
# Update local memory cache
active_calls[call_sid] = call_state
# Instead of using both Gather and Record which compete for input,
# just use Gather for speech recognition, and rely on its SpeechResult
# This is more reliable than trying to use Record and Deepgram
gather = Gather(
input='speech',
action=f'/process_speech?call_sid={call_sid}',
speech_timeout='auto',
language='en-US',
enhanced=True, # Enable enhanced speech recognition
speechModel='phone_call' # Optimize for phone calls
)
response.append(gather)
# If no input detected, redirect to twiml endpoint
# Call state will be retrieved from MongoDB
response.redirect('/twiml')
logger.info(f"Returning response: {str(response)}")
return str(response)
except Exception as e:
# Log the full error with traceback
import traceback
logger.error(f"Error in handle_call: {str(e)}")
logger.error(traceback.format_exc())
# Return a basic TwiML response
response = VoiceResponse()
response.say("I'm sorry, we encountered an error processing your call. Please try again later.", voice='alice')
response.hangup()
return str(response)
@app.route('/process_speech', methods=['POST'])
def process_speech():
"""Process user speech input and generate AI response"""
try:
logger.info(f"Processing speech: {request.values}")
call_sid = request.args.get('call_sid')
# Log all request values for debugging
logger.info(f"FULL REQUEST VALUES: {dict(request.values)}")
logger.info(f"FULL REQUEST ARGS: {dict(request.args)}")
# Get the speech result directly from Twilio
# We're now relying on Twilio's enhanced speech recognition instead of Deepgram
speech_result = request.values.get('SpeechResult')
confidence = request.values.get('Confidence')
logger.info(f"Twilio SpeechResult: {speech_result}")
logger.info(f"Twilio Confidence: {confidence}")
if not call_sid:
logger.warning(f"Missing call_sid: {call_sid}")
response = VoiceResponse()
response.say("I'm sorry, I couldn't process that request.", voice='alice')
response.hangup()
return str(response)
if not speech_result:
logger.warning("No speech result after transcription attempts")
response = VoiceResponse()
response.say("I'm sorry, I didn't catch what you said. Could you please try again?", voice='alice')
# Gather user input again
gather = Gather(
input='speech',
action=f'/process_speech?call_sid={call_sid}',
speech_timeout='auto',
language='en-US',
enhanced=True,
speechModel='phone_call'
)
response.append(gather)
# Redirect to twiml endpoint which will get call state from MongoDB
response.redirect('/twiml')
return str(response)
# Load call state from MongoDB (primary source of truth)
call_state = None
try:
call_state = get_call_state(call_sid)
if call_state:
logger.info(f"Loaded call state from MongoDB for speech processing: {call_sid}")
except Exception as e:
logger.error(f"Error retrieving MongoDB state for speech processing: {str(e)}")
call_state = None
# Try memory cache as fallback
if call_state is None and call_sid in active_calls:
call_state = active_calls[call_sid]
logger.info(f"Using in-memory state for speech processing: {call_sid}")
# Check if we have valid state
if not call_state:
logger.warning(f"No call state found for speech processing: {call_sid}")
response = VoiceResponse()
response.say("I'm sorry, your call session has expired. Please call back.", voice='alice')
response.hangup()
return str(response)
# Extract key information
workflow_id = call_state.get('workflow_id')
project_id = call_state.get('project_id')
system_prompt = call_state.get('system_prompt', "You are a helpful assistant.")
# Check if we have a Deepgram transcription stored in the call state
if 'last_transcription' in call_state and call_state['last_transcription']:
deepgram_transcription = call_state['last_transcription']
logger.info(f"Found stored Deepgram transcription: {deepgram_transcription}")
logger.info(f"Comparing with Twilio transcription: {speech_result}")
# Use the Deepgram transcription instead of Twilio's
speech_result = deepgram_transcription
# Remove it so we don't use it again
del call_state['last_transcription']
logger.info(f"Using Deepgram transcription instead")
# Log final user input that will be used
logger.info(f"Final user input: {speech_result}")
# Process with RowBoat agent
try:
# Clean up the speech result if needed
if speech_result:
# Remove any common filler words or fix typical transcription issues
import re
# Convert to lowercase for easier pattern matching
cleaned_input = speech_result.lower()
# Remove filler words that might be at the beginning
cleaned_input = re.sub(r'^(um|uh|like|so|okay|well)\s+', '', cleaned_input)
# Capitalize first letter for better appearance
if cleaned_input:
speech_result = cleaned_input[0].upper() + cleaned_input[1:]
logger.info(f"Sending to RowBoat: '{speech_result}'")
# Get previous messages and state from call state
previous_messages = call_state.get('messages', [])
previous_state = call_state.get('state')
# Process with stateless API
ai_response, updated_messages, updated_state = process_conversation_turn(
user_input=speech_result,
workflow_id=workflow_id,
system_prompt=system_prompt,
previous_messages=previous_messages,
previous_state=previous_state,
project_id=project_id
)
# Update the messages and state in call state
call_state['messages'] = updated_messages
call_state['state'] = updated_state
logger.info(f"RowBoat response: {ai_response}")
except Exception as e:
logger.error(f"Error processing with RowBoat: {str(e)}")
ai_response = "I'm sorry, I encountered an issue processing your request. Could you please try again?"
# Conversation history is updated in the streaming response section below
# Create TwiML response
response = VoiceResponse()
# Use streaming audio for the response
logger.info("Setting up response streaming with ElevenLabs")
try:
# Store the AI response in conversation history first
# (The stream-audio endpoint will read it from here)
# Update conversation history (do this before streaming so the endpoint can access it)
call_state['conversation_history'].append({
'user': speech_result,
'assistant': ai_response
})
call_state['turn_count'] += 1
# Save to MongoDB (primary source of truth)
try:
save_call_state(call_sid, call_state)
logger.info(f"Saved response state to MongoDB for {call_sid}")
except Exception as e:
logger.error(f"Error saving response state to MongoDB: {str(e)}")
raise RuntimeError(f"Failed to save response state to MongoDB: {str(e)}")
# Update local memory cache
active_calls[call_sid] = call_state
# Generate a unique ID to prevent caching
unique_id = str(uuid.uuid4())
# Use a relative URL - Twilio will use the same host as the webhook
audio_url = f"/stream-audio/{call_sid}/response/{unique_id}"
logger.info(f"Streaming response from relative URL: {audio_url}")
# Play the response via streaming
response.play(audio_url)
except Exception as e:
logger.error(f"Error with audio streaming for response: {str(e)}")
import traceback
logger.error(traceback.format_exc())
# Fallback to Twilio TTS
response.say(ai_response, voice='alice')
# Gather next user input with enhanced speech recognition
gather = Gather(
input='speech',
action=f'/process_speech?call_sid={call_sid}',
speech_timeout='auto',
language='en-US',
enhanced=True, # Enable enhanced speech recognition
speechModel='phone_call' # Optimize for phone calls
)
response.append(gather)
# If no input detected, redirect to twiml endpoint
# Call state will be retrieved from MongoDB
response.redirect('/twiml')
logger.info(f"Returning TwiML response for speech processing")
return str(response)
except Exception as e:
# Log the full error with traceback
import traceback
logger.error(f"Error in process_speech: {str(e)}")
logger.error(traceback.format_exc())
# Return a basic TwiML response
response = VoiceResponse()
response.say("I'm sorry, we encountered an error processing your speech. Please try again.", voice='alice')
response.gather(
input='speech',
action=f'/process_speech?call_sid={request.args.get("call_sid")}',
speech_timeout='auto'
)
return str(response)
@app.route('/stream-audio/<call_sid>/<text_type>/<unique_id>', methods=['GET'])
def stream_audio(call_sid, text_type, unique_id):
"""Stream audio directly from ElevenLabs to Twilio without saving to disk"""
try:
logger.info(f"Audio streaming requested for call {call_sid}, type {text_type}")
# Determine what text to synthesize
text_to_speak = ""
if text_type == "greeting":
# Use default greeting
text_to_speak = "Hello! I'm your RowBoat assistant. How can I help you today?"
elif text_type == "response":
# Get the text from call state (try MongoDB first, then memory)
call_state = None
# Try MongoDB first
try:
call_state = get_call_state(call_sid)
if call_state:
logger.info(f"Loaded call state from MongoDB for streaming: {call_sid}")
except Exception as e:
logger.error(f"Error retrieving MongoDB state for streaming: {str(e)}")
call_state = None
# Fall back to memory if needed
if call_state is None:
if call_sid not in active_calls:
logger.error(f"Call SID not found for streaming: {call_sid}")
return "Call not found", 404
call_state = active_calls[call_sid]
logger.info(f"Using in-memory state for streaming: {call_sid}")
if call_state.get('conversation_history') and len(call_state['conversation_history']) > 0:
# Get the most recent AI response
text_to_speak = call_state['conversation_history'][-1]['assistant']
else:
logger.warning(f"No conversation history found for call {call_sid}")
text_to_speak = "I'm sorry, I don't have a response ready. Could you please repeat?"
else:
# Direct text may be passed as the text_type (for testing)
text_to_speak = text_type
if not text_to_speak:
logger.error("No text to synthesize")
return "No text to synthesize", 400
logger.info(f"Streaming audio for text: {text_to_speak[:50]}...")
def generate():
try:
# Generate and stream the audio directly
audio_stream = elevenlabs_client.generate(
text=text_to_speak,
voice=TTS_VOICE,
model=TTS_MODEL,
output_format="mp3_44100_128"
)
# Stream chunks directly to the response
for chunk in audio_stream:
yield chunk
logger.info(f"Finished streaming audio for call {call_sid}")
except Exception as e:
logger.error(f"Error in audio stream generator: {str(e)}")
import traceback
logger.error(traceback.format_exc())
# Return a streaming response
response = Response(generate(), mimetype='audio/mpeg')
return response
except Exception as e:
logger.error(f"Error setting up audio stream: {str(e)}")
import traceback
logger.error(traceback.format_exc())
return "Error streaming audio", 500
@app.route('/call-status', methods=['POST'])
def call_status_callback():
"""Handle call status callbacks from Twilio"""
call_sid = request.values.get('CallSid')
call_status = request.values.get('CallStatus')
logger.info(f"Call {call_sid} status: {call_status}")
# Clean up resources when call completes
if call_status in ['completed', 'failed', 'busy', 'no-answer', 'canceled']:
# Get call state from MongoDB or memory
call_state = None
# Try to load from MongoDB first
try:
call_state = get_call_state(call_sid)
if call_state:
logger.info(f"Loaded final state from MongoDB for {call_sid}")
except Exception as e:
logger.error(f"Error retrieving final state from MongoDB: {str(e)}")
call_state = None
# Fall back to memory if needed
if call_state is None and call_sid in active_calls:
call_state = active_calls[call_sid]
logger.info(f"Using in-memory state for final call state of {call_sid}")
if call_state:
# Remove from active calls in both memory and MongoDB
if call_sid in active_calls:
del active_calls[call_sid]
logger.info(f"Removed call {call_sid} from active calls memory")
try:
# Remove the document from MongoDB
delete_call_state(call_sid)
logger.info(f"Removed call {call_sid} from MongoDB")
except Exception as e:
logger.error(f"Error removing call state from MongoDB: {str(e)}")
return '', 204
@app.route('/health', methods=['GET'])
def health_check():
"""Simple health check endpoint"""
health_data = {
"status": "healthy",
"active_calls_memory": len(active_calls)
}
# Get MongoDB status
try:
mongodb_status = get_mongodb_status()
health_data["mongodb"] = mongodb_status
health_data["active_calls_mongodb"] = mongodb_status.get("active_calls", 0)
except Exception as e:
health_data["mongodb_error"] = str(e)
health_data["status"] = "degraded"
return jsonify(health_data)
if __name__ == '__main__':
# Log startup information
logger.info(f"Starting Twilio-RowBoat server")
# Remove the explicit run configuration since Flask CLI will handle it
app.run()

View file

@ -0,0 +1,6 @@
from dotenv import load_dotenv
import os
def load_environment():
"""Load environment variables from .env file"""
load_dotenv()

View file

@ -0,0 +1,39 @@
aiohappyeyeballs==2.5.0
aiohttp==3.11.13
aiohttp-retry==2.9.1
aiosignal==1.3.2
annotated-types==0.7.0
anyio==4.8.0
attrs==25.1.0
blinker==1.9.0
certifi==2025.1.31
charset-normalizer==3.4.1
click==8.1.8
dnspython==2.7.0
dotenv==0.9.9
elevenlabs==1.52.0
Flask==3.1.0
frozenlist==1.5.0
h11==0.14.0
httpcore==1.0.7
httpx==0.28.1
idna==3.10
itsdangerous==2.2.0
Jinja2==3.1.6
MarkupSafe==3.0.2
multidict==6.1.0
propcache==0.3.0
pydantic==2.10.6
pydantic_core==2.27.2
PyJWT==2.10.1
pymongo==4.11.2
python-dotenv==1.0.1
requests==2.32.3
rowboat==2.1.0
sniffio==1.3.1
twilio==9.4.6
typing_extensions==4.12.2
urllib3==2.3.0
websockets==15.0.1
Werkzeug==3.1.3
yarl==1.18.3

View file

@ -0,0 +1,109 @@
from twilio.rest import Client as TwilioClient
from rowboat.client import Client
from rowboat.schema import UserMessage, SystemMessage
import os
from typing import Dict, List, Optional, Tuple, Any
import logging
from util import get_api_key
import time
import json
# Load environment variables
from load_env import load_environment
load_environment()
# Configure logging to stdout for Docker compatibility
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[logging.StreamHandler()] # Send logs to stdout
)
logger = logging.getLogger(__name__)
# Environment variables and configuration
ROWBOAT_API_HOST = os.environ.get("ROWBOAT_API_HOST").strip()
Message = UserMessage | SystemMessage
def process_conversation_turn(
user_input: str,
workflow_id: str,
system_prompt: str = "You are a helpful assistant. Provide concise and clear answers.",
previous_messages: List[Message] = None,
previous_state: Any = None,
project_id: str = None
) -> Tuple[str, List[Message], Any]:
"""
Process a single conversation turn with the RowBoat agent using the stateless API.
Args:
user_input: User's transcribed input
workflow_id: RowBoat workflow ID
system_prompt: System prompt for the agent
previous_messages: Previous messages in the conversation
previous_state: Previous state from RowBoat
project_id: RowBoat project ID (if different from default)
Returns:
A tuple of (response_text, updated_messages, updated_state)
"""
try:
# Initialize messages list if not provided
messages = [] if previous_messages is None else previous_messages.copy()
# If we're starting a new conversation, add the system message
if not messages or not any(msg.role == 'system' for msg in messages):
messages.append(SystemMessage(role='system', content=system_prompt))
# Add the user's new message
messages.append(UserMessage(role='user', content=user_input))
# Process the conversation using the stateless API
logger.info(f"Sending to RowBoat API with {len(messages)} messages")
# Create client with custom project_id if provided
client = Client(
host=ROWBOAT_API_HOST,
project_id=project_id,
api_key=get_api_key(project_id)
)
response_messages, new_state = client.chat(
messages=messages,
workflow_id=workflow_id,
state=previous_state
)
# Extract the assistant's response (last message)
if response_messages and len(response_messages) > 0:
assistant_response = response_messages[-1].content
else:
assistant_response = "I'm sorry, I didn't receive a proper response."
# Update messages list with the new responses
final_messages = messages + response_messages
# dump_data = {
# 'messages': [msg.model_dump() for msg in messages],
# 'response_messages': [msg.model_dump() for msg in response_messages],
# 'state': new_state
# }
# # Write messages to a debug file for inspection
# fname = f'debug_dump_{time.time()}.json'
# try:
# with open(fname, 'w') as f:
# json.dump(dump_data, f, indent=2)
# logger.info(f"Wrote debug info to {fname}")
# except Exception as e:
# logger.error(f"Failed to write message debug file: {str(e)}")
logger.info(f"Got response from RowBoat API: {assistant_response[:100]}...")
return assistant_response, final_messages, new_state
except Exception as e:
logger.error(f"Error processing conversation turn: {str(e)}")
import traceback
logger.error(traceback.format_exc())
return "I'm sorry, I encountered an error processing your request.", previous_messages, previous_state

423
apps/twilio_handler/util.py Normal file
View file

@ -0,0 +1,423 @@
import os
import logging
import datetime
from typing import Dict, Any, Optional, List, Union
import copy
from pymongo import MongoClient
from pymongo.errors import ConnectionFailure, PyMongoError
from pymongo.collection import Collection
from bson import json_util
from pydantic import BaseModel
from rowboat.schema import ApiMessage
# Configure logging to stdout for Docker compatibility
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[logging.StreamHandler()] # Send logs to stdout
)
logger = logging.getLogger(__name__)
# MongoDB Configuration
MONGODB_URI = os.environ.get('MONGODB_URI')
MONGODB_DB = 'rowboat'
CALL_STATE_COLLECTION = 'call-state'
MONGODB_EXPIRY_SECONDS = 86400 # Default 24 hours
API_KEYS_COLLECTION = "api_keys"
# MongoDB client singleton
_mongo_client = None
_db = None
_call_state_collection = None
_api_keys_collection = None
# Define chat state pydantic model
class CallState(BaseModel):
messages: List[ApiMessage] = []
workflow_id: str
project_id: str
system_prompt: str
turn_count: int = 0
inbound: bool = False
conversation_history: List[Dict[str, str]] = [] # Using Dict instead of ApiMessage for chat history
to_number: str = ""
created_at: int
state: Any = None # Allow any type since the API might return a complex state object
last_transcription: Optional[str] = None
# Enable dictionary-style access for compatibility with existing code
def __getitem__(self, key):
return getattr(self, key)
def __setitem__(self, key, value):
setattr(self, key, value)
def get(self, key, default=None):
return getattr(self, key, default)
model_config = {
# Allow extra fields for flexibility
"extra": "allow",
# More lenient type validation
"arbitrary_types_allowed": True,
# Allow population by field name
"populate_by_name": True
}
def init_mongodb():
"""Initialize MongoDB connection and set up indexes."""
global _mongo_client, _db, _call_state_collection, _api_keys_collection
try:
_mongo_client = MongoClient(MONGODB_URI)
# Force a command to check the connection
_mongo_client.admin.command('ping')
# Set up database and collection
_db = _mongo_client[MONGODB_DB]
_call_state_collection = _db[CALL_STATE_COLLECTION]
_api_keys_collection = _db[API_KEYS_COLLECTION]
# Create TTL index if it doesn't exist
if 'expires_at_1' not in _call_state_collection.index_information():
_call_state_collection.create_index('expires_at', expireAfterSeconds=0)
logger.info(f"Connected to MongoDB at {MONGODB_URI}")
return True
except ConnectionFailure as e:
logger.error(f"Failed to connect to MongoDB: {str(e)}")
raise RuntimeError(f"Could not connect to MongoDB: {str(e)}")
def get_collection() -> Collection:
"""Get the MongoDB collection, initializing if needed."""
global _call_state_collection
if _call_state_collection is None:
init_mongodb()
return _call_state_collection
def get_api_keys_collection() -> Collection:
"""Get the MongoDB collection, initializing if needed."""
global _api_keys_collection
if _api_keys_collection is None:
init_mongodb()
return _api_keys_collection
def get_api_key(project_id: str) -> Optional[str]:
"""Get the API key for a given project ID."""
collection = get_api_keys_collection()
doc = collection.find_one({"projectId": project_id})
return doc["key"] if doc else None
def save_call_state(call_sid: str, call_state: CallState) -> bool:
"""
Save call state to MongoDB.
Args:
call_sid: The call SID to use as document ID
call_state: The call state dictionary to save
Returns:
True if successful, False otherwise
"""
try:
# Validate call_state is a CallState object
if not isinstance(call_state, CallState):
raise ValueError(f"call_state must be a CallState object, got {type(call_state)}")
collection = get_collection()
# Use call_sid as document ID
collection.update_one(
{'_id': call_sid},
{'$set': call_state.model_dump()},
upsert=True
)
logger.info(f"Saved call state to MongoDB for call {call_sid}")
return True
except PyMongoError as e:
logger.error(f"Error saving call state to MongoDB for call {call_sid}: {str(e)}")
raise RuntimeError(f"Failed to save call state to MongoDB: {str(e)}")
except Exception as e:
logger.error(f"Unexpected error in save_call_state: {str(e)}")
raise RuntimeError(f"Failed to save call state: {str(e)}")
def get_call_state(call_sid: str) -> Optional[CallState]:
"""
Retrieve call state from MongoDB.
Args:
call_sid: The call SID to retrieve
Returns:
Call state dictionary or None if not found
"""
try:
collection = get_collection()
# Query MongoDB for the call state
state_doc = collection.find_one({'_id': call_sid})
if not state_doc:
logger.info(f"No call state found in MongoDB for call {call_sid}")
return None
call_state = CallState.model_validate(state_doc)
logger.info(f"Retrieved call state from MongoDB for call {call_sid}")
return call_state
except PyMongoError as e:
logger.error(f"Error retrieving call state from MongoDB for call {call_sid}: {str(e)}")
raise RuntimeError(f"Failed to retrieve call state from MongoDB: {str(e)}")
except Exception as e:
logger.error(f"Unexpected error in get_call_state: {str(e)}")
raise RuntimeError(f"Failed to retrieve call state: {str(e)}")
def delete_call_state(call_sid: str) -> bool:
"""
Delete call state from MongoDB.
Args:
call_sid: The call SID to delete
Returns:
True if successful, False if not found
"""
try:
collection = get_collection()
# Delete the document from MongoDB
result = collection.delete_one({'_id': call_sid})
if result.deleted_count > 0:
logger.info(f"Deleted call state from MongoDB for call {call_sid}")
return True
else:
logger.info(f"No call state found to delete in MongoDB for call {call_sid}")
return False
except PyMongoError as e:
logger.error(f"Error deleting call state from MongoDB for call {call_sid}: {str(e)}")
raise RuntimeError(f"Failed to delete call state from MongoDB: {str(e)}")
except Exception as e:
logger.error(f"Unexpected error in delete_call_state: {str(e)}")
raise RuntimeError(f"Failed to delete call state: {str(e)}")
def count_active_calls() -> int:
"""
Count active call documents in MongoDB.
Returns:
Number of active call documents
"""
try:
collection = get_collection()
return collection.count_documents({})
except PyMongoError as e:
logger.error(f"Error counting active calls in MongoDB: {str(e)}")
raise RuntimeError(f"Failed to count active calls in MongoDB: {str(e)}")
except Exception as e:
logger.error(f"Unexpected error in count_active_calls: {str(e)}")
raise RuntimeError(f"Failed to count active calls: {str(e)}")
def get_mongodb_status() -> Dict[str, Any]:
"""
Get MongoDB connection status information.
Returns:
Dictionary with status information
"""
status = {
"status": "connected",
"uri": MONGODB_URI,
"database": MONGODB_DB,
"collection": CALL_STATE_COLLECTION
}
try:
# First check connection with a simple command
collection = get_collection()
db = collection.database
db.command('ping')
status["connection"] = "ok"
# Count active calls
count = count_active_calls()
status["active_calls"] = count
# Get collection stats
try:
stats = db.command("collStats", CALL_STATE_COLLECTION)
status["size_bytes"] = stats.get("size", 0)
status["document_count"] = stats.get("count", 0)
status["index_count"] = len(stats.get("indexSizes", {}))
except Exception as stats_error:
status["stats_error"] = str(stats_error)
except Exception as e:
status["status"] = "error"
status["error"] = str(e)
status["timestamp"] = datetime.datetime.utcnow().isoformat()
return status
# Twilio configuration functions
def get_twilio_config(phone_number: str) -> Optional[Dict[str, Any]]:
"""
Get Twilio configuration for a specific phone number from MongoDB.
Args:
phone_number: The phone number to look up configuration for
Returns:
Configuration dictionary or None if not found/active
"""
try:
# Get MongoDB client and database
client = get_collection().database.client
db = client[MONGODB_DB]
# Use the twilio_configs collection
config_collection = db['twilio_configs']
# Enhanced logging for phone number format
logger.info(f"Looking up configuration for phone number: '{phone_number}'")
# Try different formats of the phone number
cleaned_number = phone_number.strip().replace(' ', '').replace('-', '').replace('(', '').replace(')', '')
possible_formats = [
phone_number, # Original format from Twilio
cleaned_number, # Thoroughly cleaned number
'+' + cleaned_number if not cleaned_number.startswith('+') else cleaned_number, # Ensure + prefix
# Try with different country code formats
'+1' + cleaned_number[-10:] if len(cleaned_number) >= 10 else cleaned_number, # US format with +1
'1' + cleaned_number[-10:] if len(cleaned_number) >= 10 else cleaned_number, # US format with 1
cleaned_number[-10:] if len(cleaned_number) >= 10 else cleaned_number, # US format without country code
]
# Remove duplicates while preserving order
unique_formats = []
for fmt in possible_formats:
if fmt not in unique_formats:
unique_formats.append(fmt)
possible_formats = unique_formats
# Log the formats we're trying
logger.info(f"Trying phone number formats: {possible_formats}")
# Try each format
for phone_format in possible_formats:
# Look up the configuration for this phone number format with status=active
config = config_collection.find_one({'phone_number': phone_format, 'status': 'active'})
if config:
logger.info(f"Found active configuration for '{phone_format}': project_id={config.get('project_id')}, workflow_id={config.get('workflow_id')}")
break # Found a match, exit the loop
# If we didn't find any match
if not config:
# Try a more generic query to see what configurations exist
try:
all_configs = list(config_collection.find({'phone_number': {'$regex': phone_number[-10:] if len(phone_number) >= 10 else phone_number}}))
if all_configs:
logger.warning(f"Found {len(all_configs)} configurations that match phone number {phone_number}, but none are active:")
for cfg in all_configs:
logger.warning(f" - Phone: {cfg.get('phone_number')}, Status: {cfg.get('status')}, Workflow: {cfg.get('workflow_id')}")
else:
logger.warning(f"No configurations found at all for phone number {phone_number} or related formats")
except Exception as e:
logger.error(f"Error running regex query: {str(e)}")
logger.warning(f"No active configuration found for any format of phone number {phone_number}")
return None
# Make sure required fields are present
if 'project_id' not in config or 'workflow_id' not in config:
logger.error(f"Configuration for {phone_number} is missing required fields")
return None
logger.info(f"Found active configuration for {phone_number}: project_id={config['project_id']}, workflow_id={config['workflow_id']}")
return config
except Exception as e:
logger.error(f"Error retrieving Twilio configuration for {phone_number}: {str(e)}")
# Return None instead of raising an exception to allow fallback to default behavior
return None
def list_active_twilio_configs() -> List[Dict[str, Any]]:
"""
List all active Twilio configurations from MongoDB.
Returns:
List of active configuration dictionaries
"""
try:
# Get MongoDB client and database
client = get_collection().database.client
db = client[MONGODB_DB]
# Use the twilio_configs collection
config_collection = db['twilio_configs']
# Find all active configurations
configs = list(config_collection.find({'status': 'active'}))
logger.info(f"Found {len(configs)} active Twilio configurations")
return configs
except Exception as e:
logger.error(f"Error retrieving active Twilio configurations: {str(e)}")
return []
def save_twilio_config(config: Dict[str, Any]) -> bool:
"""
Save a Twilio configuration to MongoDB.
Args:
config: Configuration dictionary with at least phone_number, project_id, and workflow_id
Returns:
True if successful, False otherwise
"""
required_fields = ['phone_number', 'project_id', 'workflow_id']
for field in required_fields:
if field not in config:
logger.error(f"Missing required field '{field}' in Twilio configuration")
return False
try:
# Get MongoDB client and database
client = get_collection().database.client
db = client[MONGODB_DB]
# Use the twilio_configs collection
config_collection = db['twilio_configs']
# Ensure status is set to active
if 'status' not in config:
config['status'] = 'active'
# Add timestamp
config['updated_at'] = datetime.datetime.utcnow()
if 'created_at' not in config:
config['created_at'] = config['updated_at']
# Use phone_number as the ID
phone_number = config['phone_number']
# Update or insert the configuration
result = config_collection.update_one(
{'phone_number': phone_number},
{'$set': config},
upsert=True
)
if result.matched_count > 0:
logger.info(f"Updated Twilio configuration for {phone_number}")
else:
logger.info(f"Created new Twilio configuration for {phone_number}")
return True
except Exception as e:
logger.error(f"Error saving Twilio configuration: {str(e)}")
return False
# Initialize MongoDB on module import
init_mongodb()

View file

@ -36,6 +36,7 @@ services:
- CHAT_WIDGET_SESSION_JWT_SECRET=${CHAT_WIDGET_SESSION_JWT_SECRET}
- MAX_QUERIES_PER_MINUTE=${MAX_QUERIES_PER_MINUTE}
- MAX_PROJECTS_PER_USER=${MAX_PROJECTS_PER_USER}
- VOICE_API_URL=${VOICE_API_URL}
restart: unless-stopped
rowboat_agents:
@ -61,6 +62,16 @@ services:
- API_KEY=${COPILOT_API_KEY}
restart: unless-stopped
tools_webhook:
build:
context: ./apps/tools_webhook
dockerfile: Dockerfile
ports:
- "3005:3005"
environment:
- SIGNING_SECRET=${SIGNING_SECRET}
restart: unless-stopped
simulation_runner:
build:
context: ./apps/simulation_runner
@ -125,17 +136,6 @@ services:
- QDRANT_API_KEY=${QDRANT_API_KEY}
restart: unless-stopped
tools_webhook:
build:
context: ./apps/tools_webhook
dockerfile: Dockerfile
profiles: [ "tools_webhook" ]
ports:
- "3005:3005"
environment:
- SIGNING_SECRET=${SIGNING_SECRET}
restart: unless-stopped
chat_widget:
build:
context: ./apps/chat_widget
@ -162,4 +162,15 @@ services:
profiles: [ "docs" ]
ports:
- "8000:8000"
twilio_handler:
build:
context: ./apps/twilio_handler
dockerfile: Dockerfile
ports:
- "3009:3009"
environment:
- ELEVENLABS_API_KEY=${ELEVENLABS_API_KEY}
- ROWBOAT_API_HOST=http://rowboat:3000
- MONGODB_URI=${MONGODB_CONNECTION_STRING}
restart: unless-stopped