diff --git a/api/Dockerfile b/api/Dockerfile index 1e1d27e..d8afae0 100644 --- a/api/Dockerfile +++ b/api/Dockerfile @@ -13,6 +13,11 @@ RUN apt-get update && apt-get install -y \ # Copy and install requirements COPY api/requirements.txt . +# Install CPU-only PyTorch FIRST to prevent CUDA/NVIDIA dependencies +# This satisfies torch dependency before other packages try to pull GPU version +RUN pip install --user --no-cache-dir torch --index-url https://download.pytorch.org/whl/cpu && \ + rm -rf /root/.cache/pip + # Install dependencies to user directory for easy copying RUN pip install --user --no-cache-dir -r requirements.txt && \ # Clean up pip cache after installation @@ -58,8 +63,13 @@ COPY ./scripts/start_services.sh ./scripts/start_services.sh ENV PYTHONPATH=/app +# Disable file logging in Docker - logs go to stdout for docker logs +ENV LOG_TO_FILE=false +# Keep container alive by waiting for background processes +ENV WAIT_FOR_PROCESSES=true + # Expose the port FastAPI will run on EXPOSE 8000 # Run the FastAPI app with uvicorn -CMD ["bash", "-c", "./scripts/start_services.sh && tail -f ./logs/latest/*.log"] \ No newline at end of file +CMD ["./scripts/start_services.sh"] \ No newline at end of file diff --git a/api/requirements.txt b/api/requirements.txt index 9b49c1d..2aca2b6 100644 --- a/api/requirements.txt +++ b/api/requirements.txt @@ -14,5 +14,4 @@ sentry-sdk[fastapi]==2.38.0 sqlalchemy[asyncio]==2.0.43 msgpack==1.1.2 docling[rapidocr]==2.68.0 -sentence-transformers==5.2.0 pgvector==0.4.2 diff --git a/api/routes/knowledge_base.py b/api/routes/knowledge_base.py index 4553efd..fbe4381 100644 --- a/api/routes/knowledge_base.py +++ b/api/routes/knowledge_base.py @@ -103,9 +103,8 @@ async def process_document( The document status will be updated from 'pending' -> 'processing' -> 'completed' or 'failed'. - Embedding Services: - * openai (default): High-quality 1536-dimensional embeddings (requires OPENAI_API_KEY) - * sentence_transformer: Free, offline-capable, 384-dimensional embeddings + Embedding: + Uses OpenAI text-embedding-3-small (1536-dimensional embeddings, requires API key configured in Model Configurations). Access Control: * Users can only process documents in their organization. @@ -134,12 +133,11 @@ async def process_document( request.s3_key, user.selected_organization_id, 128, # max_tokens (default) - request.embedding_service, ) logger.info( f"Created document {request.document_uuid} (id={document.id}) and enqueued processing " - f"with {request.embedding_service} embeddings, org {user.selected_organization_id}" + f"with OpenAI embeddings, org {user.selected_organization_id}" ) return DocumentResponseSchema( diff --git a/api/schemas/knowledge_base.py b/api/schemas/knowledge_base.py index 0cb1a72..363c7d0 100644 --- a/api/schemas/knowledge_base.py +++ b/api/schemas/knowledge_base.py @@ -1,7 +1,7 @@ """Pydantic schemas for knowledge base operations.""" from datetime import datetime -from typing import Any, Dict, List, Literal, Optional +from typing import Any, Dict, List, Optional from pydantic import BaseModel, Field @@ -29,11 +29,6 @@ class ProcessDocumentRequestSchema(BaseModel): document_uuid: str = Field(..., description="Document UUID to process") s3_key: str = Field(..., description="S3 key of the uploaded file") - embedding_service: Literal["sentence_transformer", "openai"] = Field( - default="openai", - description="Embedding service to use for processing. " - "Options: 'openai' (default, 1536-dim, requires API key) or 'sentence_transformer' (free, 384-dim)", - ) class DocumentResponseSchema(BaseModel): diff --git a/api/services/gen_ai/__init__.py b/api/services/gen_ai/__init__.py index 9a25a6f..4d5b8fe 100644 --- a/api/services/gen_ai/__init__.py +++ b/api/services/gen_ai/__init__.py @@ -4,14 +4,12 @@ from .embedding import ( BaseEmbeddingService, EmbeddingAPIKeyNotConfiguredError, OpenAIEmbeddingService, - SentenceTransformerEmbeddingService, ) from .json_parser import parse_llm_json __all__ = [ "BaseEmbeddingService", "EmbeddingAPIKeyNotConfiguredError", - "SentenceTransformerEmbeddingService", "OpenAIEmbeddingService", "parse_llm_json", ] diff --git a/api/services/gen_ai/embedding/__init__.py b/api/services/gen_ai/embedding/__init__.py index c47c66a..f6a4f18 100644 --- a/api/services/gen_ai/embedding/__init__.py +++ b/api/services/gen_ai/embedding/__init__.py @@ -2,11 +2,9 @@ from .base import BaseEmbeddingService from .openai_service import EmbeddingAPIKeyNotConfiguredError, OpenAIEmbeddingService -from .sentence_transformer_service import SentenceTransformerEmbeddingService __all__ = [ "BaseEmbeddingService", "EmbeddingAPIKeyNotConfiguredError", - "SentenceTransformerEmbeddingService", "OpenAIEmbeddingService", ] diff --git a/api/services/gen_ai/embedding/sentence_transformer_service.py b/api/services/gen_ai/embedding/sentence_transformer_service.py deleted file mode 100644 index f2773a2..0000000 --- a/api/services/gen_ai/embedding/sentence_transformer_service.py +++ /dev/null @@ -1,350 +0,0 @@ -"""Sentence Transformer embedding service. - -This module provides document processing capabilities using: -- Sentence-transformers for embeddings (all-MiniLM-L6-v2) -- Docling for document conversion and chunking -- pgvector for vector similarity search - -Setup for offline usage: -1. First run: Downloads and caches models to ~/.cache/sentence_transformers -2. Subsequent runs: Uses cached models (no internet needed) -3. For fully offline mode: Set TRANSFORMERS_OFFLINE=1 and HF_HUB_OFFLINE=1 -""" - -import os -from pathlib import Path -from typing import Any, Dict, List, Optional - -from docling.chunking import HybridChunker -from docling.document_converter import DocumentConverter -from docling_core.transforms.chunker.tokenizer.huggingface import HuggingFaceTokenizer -from loguru import logger -from sentence_transformers import SentenceTransformer -from transformers import AutoTokenizer - -from api.db.db_client import DBClient -from api.db.models import KnowledgeBaseChunkModel - -from .base import BaseEmbeddingService - -# Set environment variables for model caching -os.environ.setdefault("TRANSFORMERS_OFFLINE", "0") -os.environ.setdefault("HF_HUB_OFFLINE", "0") -os.environ.setdefault( - "SENTENCE_TRANSFORMERS_HOME", os.path.expanduser("~/.cache/sentence_transformers") -) - -# Model configuration -DEFAULT_MODEL_ID = "sentence-transformers/all-MiniLM-L6-v2" -EMBEDDING_DIMENSION = 384 # Dimension for all-MiniLM-L6-v2 - - -class SentenceTransformerEmbeddingService(BaseEmbeddingService): - """Embedding service using Sentence Transformers.""" - - def __init__( - self, - db_client: DBClient, - model_id: str = DEFAULT_MODEL_ID, - max_tokens: int = 512, - ): - """Initialize the Sentence Transformer embedding service. - - Args: - db_client: Database client for storing documents and chunks - model_id: Sentence-transformers model ID (default: all-MiniLM-L6-v2) - max_tokens: Maximum number of tokens per chunk (default: 512) - Note: This applies to the contextualized text (with headings/captions) - """ - self.db = db_client - self.model_id = model_id - self.max_tokens = max_tokens - - # Initialize embedding model - logger.info(f"Loading embedding model: {model_id}") - try: - # Try to load from cache first (local_files_only=True) - self.embedding_model = SentenceTransformer( - model_id, - cache_folder=os.environ.get("SENTENCE_TRANSFORMERS_HOME"), - local_files_only=True, - ) - logger.info("Loaded model from cache") - except Exception as e: - logger.warning(f"Model not in cache, downloading: {e}") - # If not in cache, download it (this will cache it for next time) - self.embedding_model = SentenceTransformer( - model_id, - cache_folder=os.environ.get("SENTENCE_TRANSFORMERS_HOME"), - ) - logger.info("Model downloaded and cached") - - # Initialize tokenizer for chunking with max_tokens - logger.info(f"Loading tokenizer: {model_id} with max_tokens={max_tokens}") - try: - # Try to load from cache first - self.tokenizer = HuggingFaceTokenizer( - tokenizer=AutoTokenizer.from_pretrained( - model_id, - local_files_only=True, - ), - max_tokens=max_tokens, - ) - logger.info("Loaded tokenizer from cache") - except Exception as e: - logger.warning(f"Tokenizer not in cache, downloading: {e}") - # If not in cache, download it - self.tokenizer = HuggingFaceTokenizer( - tokenizer=AutoTokenizer.from_pretrained(model_id), - max_tokens=max_tokens, - ) - logger.info("Tokenizer downloaded and cached") - - # Initialize chunker - logger.info(f"Initializing HybridChunker with max_tokens={max_tokens}") - self.chunker = HybridChunker(tokenizer=self.tokenizer) - - # Initialize document converter - self.converter = DocumentConverter() - - def get_model_id(self) -> str: - """Return the model identifier.""" - return self.model_id - - def get_embedding_dimension(self) -> int: - """Return the embedding dimension.""" - return EMBEDDING_DIMENSION - - async def embed_texts(self, texts: List[str]) -> List[List[float]]: - """Embed a batch of texts. - - Args: - texts: List of text strings to embed - - Returns: - List of embedding vectors (each vector is a list of floats) - """ - embeddings = self.embedding_model.encode( - texts, - show_progress_bar=False, - convert_to_numpy=True, - ) - return [embedding.tolist() for embedding in embeddings] - - async def embed_query(self, query: str) -> List[float]: - """Embed a single query text. - - Args: - query: Query text to embed - - Returns: - Embedding vector as list of floats - """ - embedding = self.embedding_model.encode([query])[0] - return embedding.tolist() - - async def search_similar_chunks( - self, - query: str, - organization_id: int, - limit: int = 5, - document_uuids: Optional[List[str]] = None, - ) -> List[Dict[str, Any]]: - """Search for similar chunks using vector similarity. - - Returns top-k most similar chunks without any threshold filtering. - Apply similarity thresholds and reranking at the application layer. - - Args: - query: Search query text - organization_id: Organization ID for scoping - limit: Maximum number of results to return - document_uuids: Optional list of document UUIDs to filter by - - Returns: - List of dictionaries with chunk data and similarity scores - """ - # Generate query embedding - query_embedding = await self.embed_query(query) - - # Perform vector similarity search - results = await self.db.search_similar_chunks( - query_embedding=query_embedding, - organization_id=organization_id, - limit=limit, - document_uuids=document_uuids, - embedding_model=self.model_id, - ) - - return results - - async def process_document( - self, - file_path: str, - organization_id: int, - created_by: int, - custom_metadata: dict = None, - ): - """Process a document: convert, chunk, embed, and store in database. - - Args: - file_path: Path to the document file - organization_id: Organization ID for scoping - created_by: User ID who uploaded the document - custom_metadata: Optional custom metadata dictionary - - Returns: - The created document record - """ - try: - # Extract file metadata - filename = Path(file_path).name - file_hash = self.db.compute_file_hash(file_path) - file_size = os.path.getsize(file_path) - mime_type = self.db.get_mime_type(file_path) - - # Check if document already exists - existing_doc = await self.db.get_document_by_hash( - file_hash, organization_id - ) - if existing_doc: - logger.info(f"Document already exists: {filename} (hash: {file_hash})") - return existing_doc - - # Create document record - doc_record = await self.db.create_document( - organization_id=organization_id, - created_by=created_by, - filename=filename, - file_size_bytes=file_size, - file_hash=file_hash, - mime_type=mime_type, - custom_metadata=custom_metadata or {}, - ) - - logger.info(f"Processing document: {filename}") - - # Update status to processing - await self.db.update_document_status(doc_record.id, "processing") - - # Step 1: Convert document using docling - logger.info("Converting document with docling...") - conversion_result = self.converter.convert(file_path) - doc = conversion_result.document - - # Store docling metadata - docling_metadata = { - "num_pages": len(doc.pages) if hasattr(doc, "pages") else None, - "document_type": type(doc).__name__, - } - - # Step 2: Chunk the document - logger.info(f"Chunking document with max_tokens={self.max_tokens}...") - chunks = list(self.chunker.chunk(dl_doc=doc)) - total_chunks = len(chunks) - - logger.info(f"Generated {total_chunks} chunks") - - # Step 3: Process each chunk - chunk_texts = [] - chunk_records = [] - token_counts = [] - - for i, chunk in enumerate(chunks): - # Get chunk text - chunk_text = chunk.text - - # Get contextualized text (enriched with surrounding context) - contextualized_text = self.chunker.contextualize(chunk=chunk) - - # Calculate actual token count using the tokenizer - text_to_tokenize = ( - contextualized_text if contextualized_text else chunk_text - ) - token_count = len( - self.tokenizer.tokenizer.encode( - text_to_tokenize, add_special_tokens=False - ) - ) - token_counts.append(token_count) - - # Prepare chunk metadata - chunk_metadata = {} - if hasattr(chunk, "meta") and chunk.meta: - chunk_metadata = { - "doc_items": ( - [str(item) for item in chunk.meta.doc_items] - if hasattr(chunk.meta, "doc_items") - else [] - ), - "headings": ( - chunk.meta.headings - if hasattr(chunk.meta, "headings") - else [] - ), - } - - # Create chunk record (without embedding yet) - chunk_record = KnowledgeBaseChunkModel( - document_id=doc_record.id, - organization_id=organization_id, - chunk_text=chunk_text, - contextualized_text=contextualized_text, - chunk_index=i, - chunk_metadata=chunk_metadata, - embedding_model=self.model_id, - embedding_dimension=EMBEDDING_DIMENSION, - token_count=token_count, - ) - - chunk_records.append(chunk_record) - # Use contextualized text for embedding if available - chunk_texts.append(text_to_tokenize) - - # Log chunk statistics - if token_counts: - avg_tokens = sum(token_counts) / len(token_counts) - min_tokens = min(token_counts) - max_tokens = max(token_counts) - logger.info("Chunk token statistics:") - logger.info(f" - Average: {avg_tokens:.1f} tokens") - logger.info(f" - Min: {min_tokens} tokens") - logger.info(f" - Max: {max_tokens} tokens") - - # Step 4: Generate embeddings in batch - logger.info("Generating embeddings...") - embeddings = await self.embed_texts(chunk_texts) - - # Step 5: Attach embeddings to chunk records - for chunk_record, embedding in zip(chunk_records, embeddings): - chunk_record.embedding = embedding - - # Step 6: Save all chunks in batch - logger.info("Storing chunks in database...") - await self.db.create_chunks_batch(chunk_records) - - # Update document status to completed - await self.db.update_document_status( - doc_record.id, - "completed", - total_chunks=total_chunks, - docling_metadata=docling_metadata, - ) - - logger.info(f"Successfully processed document: {filename}") - logger.info(f" - Total chunks: {total_chunks}") - logger.info(f" - Document ID: {doc_record.id}") - logger.info(f" - Document UUID: {doc_record.document_uuid}") - - return doc_record - - except Exception as e: - logger.error(f"Error processing document: {e}") - - # Update document status to failed if it exists - if "doc_record" in locals(): - await self.db.update_document_status( - doc_record.id, "failed", error_message=str(e) - ) - - raise diff --git a/api/services/pipecat/run_pipeline.py b/api/services/pipecat/run_pipeline.py index fce8cea..00744f0 100644 --- a/api/services/pipecat/run_pipeline.py +++ b/api/services/pipecat/run_pipeline.py @@ -44,6 +44,8 @@ from api.services.telephony.stasis_rtp_connection import StasisRTPConnection from api.services.workflow.dto import ReactFlowDTO from api.services.workflow.pipecat_engine import PipecatEngine from api.services.workflow.workflow import WorkflowGraph +from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams +from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3 from pipecat.extensions.voicemail.voicemail_detector import VoicemailDetector from pipecat.pipeline.base_task import PipelineTaskParams from pipecat.processors.aggregators.llm_response_universal import ( @@ -66,6 +68,7 @@ from pipecat.turns.user_start.vad_user_turn_start_strategy import ( from pipecat.turns.user_stop import ( ExternalUserTurnStopStrategy, TranscriptionUserTurnStopStrategy, + TurnAnalyzerUserTurnStopStrategy, ) from pipecat.turns.user_turn_strategies import UserTurnStrategies from pipecat.utils.context import set_current_run_id @@ -452,6 +455,8 @@ async def _run_pipeline( # Extract configurations from workflow configurations max_call_duration_seconds = 300 # Default 5 minutes max_user_idle_timeout = 10.0 # Default 10 seconds + smart_turn_stop_secs = 2.0 # Default 2 seconds for incomplete turn timeout + turn_stop_strategy = "transcription" # Default to transcription-based detection keyterms = None # Dictionary words for STT boosting if workflow.workflow_configurations: @@ -467,6 +472,16 @@ async def _run_pipeline( "max_user_idle_timeout" ] + # Use workflow-specific smart turn stop timeout if provided + if "smart_turn_stop_secs" in workflow.workflow_configurations: + smart_turn_stop_secs = workflow.workflow_configurations[ + "smart_turn_stop_secs" + ] + + # Use workflow-specific turn stop strategy if provided + if "turn_stop_strategy" in workflow.workflow_configurations: + turn_stop_strategy = workflow.workflow_configurations["turn_stop_strategy"] + # Extract dictionary words and convert to keyterms list if "dictionary" in workflow.workflow_configurations: dictionary = workflow.workflow_configurations["dictionary"] @@ -550,9 +565,9 @@ async def _run_pipeline( correct_aggregation_callback=engine.create_aggregation_correction_callback(), ) - # Configure turn strategies based on STT provider and model + # Configure turn strategies based on STT provider, model, and workflow configuration # Deepgram Flux uses external turn detection (VAD + External start/stop) - # Other models use transcription-based turn detection with smart turn analyzer + # Other models use configurable turn detection strategy is_deepgram_flux = ( user_config.stt.provider == ServiceProviders.DEEPGRAM.value and user_config.stt.model == "flux-general-en" @@ -563,7 +578,19 @@ async def _run_pipeline( start=[VADUserTurnStartStrategy(), TranscriptionUserTurnStartStrategy()], stop=[ExternalUserTurnStopStrategy()], ) + elif turn_stop_strategy == "turn_analyzer": + # Smart Turn Analyzer: best for longer responses with natural pauses + smart_turn_params = SmartTurnParams(stop_secs=smart_turn_stop_secs) + user_turn_strategies = UserTurnStrategies( + start=[VADUserTurnStartStrategy(), TranscriptionUserTurnStartStrategy()], + stop=[ + TurnAnalyzerUserTurnStopStrategy( + turn_analyzer=LocalSmartTurnAnalyzerV3(params=smart_turn_params) + ) + ], + ) else: + # Transcription-based (default): best for short 1-2 word responses user_turn_strategies = UserTurnStrategies( start=[VADUserTurnStartStrategy(), TranscriptionUserTurnStartStrategy()], stop=[TranscriptionUserTurnStopStrategy()], diff --git a/api/tasks/knowledge_base_processing.py b/api/tasks/knowledge_base_processing.py index 7bef401..e1a4cea 100644 --- a/api/tasks/knowledge_base_processing.py +++ b/api/tasks/knowledge_base_processing.py @@ -2,7 +2,6 @@ import os import tempfile -from typing import Literal from docling.chunking import HybridChunker from docling.document_converter import DocumentConverter @@ -12,13 +11,10 @@ from transformers import AutoTokenizer from api.db import db_client from api.db.models import KnowledgeBaseChunkModel -from api.services.gen_ai import ( - OpenAIEmbeddingService, - SentenceTransformerEmbeddingService, -) +from api.services.gen_ai import OpenAIEmbeddingService from api.services.storage import storage_fs -# For tokenization/chunking - use SentenceTransformer tokenizer as baseline +# For tokenization/chunking TOKENIZER_MODEL = "sentence-transformers/all-MiniLM-L6-v2" @@ -28,7 +24,6 @@ async def process_knowledge_base_document( s3_key: str, organization_id: int, max_tokens: int = 128, - embedding_service: Literal["sentence_transformer", "openai"] = "openai", ): """Process a knowledge base document: download, chunk, embed, and store. @@ -38,9 +33,6 @@ async def process_knowledge_base_document( s3_key: S3 key where the file is stored organization_id: Organization ID max_tokens: Maximum number of tokens per chunk (default: 128) - embedding_service: Embedding service to use (default: "openai") - - "openai": Use OpenAI text-embedding-3-small (1536-dim, requires API key) - - "sentence_transformer": Use SentenceTransformer (all-MiniLM-L6-v2, 384-dim, free) """ logger.info( f"Starting knowledge base document processing for document_id={document_id}, " @@ -125,56 +117,38 @@ async def process_knowledge_base_document( mime_type=mime_type, ) - # Initialize the embedding service based on the parameter - if embedding_service == "openai": - logger.info( - f"Initializing OpenAI embedding service with max_tokens={max_tokens}" - ) - # Try to get user's embeddings configuration - embeddings_api_key = None - embeddings_model = None - if document.created_by: - user_config = await db_client.get_user_configurations( - document.created_by - ) - if user_config.embeddings: - embeddings_api_key = user_config.embeddings.api_key - embeddings_model = user_config.embeddings.model - logger.info( - f"Using user embeddings config: model={embeddings_model}" - ) + # Initialize the OpenAI embedding service + logger.info( + f"Initializing OpenAI embedding service with max_tokens={max_tokens}" + ) + # Try to get user's embeddings configuration + embeddings_api_key = None + embeddings_model = None + if document.created_by: + user_config = await db_client.get_user_configurations(document.created_by) + if user_config.embeddings: + embeddings_api_key = user_config.embeddings.api_key + embeddings_model = user_config.embeddings.model + logger.info(f"Using user embeddings config: model={embeddings_model}") - # Check if API key is configured - if not embeddings_api_key: - error_message = ( - "OpenAI API key not configured. Please set your API key in " - "Model Configurations > Embedding to process documents." - ) - logger.warning(f"Document {document_id}: {error_message}") - await db_client.update_document_status( - document_id, "failed", error_message=error_message - ) - return + # Check if API key is configured + if not embeddings_api_key: + error_message = ( + "OpenAI API key not configured. Please set your API key in " + "Model Configurations > Embedding to process documents." + ) + logger.warning(f"Document {document_id}: {error_message}") + await db_client.update_document_status( + document_id, "failed", error_message=error_message + ) + return - service = OpenAIEmbeddingService( - db_client=db_client, - max_tokens=max_tokens, - api_key=embeddings_api_key, - model_id=embeddings_model or "text-embedding-3-small", - ) - elif embedding_service == "sentence_transformer": - logger.info( - f"Initializing SentenceTransformer embedding service with max_tokens={max_tokens}" - ) - service = SentenceTransformerEmbeddingService( - db_client=db_client, - max_tokens=max_tokens, - ) - else: - raise ValueError( - f"Invalid embedding_service: {embedding_service}. " - f"Must be 'sentence_transformer' or 'openai'" - ) + service = OpenAIEmbeddingService( + db_client=db_client, + max_tokens=max_tokens, + api_key=embeddings_api_key, + model_id=embeddings_model or "text-embedding-3-small", + ) # Step 1: Convert document with docling logger.info("Converting document with docling") @@ -265,8 +239,8 @@ async def process_knowledge_base_document( logger.info(f" - Min: {min_tokens} tokens") logger.info(f" - Max: {max_tokens_actual} tokens") - # Step 6: Generate embeddings using the embedding service - logger.info(f"Generating embeddings using {embedding_service}") + # Step 6: Generate embeddings using OpenAI + logger.info(f"Generating embeddings using {service.get_model_id()}") embeddings = await service.embed_texts(chunk_texts) # Step 7: Attach embeddings to chunk records diff --git a/api/tests/test_is_private_ip_candidate.py b/api/tests/test_is_private_ip_candidate.py index 146f4ae..25feb5a 100644 --- a/api/tests/test_is_private_ip_candidate.py +++ b/api/tests/test_is_private_ip_candidate.py @@ -6,37 +6,51 @@ class TestIsPrivateIpCandidate: def test_private_ip_192_168(self): """192.168.x.x addresses are detected as private.""" - candidate = "candidate:123 1 udp 2122260223 192.168.50.24 63603 typ host generation 0" + candidate = ( + "candidate:123 1 udp 2122260223 192.168.50.24 63603 typ host generation 0" + ) assert is_private_ip_candidate(candidate) is True def test_private_ip_10_x(self): """10.x.x.x addresses are detected as private.""" - candidate = "candidate:456 1 udp 2122260223 10.0.0.1 12345 typ host generation 0" + candidate = ( + "candidate:456 1 udp 2122260223 10.0.0.1 12345 typ host generation 0" + ) assert is_private_ip_candidate(candidate) is True def test_private_ip_172_16(self): """172.16.x.x addresses are detected as private.""" - candidate = "candidate:789 1 udp 2122260223 172.16.0.1 54321 typ host generation 0" + candidate = ( + "candidate:789 1 udp 2122260223 172.16.0.1 54321 typ host generation 0" + ) assert is_private_ip_candidate(candidate) is True def test_private_ip_172_31(self): """172.31.x.x addresses are detected as private.""" - candidate = "candidate:101 1 udp 2122260223 172.31.255.255 12345 typ host generation 0" + candidate = ( + "candidate:101 1 udp 2122260223 172.31.255.255 12345 typ host generation 0" + ) assert is_private_ip_candidate(candidate) is True def test_cgnat_ip(self): """CGNAT addresses (100.64.0.0/10) are detected as private.""" - candidate = "candidate:202 1 udp 2122260223 100.64.0.1 12345 typ host generation 0" + candidate = ( + "candidate:202 1 udp 2122260223 100.64.0.1 12345 typ host generation 0" + ) assert is_private_ip_candidate(candidate) is True def test_cgnat_ip_upper_bound(self): """Upper bound of CGNAT range is detected.""" - candidate = "candidate:303 1 udp 2122260223 100.127.255.255 12345 typ host generation 0" + candidate = ( + "candidate:303 1 udp 2122260223 100.127.255.255 12345 typ host generation 0" + ) assert is_private_ip_candidate(candidate) is True def test_public_ip(self): """Public IP addresses return False.""" - candidate = "candidate:404 1 udp 2122260223 142.250.190.46 12345 typ host generation 0" + candidate = ( + "candidate:404 1 udp 2122260223 142.250.190.46 12345 typ host generation 0" + ) assert is_private_ip_candidate(candidate) is False def test_public_ip_8_8_8_8(self): @@ -46,17 +60,23 @@ class TestIsPrivateIpCandidate: def test_non_cgnat_100_range(self): """100.x.x.x outside CGNAT range is public.""" - candidate = "candidate:606 1 udp 2122260223 100.128.0.1 12345 typ host generation 0" + candidate = ( + "candidate:606 1 udp 2122260223 100.128.0.1 12345 typ host generation 0" + ) assert is_private_ip_candidate(candidate) is False def test_172_15_is_public(self): """172.15.x.x is outside private range and should be public.""" - candidate = "candidate:707 1 udp 2122260223 172.15.255.255 12345 typ srflx generation 0" + candidate = ( + "candidate:707 1 udp 2122260223 172.15.255.255 12345 typ srflx generation 0" + ) assert is_private_ip_candidate(candidate) is False def test_172_32_is_public(self): """172.32.x.x is outside private range and should be public.""" - candidate = "candidate:808 1 udp 2122260223 172.32.0.1 12345 typ srflx generation 0" + candidate = ( + "candidate:808 1 udp 2122260223 172.32.0.1 12345 typ srflx generation 0" + ) assert is_private_ip_candidate(candidate) is False def test_srflx_candidate_type(self): @@ -66,17 +86,23 @@ class TestIsPrivateIpCandidate: def test_relay_candidate_type(self): """Relay candidates are parsed correctly.""" - candidate = "candidate:111 1 udp 41885439 1.1.1.1 50000 typ relay raddr 0.0.0.0 rport 0" + candidate = ( + "candidate:111 1 udp 41885439 1.1.1.1 50000 typ relay raddr 0.0.0.0 rport 0" + ) assert is_private_ip_candidate(candidate) is False def test_loopback_ip(self): """Loopback addresses are detected as private.""" - candidate = "candidate:222 1 udp 2122260223 127.0.0.1 12345 typ host generation 0" + candidate = ( + "candidate:222 1 udp 2122260223 127.0.0.1 12345 typ host generation 0" + ) assert is_private_ip_candidate(candidate) is True def test_link_local_ip(self): """Link-local addresses (169.254.x.x) are detected as private.""" - candidate = "candidate:333 1 udp 2122260223 169.254.1.1 12345 typ host generation 0" + candidate = ( + "candidate:333 1 udp 2122260223 169.254.1.1 12345 typ host generation 0" + ) assert is_private_ip_candidate(candidate) is True def test_ipv6_link_local(self): @@ -100,7 +126,9 @@ class TestIsPrivateIpCandidate: def test_malformed_candidate_invalid_ip(self): """Candidate with invalid IP returns False.""" - candidate = "candidate:777 1 udp 2122260223 not.an.ip 12345 typ host generation 0" + candidate = ( + "candidate:777 1 udp 2122260223 not.an.ip 12345 typ host generation 0" + ) assert is_private_ip_candidate(candidate) is False def test_malformed_candidate_short(self): @@ -110,5 +138,7 @@ class TestIsPrivateIpCandidate: def test_tcp_candidate(self): """TCP candidates are parsed correctly.""" - candidate = "candidate:999 1 tcp 1518280447 192.168.1.100 9 typ host tcptype active" + candidate = ( + "candidate:999 1 tcp 1518280447 192.168.1.100 9 typ host tcptype active" + ) assert is_private_ip_candidate(candidate) is True diff --git a/docker-compose.yaml b/docker-compose.yaml index 14ce51c..5fb3c53 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -85,7 +85,7 @@ services: # Replace this environment variable if you are using a custom # domain to host the stack - BACKEND_API_ENDPOINT: "http://localhost:8000" + BACKEND_API_ENDPOINT: "${BACKEND_API_ENDPOINT:-http://localhost:8000}" # Database configuration (using containerized postgres) DATABASE_URL: "postgresql+asyncpg://postgres:postgres@postgres:5432/postgres" @@ -186,7 +186,7 @@ services: - app-network coturn: - image: coturn/coturn:4.6.3 + image: coturn/coturn:4.8.0 container_name: coturn restart: unless-stopped profiles: ["remote"] @@ -196,25 +196,11 @@ services: - "5349:5349/udp" - "5349:5349/tcp" - "49152-49200:49152-49200/udp" - environment: - TURN_SECRET: ${TURN_SECRET} - TURN_HOST: ${TURN_HOST} - command: > - -n - --listening-port=3478 - --tls-listening-port=5349 - --min-port=49152 - --max-port=49200 - --realm=${TURN_REALM:-dograh.com} - --use-auth-secret - --static-auth-secret=${TURN_SECRET} - --fingerprint - --no-cli - --log-file=stdout - --no-multicast-peers - --no-tlsv1 - --no-tlsv1_1 - ${TURN_HOST:+--external-ip=$TURN_HOST} + volumes: + - ./turnserver.conf:/etc/coturn/turnserver.conf:ro + command: + - -c + - /etc/coturn/turnserver.conf networks: - app-network diff --git a/docs/deployment/custom-domain.mdx b/docs/deployment/custom-domain.mdx index e2bedbb..b25dacc 100644 --- a/docs/deployment/custom-domain.mdx +++ b/docs/deployment/custom-domain.mdx @@ -54,7 +54,14 @@ You should see your server's IP address in the response. ## Step 2: Quick Setup (Recommended) -Once your DNS is configured, run the automated setup script that handles the rest: +Once your DNS is configured, run the automated setup script that handles the rest. + + +You must be at the same place where you had run `setup_remote.sh` from. The directory should contain `dograh/` with the artifacts that got created when `setup_remote.sh` was run. + + +You must not move the `dograh/` directory to a different location after running `setup_custom_domain.sh`, since we set up auto certificate renewal script as certbot renewal hook pointing to the `dograh/` directory. + ```bash curl -o setup_custom_domain.sh https://raw.githubusercontent.com/dograh-hq/dograh/main/scripts/setup_custom_domain.sh && chmod +x setup_custom_domain.sh && sudo ./setup_custom_domain.sh diff --git a/docs/deployment/docker.mdx b/docs/deployment/docker.mdx index 2936f0a..abc6c6e 100644 --- a/docs/deployment/docker.mdx +++ b/docs/deployment/docker.mdx @@ -106,6 +106,7 @@ The setup script creates the following files in the `dograh/` directory: | File | Purpose | |------|---------| | `docker-compose.yaml` | Main Docker Compose configuration | +| `turnserver.conf` | Configuration for TURN server | | `nginx.conf` | nginx reverse proxy configuration with your IP | | `generate_certificate.sh` | Script to regenerate SSL certificates | | `certs/local.crt` | Self-signed SSL certificate | diff --git a/pipecat b/pipecat index 3eb7013..f999b70 160000 --- a/pipecat +++ b/pipecat @@ -1 +1 @@ -Subproject commit 3eb70131d4c96e5baf7129fb657efe003893ccc9 +Subproject commit f999b70ffb3ba678aec1996f85a69fe13692d067 diff --git a/scripts/setup_custom_domain.sh b/scripts/setup_custom_domain.sh index 4b6e2b4..90fb386 100755 --- a/scripts/setup_custom_domain.sh +++ b/scripts/setup_custom_domain.sh @@ -245,8 +245,21 @@ server { NGINX_EOF echo -e "${GREEN}✓ nginx.conf updated${NC}" +# Update .env file with domain name +echo -e "${BLUE}[6/8] Updating environment variables...${NC}" +if [[ -f ".env" ]]; then + # Update BACKEND_API_ENDPOINT to use domain + sed -i.bak "s|^BACKEND_API_ENDPOINT=.*|BACKEND_API_ENDPOINT=https://$DOMAIN_NAME|" .env + # Update TURN_HOST to use domain + sed -i.bak "s|^TURN_HOST=.*|TURN_HOST=$DOMAIN_NAME|" .env + rm -f .env.bak + echo -e "${GREEN}✓ .env updated with domain name${NC}" +else + echo -e "${YELLOW}⚠ .env file not found - skipping environment update${NC}" +fi + # Setup auto-renewal -echo -e "${BLUE}[6/7] Setting up automatic certificate renewal...${NC}" +echo -e "${BLUE}[7/8] Setting up automatic certificate renewal...${NC}" DOGRAH_PATH=$(pwd) # Create renewal hook script that copies new certificates and restarts nginx @@ -268,7 +281,7 @@ certbot renew --dry-run --quiet && echo -e "${GREEN}✓ Auto-renewal configured # Start Dograh services echo "" -echo -e "${BLUE}[7/7] Starting Dograh services...${NC}" +echo -e "${BLUE}[8/8] Starting Dograh services...${NC}" docker compose --profile remote up -d --pull always echo "" @@ -287,6 +300,7 @@ echo -e " Auto-renewal: Enabled (certificates renew automatically)" echo "" echo -e "${YELLOW}Files modified:${NC}" echo " - dograh/nginx.conf (updated with domain name)" +echo " - dograh/.env (BACKEND_API_ENDPOINT and TURN_HOST updated)" echo " - dograh/certs/local.crt (SSL certificate)" echo " - dograh/certs/local.key (SSL private key)" echo " - /etc/letsencrypt/renewal-hooks/deploy/dograh-reload.sh (renewal hook)" diff --git a/scripts/setup_remote.sh b/scripts/setup_remote.sh index 355ce0b..d1e71c5 100755 --- a/scripts/setup_remote.sh +++ b/scripts/setup_remote.sh @@ -133,8 +133,44 @@ echo -e "${BLUE}[4/5] Generating SSL certificates...${NC}" ./generate_certificate.sh echo -e "${GREEN}✓ SSL certificates generated${NC}" -echo -e "${BLUE}[5/5] Creating environment file...${NC}" +echo -e "${BLUE}[5/6] Creating TURN server configuration...${NC}" +cat > turnserver.conf << TURN_EOF +# Coturn TURN Server - Docker Configuration +# Auto-generated by setup_remote.sh + +# Listener ports +listening-port=3478 +tls-listening-port=5349 + +# Relay port range +min-port=49152 +max-port=49200 + +# Network - external IP for NAT traversal +external-ip=$SERVER_IP + +# Realm +realm=dograh.com + +# Authentication (TURN REST API with time-limited credentials) +use-auth-secret +static-auth-secret=$TURN_SECRET + +# Security +fingerprint +no-cli +no-multicast-peers + +# Logging +log-file=stdout +TURN_EOF +echo -e "${GREEN}✓ turnserver.conf created${NC}" + +echo -e "${BLUE}[6/6] Creating environment file...${NC}" cat > .env << ENV_EOF +# Backend API endpoint (for remote deployment) +BACKEND_API_ENDPOINT=https://$SERVER_IP + # TURN Server Configuration (time-limited credentials via TURN REST API) TURN_HOST=$SERVER_IP TURN_SECRET=$TURN_SECRET @@ -152,6 +188,7 @@ echo "" echo -e "Files created in ${BLUE}$(pwd)${NC}:" echo " - docker-compose.yaml" echo " - nginx.conf" +echo " - turnserver.conf" echo " - generate_certificate.sh" echo " - certs/local.crt" echo " - certs/local.key" diff --git a/scripts/start_services.sh b/scripts/start_services.sh index a1d9951..aa25731 100755 --- a/scripts/start_services.sh +++ b/scripts/start_services.sh @@ -54,6 +54,8 @@ LATEST_LINK="$BASE_LOG_DIR/latest" # Symlink to latest logs VENV_PATH="$BASE_DIR/venv" ARQ_WORKERS=${ARQ_WORKERS:-1} +LOG_TO_FILE=${LOG_TO_FILE:-true} # Set to false in Docker to use stdout +WAIT_FOR_PROCESSES=${WAIT_FOR_PROCESSES:-false} # Set to true in Docker to keep container alive # Log startup cd "$BASE_DIR" @@ -239,8 +241,13 @@ for i in "${!SERVICE_NAMES[@]}"; do ( cd "$BASE_DIR" - export LOG_FILE_PATH="$LOG_DIR/$name.log" - exec $cmd >>"$LOG_DIR/$name.log" 2>&1 + if [[ "$LOG_TO_FILE" == "true" ]]; then + export LOG_FILE_PATH="$LOG_DIR/$name.log" + exec $cmd >>"$LOG_DIR/$name.log" 2>&1 + else + # Log to stdout/stderr for Docker + exec $cmd + fi ) & pid=$! @@ -276,3 +283,8 @@ echo "Logs: tail -f $LOG_DIR/*.log" echo "Rotated logs: ls $LOG_DIR/*.log.*" echo "To stop: ./scripts/stop_services.sh" echo "──────────────────────────────────────────────────" + +# In Docker mode, wait for all background processes to keep container alive +if [[ "$WAIT_FOR_PROCESSES" == "true" ]]; then + wait +fi diff --git a/ui/src/app/workflow/[workflowId]/components/ConfigurationsDialog.tsx b/ui/src/app/workflow/[workflowId]/components/ConfigurationsDialog.tsx index 68a7fdc..92bd454 100644 --- a/ui/src/app/workflow/[workflowId]/components/ConfigurationsDialog.tsx +++ b/ui/src/app/workflow/[workflowId]/components/ConfigurationsDialog.tsx @@ -4,8 +4,9 @@ import { Button } from "@/components/ui/button"; import { Dialog, DialogContent, DialogFooter, DialogHeader, DialogTitle } from "@/components/ui/dialog"; import { Input } from "@/components/ui/input"; import { Label } from "@/components/ui/label"; +import { Select, SelectContent, SelectItem, SelectTrigger, SelectValue } from "@/components/ui/select"; import { Switch } from "@/components/ui/switch"; -import { AmbientNoiseConfiguration, VADConfiguration, WorkflowConfigurations } from "@/types/workflow-configurations"; +import { AmbientNoiseConfiguration, TurnStopStrategy, WorkflowConfigurations } from "@/types/workflow-configurations"; interface ConfigurationsDialogProps { open: boolean; @@ -15,13 +16,6 @@ interface ConfigurationsDialogProps { onSave: (configurations: WorkflowConfigurations, workflowName: string) => Promise; } -const DEFAULT_VAD_CONFIG: VADConfiguration = { - confidence: 0.7, - start_seconds: 0.4, - stop_seconds: 0.8, - minimum_volume: 0.6, -}; - const DEFAULT_AMBIENT_NOISE_CONFIG: AmbientNoiseConfiguration = { enabled: false, volume: 0.3, @@ -35,9 +29,6 @@ export const ConfigurationsDialog = ({ onSave }: ConfigurationsDialogProps) => { const [name, setName] = useState(workflowName); - const [vadConfig, setVadConfig] = useState( - workflowConfigurations?.vad_configuration || DEFAULT_VAD_CONFIG - ); const [ambientNoiseConfig, setAmbientNoiseConfig] = useState( workflowConfigurations?.ambient_noise_configuration || DEFAULT_AMBIENT_NOISE_CONFIG ); @@ -47,16 +38,23 @@ export const ConfigurationsDialog = ({ const [maxUserIdleTimeout, setMaxUserIdleTimeout] = useState( workflowConfigurations?.max_user_idle_timeout || 10 // Default 10 seconds ); + const [smartTurnStopSecs, setSmartTurnStopSecs] = useState( + workflowConfigurations?.smart_turn_stop_secs || 2 // Default 2 seconds + ); + const [turnStopStrategy, setTurnStopStrategy] = useState( + workflowConfigurations?.turn_stop_strategy || 'transcription' + ); const [isSaving, setIsSaving] = useState(false); const handleSave = async () => { setIsSaving(true); try { await onSave({ - vad_configuration: vadConfig, ambient_noise_configuration: ambientNoiseConfig, max_call_duration: maxCallDuration, - max_user_idle_timeout: maxUserIdleTimeout + max_user_idle_timeout: maxUserIdleTimeout, + smart_turn_stop_secs: smartTurnStopSecs, + turn_stop_strategy: turnStopStrategy }, name); onOpenChange(false); } catch (error) { @@ -70,23 +68,14 @@ export const ConfigurationsDialog = ({ useEffect(() => { if (open) { setName(workflowName); - setVadConfig(workflowConfigurations?.vad_configuration || DEFAULT_VAD_CONFIG); setAmbientNoiseConfig(workflowConfigurations?.ambient_noise_configuration || DEFAULT_AMBIENT_NOISE_CONFIG); setMaxCallDuration(workflowConfigurations?.max_call_duration || 600); setMaxUserIdleTimeout(workflowConfigurations?.max_user_idle_timeout || 10); + setSmartTurnStopSecs(workflowConfigurations?.smart_turn_stop_secs || 2); + setTurnStopStrategy(workflowConfigurations?.turn_stop_strategy || 'transcription'); } }, [open, workflowName, workflowConfigurations]); - const handleVadChange = (field: keyof VADConfiguration, value: string) => { - const numValue = parseFloat(value); - if (!isNaN(numValue)) { - setVadConfig(prev => ({ - ...prev, - [field]: numValue - })); - } - }; - return ( @@ -117,76 +106,6 @@ export const ConfigurationsDialog = ({ - {/* Voice Activity Detection Section */} -
-
-

Voice Activity Detection

-

- Hyperparameters to set for voice activity detection. Already configured with defaults. -

-
- -
-
- - handleVadChange('confidence', e.target.value)} - /> -
- -
- - handleVadChange('start_seconds', e.target.value)} - /> -
- -
- - handleVadChange('stop_seconds', e.target.value)} - /> -
- -
- - handleVadChange('minimum_volume', e.target.value)} - /> -
-
-
- {/* Ambient Noise Section */}
@@ -234,6 +153,68 @@ export const ConfigurationsDialog = ({
+ {/* Turn Detection Section */} +
+
+

Turn Detection

+

+ Configure how the agent detects when the user has finished speaking. +

+
+ +
+ + +

+ {turnStopStrategy === 'transcription' + ? "Best for short responses (1-2 word statements). Ends turn when transcription indicates completion." + : "Best for longer responses with natural pauses. Uses ML model to detect end of turn."} +

+
+ + {turnStopStrategy === 'turn_analyzer' && ( +
+ + { + const value = parseFloat(e.target.value); + if (!isNaN(value) && value >= 0.5) { + setSmartTurnStopSecs(value); + } + }} + /> +

+ Max silence duration before ending an incomplete turn. Default: 2 seconds +

+
+ )} +
+ {/* Call Management Section */}
diff --git a/ui/src/types/workflow-configurations.ts b/ui/src/types/workflow-configurations.ts index 93a3c05..9f44063 100644 --- a/ui/src/types/workflow-configurations.ts +++ b/ui/src/types/workflow-configurations.ts @@ -10,27 +10,27 @@ export interface AmbientNoiseConfiguration { volume: number; } +export type TurnStopStrategy = 'transcription' | 'turn_analyzer'; + export interface WorkflowConfigurations { - vad_configuration: VADConfiguration; + vad_configuration?: VADConfiguration; ambient_noise_configuration: AmbientNoiseConfiguration; max_call_duration: number; // Maximum call duration in seconds max_user_idle_timeout: number; // Maximum user idle time in seconds + smart_turn_stop_secs: number; // Timeout in seconds for incomplete turn detection + turn_stop_strategy: TurnStopStrategy; // Strategy for detecting end of user turn dictionary?: string; // Comma-separated words for voice agent to listen for [key: string]: unknown; // Allow additional properties for future configurations } export const DEFAULT_WORKFLOW_CONFIGURATIONS: WorkflowConfigurations = { - vad_configuration: { - confidence: 0.7, - start_seconds: 0.4, - stop_seconds: 0.8, - minimum_volume: 0.6 - }, ambient_noise_configuration: { enabled: false, volume: 0.3 }, max_call_duration: 600, // 10 minutes max_user_idle_timeout: 10, // 10 seconds + smart_turn_stop_secs: 2, // 2 seconds + turn_stop_strategy: 'transcription', // Default to transcription-based detection dictionary: '' };