Merge remote-tracking branch 'upstream/dev' into fix/documents

This commit is contained in:
Anish Sarkar 2026-02-04 03:04:52 +05:30
commit 103baa8b7a
41 changed files with 2054 additions and 475 deletions

View file

@ -3,6 +3,12 @@ DATABASE_URL=postgresql+asyncpg://postgres:postgres@localhost:5432/surfsense
#Celery Config
CELERY_BROKER_URL=redis://localhost:6379/0
CELERY_RESULT_BACKEND=redis://localhost:6379/0
# Optional: isolate queues when sharing Redis with other apps
CELERY_TASK_DEFAULT_QUEUE=surfsense
# Redis for app-level features (heartbeats, podcast markers)
# Defaults to CELERY_BROKER_URL when not set
REDIS_APP_URL=redis://localhost:6379/0
#Electric(for migrations only)
ELECTRIC_DB_USER=electric

View file

@ -7,11 +7,14 @@ Create Date: 2026-02-02
Changes:
1. Add created_by_id column (UUID, nullable, foreign key to user.id)
2. Create index on created_by_id for performance
3. Backfill existing documents with search space owner's user_id
3. Backfill existing documents with search space owner's user_id (with progress indicator)
"""
import sys
from collections.abc import Sequence
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
@ -20,11 +23,15 @@ down_revision: str | None = "85"
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None
# Batch size for backfill operation
BATCH_SIZE = 5000
def upgrade() -> None:
"""Add created_by_id column to documents and backfill with search space owner."""
# 1. Add created_by_id column (nullable for backward compatibility)
print("Step 1/4: Adding created_by_id column...")
op.execute(
"""
DO $$
@ -39,17 +46,21 @@ def upgrade() -> None:
END$$;
"""
)
print(" Done: created_by_id column added.")
# 2. Create index on created_by_id for efficient queries
print("Step 2/4: Creating index on created_by_id...")
op.execute(
"""
CREATE INDEX IF NOT EXISTS ix_documents_created_by_id
ON documents (created_by_id);
"""
)
print(" Done: Index created.")
# 3. Add foreign key constraint with ON DELETE SET NULL
# First check if constraint already exists
print("Step 3/4: Adding foreign key constraint...")
op.execute(
"""
DO $$
@ -67,18 +78,69 @@ def upgrade() -> None:
END$$;
"""
)
print(" Done: Foreign key constraint added.")
# 4. Backfill existing documents with search space owner's user_id
# This ensures all existing documents are associated with the search space owner
op.execute(
"""
UPDATE documents
SET created_by_id = searchspaces.user_id
FROM searchspaces
WHERE documents.search_space_id = searchspaces.id
AND documents.created_by_id IS NULL;
"""
# Process in batches with progress indicator
print("Step 4/4: Backfilling created_by_id for existing documents...")
connection = op.get_bind()
# Get total count of documents that need backfilling
result = connection.execute(
sa.text("""
SELECT COUNT(*) FROM documents WHERE created_by_id IS NULL
""")
)
total_count = result.scalar()
if total_count == 0:
print(" No documents need backfilling. Skipping.")
return
print(f" Total documents to backfill: {total_count:,}")
processed = 0
batch_num = 0
while processed < total_count:
batch_num += 1
# Update a batch of documents using a subquery to limit the update
# We use ctid (tuple identifier) for efficient batching in PostgreSQL
result = connection.execute(
sa.text("""
UPDATE documents
SET created_by_id = searchspaces.user_id
FROM searchspaces
WHERE documents.search_space_id = searchspaces.id
AND documents.created_by_id IS NULL
AND documents.id IN (
SELECT d.id FROM documents d
WHERE d.created_by_id IS NULL
LIMIT :batch_size
)
"""),
{"batch_size": BATCH_SIZE},
)
rows_updated = result.rowcount
if rows_updated == 0:
# No more rows to update
break
processed += rows_updated
progress_pct = min(100.0, (processed / total_count) * 100)
# Print progress with carriage return for in-place update
sys.stdout.write(
f"\r Progress: {processed:,}/{total_count:,} documents ({progress_pct:.1f}%) - Batch {batch_num}"
)
sys.stdout.flush()
# Final newline after progress
print()
print(f" Done: Backfilled {processed:,} documents.")
def downgrade() -> None:

View file

@ -0,0 +1,58 @@
"""Make podcast_transcript nullable
Revision ID: 88
Revises: 87
Create Date: 2026-02-02
The podcast workflow now creates a podcast record with PENDING status first,
then fills in the transcript after generation completes. This requires
podcast_transcript to be nullable.
"""
from collections.abc import Sequence
from alembic import op
revision: str = "88"
down_revision: str | None = "87"
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None
def upgrade() -> None:
# Make podcast_transcript nullable and remove the server default
op.execute(
"""
ALTER TABLE podcasts
ALTER COLUMN podcast_transcript DROP NOT NULL;
"""
)
op.execute(
"""
ALTER TABLE podcasts
ALTER COLUMN podcast_transcript DROP DEFAULT;
"""
)
def downgrade() -> None:
# Set empty JSON for any NULL values before adding NOT NULL constraint
op.execute(
"""
UPDATE podcasts
SET podcast_transcript = '{}'::jsonb
WHERE podcast_transcript IS NULL;
"""
)
op.execute(
"""
ALTER TABLE podcasts
ALTER COLUMN podcast_transcript SET DEFAULT '{}';
"""
)
op.execute(
"""
ALTER TABLE podcasts
ALTER COLUMN podcast_transcript SET NOT NULL;
"""
)

View file

@ -0,0 +1,46 @@
"""Make podcast file_location nullable
Revision ID: 89
Revises: 88
Create Date: 2026-02-03
The podcast workflow creates a podcast record with PENDING status first,
then fills in the file_location after audio generation completes. This requires
file_location to be nullable.
"""
from collections.abc import Sequence
from alembic import op
revision: str = "89"
down_revision: str | None = "88"
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None
def upgrade() -> None:
# Make file_location nullable
op.execute(
"""
ALTER TABLE podcasts
ALTER COLUMN file_location DROP NOT NULL;
"""
)
def downgrade() -> None:
# Set empty string for any NULL values before adding NOT NULL constraint
op.execute(
"""
UPDATE podcasts
SET file_location = ''
WHERE file_location IS NULL;
"""
)
op.execute(
"""
ALTER TABLE podcasts
ALTER COLUMN file_location SET NOT NULL;
"""
)

View file

@ -0,0 +1,66 @@
"""Add public_sharing permissions to existing roles
Revision ID: 90
Revises: 89
Create Date: 2026-02-02
"""
from sqlalchemy import text
from alembic import op
revision = "90"
down_revision = "89"
branch_labels = None
depends_on = None
def upgrade():
connection = op.get_bind()
connection.execute(
text(
"""
UPDATE search_space_roles
SET permissions = array_append(permissions, 'public_sharing:view')
WHERE name IN ('Editor', 'Viewer')
AND NOT ('public_sharing:view' = ANY(permissions))
"""
)
)
connection.execute(
text(
"""
UPDATE search_space_roles
SET permissions = array_append(permissions, 'public_sharing:create')
WHERE name = 'Editor'
AND NOT ('public_sharing:create' = ANY(permissions))
"""
)
)
def downgrade():
connection = op.get_bind()
connection.execute(
text(
"""
UPDATE search_space_roles
SET permissions = array_remove(permissions, 'public_sharing:view')
WHERE name IN ('Editor', 'Viewer')
"""
)
)
connection.execute(
text(
"""
UPDATE search_space_roles
SET permissions = array_remove(permissions, 'public_sharing:create')
WHERE name = 'Editor'
"""
)
)

View file

@ -21,8 +21,11 @@ from sqlalchemy.ext.asyncio import AsyncSession
from app.db import Podcast, PodcastStatus
# Redis connection for tracking active podcast tasks
# Uses the same Redis instance as Celery
REDIS_URL = os.getenv("CELERY_BROKER_URL", "redis://localhost:6379/0")
# Defaults to the Celery broker when REDIS_APP_URL is not set
REDIS_URL = os.getenv(
"REDIS_APP_URL",
os.getenv("CELERY_BROKER_URL", "redis://localhost:6379/0"),
)
_redis_client: redis.Redis | None = None

View file

@ -26,6 +26,7 @@ def init_worker(**kwargs):
# Get Celery configuration from environment
CELERY_BROKER_URL = os.getenv("CELERY_BROKER_URL", "redis://localhost:6379/0")
CELERY_RESULT_BACKEND = os.getenv("CELERY_RESULT_BACKEND", "redis://localhost:6379/0")
CELERY_TASK_DEFAULT_QUEUE = os.getenv("CELERY_TASK_DEFAULT_QUEUE", "surfsense")
# Get schedule checker interval from environment
# Format: "<number><unit>" where unit is 'm' (minutes) or 'h' (hours)
@ -92,6 +93,9 @@ celery_app.conf.update(
result_serializer="json",
timezone="UTC",
enable_utc=True,
task_default_queue=CELERY_TASK_DEFAULT_QUEUE,
task_default_exchange=CELERY_TASK_DEFAULT_QUEUE,
task_default_routing_key=CELERY_TASK_DEFAULT_QUEUE,
# Task execution settings
task_track_started=True,
task_time_limit=28800, # 8 hour hard limit

View file

@ -122,8 +122,52 @@ global_llm_configs:
use_default_system_instructions: false
citations_enabled: true
# Example: Groq - Fast inference
# Example: Azure OpenAI GPT-4o
# IMPORTANT: For Azure deployments, always include 'base_model' in litellm_params
# to enable accurate token counting, cost tracking, and max token limits
- id: -5
name: "Global Azure GPT-4o"
description: "Azure OpenAI GPT-4o deployment"
provider: "AZURE"
# model_name format for Azure: azure/<your-deployment-name>
model_name: "azure/gpt-4o-deployment"
api_key: "your-azure-api-key-here"
api_base: "https://your-resource.openai.azure.com"
api_version: "2024-02-15-preview" # Azure API version
rpm: 1000
tpm: 150000
litellm_params:
temperature: 0.7
max_tokens: 4000
# REQUIRED for Azure: Specify the underlying OpenAI model
# This fixes "Could not identify azure model" warnings
# Common base_model values: gpt-4, gpt-4-turbo, gpt-4o, gpt-4o-mini, gpt-3.5-turbo
base_model: "gpt-4o"
system_instructions: ""
use_default_system_instructions: true
citations_enabled: true
# Example: Azure OpenAI GPT-4 Turbo
- id: -6
name: "Global Azure GPT-4 Turbo"
description: "Azure OpenAI GPT-4 Turbo deployment"
provider: "AZURE"
model_name: "azure/gpt-4-turbo-deployment"
api_key: "your-azure-api-key-here"
api_base: "https://your-resource.openai.azure.com"
api_version: "2024-02-15-preview"
rpm: 500
tpm: 100000
litellm_params:
temperature: 0.7
max_tokens: 4000
base_model: "gpt-4-turbo" # Maps to gpt-4-turbo-preview
system_instructions: ""
use_default_system_instructions: true
citations_enabled: true
# Example: Groq - Fast inference
- id: -7
name: "Global Groq Llama 3"
description: "Ultra-fast Llama 3 70B via Groq"
provider: "GROQ"
@ -150,3 +194,11 @@ global_llm_configs:
# - All standard LiteLLM providers are supported
# - rpm/tpm: Optional rate limits for load balancing (requests/tokens per minute)
# These help the router distribute load evenly and avoid rate limit errors
#
# AZURE-SPECIFIC NOTES:
# - Always add 'base_model' in litellm_params for Azure deployments
# - This fixes "Could not identify azure model 'X'" warnings
# - base_model should match the underlying OpenAI model (e.g., gpt-4o, gpt-4-turbo, gpt-3.5-turbo)
# - model_name format: "azure/<your-deployment-name>"
# - api_version: Use a recent Azure API version (e.g., "2024-02-15-preview")
# - See: https://docs.litellm.ai/docs/proxy/cost_tracking#spend-tracking-for-azure-openai-models

View file

@ -257,6 +257,11 @@ class Permission(str, Enum):
SETTINGS_UPDATE = "settings:update"
SETTINGS_DELETE = "settings:delete" # Delete the entire search space
# Public Sharing
PUBLIC_SHARING_VIEW = "public_sharing:view"
PUBLIC_SHARING_CREATE = "public_sharing:create"
PUBLIC_SHARING_DELETE = "public_sharing:delete"
# Full access wildcard
FULL_ACCESS = "*"
@ -299,6 +304,9 @@ DEFAULT_ROLE_PERMISSIONS = {
Permission.ROLES_READ.value,
# Settings (view only, no update or delete)
Permission.SETTINGS_VIEW.value,
# Public Sharing (can create and view, no delete)
Permission.PUBLIC_SHARING_VIEW.value,
Permission.PUBLIC_SHARING_CREATE.value,
],
"Viewer": [
# Documents (read only)
@ -322,6 +330,8 @@ DEFAULT_ROLE_PERMISSIONS = {
Permission.ROLES_READ.value,
# Settings (view only)
Permission.SETTINGS_VIEW.value,
# Public Sharing (view only)
Permission.PUBLIC_SHARING_VIEW.value,
],
}

View file

@ -45,9 +45,9 @@ from app.schemas.new_chat import (
NewChatThreadUpdate,
NewChatThreadVisibilityUpdate,
NewChatThreadWithMessages,
PublicChatSnapshotCreateResponse,
PublicChatSnapshotListResponse,
RegenerateRequest,
SnapshotCreateResponse,
SnapshotListResponse,
ThreadHistoryLoadResponse,
ThreadListItem,
ThreadListResponse,
@ -736,10 +736,11 @@ async def update_thread_visibility(
# =============================================================================
@router.post("/threads/{thread_id}/snapshots", response_model=SnapshotCreateResponse)
@router.post(
"/threads/{thread_id}/snapshots", response_model=PublicChatSnapshotCreateResponse
)
async def create_thread_snapshot(
thread_id: int,
request: Request,
session: AsyncSession = Depends(get_async_session),
user: User = Depends(current_active_user),
):
@ -747,23 +748,21 @@ async def create_thread_snapshot(
Create a public snapshot of the thread.
Returns existing snapshot URL if content unchanged (deduplication).
Only the thread owner can create snapshots.
"""
from app.services.public_chat_service import create_snapshot
base_url = str(request.base_url).rstrip("/")
return await create_snapshot(
session=session,
thread_id=thread_id,
user=user,
base_url=base_url,
)
@router.get("/threads/{thread_id}/snapshots", response_model=SnapshotListResponse)
@router.get(
"/threads/{thread_id}/snapshots", response_model=PublicChatSnapshotListResponse
)
async def list_thread_snapshots(
thread_id: int,
request: Request,
session: AsyncSession = Depends(get_async_session),
user: User = Depends(current_active_user),
):
@ -774,13 +773,11 @@ async def list_thread_snapshots(
"""
from app.services.public_chat_service import list_snapshots_for_thread
base_url = str(request.base_url).rstrip("/")
return SnapshotListResponse(
return PublicChatSnapshotListResponse(
snapshots=await list_snapshots_for_thread(
session=session,
thread_id=thread_id,
user=user,
base_url=base_url,
)
)

View file

@ -91,7 +91,10 @@ def get_heartbeat_redis_client() -> redis.Redis:
"""Get or create Redis client for heartbeat tracking."""
global _heartbeat_redis_client
if _heartbeat_redis_client is None:
redis_url = os.getenv("CELERY_BROKER_URL", "redis://localhost:6379/0")
redis_url = os.getenv(
"REDIS_APP_URL",
os.getenv("CELERY_BROKER_URL", "redis://localhost:6379/0"),
)
_heartbeat_redis_client = redis.from_url(redis_url, decode_responses=True)
return _heartbeat_redis_client

View file

@ -501,3 +501,25 @@ async def update_llm_preferences(
raise HTTPException(
status_code=500, detail=f"Failed to update LLM preferences: {e!s}"
) from e
@router.get("/searchspaces/{search_space_id}/snapshots")
async def list_search_space_snapshots(
search_space_id: int,
session: AsyncSession = Depends(get_async_session),
user: User = Depends(current_active_user),
):
"""
List all public chat snapshots for a search space.
Requires PUBLIC_SHARING_VIEW permission.
"""
from app.schemas.new_chat import PublicChatSnapshotsBySpaceResponse
from app.services.public_chat_service import list_snapshots_for_search_space
snapshots = await list_snapshots_for_search_space(
session=session,
search_space_id=search_space_id,
user=user,
)
return PublicChatSnapshotsBySpaceResponse(snapshots=snapshots)

View file

@ -211,17 +211,17 @@ class RegenerateRequest(BaseModel):
# =============================================================================
class SnapshotCreateResponse(BaseModel):
"""Response after creating a public snapshot."""
class PublicChatSnapshotCreateResponse(BaseModel):
"""Response after creating a public chat snapshot."""
snapshot_id: int
share_token: str
public_url: str
is_new: bool # False if existing snapshot returned (same content)
is_new: bool
class SnapshotInfo(BaseModel):
"""Info about a single snapshot."""
class PublicChatSnapshotInfo(BaseModel):
"""Info about a single public chat snapshot."""
id: int
share_token: str
@ -230,10 +230,28 @@ class SnapshotInfo(BaseModel):
message_count: int
class SnapshotListResponse(BaseModel):
"""List of snapshots for a thread."""
class PublicChatSnapshotListResponse(BaseModel):
"""List of public chat snapshots for a thread."""
snapshots: list[SnapshotInfo]
snapshots: list[PublicChatSnapshotInfo]
class PublicChatSnapshotDetail(BaseModel):
"""Public chat snapshot with thread context."""
id: int
share_token: str
public_url: str
created_at: datetime
message_count: int
thread_id: int
thread_title: str
class PublicChatSnapshotsBySpaceResponse(BaseModel):
"""List of public chat snapshots for a search space."""
snapshots: list[PublicChatSnapshotDetail]
# =============================================================================

View file

@ -25,12 +25,14 @@ from app.db import (
ChatVisibility,
NewChatMessage,
NewChatThread,
Permission,
Podcast,
PodcastStatus,
PublicChatSnapshot,
SearchSpaceMembership,
User,
)
from app.utils.rbac import check_permission
UI_TOOLS = {
"display_image",
@ -159,7 +161,6 @@ async def create_snapshot(
session: AsyncSession,
thread_id: int,
user: User,
base_url: str,
) -> dict:
"""
Create a public snapshot of a chat thread.
@ -167,6 +168,9 @@ async def create_snapshot(
Returns existing snapshot if content unchanged (same hash).
Returns new snapshot with unique URL if content changed.
"""
from app.config import config
frontend_url = (config.NEXT_FRONTEND_URL or "").rstrip("/")
result = await session.execute(
select(NewChatThread)
.options(selectinload(NewChatThread.messages))
@ -177,11 +181,13 @@ async def create_snapshot(
if not thread:
raise HTTPException(status_code=404, detail="Thread not found")
if thread.created_by_id != user.id:
raise HTTPException(
status_code=403,
detail="Only the creator of this chat can create public snapshots",
)
await check_permission(
session,
user,
thread.search_space_id,
Permission.PUBLIC_SHARING_CREATE.value,
"You don't have permission to create public share links",
)
# Build snapshot data
user_cache: dict[UUID, dict] = {}
@ -246,7 +252,7 @@ async def create_snapshot(
return {
"snapshot_id": existing.id,
"share_token": existing.share_token,
"public_url": f"{base_url}/public/{existing.share_token}",
"public_url": f"{frontend_url}/public/{existing.share_token}",
"is_new": False,
}
@ -279,7 +285,7 @@ async def create_snapshot(
return {
"snapshot_id": snapshot.id,
"share_token": snapshot.share_token,
"public_url": f"{base_url}/public/{snapshot.share_token}",
"public_url": f"{frontend_url}/public/{snapshot.share_token}",
"is_new": True,
}
@ -348,10 +354,10 @@ async def list_snapshots_for_thread(
session: AsyncSession,
thread_id: int,
user: User,
base_url: str,
) -> list[dict]:
"""List all public snapshots for a thread."""
# Verify ownership
from app.config import config
result = await session.execute(
select(NewChatThread).filter(NewChatThread.id == thread_id)
)
@ -366,7 +372,6 @@ async def list_snapshots_for_thread(
detail="Only the creator can view snapshots",
)
# Get snapshots
result = await session.execute(
select(PublicChatSnapshot)
.filter(PublicChatSnapshot.thread_id == thread_id)
@ -374,11 +379,13 @@ async def list_snapshots_for_thread(
)
snapshots = result.scalars().all()
frontend_url = (config.NEXT_FRONTEND_URL or "").rstrip("/")
return [
{
"id": s.id,
"share_token": s.share_token,
"public_url": f"{base_url}/public/{s.share_token}",
"public_url": f"{frontend_url}/public/{s.share_token}",
"created_at": s.created_at.isoformat() if s.created_at else None,
"message_count": len(s.message_ids) if s.message_ids else 0,
}
@ -386,6 +393,54 @@ async def list_snapshots_for_thread(
]
async def list_snapshots_for_search_space(
session: AsyncSession,
search_space_id: int,
user: User,
) -> list[dict]:
"""List all public snapshots for a search space."""
from app.config import config
await check_permission(
session,
user,
search_space_id,
Permission.PUBLIC_SHARING_VIEW.value,
"You don't have permission to view public share links",
)
result = await session.execute(
select(PublicChatSnapshot)
.join(NewChatThread, PublicChatSnapshot.thread_id == NewChatThread.id)
.filter(NewChatThread.search_space_id == search_space_id)
.order_by(PublicChatSnapshot.created_at.desc())
)
snapshots = result.scalars().all()
snapshot_thread_ids = [s.thread_id for s in snapshots]
thread_result = await session.execute(
select(NewChatThread.id, NewChatThread.title).filter(
NewChatThread.id.in_(snapshot_thread_ids)
)
)
thread_titles = {row[0]: row[1] for row in thread_result.fetchall()}
frontend_url = (config.NEXT_FRONTEND_URL or "").rstrip("/")
return [
{
"id": s.id,
"share_token": s.share_token,
"public_url": f"{frontend_url}/public/{s.share_token}",
"created_at": s.created_at.isoformat() if s.created_at else None,
"message_count": len(s.message_ids) if s.message_ids else 0,
"thread_id": s.thread_id,
"thread_title": thread_titles.get(s.thread_id, "Untitled"),
}
for s in snapshots
]
# =============================================================================
# Snapshot Deletion
# =============================================================================
@ -412,11 +467,13 @@ async def delete_snapshot(
if not snapshot:
raise HTTPException(status_code=404, detail="Snapshot not found")
if snapshot.thread.created_by_id != user.id:
raise HTTPException(
status_code=403,
detail="Only the creator can delete snapshots",
)
await check_permission(
session,
user,
snapshot.thread.search_space_id,
Permission.PUBLIC_SHARING_DELETE.value,
"You don't have permission to delete public share links",
)
await session.delete(snapshot)
await session.commit()

View file

@ -323,6 +323,28 @@ def process_file_upload_task(
user_id: ID of the user
"""
import asyncio
import os
import traceback
logger.info(
f"[process_file_upload] Task started - file: {filename}, "
f"search_space_id: {search_space_id}, user_id: {user_id}"
)
logger.info(f"[process_file_upload] File path: {file_path}")
# Check if file exists and is accessible
if not os.path.exists(file_path):
logger.error(
f"[process_file_upload] File does not exist: {file_path}. "
"The temp file may have been cleaned up before the task ran."
)
return
try:
file_size = os.path.getsize(file_path)
logger.info(f"[process_file_upload] File size: {file_size} bytes")
except Exception as e:
logger.warning(f"[process_file_upload] Could not get file size: {e}")
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
@ -331,6 +353,15 @@ def process_file_upload_task(
loop.run_until_complete(
_process_file_upload(file_path, filename, search_space_id, user_id)
)
logger.info(
f"[process_file_upload] Task completed successfully for: {filename}"
)
except Exception as e:
logger.error(
f"[process_file_upload] Task failed for {filename}: {e}\n"
f"Traceback:\n{traceback.format_exc()}"
)
raise
finally:
loop.close()
@ -343,16 +374,22 @@ async def _process_file_upload(
from app.tasks.document_processors.file_processors import process_file_in_background
logger.info(f"[_process_file_upload] Starting async processing for: {filename}")
async with get_celery_session_maker()() as session:
logger.info(f"[_process_file_upload] Database session created for: {filename}")
task_logger = TaskLoggingService(session, search_space_id)
# Get file size for notification metadata
try:
file_size = os.path.getsize(file_path)
except Exception:
logger.info(f"[_process_file_upload] File size: {file_size} bytes")
except Exception as e:
logger.warning(f"[_process_file_upload] Could not get file size: {e}")
file_size = None
# Create notification for document processing
logger.info(f"[_process_file_upload] Creating notification for: {filename}")
notification = (
await NotificationService.document_processing.notify_processing_started(
session=session,
@ -363,6 +400,9 @@ async def _process_file_upload(
file_size=file_size,
)
)
logger.info(
f"[_process_file_upload] Notification created with ID: {notification.id if notification else 'None'}"
)
log_entry = await task_logger.log_task_start(
task_name="process_file_upload",

View file

@ -51,7 +51,10 @@ def _clear_generating_podcast(search_space_id: int) -> None:
import redis
try:
redis_url = os.getenv("CELERY_BROKER_URL", "redis://localhost:6379/0")
redis_url = os.getenv(
"REDIS_APP_URL",
os.getenv("CELERY_BROKER_URL", "redis://localhost:6379/0"),
)
client = redis.from_url(redis_url, decode_responses=True)
key = f"podcast:generating:{search_space_id}"
client.delete(key)

View file

@ -36,7 +36,10 @@ def get_redis_client() -> redis.Redis:
"""Get or create Redis client for heartbeat checking."""
global _redis_client
if _redis_client is None:
redis_url = os.getenv("CELERY_BROKER_URL", "redis://localhost:6379/0")
redis_url = os.getenv(
"REDIS_APP_URL",
os.getenv("CELERY_BROKER_URL", "redis://localhost:6379/0"),
)
_redis_client = redis.from_url(redis_url, decode_responses=True)
return _redis_client

View file

@ -32,8 +32,6 @@ dependencies = [
"en-core-web-sm@https://github.com/explosion/spacy-models/releases/download/en_core_web_sm-3.8.0/en_core_web_sm-3.8.0-py3-none-any.whl",
"static-ffmpeg>=2.13",
"tavily-python>=0.3.2",
"unstructured-client>=0.30.0",
"unstructured[all-docs]>=0.16.25",
"uvicorn[standard]>=0.34.0",
"validators>=0.34.0",
"youtube-transcript-api>=1.0.3",
@ -45,7 +43,6 @@ dependencies = [
"firecrawl-py>=4.9.0",
"boto3>=1.35.0",
"langchain-community>=0.3.31",
"langchain-unstructured>=1.0.0",
"litellm>=1.80.10",
"langchain-litellm>=0.3.5",
"fake-useragent>=2.2.0",
@ -62,6 +59,9 @@ dependencies = [
"deepagents>=0.3.8",
"langchain>=1.2.6",
"langgraph>=1.0.5",
"unstructured[all-docs]>=0.18.31",
"unstructured-client>=0.42.3",
"langchain-unstructured>=1.0.1",
]
[dependency-groups]

View file

@ -39,7 +39,7 @@ backend_pid=$!
sleep 5
echo "Starting Celery Worker..."
celery -A app.celery_app worker --loglevel=info &
celery -A app.celery_app worker --loglevel=info --autoscale=128,4 &
celery_worker_pid=$!
# Wait a bit for worker to initialize

File diff suppressed because it is too large Load diff