Merge branch 'main' into feat/audio_recording_in_tool_calls

This commit is contained in:
Abhishek 2026-04-10 17:53:30 +05:30 committed by GitHub
commit 5efcccf204
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
24 changed files with 450 additions and 93 deletions

View file

@ -51,6 +51,10 @@ S3_REGION = os.environ.get("S3_REGION", "us-east-1")
# Sentry configuration
SENTRY_DSN = os.getenv("SENTRY_DSN")
# PostHog configuration
POSTHOG_API_KEY = os.getenv("POSTHOG_API_KEY")
POSTHOG_HOST = os.getenv("POSTHOG_HOST", "https://us.i.posthog.com")
ENABLE_ARI_STASIS = os.getenv("ENABLE_ARI_STASIS", "false").lower() == "true"
SERIALIZE_LOG_OUTPUT = os.getenv("SERIALIZE_LOG_OUTPUT", "false").lower() == "true"

View file

@ -140,3 +140,18 @@ class ToolStatus(Enum):
ACTIVE = "active" # Tool is available for use
ARCHIVED = "archived" # Tool is soft-deleted
DRAFT = "draft" # Tool is being configured (not ready for use)
class PostHogEvent(str, Enum):
"""PostHog event names — backend events only."""
WORKFLOW_CREATED = "workflow_created"
WORKFLOW_PUBLISHED = "workflow_published"
WORKFLOW_DUPLICATED = "workflow_duplicated"
CALL_STARTED = "call_started"
CALL_COMPLETED = "call_completed"
CALL_FAILED = "call_failed"
TELEPHONY_CONFIGURED = "telephony_configured"
KNOWLEDGE_BASE_CREATED = "knowledge_base_created"
TOOL_CREATED = "tool_created"
AGENT_EMBEDDED = "agent_embedded"

View file

@ -17,3 +17,4 @@ docling[rapidocr]==2.68.0
pgvector==0.4.2
bcrypt==5.0.0
email-validator==2.3.0
posthog==3.24.0

View file

@ -60,6 +60,7 @@ async def signup(request: SignupRequest):
email=user.email,
name=request.name,
organization_id=organization.id,
provider_id=user.provider_id,
),
)
@ -84,6 +85,7 @@ async def login(request: LoginRequest):
id=user.id,
email=user.email,
organization_id=user.selected_organization_id,
provider_id=user.provider_id,
),
)
@ -94,4 +96,5 @@ async def get_current_user(user: UserModel = Depends(get_user)):
id=user.id,
email=user.email,
organization_id=user.selected_organization_id,
provider_id=user.provider_id,
)

View file

@ -7,6 +7,7 @@ from fastapi import APIRouter, Depends, HTTPException, Query
from loguru import logger
from api.db import db_client
from api.enums import PostHogEvent
from api.schemas.knowledge_base import (
ChunkSearchRequestSchema,
ChunkSearchResponseSchema,
@ -17,6 +18,7 @@ from api.schemas.knowledge_base import (
ProcessDocumentRequestSchema,
)
from api.services.auth.depends import get_user
from api.services.posthog_client import capture_event
from api.services.storage import storage_fs
from api.tasks.arq import enqueue_job
from api.tasks.function_names import FunctionNames
@ -142,6 +144,18 @@ async def process_document(
f"with OpenAI embeddings, org {user.selected_organization_id}"
)
capture_event(
distinct_id=str(user.provider_id),
event=PostHogEvent.KNOWLEDGE_BASE_CREATED,
properties={
"document_id": document.id,
"document_uuid": str(request.document_uuid),
"filename": filename,
"retrieval_mode": request.retrieval_mode,
"organization_id": user.selected_organization_id,
},
)
return DocumentResponseSchema(
id=document.id,
document_uuid=request.document_uuid,

View file

@ -6,7 +6,7 @@ from pydantic import BaseModel
from api.constants import DEFAULT_CAMPAIGN_RETRY_CONFIG, DEFAULT_ORG_CONCURRENCY_LIMIT
from api.db import db_client
from api.db.models import UserModel
from api.enums import OrganizationConfigurationKey
from api.enums import OrganizationConfigurationKey, PostHogEvent
from api.schemas.telephony_config import (
ARIConfigurationRequest,
ARIConfigurationResponse,
@ -24,6 +24,7 @@ from api.schemas.telephony_config import (
)
from api.services.auth.depends import get_user
from api.services.configuration.masking import is_mask_of, mask_key
from api.services.posthog_client import capture_event
from api.services.worker_sync.manager import get_worker_sync_manager
from api.services.worker_sync.protocol import WorkerSyncEventType
@ -257,6 +258,16 @@ async def save_telephony_configuration(
config_value,
)
capture_event(
distinct_id=str(user.provider_id),
event=PostHogEvent.TELEPHONY_CONFIGURED,
properties={
"provider": request.provider,
"phone_number_count": len(request.from_numbers),
"organization_id": user.selected_organization_id,
},
)
return {"message": "Telephony configuration saved successfully"}

View file

@ -9,8 +9,9 @@ from pydantic import BaseModel, Field, field_validator
from api.db import db_client
from api.db.models import UserModel
from api.enums import ToolCategory, ToolStatus
from api.enums import PostHogEvent, ToolCategory, ToolStatus
from api.services.auth.depends import get_user
from api.services.posthog_client import capture_event
router = APIRouter(prefix="/tools")
@ -342,6 +343,16 @@ async def create_tool(
icon_color=request.icon_color,
)
capture_event(
distinct_id=str(user.provider_id),
event=PostHogEvent.TOOL_CREATED,
properties={
"tool_name": request.name,
"tool_category": request.category,
"organization_id": user.selected_organization_id,
},
)
return build_tool_response(tool)

View file

@ -281,7 +281,12 @@ class SignalingManager:
# Start pipeline in background
asyncio.create_task(
run_pipeline_smallwebrtc(
pc, workflow_id, workflow_run_id, user.id, call_context_vars
pc,
workflow_id,
workflow_run_id,
user.id,
call_context_vars,
user_provider_id=str(user.provider_id),
)
)

View file

@ -13,7 +13,7 @@ from api.constants import DEPLOYMENT_MODE
from api.db import db_client
from api.db.models import UserModel
from api.db.workflow_template_client import WorkflowTemplateClient
from api.enums import CallType, StorageBackend
from api.enums import CallType, PostHogEvent, StorageBackend
from api.schemas.workflow import WorkflowRunResponseSchema
from api.services.auth.depends import get_user
from api.services.configuration.check_validity import UserConfigurationValidator
@ -23,6 +23,7 @@ from api.services.configuration.masking import (
)
from api.services.configuration.resolve import resolve_effective_config
from api.services.mps_service_key_client import mps_service_key_client
from api.services.posthog_client import capture_event
from api.services.storage import storage_fs
from api.services.workflow.dto import ReactFlowDTO
from api.services.workflow.duplicate import duplicate_workflow
@ -287,6 +288,17 @@ async def create_workflow(
user.selected_organization_id,
)
capture_event(
distinct_id=str(user.provider_id),
event=PostHogEvent.WORKFLOW_CREATED,
properties={
"workflow_id": workflow.id,
"workflow_name": workflow.name,
"source": "direct",
"organization_id": user.selected_organization_id,
},
)
# Sync agent triggers if workflow definition contains any
if request.workflow_definition:
trigger_paths = extract_trigger_paths(request.workflow_definition)
@ -365,6 +377,19 @@ async def create_workflow_from_template(
organization_id=user.selected_organization_id,
)
capture_event(
distinct_id=str(user.provider_id),
event=PostHogEvent.WORKFLOW_CREATED,
properties={
"workflow_id": workflow.id,
"workflow_name": workflow.name,
"source": "template",
"call_type": request.call_type,
"use_case": request.use_case,
"organization_id": user.selected_organization_id,
},
)
# Sync agent triggers if workflow definition contains any
if workflow_def:
trigger_paths = extract_trigger_paths(workflow_def)
@ -569,6 +594,16 @@ async def publish_workflow(
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
capture_event(
distinct_id=str(user.provider_id),
event=PostHogEvent.WORKFLOW_PUBLISHED,
properties={
"workflow_id": workflow_id,
"version_number": published.version_number,
"organization_id": user.selected_organization_id,
},
)
return {
"id": published.id,
"version_number": published.version_number,
@ -787,6 +822,18 @@ async def duplicate_workflow_endpoint(
organization_id=user.selected_organization_id,
user_id=user.id,
)
capture_event(
distinct_id=str(user.provider_id),
event=PostHogEvent.WORKFLOW_DUPLICATED,
properties={
"workflow_id": workflow.id,
"workflow_name": workflow.name,
"source_workflow_id": workflow_id,
"organization_id": user.selected_organization_id,
},
)
return {
"id": workflow.id,
"name": workflow.name,

View file

@ -9,7 +9,9 @@ from pydantic import BaseModel
from api.constants import BACKEND_API_ENDPOINT, ENVIRONMENT, UI_APP_URL
from api.db import db_client
from api.db.models import EmbedTokenModel, UserModel
from api.enums import PostHogEvent
from api.services.auth.depends import get_user
from api.services.posthog_client import capture_event
router = APIRouter(prefix="/workflow")
@ -103,6 +105,17 @@ async def create_or_update_embed_token(
expires_at=expires_at,
)
capture_event(
distinct_id=str(user.provider_id),
event=PostHogEvent.AGENT_EMBEDDED,
properties={
"workflow_id": workflow_id,
"is_new_token": len(existing_tokens) == 0,
"has_domain_restriction": bool(embed_request.allowed_domains),
"organization_id": user.selected_organization_id,
},
)
# Generate embed script
embed_script = generate_embed_script(token)

View file

@ -24,6 +24,7 @@ class UserResponse(BaseModel):
email: str | None
name: str | None = None
organization_id: int | None = None
provider_id: str | None = None
class AuthResponse(BaseModel):

View file

@ -3,7 +3,7 @@ import asyncio
from loguru import logger
from api.db import db_client
from api.enums import WorkflowRunState
from api.enums import PostHogEvent, WorkflowRunState
from api.services.campaign.circuit_breaker import circuit_breaker
from api.services.pipecat.audio_config import AudioConfig
from api.services.pipecat.audio_playback import play_audio, play_audio_loop
@ -13,6 +13,7 @@ from api.services.pipecat.in_memory_buffers import (
)
from api.services.pipecat.pipeline_metrics_aggregator import PipelineMetricsAggregator
from api.services.pipecat.tracing_config import get_trace_url
from api.services.posthog_client import capture_event
from api.services.workflow.pipecat_engine import PipecatEngine
from api.tasks.arq import enqueue_job
from api.tasks.function_names import FunctionNames
@ -26,6 +27,37 @@ from pipecat.processors.audio.audio_buffer_processor import AudioBufferProcessor
from pipecat.utils.enums import EndTaskReason
async def _capture_call_event(
workflow_run_id: int,
user_provider_id: str | None,
event: str,
extra_properties: dict | None = None,
) -> None:
"""Look up workflow_run for call metadata and fire a PostHog event.
Meant to be run via asyncio.create_task() so it never blocks the pipeline."""
try:
workflow_run = await db_client.get_workflow_run_by_id(workflow_run_id)
properties = {
"workflow_run_id": workflow_run_id,
"workflow_id": workflow_run.workflow_id if workflow_run else None,
"call_type": workflow_run.mode if workflow_run else None,
"call_direction": (workflow_run.initial_context or {}).get(
"direction", "outbound"
)
if workflow_run
else None,
}
if extra_properties:
properties.update(extra_properties)
capture_event(
distinct_id=user_provider_id,
event=event,
properties=properties,
)
except Exception:
logger.exception(f"Background PostHog capture failed for '{event}'")
def register_event_handlers(
task: PipelineTask,
transport,
@ -37,6 +69,7 @@ def register_event_handlers(
audio_config=AudioConfig,
pre_call_fetch_task: asyncio.Task | None = None,
fetch_recording_audio=None,
user_provider_id: str | None = None,
):
"""Register all event handlers for transport and task events.
@ -80,6 +113,12 @@ def register_event_handlers(
):
ready_state["initial_response_triggered"] = True
asyncio.create_task(
_capture_call_event(
workflow_run_id, user_provider_id, PostHogEvent.CALL_STARTED
)
)
# Wait for pre-call fetch if in progress, playing ringer meanwhile
if pre_call_fetch_task is not None:
if not pre_call_fetch_task.done():
@ -193,6 +232,14 @@ def register_event_handlers(
await circuit_breaker.record_and_evaluate(
campaign_id=workflow_run.campaign_id, is_failure=True
)
asyncio.create_task(
_capture_call_event(
workflow_run_id,
user_provider_id,
PostHogEvent.CALL_FAILED,
extra_properties={"error_reason": "pipeline_error"},
)
)
except Exception as e:
logger.error(f"Error recording circuit breaker failure: {e}", exc_info=True)
@ -301,6 +348,12 @@ def register_event_handlers(
state=WorkflowRunState.COMPLETED.value,
)
asyncio.create_task(
_capture_call_event(
workflow_run_id, user_provider_id, PostHogEvent.CALL_COMPLETED
)
)
# Save real-time feedback logs to workflow run
if not in_memory_logs_buffer.is_empty:
try:

View file

@ -483,6 +483,7 @@ async def run_pipeline_smallwebrtc(
workflow_run_id: int,
user_id: int,
call_context_vars: dict = {},
user_provider_id: str | None = None,
) -> None:
"""Run pipeline for WebRTC connections"""
logger.debug(
@ -524,6 +525,7 @@ async def run_pipeline_smallwebrtc(
user_id,
call_context_vars=call_context_vars,
audio_config=audio_config,
user_provider_id=user_provider_id,
)
@ -534,6 +536,7 @@ async def _run_pipeline(
user_id: int,
call_context_vars: dict = {},
audio_config: AudioConfig = None,
user_provider_id: str | None = None,
) -> None:
"""
Run the pipeline with the given transport and configuration
@ -964,7 +967,10 @@ async def _run_pipeline(
in_memory_logs_buffer, user_context_aggregator, assistant_context_aggregator
)
# Register event handlers
# Register event handlers — resolve provider_id for PostHog tracking
if not user_provider_id:
user_obj = await db_client.get_user_by_id(user_id)
user_provider_id = str(user_obj.provider_id) if user_obj else None
in_memory_audio_buffer = register_event_handlers(
task,
transport,
@ -976,6 +982,7 @@ async def _run_pipeline(
audio_config=audio_config,
pre_call_fetch_task=pre_call_fetch_task,
fetch_recording_audio=fetch_audio,
user_provider_id=user_provider_id,
)
register_audio_data_handler(audio_buffer, workflow_run_id, in_memory_audio_buffer)

View file

@ -0,0 +1,31 @@
from loguru import logger
from posthog import Posthog
from api.constants import POSTHOG_API_KEY, POSTHOG_HOST
_posthog_client: Posthog | None = None
def get_posthog() -> Posthog | None:
"""Return the lazily-initialised PostHog client, or None if not configured."""
global _posthog_client
if _posthog_client is None and POSTHOG_API_KEY:
_posthog_client = Posthog(POSTHOG_API_KEY, host=POSTHOG_HOST)
return _posthog_client
def capture_event(
distinct_id: str,
event: str,
properties: dict | None = None,
) -> None:
"""Fire a PostHog event. Silently no-ops if PostHog is not configured."""
client = get_posthog()
if not client:
return
try:
client.capture(
distinct_id=distinct_id, event=event, properties=properties or {}
)
except Exception:
logger.exception(f"Failed to send PostHog event '{event}'")