fix: fix remote deployment method (#145)

* fix: disable file logging for docker compose mode

* fix: wait for processes in Docker compose mode

* fix: add default turn server conf for remote mode

* remove sentence transformers

* make turn detection configurable
This commit is contained in:
Abhishek 2026-02-05 13:10:33 +05:30 committed by GitHub
parent 7d1e22d53c
commit 87fc64d55c
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
19 changed files with 290 additions and 573 deletions

View file

@ -13,6 +13,11 @@ RUN apt-get update && apt-get install -y \
# Copy and install requirements
COPY api/requirements.txt .
# Install CPU-only PyTorch FIRST to prevent CUDA/NVIDIA dependencies
# This satisfies torch dependency before other packages try to pull GPU version
RUN pip install --user --no-cache-dir torch --index-url https://download.pytorch.org/whl/cpu && \
rm -rf /root/.cache/pip
# Install dependencies to user directory for easy copying
RUN pip install --user --no-cache-dir -r requirements.txt && \
# Clean up pip cache after installation
@ -58,8 +63,13 @@ COPY ./scripts/start_services.sh ./scripts/start_services.sh
ENV PYTHONPATH=/app
# Disable file logging in Docker - logs go to stdout for docker logs
ENV LOG_TO_FILE=false
# Keep container alive by waiting for background processes
ENV WAIT_FOR_PROCESSES=true
# Expose the port FastAPI will run on
EXPOSE 8000
# Run the FastAPI app with uvicorn
CMD ["bash", "-c", "./scripts/start_services.sh && tail -f ./logs/latest/*.log"]
CMD ["./scripts/start_services.sh"]

View file

@ -14,5 +14,4 @@ sentry-sdk[fastapi]==2.38.0
sqlalchemy[asyncio]==2.0.43
msgpack==1.1.2
docling[rapidocr]==2.68.0
sentence-transformers==5.2.0
pgvector==0.4.2

View file

@ -103,9 +103,8 @@ async def process_document(
The document status will be updated from 'pending' -> 'processing' -> 'completed' or 'failed'.
Embedding Services:
* openai (default): High-quality 1536-dimensional embeddings (requires OPENAI_API_KEY)
* sentence_transformer: Free, offline-capable, 384-dimensional embeddings
Embedding:
Uses OpenAI text-embedding-3-small (1536-dimensional embeddings, requires API key configured in Model Configurations).
Access Control:
* Users can only process documents in their organization.
@ -134,12 +133,11 @@ async def process_document(
request.s3_key,
user.selected_organization_id,
128, # max_tokens (default)
request.embedding_service,
)
logger.info(
f"Created document {request.document_uuid} (id={document.id}) and enqueued processing "
f"with {request.embedding_service} embeddings, org {user.selected_organization_id}"
f"with OpenAI embeddings, org {user.selected_organization_id}"
)
return DocumentResponseSchema(

View file

@ -1,7 +1,7 @@
"""Pydantic schemas for knowledge base operations."""
from datetime import datetime
from typing import Any, Dict, List, Literal, Optional
from typing import Any, Dict, List, Optional
from pydantic import BaseModel, Field
@ -29,11 +29,6 @@ class ProcessDocumentRequestSchema(BaseModel):
document_uuid: str = Field(..., description="Document UUID to process")
s3_key: str = Field(..., description="S3 key of the uploaded file")
embedding_service: Literal["sentence_transformer", "openai"] = Field(
default="openai",
description="Embedding service to use for processing. "
"Options: 'openai' (default, 1536-dim, requires API key) or 'sentence_transformer' (free, 384-dim)",
)
class DocumentResponseSchema(BaseModel):

View file

@ -4,14 +4,12 @@ from .embedding import (
BaseEmbeddingService,
EmbeddingAPIKeyNotConfiguredError,
OpenAIEmbeddingService,
SentenceTransformerEmbeddingService,
)
from .json_parser import parse_llm_json
__all__ = [
"BaseEmbeddingService",
"EmbeddingAPIKeyNotConfiguredError",
"SentenceTransformerEmbeddingService",
"OpenAIEmbeddingService",
"parse_llm_json",
]

View file

@ -2,11 +2,9 @@
from .base import BaseEmbeddingService
from .openai_service import EmbeddingAPIKeyNotConfiguredError, OpenAIEmbeddingService
from .sentence_transformer_service import SentenceTransformerEmbeddingService
__all__ = [
"BaseEmbeddingService",
"EmbeddingAPIKeyNotConfiguredError",
"SentenceTransformerEmbeddingService",
"OpenAIEmbeddingService",
]

View file

@ -1,350 +0,0 @@
"""Sentence Transformer embedding service.
This module provides document processing capabilities using:
- Sentence-transformers for embeddings (all-MiniLM-L6-v2)
- Docling for document conversion and chunking
- pgvector for vector similarity search
Setup for offline usage:
1. First run: Downloads and caches models to ~/.cache/sentence_transformers
2. Subsequent runs: Uses cached models (no internet needed)
3. For fully offline mode: Set TRANSFORMERS_OFFLINE=1 and HF_HUB_OFFLINE=1
"""
import os
from pathlib import Path
from typing import Any, Dict, List, Optional
from docling.chunking import HybridChunker
from docling.document_converter import DocumentConverter
from docling_core.transforms.chunker.tokenizer.huggingface import HuggingFaceTokenizer
from loguru import logger
from sentence_transformers import SentenceTransformer
from transformers import AutoTokenizer
from api.db.db_client import DBClient
from api.db.models import KnowledgeBaseChunkModel
from .base import BaseEmbeddingService
# Set environment variables for model caching
os.environ.setdefault("TRANSFORMERS_OFFLINE", "0")
os.environ.setdefault("HF_HUB_OFFLINE", "0")
os.environ.setdefault(
"SENTENCE_TRANSFORMERS_HOME", os.path.expanduser("~/.cache/sentence_transformers")
)
# Model configuration
DEFAULT_MODEL_ID = "sentence-transformers/all-MiniLM-L6-v2"
EMBEDDING_DIMENSION = 384 # Dimension for all-MiniLM-L6-v2
class SentenceTransformerEmbeddingService(BaseEmbeddingService):
"""Embedding service using Sentence Transformers."""
def __init__(
self,
db_client: DBClient,
model_id: str = DEFAULT_MODEL_ID,
max_tokens: int = 512,
):
"""Initialize the Sentence Transformer embedding service.
Args:
db_client: Database client for storing documents and chunks
model_id: Sentence-transformers model ID (default: all-MiniLM-L6-v2)
max_tokens: Maximum number of tokens per chunk (default: 512)
Note: This applies to the contextualized text (with headings/captions)
"""
self.db = db_client
self.model_id = model_id
self.max_tokens = max_tokens
# Initialize embedding model
logger.info(f"Loading embedding model: {model_id}")
try:
# Try to load from cache first (local_files_only=True)
self.embedding_model = SentenceTransformer(
model_id,
cache_folder=os.environ.get("SENTENCE_TRANSFORMERS_HOME"),
local_files_only=True,
)
logger.info("Loaded model from cache")
except Exception as e:
logger.warning(f"Model not in cache, downloading: {e}")
# If not in cache, download it (this will cache it for next time)
self.embedding_model = SentenceTransformer(
model_id,
cache_folder=os.environ.get("SENTENCE_TRANSFORMERS_HOME"),
)
logger.info("Model downloaded and cached")
# Initialize tokenizer for chunking with max_tokens
logger.info(f"Loading tokenizer: {model_id} with max_tokens={max_tokens}")
try:
# Try to load from cache first
self.tokenizer = HuggingFaceTokenizer(
tokenizer=AutoTokenizer.from_pretrained(
model_id,
local_files_only=True,
),
max_tokens=max_tokens,
)
logger.info("Loaded tokenizer from cache")
except Exception as e:
logger.warning(f"Tokenizer not in cache, downloading: {e}")
# If not in cache, download it
self.tokenizer = HuggingFaceTokenizer(
tokenizer=AutoTokenizer.from_pretrained(model_id),
max_tokens=max_tokens,
)
logger.info("Tokenizer downloaded and cached")
# Initialize chunker
logger.info(f"Initializing HybridChunker with max_tokens={max_tokens}")
self.chunker = HybridChunker(tokenizer=self.tokenizer)
# Initialize document converter
self.converter = DocumentConverter()
def get_model_id(self) -> str:
"""Return the model identifier."""
return self.model_id
def get_embedding_dimension(self) -> int:
"""Return the embedding dimension."""
return EMBEDDING_DIMENSION
async def embed_texts(self, texts: List[str]) -> List[List[float]]:
"""Embed a batch of texts.
Args:
texts: List of text strings to embed
Returns:
List of embedding vectors (each vector is a list of floats)
"""
embeddings = self.embedding_model.encode(
texts,
show_progress_bar=False,
convert_to_numpy=True,
)
return [embedding.tolist() for embedding in embeddings]
async def embed_query(self, query: str) -> List[float]:
"""Embed a single query text.
Args:
query: Query text to embed
Returns:
Embedding vector as list of floats
"""
embedding = self.embedding_model.encode([query])[0]
return embedding.tolist()
async def search_similar_chunks(
self,
query: str,
organization_id: int,
limit: int = 5,
document_uuids: Optional[List[str]] = None,
) -> List[Dict[str, Any]]:
"""Search for similar chunks using vector similarity.
Returns top-k most similar chunks without any threshold filtering.
Apply similarity thresholds and reranking at the application layer.
Args:
query: Search query text
organization_id: Organization ID for scoping
limit: Maximum number of results to return
document_uuids: Optional list of document UUIDs to filter by
Returns:
List of dictionaries with chunk data and similarity scores
"""
# Generate query embedding
query_embedding = await self.embed_query(query)
# Perform vector similarity search
results = await self.db.search_similar_chunks(
query_embedding=query_embedding,
organization_id=organization_id,
limit=limit,
document_uuids=document_uuids,
embedding_model=self.model_id,
)
return results
async def process_document(
self,
file_path: str,
organization_id: int,
created_by: int,
custom_metadata: dict = None,
):
"""Process a document: convert, chunk, embed, and store in database.
Args:
file_path: Path to the document file
organization_id: Organization ID for scoping
created_by: User ID who uploaded the document
custom_metadata: Optional custom metadata dictionary
Returns:
The created document record
"""
try:
# Extract file metadata
filename = Path(file_path).name
file_hash = self.db.compute_file_hash(file_path)
file_size = os.path.getsize(file_path)
mime_type = self.db.get_mime_type(file_path)
# Check if document already exists
existing_doc = await self.db.get_document_by_hash(
file_hash, organization_id
)
if existing_doc:
logger.info(f"Document already exists: {filename} (hash: {file_hash})")
return existing_doc
# Create document record
doc_record = await self.db.create_document(
organization_id=organization_id,
created_by=created_by,
filename=filename,
file_size_bytes=file_size,
file_hash=file_hash,
mime_type=mime_type,
custom_metadata=custom_metadata or {},
)
logger.info(f"Processing document: {filename}")
# Update status to processing
await self.db.update_document_status(doc_record.id, "processing")
# Step 1: Convert document using docling
logger.info("Converting document with docling...")
conversion_result = self.converter.convert(file_path)
doc = conversion_result.document
# Store docling metadata
docling_metadata = {
"num_pages": len(doc.pages) if hasattr(doc, "pages") else None,
"document_type": type(doc).__name__,
}
# Step 2: Chunk the document
logger.info(f"Chunking document with max_tokens={self.max_tokens}...")
chunks = list(self.chunker.chunk(dl_doc=doc))
total_chunks = len(chunks)
logger.info(f"Generated {total_chunks} chunks")
# Step 3: Process each chunk
chunk_texts = []
chunk_records = []
token_counts = []
for i, chunk in enumerate(chunks):
# Get chunk text
chunk_text = chunk.text
# Get contextualized text (enriched with surrounding context)
contextualized_text = self.chunker.contextualize(chunk=chunk)
# Calculate actual token count using the tokenizer
text_to_tokenize = (
contextualized_text if contextualized_text else chunk_text
)
token_count = len(
self.tokenizer.tokenizer.encode(
text_to_tokenize, add_special_tokens=False
)
)
token_counts.append(token_count)
# Prepare chunk metadata
chunk_metadata = {}
if hasattr(chunk, "meta") and chunk.meta:
chunk_metadata = {
"doc_items": (
[str(item) for item in chunk.meta.doc_items]
if hasattr(chunk.meta, "doc_items")
else []
),
"headings": (
chunk.meta.headings
if hasattr(chunk.meta, "headings")
else []
),
}
# Create chunk record (without embedding yet)
chunk_record = KnowledgeBaseChunkModel(
document_id=doc_record.id,
organization_id=organization_id,
chunk_text=chunk_text,
contextualized_text=contextualized_text,
chunk_index=i,
chunk_metadata=chunk_metadata,
embedding_model=self.model_id,
embedding_dimension=EMBEDDING_DIMENSION,
token_count=token_count,
)
chunk_records.append(chunk_record)
# Use contextualized text for embedding if available
chunk_texts.append(text_to_tokenize)
# Log chunk statistics
if token_counts:
avg_tokens = sum(token_counts) / len(token_counts)
min_tokens = min(token_counts)
max_tokens = max(token_counts)
logger.info("Chunk token statistics:")
logger.info(f" - Average: {avg_tokens:.1f} tokens")
logger.info(f" - Min: {min_tokens} tokens")
logger.info(f" - Max: {max_tokens} tokens")
# Step 4: Generate embeddings in batch
logger.info("Generating embeddings...")
embeddings = await self.embed_texts(chunk_texts)
# Step 5: Attach embeddings to chunk records
for chunk_record, embedding in zip(chunk_records, embeddings):
chunk_record.embedding = embedding
# Step 6: Save all chunks in batch
logger.info("Storing chunks in database...")
await self.db.create_chunks_batch(chunk_records)
# Update document status to completed
await self.db.update_document_status(
doc_record.id,
"completed",
total_chunks=total_chunks,
docling_metadata=docling_metadata,
)
logger.info(f"Successfully processed document: {filename}")
logger.info(f" - Total chunks: {total_chunks}")
logger.info(f" - Document ID: {doc_record.id}")
logger.info(f" - Document UUID: {doc_record.document_uuid}")
return doc_record
except Exception as e:
logger.error(f"Error processing document: {e}")
# Update document status to failed if it exists
if "doc_record" in locals():
await self.db.update_document_status(
doc_record.id, "failed", error_message=str(e)
)
raise

View file

@ -44,6 +44,8 @@ from api.services.telephony.stasis_rtp_connection import StasisRTPConnection
from api.services.workflow.dto import ReactFlowDTO
from api.services.workflow.pipecat_engine import PipecatEngine
from api.services.workflow.workflow import WorkflowGraph
from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams
from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3
from pipecat.extensions.voicemail.voicemail_detector import VoicemailDetector
from pipecat.pipeline.base_task import PipelineTaskParams
from pipecat.processors.aggregators.llm_response_universal import (
@ -66,6 +68,7 @@ from pipecat.turns.user_start.vad_user_turn_start_strategy import (
from pipecat.turns.user_stop import (
ExternalUserTurnStopStrategy,
TranscriptionUserTurnStopStrategy,
TurnAnalyzerUserTurnStopStrategy,
)
from pipecat.turns.user_turn_strategies import UserTurnStrategies
from pipecat.utils.context import set_current_run_id
@ -452,6 +455,8 @@ async def _run_pipeline(
# Extract configurations from workflow configurations
max_call_duration_seconds = 300 # Default 5 minutes
max_user_idle_timeout = 10.0 # Default 10 seconds
smart_turn_stop_secs = 2.0 # Default 2 seconds for incomplete turn timeout
turn_stop_strategy = "transcription" # Default to transcription-based detection
keyterms = None # Dictionary words for STT boosting
if workflow.workflow_configurations:
@ -467,6 +472,16 @@ async def _run_pipeline(
"max_user_idle_timeout"
]
# Use workflow-specific smart turn stop timeout if provided
if "smart_turn_stop_secs" in workflow.workflow_configurations:
smart_turn_stop_secs = workflow.workflow_configurations[
"smart_turn_stop_secs"
]
# Use workflow-specific turn stop strategy if provided
if "turn_stop_strategy" in workflow.workflow_configurations:
turn_stop_strategy = workflow.workflow_configurations["turn_stop_strategy"]
# Extract dictionary words and convert to keyterms list
if "dictionary" in workflow.workflow_configurations:
dictionary = workflow.workflow_configurations["dictionary"]
@ -550,9 +565,9 @@ async def _run_pipeline(
correct_aggregation_callback=engine.create_aggregation_correction_callback(),
)
# Configure turn strategies based on STT provider and model
# Configure turn strategies based on STT provider, model, and workflow configuration
# Deepgram Flux uses external turn detection (VAD + External start/stop)
# Other models use transcription-based turn detection with smart turn analyzer
# Other models use configurable turn detection strategy
is_deepgram_flux = (
user_config.stt.provider == ServiceProviders.DEEPGRAM.value
and user_config.stt.model == "flux-general-en"
@ -563,7 +578,19 @@ async def _run_pipeline(
start=[VADUserTurnStartStrategy(), TranscriptionUserTurnStartStrategy()],
stop=[ExternalUserTurnStopStrategy()],
)
elif turn_stop_strategy == "turn_analyzer":
# Smart Turn Analyzer: best for longer responses with natural pauses
smart_turn_params = SmartTurnParams(stop_secs=smart_turn_stop_secs)
user_turn_strategies = UserTurnStrategies(
start=[VADUserTurnStartStrategy(), TranscriptionUserTurnStartStrategy()],
stop=[
TurnAnalyzerUserTurnStopStrategy(
turn_analyzer=LocalSmartTurnAnalyzerV3(params=smart_turn_params)
)
],
)
else:
# Transcription-based (default): best for short 1-2 word responses
user_turn_strategies = UserTurnStrategies(
start=[VADUserTurnStartStrategy(), TranscriptionUserTurnStartStrategy()],
stop=[TranscriptionUserTurnStopStrategy()],

View file

@ -2,7 +2,6 @@
import os
import tempfile
from typing import Literal
from docling.chunking import HybridChunker
from docling.document_converter import DocumentConverter
@ -12,13 +11,10 @@ from transformers import AutoTokenizer
from api.db import db_client
from api.db.models import KnowledgeBaseChunkModel
from api.services.gen_ai import (
OpenAIEmbeddingService,
SentenceTransformerEmbeddingService,
)
from api.services.gen_ai import OpenAIEmbeddingService
from api.services.storage import storage_fs
# For tokenization/chunking - use SentenceTransformer tokenizer as baseline
# For tokenization/chunking
TOKENIZER_MODEL = "sentence-transformers/all-MiniLM-L6-v2"
@ -28,7 +24,6 @@ async def process_knowledge_base_document(
s3_key: str,
organization_id: int,
max_tokens: int = 128,
embedding_service: Literal["sentence_transformer", "openai"] = "openai",
):
"""Process a knowledge base document: download, chunk, embed, and store.
@ -38,9 +33,6 @@ async def process_knowledge_base_document(
s3_key: S3 key where the file is stored
organization_id: Organization ID
max_tokens: Maximum number of tokens per chunk (default: 128)
embedding_service: Embedding service to use (default: "openai")
- "openai": Use OpenAI text-embedding-3-small (1536-dim, requires API key)
- "sentence_transformer": Use SentenceTransformer (all-MiniLM-L6-v2, 384-dim, free)
"""
logger.info(
f"Starting knowledge base document processing for document_id={document_id}, "
@ -125,56 +117,38 @@ async def process_knowledge_base_document(
mime_type=mime_type,
)
# Initialize the embedding service based on the parameter
if embedding_service == "openai":
logger.info(
f"Initializing OpenAI embedding service with max_tokens={max_tokens}"
)
# Try to get user's embeddings configuration
embeddings_api_key = None
embeddings_model = None
if document.created_by:
user_config = await db_client.get_user_configurations(
document.created_by
)
if user_config.embeddings:
embeddings_api_key = user_config.embeddings.api_key
embeddings_model = user_config.embeddings.model
logger.info(
f"Using user embeddings config: model={embeddings_model}"
)
# Initialize the OpenAI embedding service
logger.info(
f"Initializing OpenAI embedding service with max_tokens={max_tokens}"
)
# Try to get user's embeddings configuration
embeddings_api_key = None
embeddings_model = None
if document.created_by:
user_config = await db_client.get_user_configurations(document.created_by)
if user_config.embeddings:
embeddings_api_key = user_config.embeddings.api_key
embeddings_model = user_config.embeddings.model
logger.info(f"Using user embeddings config: model={embeddings_model}")
# Check if API key is configured
if not embeddings_api_key:
error_message = (
"OpenAI API key not configured. Please set your API key in "
"Model Configurations > Embedding to process documents."
)
logger.warning(f"Document {document_id}: {error_message}")
await db_client.update_document_status(
document_id, "failed", error_message=error_message
)
return
# Check if API key is configured
if not embeddings_api_key:
error_message = (
"OpenAI API key not configured. Please set your API key in "
"Model Configurations > Embedding to process documents."
)
logger.warning(f"Document {document_id}: {error_message}")
await db_client.update_document_status(
document_id, "failed", error_message=error_message
)
return
service = OpenAIEmbeddingService(
db_client=db_client,
max_tokens=max_tokens,
api_key=embeddings_api_key,
model_id=embeddings_model or "text-embedding-3-small",
)
elif embedding_service == "sentence_transformer":
logger.info(
f"Initializing SentenceTransformer embedding service with max_tokens={max_tokens}"
)
service = SentenceTransformerEmbeddingService(
db_client=db_client,
max_tokens=max_tokens,
)
else:
raise ValueError(
f"Invalid embedding_service: {embedding_service}. "
f"Must be 'sentence_transformer' or 'openai'"
)
service = OpenAIEmbeddingService(
db_client=db_client,
max_tokens=max_tokens,
api_key=embeddings_api_key,
model_id=embeddings_model or "text-embedding-3-small",
)
# Step 1: Convert document with docling
logger.info("Converting document with docling")
@ -265,8 +239,8 @@ async def process_knowledge_base_document(
logger.info(f" - Min: {min_tokens} tokens")
logger.info(f" - Max: {max_tokens_actual} tokens")
# Step 6: Generate embeddings using the embedding service
logger.info(f"Generating embeddings using {embedding_service}")
# Step 6: Generate embeddings using OpenAI
logger.info(f"Generating embeddings using {service.get_model_id()}")
embeddings = await service.embed_texts(chunk_texts)
# Step 7: Attach embeddings to chunk records

View file

@ -6,37 +6,51 @@ class TestIsPrivateIpCandidate:
def test_private_ip_192_168(self):
"""192.168.x.x addresses are detected as private."""
candidate = "candidate:123 1 udp 2122260223 192.168.50.24 63603 typ host generation 0"
candidate = (
"candidate:123 1 udp 2122260223 192.168.50.24 63603 typ host generation 0"
)
assert is_private_ip_candidate(candidate) is True
def test_private_ip_10_x(self):
"""10.x.x.x addresses are detected as private."""
candidate = "candidate:456 1 udp 2122260223 10.0.0.1 12345 typ host generation 0"
candidate = (
"candidate:456 1 udp 2122260223 10.0.0.1 12345 typ host generation 0"
)
assert is_private_ip_candidate(candidate) is True
def test_private_ip_172_16(self):
"""172.16.x.x addresses are detected as private."""
candidate = "candidate:789 1 udp 2122260223 172.16.0.1 54321 typ host generation 0"
candidate = (
"candidate:789 1 udp 2122260223 172.16.0.1 54321 typ host generation 0"
)
assert is_private_ip_candidate(candidate) is True
def test_private_ip_172_31(self):
"""172.31.x.x addresses are detected as private."""
candidate = "candidate:101 1 udp 2122260223 172.31.255.255 12345 typ host generation 0"
candidate = (
"candidate:101 1 udp 2122260223 172.31.255.255 12345 typ host generation 0"
)
assert is_private_ip_candidate(candidate) is True
def test_cgnat_ip(self):
"""CGNAT addresses (100.64.0.0/10) are detected as private."""
candidate = "candidate:202 1 udp 2122260223 100.64.0.1 12345 typ host generation 0"
candidate = (
"candidate:202 1 udp 2122260223 100.64.0.1 12345 typ host generation 0"
)
assert is_private_ip_candidate(candidate) is True
def test_cgnat_ip_upper_bound(self):
"""Upper bound of CGNAT range is detected."""
candidate = "candidate:303 1 udp 2122260223 100.127.255.255 12345 typ host generation 0"
candidate = (
"candidate:303 1 udp 2122260223 100.127.255.255 12345 typ host generation 0"
)
assert is_private_ip_candidate(candidate) is True
def test_public_ip(self):
"""Public IP addresses return False."""
candidate = "candidate:404 1 udp 2122260223 142.250.190.46 12345 typ host generation 0"
candidate = (
"candidate:404 1 udp 2122260223 142.250.190.46 12345 typ host generation 0"
)
assert is_private_ip_candidate(candidate) is False
def test_public_ip_8_8_8_8(self):
@ -46,17 +60,23 @@ class TestIsPrivateIpCandidate:
def test_non_cgnat_100_range(self):
"""100.x.x.x outside CGNAT range is public."""
candidate = "candidate:606 1 udp 2122260223 100.128.0.1 12345 typ host generation 0"
candidate = (
"candidate:606 1 udp 2122260223 100.128.0.1 12345 typ host generation 0"
)
assert is_private_ip_candidate(candidate) is False
def test_172_15_is_public(self):
"""172.15.x.x is outside private range and should be public."""
candidate = "candidate:707 1 udp 2122260223 172.15.255.255 12345 typ srflx generation 0"
candidate = (
"candidate:707 1 udp 2122260223 172.15.255.255 12345 typ srflx generation 0"
)
assert is_private_ip_candidate(candidate) is False
def test_172_32_is_public(self):
"""172.32.x.x is outside private range and should be public."""
candidate = "candidate:808 1 udp 2122260223 172.32.0.1 12345 typ srflx generation 0"
candidate = (
"candidate:808 1 udp 2122260223 172.32.0.1 12345 typ srflx generation 0"
)
assert is_private_ip_candidate(candidate) is False
def test_srflx_candidate_type(self):
@ -66,17 +86,23 @@ class TestIsPrivateIpCandidate:
def test_relay_candidate_type(self):
"""Relay candidates are parsed correctly."""
candidate = "candidate:111 1 udp 41885439 1.1.1.1 50000 typ relay raddr 0.0.0.0 rport 0"
candidate = (
"candidate:111 1 udp 41885439 1.1.1.1 50000 typ relay raddr 0.0.0.0 rport 0"
)
assert is_private_ip_candidate(candidate) is False
def test_loopback_ip(self):
"""Loopback addresses are detected as private."""
candidate = "candidate:222 1 udp 2122260223 127.0.0.1 12345 typ host generation 0"
candidate = (
"candidate:222 1 udp 2122260223 127.0.0.1 12345 typ host generation 0"
)
assert is_private_ip_candidate(candidate) is True
def test_link_local_ip(self):
"""Link-local addresses (169.254.x.x) are detected as private."""
candidate = "candidate:333 1 udp 2122260223 169.254.1.1 12345 typ host generation 0"
candidate = (
"candidate:333 1 udp 2122260223 169.254.1.1 12345 typ host generation 0"
)
assert is_private_ip_candidate(candidate) is True
def test_ipv6_link_local(self):
@ -100,7 +126,9 @@ class TestIsPrivateIpCandidate:
def test_malformed_candidate_invalid_ip(self):
"""Candidate with invalid IP returns False."""
candidate = "candidate:777 1 udp 2122260223 not.an.ip 12345 typ host generation 0"
candidate = (
"candidate:777 1 udp 2122260223 not.an.ip 12345 typ host generation 0"
)
assert is_private_ip_candidate(candidate) is False
def test_malformed_candidate_short(self):
@ -110,5 +138,7 @@ class TestIsPrivateIpCandidate:
def test_tcp_candidate(self):
"""TCP candidates are parsed correctly."""
candidate = "candidate:999 1 tcp 1518280447 192.168.1.100 9 typ host tcptype active"
candidate = (
"candidate:999 1 tcp 1518280447 192.168.1.100 9 typ host tcptype active"
)
assert is_private_ip_candidate(candidate) is True