feat: add posthog events (#231)

* feat: add posthog events

* fix: workflow_duplicated event

* chore: add events to enum
This commit is contained in:
Sabiha Khan 2026-04-10 17:52:21 +05:30 committed by GitHub
parent bb5f56bfb7
commit 3f19a16e7f
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
24 changed files with 450 additions and 93 deletions

View file

@ -3,7 +3,7 @@ import asyncio
from loguru import logger
from api.db import db_client
from api.enums import WorkflowRunState
from api.enums import PostHogEvent, WorkflowRunState
from api.services.campaign.circuit_breaker import circuit_breaker
from api.services.pipecat.audio_config import AudioConfig
from api.services.pipecat.in_memory_buffers import (
@ -12,6 +12,7 @@ from api.services.pipecat.in_memory_buffers import (
)
from api.services.pipecat.pipeline_metrics_aggregator import PipelineMetricsAggregator
from api.services.pipecat.tracing_config import get_trace_url
from api.services.posthog_client import capture_event
from api.services.workflow.pipecat_engine import PipecatEngine
from api.tasks.arq import enqueue_job
from api.tasks.function_names import FunctionNames
@ -22,6 +23,37 @@ from pipecat.processors.audio.audio_buffer_processor import AudioBufferProcessor
from pipecat.utils.enums import EndTaskReason
async def _capture_call_event(
workflow_run_id: int,
user_provider_id: str | None,
event: str,
extra_properties: dict | None = None,
) -> None:
"""Look up workflow_run for call metadata and fire a PostHog event.
Meant to be run via asyncio.create_task() so it never blocks the pipeline."""
try:
workflow_run = await db_client.get_workflow_run_by_id(workflow_run_id)
properties = {
"workflow_run_id": workflow_run_id,
"workflow_id": workflow_run.workflow_id if workflow_run else None,
"call_type": workflow_run.mode if workflow_run else None,
"call_direction": (workflow_run.initial_context or {}).get(
"direction", "outbound"
)
if workflow_run
else None,
}
if extra_properties:
properties.update(extra_properties)
capture_event(
distinct_id=user_provider_id,
event=event,
properties=properties,
)
except Exception:
logger.exception(f"Background PostHog capture failed for '{event}'")
def register_event_handlers(
task: PipelineTask,
transport,
@ -32,6 +64,7 @@ def register_event_handlers(
pipeline_metrics_aggregator: PipelineMetricsAggregator,
audio_config=AudioConfig,
pre_call_fetch_task: asyncio.Task | None = None,
user_provider_id: str | None = None,
):
"""Register all event handlers for transport and task events.
@ -75,6 +108,12 @@ def register_event_handlers(
):
ready_state["initial_response_triggered"] = True
asyncio.create_task(
_capture_call_event(
workflow_run_id, user_provider_id, PostHogEvent.CALL_STARTED
)
)
# Wait for pre-call fetch if in progress, playing ringer meanwhile
if pre_call_fetch_task is not None:
if not pre_call_fetch_task.done():
@ -161,6 +200,14 @@ def register_event_handlers(
await circuit_breaker.record_and_evaluate(
campaign_id=workflow_run.campaign_id, is_failure=True
)
asyncio.create_task(
_capture_call_event(
workflow_run_id,
user_provider_id,
PostHogEvent.CALL_FAILED,
extra_properties={"error_reason": "pipeline_error"},
)
)
except Exception as e:
logger.error(f"Error recording circuit breaker failure: {e}", exc_info=True)
@ -269,6 +316,12 @@ def register_event_handlers(
state=WorkflowRunState.COMPLETED.value,
)
asyncio.create_task(
_capture_call_event(
workflow_run_id, user_provider_id, PostHogEvent.CALL_COMPLETED
)
)
# Save real-time feedback logs to workflow run
if not in_memory_logs_buffer.is_empty:
try:

View file

@ -483,6 +483,7 @@ async def run_pipeline_smallwebrtc(
workflow_run_id: int,
user_id: int,
call_context_vars: dict = {},
user_provider_id: str | None = None,
) -> None:
"""Run pipeline for WebRTC connections"""
logger.debug(
@ -524,6 +525,7 @@ async def run_pipeline_smallwebrtc(
user_id,
call_context_vars=call_context_vars,
audio_config=audio_config,
user_provider_id=user_provider_id,
)
@ -534,6 +536,7 @@ async def _run_pipeline(
user_id: int,
call_context_vars: dict = {},
audio_config: AudioConfig = None,
user_provider_id: str | None = None,
) -> None:
"""
Run the pipeline with the given transport and configuration
@ -962,7 +965,10 @@ async def _run_pipeline(
in_memory_logs_buffer, user_context_aggregator, assistant_context_aggregator
)
# Register event handlers
# Register event handlers — resolve provider_id for PostHog tracking
if not user_provider_id:
user_obj = await db_client.get_user_by_id(user_id)
user_provider_id = str(user_obj.provider_id) if user_obj else None
in_memory_audio_buffer = register_event_handlers(
task,
transport,
@ -973,6 +979,7 @@ async def _run_pipeline(
pipeline_metrics_aggregator=pipeline_metrics_aggregator,
audio_config=audio_config,
pre_call_fetch_task=pre_call_fetch_task,
user_provider_id=user_provider_id,
)
register_audio_data_handler(audio_buffer, workflow_run_id, in_memory_audio_buffer)

View file

@ -0,0 +1,31 @@
from loguru import logger
from posthog import Posthog
from api.constants import POSTHOG_API_KEY, POSTHOG_HOST
_posthog_client: Posthog | None = None
def get_posthog() -> Posthog | None:
"""Return the lazily-initialised PostHog client, or None if not configured."""
global _posthog_client
if _posthog_client is None and POSTHOG_API_KEY:
_posthog_client = Posthog(POSTHOG_API_KEY, host=POSTHOG_HOST)
return _posthog_client
def capture_event(
distinct_id: str,
event: str,
properties: dict | None = None,
) -> None:
"""Fire a PostHog event. Silently no-ops if PostHog is not configured."""
client = get_posthog()
if not client:
return
try:
client.capture(
distinct_id=distinct_id, event=event, properties=properties or {}
)
except Exception:
logger.exception(f"Failed to send PostHog event '{event}'")