diff --git a/surfsense_backend/alembic/versions/81_add_public_chat_features.py b/surfsense_backend/alembic/versions/81_add_public_chat_features.py new file mode 100644 index 000000000..8d7e95df7 --- /dev/null +++ b/surfsense_backend/alembic/versions/81_add_public_chat_features.py @@ -0,0 +1,114 @@ +"""Add public chat sharing and cloning features to new_chat_threads + +Revision ID: 81 +Revises: 80 +Create Date: 2026-01-23 + +Adds columns for: +1. Public sharing via tokenized URLs (public_share_token, public_share_enabled) +2. Clone tracking for audit (cloned_from_thread_id, cloned_at) +3. History bootstrap flag for cloned chats (needs_history_bootstrap) +4. Clone pending flag for two-phase clone (clone_pending) +""" + +from collections.abc import Sequence + +from alembic import op + +# revision identifiers, used by Alembic. +revision: str = "81" +down_revision: str | None = "80" +branch_labels: str | Sequence[str] | None = None +depends_on: str | Sequence[str] | None = None + + +def upgrade() -> None: + """Add public sharing and cloning columns to new_chat_threads.""" + + op.execute( + """ + ALTER TABLE new_chat_threads + ADD COLUMN IF NOT EXISTS public_share_token VARCHAR(64); + """ + ) + + op.execute( + """ + ALTER TABLE new_chat_threads + ADD COLUMN IF NOT EXISTS public_share_enabled BOOLEAN NOT NULL DEFAULT FALSE; + """ + ) + + op.execute( + """ + CREATE UNIQUE INDEX IF NOT EXISTS ix_new_chat_threads_public_share_token + ON new_chat_threads(public_share_token) + WHERE public_share_token IS NOT NULL; + """ + ) + + op.execute( + """ + CREATE INDEX IF NOT EXISTS ix_new_chat_threads_public_share_enabled + ON new_chat_threads(public_share_enabled) + WHERE public_share_enabled = TRUE; + """ + ) + + op.execute( + """ + ALTER TABLE new_chat_threads + ADD COLUMN IF NOT EXISTS cloned_from_thread_id INTEGER + REFERENCES new_chat_threads(id) ON DELETE SET NULL; + """ + ) + + op.execute( + """ + ALTER TABLE new_chat_threads + ADD COLUMN IF NOT EXISTS cloned_at TIMESTAMP WITH TIME ZONE; + """ + ) + + op.execute( + """ + ALTER TABLE new_chat_threads + ADD COLUMN IF NOT EXISTS needs_history_bootstrap BOOLEAN NOT NULL DEFAULT FALSE; + """ + ) + + op.execute( + """ + ALTER TABLE new_chat_threads + ADD COLUMN IF NOT EXISTS clone_pending BOOLEAN NOT NULL DEFAULT FALSE; + """ + ) + + op.execute( + """ + CREATE INDEX IF NOT EXISTS ix_new_chat_threads_cloned_from_thread_id + ON new_chat_threads(cloned_from_thread_id) + WHERE cloned_from_thread_id IS NOT NULL; + """ + ) + + +def downgrade() -> None: + """Remove public sharing and cloning columns from new_chat_threads.""" + + op.execute("DROP INDEX IF EXISTS ix_new_chat_threads_cloned_from_thread_id") + op.execute("ALTER TABLE new_chat_threads DROP COLUMN IF EXISTS clone_pending") + op.execute( + "ALTER TABLE new_chat_threads DROP COLUMN IF EXISTS needs_history_bootstrap" + ) + op.execute("ALTER TABLE new_chat_threads DROP COLUMN IF EXISTS cloned_at") + op.execute( + "ALTER TABLE new_chat_threads DROP COLUMN IF EXISTS cloned_from_thread_id" + ) + + op.execute("DROP INDEX IF EXISTS ix_new_chat_threads_public_share_enabled") + op.execute("DROP INDEX IF EXISTS ix_new_chat_threads_public_share_token") + op.execute( + "ALTER TABLE new_chat_threads DROP COLUMN IF EXISTS public_share_enabled" + ) + op.execute("ALTER TABLE new_chat_threads DROP COLUMN IF EXISTS public_share_token") diff --git a/surfsense_backend/alembic/versions/82_add_podcast_status_and_thread.py b/surfsense_backend/alembic/versions/82_add_podcast_status_and_thread.py new file mode 100644 index 000000000..fd4eed89f --- /dev/null +++ b/surfsense_backend/alembic/versions/82_add_podcast_status_and_thread.py @@ -0,0 +1,62 @@ +"""Add status and thread_id to podcasts + +Revision ID: 82 +Revises: 81 +Create Date: 2026-01-27 + +Adds status enum and thread_id FK to podcasts. +""" + +from collections.abc import Sequence + +from alembic import op + +revision: str = "82" +down_revision: str | None = "81" +branch_labels: str | Sequence[str] | None = None +depends_on: str | Sequence[str] | None = None + + +def upgrade() -> None: + op.execute( + """ + CREATE TYPE podcast_status AS ENUM ('pending', 'generating', 'ready', 'failed'); + """ + ) + + op.execute( + """ + ALTER TABLE podcasts + ADD COLUMN IF NOT EXISTS status podcast_status NOT NULL DEFAULT 'ready'; + """ + ) + + op.execute( + """ + ALTER TABLE podcasts + ADD COLUMN IF NOT EXISTS thread_id INTEGER + REFERENCES new_chat_threads(id) ON DELETE SET NULL; + """ + ) + + op.execute( + """ + CREATE INDEX IF NOT EXISTS ix_podcasts_thread_id + ON podcasts(thread_id); + """ + ) + + op.execute( + """ + CREATE INDEX IF NOT EXISTS ix_podcasts_status + ON podcasts(status); + """ + ) + + +def downgrade() -> None: + op.execute("DROP INDEX IF EXISTS ix_podcasts_status") + op.execute("DROP INDEX IF EXISTS ix_podcasts_thread_id") + op.execute("ALTER TABLE podcasts DROP COLUMN IF EXISTS thread_id") + op.execute("ALTER TABLE podcasts DROP COLUMN IF EXISTS status") + op.execute("DROP TYPE IF EXISTS podcast_status") diff --git a/surfsense_backend/alembic/versions/83_add_reddit_follow_incentive_task.py b/surfsense_backend/alembic/versions/83_add_reddit_follow_incentive_task.py new file mode 100644 index 000000000..52ab77446 --- /dev/null +++ b/surfsense_backend/alembic/versions/83_add_reddit_follow_incentive_task.py @@ -0,0 +1,33 @@ +"""Add REDDIT_FOLLOW to incentive task type enum + +Revision ID: 83 +Revises: 82 + +Changes: +1. Add REDDIT_FOLLOW value to incentivetasktype enum +""" + +from collections.abc import Sequence + +from alembic import op + +# revision identifiers, used by Alembic. +revision: str = "83" +down_revision: str | None = "82" +branch_labels: str | Sequence[str] | None = None +depends_on: str | Sequence[str] | None = None + + +def upgrade() -> None: + """Add REDDIT_FOLLOW to incentivetasktype enum.""" + op.execute("ALTER TYPE incentivetasktype ADD VALUE IF NOT EXISTS 'REDDIT_FOLLOW'") + + +def downgrade() -> None: + """Remove REDDIT_FOLLOW from incentivetasktype enum. + + Note: PostgreSQL doesn't support removing values from enums directly. + This would require recreating the enum type, which is complex and risky. + For safety, we leave the enum value in place during downgrade. + """ + pass diff --git a/surfsense_backend/app/agents/new_chat/chat_deepagent.py b/surfsense_backend/app/agents/new_chat/chat_deepagent.py index 53e1b14bd..fda22aec3 100644 --- a/surfsense_backend/app/agents/new_chat/chat_deepagent.py +++ b/surfsense_backend/app/agents/new_chat/chat_deepagent.py @@ -120,6 +120,7 @@ async def create_surfsense_deep_agent( connector_service: ConnectorService, checkpointer: Checkpointer, user_id: str | None = None, + thread_id: int | None = None, agent_config: AgentConfig | None = None, enabled_tools: list[str] | None = None, disabled_tools: list[str] | None = None, @@ -232,6 +233,7 @@ async def create_surfsense_deep_agent( "connector_service": connector_service, "firecrawl_api_key": firecrawl_api_key, "user_id": user_id, # Required for memory tools + "thread_id": thread_id, # For podcast tool # Dynamic connector/document type discovery for knowledge base tool "available_connectors": available_connectors, "available_document_types": available_document_types, diff --git a/surfsense_backend/app/agents/new_chat/tools/podcast.py b/surfsense_backend/app/agents/new_chat/tools/podcast.py index ff567bf73..1048ed881 100644 --- a/surfsense_backend/app/agents/new_chat/tools/podcast.py +++ b/surfsense_backend/app/agents/new_chat/tools/podcast.py @@ -18,6 +18,8 @@ import redis from langchain_core.tools import tool from sqlalchemy.ext.asyncio import AsyncSession +from app.db import Podcast, PodcastStatus + # Redis connection for tracking active podcast tasks # Uses the same Redis instance as Celery REDIS_URL = os.getenv("CELERY_BROKER_URL", "redis://localhost:6379/0") @@ -32,50 +34,46 @@ def get_redis_client() -> redis.Redis: return _redis_client -def get_active_podcast_key(search_space_id: int) -> str: - """Generate Redis key for tracking active podcast task.""" - return f"podcast:active:{search_space_id}" +def _redis_key(search_space_id: int) -> str: + return f"podcast:generating:{search_space_id}" -def get_active_podcast_task(search_space_id: int) -> str | None: - """Check if there's an active podcast task for this search space.""" +def get_generating_podcast_id(search_space_id: int) -> int | None: + """Get the podcast ID currently being generated for this search space.""" try: client = get_redis_client() - return client.get(get_active_podcast_key(search_space_id)) + value = client.get(_redis_key(search_space_id)) + return int(value) if value else None except Exception: - # If Redis is unavailable, allow the request (fail open) return None -def set_active_podcast_task(search_space_id: int, task_id: str) -> None: - """Mark a podcast task as active for this search space.""" +def set_generating_podcast(search_space_id: int, podcast_id: int) -> None: + """Mark a podcast as currently generating for this search space.""" try: client = get_redis_client() - # Set with 30-minute expiry as safety net (podcast should complete before this) - client.setex(get_active_podcast_key(search_space_id), 1800, task_id) + client.setex(_redis_key(search_space_id), 1800, str(podcast_id)) except Exception as e: - print(f"[generate_podcast] Warning: Could not set active task in Redis: {e}") - - -def clear_active_podcast_task(search_space_id: int) -> None: - """Clear the active podcast task for this search space.""" - try: - client = get_redis_client() - client.delete(get_active_podcast_key(search_space_id)) - except Exception as e: - print(f"[generate_podcast] Warning: Could not clear active task in Redis: {e}") + print( + f"[generate_podcast] Warning: Could not set generating podcast in Redis: {e}" + ) def create_generate_podcast_tool( search_space_id: int, db_session: AsyncSession, + thread_id: int | None = None, ): """ Factory function to create the generate_podcast tool with injected dependencies. + Pre-creates podcast record with pending status so podcast_id is available + immediately for frontend polling. + Args: search_space_id: The user's search space ID - db_session: Database session (not used - Celery creates its own) + db_session: Database session for creating the podcast record + thread_id: The chat thread ID for associating the podcast Returns: A configured tool function for generating podcasts @@ -98,76 +96,71 @@ def create_generate_podcast_tool( - "Make a podcast about..." - "Turn this into a podcast" - The tool will start generating a podcast in the background. - The podcast will be available once generation completes. - - IMPORTANT: Only one podcast can be generated at a time. If a podcast - is already being generated, this tool will return a message asking - the user to wait. - Args: source_content: The text content to convert into a podcast. - This can be a summary, research findings, or any text - the user wants transformed into an audio podcast. podcast_title: Title for the podcast (default: "SurfSense Podcast") user_prompt: Optional instructions for podcast style, tone, or format. - For example: "Make it casual and fun" or "Focus on the key insights" Returns: A dictionary containing: - - status: "processing" (task submitted), "already_generating", or "error" - - task_id: The Celery task ID for polling status (if processing) + - status: PodcastStatus value (pending, generating, or failed) + - podcast_id: The podcast ID for polling (when status is pending or generating) - title: The podcast title - - message: Status message for the user + - message: Status message (or "error" field if status is failed) """ try: - # Check if a podcast is already being generated for this search space - active_task_id = get_active_podcast_task(search_space_id) - if active_task_id: + generating_podcast_id = get_generating_podcast_id(search_space_id) + if generating_podcast_id: print( - f"[generate_podcast] Blocked duplicate request. Active task: {active_task_id}" + f"[generate_podcast] Blocked duplicate request. Generating podcast: {generating_podcast_id}" ) return { - "status": "already_generating", - "task_id": active_task_id, + "status": PodcastStatus.GENERATING.value, + "podcast_id": generating_podcast_id, "title": podcast_title, - "message": "A podcast is already being generated. Please wait for it to complete before requesting another one.", + "message": "A podcast is already being generated. Please wait for it to complete.", } - # Import Celery task here to avoid circular imports + podcast = Podcast( + title=podcast_title, + status=PodcastStatus.PENDING, + search_space_id=search_space_id, + thread_id=thread_id, + ) + db_session.add(podcast) + await db_session.commit() + await db_session.refresh(podcast) + from app.tasks.celery_tasks.podcast_tasks import ( generate_content_podcast_task, ) - # Submit Celery task for background processing task = generate_content_podcast_task.delay( + podcast_id=podcast.id, source_content=source_content, search_space_id=search_space_id, - podcast_title=podcast_title, user_prompt=user_prompt, ) - # Mark this task as active - set_active_podcast_task(search_space_id, task.id) + set_generating_podcast(search_space_id, podcast.id) - print(f"[generate_podcast] Submitted Celery task: {task.id}") + print(f"[generate_podcast] Created podcast {podcast.id}, task: {task.id}") - # Return immediately with task_id for polling return { - "status": "processing", - "task_id": task.id, + "status": PodcastStatus.PENDING.value, + "podcast_id": podcast.id, "title": podcast_title, "message": "Podcast generation started. This may take a few minutes.", } except Exception as e: error_message = str(e) - print(f"[generate_podcast] Error submitting task: {error_message}") + print(f"[generate_podcast] Error: {error_message}") return { - "status": "error", + "status": PodcastStatus.FAILED.value, "error": error_message, "title": podcast_title, - "task_id": None, + "podcast_id": None, } return generate_podcast diff --git a/surfsense_backend/app/agents/new_chat/tools/registry.py b/surfsense_backend/app/agents/new_chat/tools/registry.py index 968e51445..c65445419 100644 --- a/surfsense_backend/app/agents/new_chat/tools/registry.py +++ b/surfsense_backend/app/agents/new_chat/tools/registry.py @@ -107,8 +107,9 @@ BUILTIN_TOOLS: list[ToolDefinition] = [ factory=lambda deps: create_generate_podcast_tool( search_space_id=deps["search_space_id"], db_session=deps["db_session"], + thread_id=deps["thread_id"], ), - requires=["search_space_id", "db_session"], + requires=["search_space_id", "db_session", "thread_id"], ), # Link preview tool - fetches Open Graph metadata for URLs ToolDefinition( diff --git a/surfsense_backend/app/connectors/composio_google_drive_connector.py b/surfsense_backend/app/connectors/composio_google_drive_connector.py index e3b988676..5b8c4b993 100644 --- a/surfsense_backend/app/connectors/composio_google_drive_connector.py +++ b/surfsense_backend/app/connectors/composio_google_drive_connector.py @@ -4,6 +4,8 @@ Composio Google Drive Connector Module. Provides Google Drive specific methods for data retrieval and indexing via Composio. """ +import hashlib +import json import logging import os import tempfile @@ -464,6 +466,55 @@ async def check_document_by_unique_identifier( return existing_doc_result.scalars().first() +async def check_document_by_content_hash( + session: AsyncSession, content_hash: str +) -> Document | None: + """Check if a document with the given content hash already exists. + + This is used to prevent duplicate content from being indexed, regardless + of which connector originally indexed it. + """ + from sqlalchemy.future import select + + existing_doc_result = await session.execute( + select(Document).where(Document.content_hash == content_hash) + ) + return existing_doc_result.scalars().first() + + +async def check_document_by_google_drive_file_id( + session: AsyncSession, file_id: str, search_space_id: int +) -> Document | None: + """Check if a document with this Google Drive file ID exists (from any connector). + + This checks both metadata key formats: + - 'google_drive_file_id' (normal Google Drive connector) + - 'file_id' (Composio Google Drive connector) + + This allows detecting duplicates BEFORE downloading/ETL, saving expensive API calls. + """ + from sqlalchemy import String, cast, or_ + from sqlalchemy.future import select + + # When casting JSON to String, the result includes quotes: "value" instead of value + # So we need to compare with the quoted version + quoted_file_id = f'"{file_id}"' + + existing_doc_result = await session.execute( + select(Document).where( + Document.search_space_id == search_space_id, + or_( + # Normal Google Drive connector format + cast(Document.document_metadata["google_drive_file_id"], String) + == quoted_file_id, + # Composio Google Drive connector format + cast(Document.document_metadata["file_id"], String) == quoted_file_id, + ), + ) + ) + return existing_doc_result.scalars().first() + + async def update_connector_last_indexed( session: AsyncSession, connector, @@ -477,6 +528,33 @@ async def update_connector_last_indexed( logger.info(f"Updated last_indexed_at to {connector.last_indexed_at}") +def generate_indexing_settings_hash( + selected_folders: list[dict], + selected_files: list[dict], + indexing_options: dict, +) -> str: + """Generate a hash of indexing settings to detect configuration changes. + + This hash is used to determine if indexing settings have changed since + the last index, which would require a full re-scan instead of delta sync. + + Args: + selected_folders: List of {id, name} for folders to index + selected_files: List of {id, name} for individual files to index + indexing_options: Dict with max_files_per_folder, include_subfolders, etc. + + Returns: + MD5 hash string of the settings + """ + settings = { + "folders": sorted([f.get("id", "") for f in selected_folders]), + "files": sorted([f.get("id", "") for f in selected_files]), + "include_subfolders": indexing_options.get("include_subfolders", True), + "max_files_per_folder": indexing_options.get("max_files_per_folder", 100), + } + return hashlib.md5(json.dumps(settings, sort_keys=True).encode()).hexdigest() + + async def index_composio_google_drive( session: AsyncSession, connector, @@ -487,12 +565,16 @@ async def index_composio_google_drive( log_entry, update_last_indexed: bool = True, max_items: int = 1000, -) -> tuple[int, str]: +) -> tuple[int, int, str | None]: """Index Google Drive files via Composio with delta sync support. + Returns: + Tuple of (documents_indexed, documents_skipped, error_message or None) + Delta Sync Flow: 1. First sync: Full scan + get initial page token 2. Subsequent syncs: Use LIST_CHANGES to process only changed files + (unless settings changed or incremental_sync is disabled) Supports folder/file selection via connector config: - selected_folders: List of {id, name} for folders to index @@ -508,12 +590,42 @@ async def index_composio_google_drive( selected_files = connector_config.get("selected_files", []) indexing_options = connector_config.get("indexing_options", {}) - # Check for stored page token for delta sync - stored_page_token = connector_config.get("drive_page_token") - use_delta_sync = stored_page_token and connector.last_indexed_at - max_files_per_folder = indexing_options.get("max_files_per_folder", 100) include_subfolders = indexing_options.get("include_subfolders", True) + incremental_sync = indexing_options.get("incremental_sync", True) + + # Generate current settings hash to detect configuration changes + current_settings_hash = generate_indexing_settings_hash( + selected_folders, selected_files, indexing_options + ) + last_settings_hash = connector_config.get("last_indexed_settings_hash") + + # Detect if settings changed since last index + settings_changed = ( + last_settings_hash is not None + and current_settings_hash != last_settings_hash + ) + + if settings_changed: + logger.info( + f"Indexing settings changed for connector {connector_id}. " + f"Will perform full re-scan to apply new configuration." + ) + + # Check for stored page token for delta sync + stored_page_token = connector_config.get("drive_page_token") + + # Determine whether to use delta sync: + # - Must have a stored page token + # - Must have been indexed before (last_indexed_at exists) + # - User must have incremental_sync enabled + # - Settings must not have changed (folder/subfolder config) + use_delta_sync = ( + incremental_sync + and stored_page_token + and connector.last_indexed_at + and not settings_changed + ) # Route to delta sync or full scan if use_delta_sync: @@ -588,6 +700,14 @@ async def index_composio_google_drive( elif token_error: logger.warning(f"Failed to get new page token: {token_error}") + # Save current settings hash for future change detection + # This allows detecting when folder/subfolder settings change + if not connector.config: + connector.config = {} + connector.config["last_indexed_settings_hash"] = current_settings_hash + flag_modified(connector, "config") + logger.info(f"Saved indexing settings hash for connector {connector_id}") + # CRITICAL: Always update timestamp so Electric SQL syncs and UI shows indexed status await update_connector_last_indexed(session, connector, update_last_indexed) @@ -628,11 +748,11 @@ async def index_composio_google_drive( }, ) - return documents_indexed, error_message + return documents_indexed, documents_skipped, error_message except Exception as e: logger.error(f"Failed to index Google Drive via Composio: {e!s}", exc_info=True) - return 0, f"Failed to index Google Drive via Composio: {e!s}" + return 0, 0, f"Failed to index Google Drive via Composio: {e!s}" async def _index_composio_drive_delta_sync( @@ -953,13 +1073,28 @@ async def _process_single_drive_file( """ processing_errors = [] + # ========== EARLY DUPLICATE CHECK BY FILE ID ========== + # Check if this Google Drive file was already indexed by ANY connector + # This happens BEFORE download/ETL to save expensive API calls + existing_by_file_id = await check_document_by_google_drive_file_id( + session, file_id, search_space_id + ) + if existing_by_file_id: + logger.info( + f"Skipping file {file_name} (file_id={file_id}): already indexed " + f"by {existing_by_file_id.document_type.value} as '{existing_by_file_id.title}' " + f"(saved download & ETL cost)" + ) + return 0, 1, processing_errors # Skip - NO download, NO ETL! + # ====================================================== + # Generate unique identifier hash document_type = DocumentType(TOOLKIT_TO_DOCUMENT_TYPE["googledrive"]) unique_identifier_hash = generate_unique_identifier_hash( document_type, f"drive_{file_id}", search_space_id ) - # Check if document exists + # Check if document exists by unique identifier (same connector, same file) existing_document = await check_document_by_unique_identifier( session, unique_identifier_hash ) @@ -1000,7 +1135,7 @@ async def _process_single_drive_file( if existing_document: if existing_document.content_hash == content_hash: - return 0, 1, processing_errors # Skipped + return 0, 1, processing_errors # Skipped - unchanged # Update existing document user_llm = await get_user_long_context_llm(session, user_id, search_space_id) @@ -1039,7 +1174,19 @@ async def _process_single_drive_file( existing_document.chunks = chunks existing_document.updated_at = get_current_timestamp() - return 1, 0, processing_errors # Indexed + return 1, 0, processing_errors # Indexed - updated + + # Check if content_hash already exists (from any connector) + # This prevents duplicate content and avoids IntegrityError on unique constraint + existing_by_content_hash = await check_document_by_content_hash( + session, content_hash + ) + if existing_by_content_hash: + logger.info( + f"Skipping file {file_name} (file_id={file_id}): identical content " + f"already indexed as '{existing_by_content_hash.title}'" + ) + return 0, 1, processing_errors # Skipped - duplicate content # Create new document user_llm = await get_user_long_context_llm(session, user_id, search_space_id) @@ -1085,7 +1232,7 @@ async def _process_single_drive_file( ) session.add(document) - return 1, 0, processing_errors # Indexed + return 1, 0, processing_errors # Indexed - new async def _fetch_folder_files_recursively( diff --git a/surfsense_backend/app/connectors/notion_history.py b/surfsense_backend/app/connectors/notion_history.py index e38218a6e..ff8478905 100644 --- a/surfsense_backend/app/connectors/notion_history.py +++ b/surfsense_backend/app/connectors/notion_history.py @@ -1,6 +1,10 @@ +import asyncio import logging +from collections.abc import Awaitable, Callable +from typing import Any, TypeVar from notion_client import AsyncClient +from notion_client.errors import APIResponseError from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.future import select @@ -12,6 +16,43 @@ from app.utils.oauth_security import TokenEncryption logger = logging.getLogger(__name__) +# Type variable for generic return type +T = TypeVar("T") + +# ============================================================================ +# Retry Configuration (per Notion API docs) +# https://developers.notion.com/reference/request-limits +# https://developers.notion.com/reference/status-codes +# ============================================================================ +MAX_RETRIES = 5 +BASE_RETRY_DELAY = 1.0 # seconds +MAX_RETRY_DELAY = 60.0 # seconds (Notion's max request timeout) + +# Type alias for retry callback function +# Signature: async callback(retry_reason, attempt, max_attempts, wait_seconds) -> None +# retry_reason: 'rate_limit', 'server_error', 'timeout' +# This callback can be used to update notifications during retries +RetryCallbackType = Callable[[str, int, int, float], Awaitable[None]] + +# HTTP status codes that should trigger a retry +# 429: rate_limited - Use Retry-After header +# 500: internal_server_error - Unexpected error +# 502: bad_gateway - Failed upstream connection +# 503: service_unavailable - Notion unavailable or timeout +# 504: gateway_timeout - Notion timed out +RETRYABLE_STATUS_CODES = frozenset({429, 500, 502, 503, 504}) + +# Known unsupported block types that Notion API doesn't expose +# These will be skipped gracefully instead of failing the entire sync +UNSUPPORTED_BLOCK_TYPE_ERRORS = [ + "transcription is not supported", + "ai_block is not supported", + "is not supported via the API", +] + +# Known unsupported block types to check before API calls +UNSUPPORTED_BLOCK_TYPES = ["transcription", "ai_block"] + class NotionHistoryConnector: def __init__( @@ -32,6 +73,28 @@ class NotionHistoryConnector: self._connector_id = connector_id self._credentials = credentials self._notion_client: AsyncClient | None = None + # Track pages with skipped unsupported content (for user notifications) + self._pages_with_skipped_content: list[str] = [] + # Optional callback to notify about retry progress (for user notifications) + self._on_retry_callback: RetryCallbackType | None = None + # Track if using legacy integration token (for upgrade notification) + self._using_legacy_token: bool = False + + def set_retry_callback(self, callback: RetryCallbackType | None) -> None: + """ + Set a callback function to be called when API calls are retried. + + This allows the indexer to receive notifications about rate limits + and other transient errors, which can be used to update user-facing + notifications. + + Args: + callback: Async function with signature: + callback(retry_reason, attempt, max_attempts, wait_seconds) -> None + retry_reason: 'rate_limit', 'server_error', or 'timeout' + Set to None to disable callbacks. + """ + self._on_retry_callback = callback async def _get_valid_token(self) -> str: """ @@ -58,6 +121,18 @@ class NotionHistoryConnector: config_data = connector.config.copy() + # Check for legacy integration token format first + # (for connectors created before OAuth was implemented) + legacy_token = config_data.get("NOTION_INTEGRATION_TOKEN") + raw_access_token = config_data.get("access_token") + + # Validate that we have some form of token + if not raw_access_token and not legacy_token: + raise ValueError( + "Notion integration not properly connected. " + "Please remove and re-add the Notion connector." + ) + # Decrypt credentials if they are encrypted token_encrypted = config_data.get("_token_encrypted", False) if token_encrypted and config.SECRET_KEY: @@ -82,13 +157,40 @@ class NotionHistoryConnector: f"Failed to decrypt Notion credentials for connector {self._connector_id}: {e!s}" ) raise ValueError( - f"Failed to decrypt Notion credentials: {e!s}" + "Notion credentials could not be decrypted. " + "Please remove and re-add the Notion connector." ) from e + # Handle legacy format: convert NOTION_INTEGRATION_TOKEN to access_token + if not config_data.get("access_token") and legacy_token: + config_data["access_token"] = legacy_token + self._using_legacy_token = True + logger.info( + f"Using legacy NOTION_INTEGRATION_TOKEN for connector {self._connector_id}" + ) + + # Final validation: ensure we have a valid access_token after all processing + final_token = config_data.get("access_token") + if not final_token or ( + isinstance(final_token, str) and not final_token.strip() + ): + raise ValueError( + "Notion access token is invalid or empty. " + "Please remove and re-add the Notion connector." + ) + try: self._credentials = NotionAuthCredentialsBase.from_dict(config_data) + except KeyError as e: + raise ValueError( + f"Notion credentials are incomplete (missing {e}). " + "Please reconnect your Notion account." + ) from e except Exception as e: - raise ValueError(f"Invalid Notion credentials: {e!s}") from e + raise ValueError( + f"Notion credentials format error: {e!s}. " + "Please reconnect your Notion account." + ) from e # Check if token is expired and refreshable if self._credentials.is_expired and self._credentials.is_refreshable: @@ -157,12 +259,161 @@ class NotionHistoryConnector: self._notion_client = AsyncClient(auth=token) return self._notion_client + async def _api_call_with_retry( + self, + api_func: Callable[..., Awaitable[T]], + *args: Any, + on_retry: RetryCallbackType | None = None, + **kwargs: Any, + ) -> T: + """ + Execute Notion API call with retry logic and exponential backoff. + + Handles retryable errors per Notion API documentation: + - 429 rate_limited: Uses Retry-After header value + - 500 internal_server_error: Retries with exponential backoff + - 502 bad_gateway: Retries with exponential backoff + - 503 service_unavailable: Retries with exponential backoff + - 504 gateway_timeout: Retries with exponential backoff + + Args: + api_func: The async Notion API function to call + *args: Positional arguments to pass to the API function + on_retry: Optional callback to notify about retry progress. + Signature: async callback(retry_reason, attempt, max_attempts, wait_seconds) + retry_reason is one of: 'rate_limit', 'server_error', 'timeout' + **kwargs: Keyword arguments to pass to the API function + + Returns: + The result from the API call + + Raises: + APIResponseError: If all retries are exhausted or error is not retryable + """ + last_exception: APIResponseError | None = None + retry_delay = BASE_RETRY_DELAY + + for attempt in range(MAX_RETRIES): + try: + return await api_func(*args, **kwargs) + + except APIResponseError as e: + last_exception = e + + # Check if this error is retryable + if e.status not in RETRYABLE_STATUS_CODES: + # Not retryable (e.g., 400, 401, 403, 404) - raise immediately + raise + + # Check if we've exhausted retries + if attempt == MAX_RETRIES - 1: + logger.error( + f"Notion API call failed after {MAX_RETRIES} retries. " + f"Last error: {e.status} {e.code}" + ) + raise + + # Determine retry reason and wait time based on status code + if e.status == 429: + # Rate limited - use Retry-After header if available + retry_reason = "rate_limit" + retry_after = e.headers.get("Retry-After") if e.headers else None + if retry_after: + try: + wait_time = float(retry_after) + except (ValueError, TypeError): + wait_time = retry_delay + else: + wait_time = retry_delay + logger.warning( + f"Notion API rate limited (429). " + f"Waiting {wait_time}s. Attempt {attempt + 1}/{MAX_RETRIES}" + ) + elif e.status == 504: + # Gateway timeout + retry_reason = "timeout" + wait_time = min(retry_delay, MAX_RETRY_DELAY) + logger.warning( + f"Notion API timeout ({e.status}). " + f"Retrying in {wait_time}s. Attempt {attempt + 1}/{MAX_RETRIES}" + ) + else: + # Server error (500/502/503) - use exponential backoff + retry_reason = "server_error" + wait_time = min(retry_delay, MAX_RETRY_DELAY) + logger.warning( + f"Notion API error {e.status} ({e.code}). " + f"Retrying in {wait_time}s. Attempt {attempt + 1}/{MAX_RETRIES}" + ) + + # Notify about retry via callback (for user notifications) + # Call before sleeping so user sees the message while we wait + if on_retry: + try: + await on_retry( + retry_reason, + attempt + 1, # 1-based for display + MAX_RETRIES, + wait_time, + ) + except Exception as callback_error: + # Don't let callback errors break the retry logic + logger.warning(f"Retry callback failed: {callback_error}") + + # Wait before retrying + await asyncio.sleep(wait_time) + + # Exponential backoff for next attempt + retry_delay = min(retry_delay * 2, MAX_RETRY_DELAY) + + # This should not be reached, but just in case + if last_exception: + raise last_exception + raise RuntimeError("Unexpected state in retry logic") + async def close(self): """Close the async client connection.""" if self._notion_client: await self._notion_client.aclose() self._notion_client = None + def get_pages_with_skipped_content(self) -> list[str]: + """ + Get list of page titles that had unsupported content skipped. + + Returns: + List of page titles with skipped content + """ + return self._pages_with_skipped_content + + def get_skipped_content_count(self) -> int: + """ + Get count of pages that had unsupported content skipped. + + Returns: + Number of pages with skipped content + """ + return len(self._pages_with_skipped_content) + + def is_using_legacy_token(self) -> bool: + """ + Check if connector is using legacy integration token format. + + Returns: + True if using legacy NOTION_INTEGRATION_TOKEN, False if using OAuth + """ + return self._using_legacy_token + + def _record_skipped_content(self, page_title: str): + """ + Record that a page had unsupported content skipped. + + Args: + page_title: Title of the page with skipped content + """ + if page_title not in self._pages_with_skipped_content: + self._pages_with_skipped_content.append(page_title) + async def __aenter__(self): """Async context manager entry.""" return self @@ -186,7 +437,7 @@ class NotionHistoryConnector: # Build the filter for the search # Note: Notion API requires specific filter structure - search_params = {} + search_params: dict[str, Any] = {} # Filter for pages only (not databases) search_params["filter"] = {"value": "page", "property": "object"} @@ -214,29 +465,53 @@ class NotionHistoryConnector: cursor = None while has_more: - if cursor: - search_params["start_cursor"] = cursor + try: + if cursor: + search_params["start_cursor"] = cursor - search_results = await notion.search(**search_params) + # Use retry wrapper for search API call + search_results = await self._api_call_with_retry( + notion.search, on_retry=self._on_retry_callback, **search_params + ) - pages.extend(search_results["results"]) - has_more = search_results.get("has_more", False) + pages.extend(search_results["results"]) + has_more = search_results.get("has_more", False) - if has_more: - cursor = search_results.get("next_cursor") + if has_more: + cursor = search_results.get("next_cursor") + + except APIResponseError as e: + error_message = str(e) + # Handle invalid cursor - stop pagination gracefully + if "start_cursor provided is invalid" in error_message: + logger.warning( + f"Invalid pagination cursor encountered. " + f"Continuing with {len(pages)} pages already fetched." + ) + has_more = False + continue + # Re-raise other errors + raise all_page_data = [] for page in pages: page_id = page["id"] + page_title = self.get_page_title(page) - # Get detailed page information - page_content = await self.get_page_content(page_id) + # Get detailed page information (pass title for skip tracking) + page_content, had_skipped_content = await self.get_page_content( + page_id, page_title + ) + + # Record if this page had skipped content + if had_skipped_content: + self._record_skipped_content(page_title) all_page_data.append( { "page_id": page_id, - "title": self.get_page_title(page), + "title": page_title, "content": page_content, } ) @@ -265,46 +540,91 @@ class NotionHistoryConnector: # If no title found, return the page ID as fallback return f"Untitled page ({page['id']})" - async def get_page_content(self, page_id): + async def get_page_content( + self, page_id: str, page_title: str | None = None + ) -> tuple[list, bool]: """ Fetches the content (blocks) of a specific page. Args: page_id (str): The ID of the page to fetch + page_title (str, optional): Title of the page (for logging) Returns: - list: List of processed blocks from the page + tuple: (List of processed blocks, bool indicating if content was skipped) """ notion = await self._get_client() blocks = [] has_more = True cursor = None + skipped_blocks_count = 0 + had_skipped_content = False # Paginate through all blocks while has_more: - if cursor: - response = await notion.blocks.children.list( - block_id=page_id, start_cursor=cursor - ) - else: - response = await notion.blocks.children.list(block_id=page_id) + try: + # Use retry wrapper for blocks.children.list API call + if cursor: + response = await self._api_call_with_retry( + notion.blocks.children.list, + on_retry=self._on_retry_callback, + block_id=page_id, + start_cursor=cursor, + ) + else: + response = await self._api_call_with_retry( + notion.blocks.children.list, + on_retry=self._on_retry_callback, + block_id=page_id, + ) - blocks.extend(response["results"]) - has_more = response["has_more"] + blocks.extend(response["results"]) + has_more = response["has_more"] - if has_more: - cursor = response["next_cursor"] + if has_more: + cursor = response["next_cursor"] + + except APIResponseError as e: + error_message = str(e) + # Check if this is an unsupported block type error + if any(err in error_message for err in UNSUPPORTED_BLOCK_TYPE_ERRORS): + logger.warning( + f"Skipping page blocks due to unsupported block type in page {page_id}: {error_message}" + ) + skipped_blocks_count += 1 + had_skipped_content = True + # If we haven't fetched any blocks yet, return empty + # If we have some blocks, continue with what we have + has_more = False + continue + elif "Could not find block" in error_message: + logger.warning( + f"Block not found in page {page_id}, continuing with available blocks: {error_message}" + ) + has_more = False + continue + # Re-raise other API errors (after retry exhaustion) + raise + + if skipped_blocks_count > 0: + logger.info( + f"Page {page_id}: Skipped {skipped_blocks_count} unsupported block sections, " + f"successfully processed {len(blocks)} blocks" + ) # Process nested blocks recursively processed_blocks = [] for block in blocks: - processed_block = await self.process_block(block) - processed_blocks.append(processed_block) + processed_block, block_had_skips = await self.process_block(block) + if processed_block: # Only add if block was processed successfully + processed_blocks.append(processed_block) + if block_had_skips: + had_skipped_content = True - return processed_blocks + return processed_blocks, had_skipped_content - async def process_block(self, block): + async def process_block(self, block) -> tuple[dict | None, bool]: """ Processes a block and recursively fetches any child blocks. @@ -312,12 +632,28 @@ class NotionHistoryConnector: block (dict): The block to process Returns: - dict: Processed block with content and children + tuple: (Processed block dict or None, bool indicating if content was skipped) """ notion = await self._get_client() block_id = block["id"] block_type = block["type"] + had_skipped_content = False + + # Check if this is a known unsupported block type before processing + if block_type in UNSUPPORTED_BLOCK_TYPES: + logger.debug( + f"Skipping unsupported block type: {block_type} (block_id: {block_id})" + ) + return ( + { + "id": block_id, + "type": block_type, + "content": f"[{block_type} block - not supported by Notion API]", + "children": [], + }, + True, # Content was skipped + ) # Extract block content based on its type content = self.extract_block_content(block) @@ -327,17 +663,48 @@ class NotionHistoryConnector: child_blocks = [] if has_children: - # Fetch and process child blocks - children_response = await notion.blocks.children.list(block_id=block_id) - for child_block in children_response["results"]: - child_blocks.append(await self.process_block(child_block)) + try: + # Use retry wrapper for blocks.children.list API call + children_response = await self._api_call_with_retry( + notion.blocks.children.list, + on_retry=self._on_retry_callback, + block_id=block_id, + ) + for child_block in children_response["results"]: + processed_child, child_had_skips = await self.process_block( + child_block + ) + if processed_child: + child_blocks.append(processed_child) + if child_had_skips: + had_skipped_content = True + except APIResponseError as e: + error_message = str(e) + # Check if this is an unsupported block type error + if any(err in error_message for err in UNSUPPORTED_BLOCK_TYPE_ERRORS): + logger.warning( + f"Skipping children of block {block_id} due to unsupported block type: {error_message}" + ) + had_skipped_content = True + # Continue without children instead of failing + elif "Could not find block" in error_message: + logger.warning( + f"Block {block_id} children not accessible, skipping: {error_message}" + ) + # Continue without children + else: + # Re-raise other API errors (after retry exhaustion) + raise - return { - "id": block_id, - "type": block_type, - "content": content, - "children": child_blocks, - } + return ( + { + "id": block_id, + "type": block_type, + "content": content, + "children": child_blocks, + }, + had_skipped_content, + ) def extract_block_content(self, block): """ diff --git a/surfsense_backend/app/db.py b/surfsense_backend/app/db.py index 8a9507e1b..876bc1d3c 100644 --- a/surfsense_backend/app/db.py +++ b/surfsense_backend/app/db.py @@ -93,6 +93,13 @@ class SearchSourceConnectorType(str, Enum): COMPOSIO_GOOGLE_CALENDAR_CONNECTOR = "COMPOSIO_GOOGLE_CALENDAR_CONNECTOR" +class PodcastStatus(str, Enum): + PENDING = "pending" + GENERATING = "generating" + READY = "ready" + FAILED = "failed" + + class LiteLLMProvider(str, Enum): """ Enum for LLM providers supported by LiteLLM. @@ -156,6 +163,7 @@ class IncentiveTaskType(str, Enum): """ GITHUB_STAR = "GITHUB_STAR" + REDDIT_FOLLOW = "REDDIT_FOLLOW" # Future tasks can be added here: # GITHUB_ISSUE = "GITHUB_ISSUE" # SOCIAL_SHARE = "SOCIAL_SHARE" @@ -171,6 +179,12 @@ INCENTIVE_TASKS_CONFIG = { "pages_reward": 100, "action_url": "https://github.com/MODSetter/SurfSense", }, + IncentiveTaskType.REDDIT_FOLLOW: { + "title": "Join our Subreddit", + "description": "Join the SurfSense community on Reddit", + "pages_reward": 100, + "action_url": "https://www.reddit.com/r/SurfSense/", + }, # Future tasks can be configured here: # IncentiveTaskType.GITHUB_ISSUE: { # "title": "Create an issue", @@ -397,6 +411,47 @@ class NewChatThread(BaseModel, TimestampMixin): index=True, ) + # Public sharing - cryptographic token for public URL access + public_share_token = Column( + String(64), + nullable=True, + unique=True, + index=True, + ) + # Whether public sharing is currently enabled for this thread + public_share_enabled = Column( + Boolean, + nullable=False, + default=False, + server_default="false", + ) + + # Clone tracking - for audit and history bootstrap + cloned_from_thread_id = Column( + Integer, + ForeignKey("new_chat_threads.id", ondelete="SET NULL"), + nullable=True, + index=True, + ) + cloned_at = Column( + TIMESTAMP(timezone=True), + nullable=True, + ) + # Flag to bootstrap LangGraph checkpointer with DB messages on first message + needs_history_bootstrap = Column( + Boolean, + nullable=False, + default=False, + server_default="false", + ) + # Flag indicating content clone is pending (two-phase clone) + clone_pending = Column( + Boolean, + nullable=False, + default=False, + server_default="false", + ) + # Relationships search_space = relationship("SearchSpace", back_populates="new_chat_threads") created_by = relationship("User", back_populates="new_chat_threads") @@ -709,14 +764,34 @@ class Podcast(BaseModel, TimestampMixin): __tablename__ = "podcasts" title = Column(String(500), nullable=False) - podcast_transcript = Column(JSONB, nullable=True) # List of transcript entries - file_location = Column(Text, nullable=True) # Path to the audio file + podcast_transcript = Column(JSONB, nullable=True) + file_location = Column(Text, nullable=True) + status = Column( + SQLAlchemyEnum( + PodcastStatus, + name="podcast_status", + create_type=False, + values_callable=lambda x: [e.value for e in x], + ), + nullable=False, + default=PodcastStatus.READY, + server_default="ready", + index=True, + ) search_space_id = Column( Integer, ForeignKey("searchspaces.id", ondelete="CASCADE"), nullable=False ) search_space = relationship("SearchSpace", back_populates="podcasts") + thread_id = Column( + Integer, + ForeignKey("new_chat_threads.id", ondelete="SET NULL"), + nullable=True, + index=True, + ) + thread = relationship("NewChatThread") + class SearchSpace(BaseModel, TimestampMixin): __tablename__ = "searchspaces" diff --git a/surfsense_backend/app/routes/__init__.py b/surfsense_backend/app/routes/__init__.py index 753105c46..746c18c6d 100644 --- a/surfsense_backend/app/routes/__init__.py +++ b/surfsense_backend/app/routes/__init__.py @@ -31,6 +31,7 @@ from .notes_routes import router as notes_router from .notifications_routes import router as notifications_router from .notion_add_connector_route import router as notion_add_connector_router from .podcasts_routes import router as podcasts_router +from .public_chat_routes import router as public_chat_router from .rbac_routes import router as rbac_router from .search_source_connectors_routes import router as search_source_connectors_router from .search_spaces_routes import router as search_spaces_router @@ -68,4 +69,5 @@ router.include_router(circleback_webhook_router) # Circleback meeting webhooks router.include_router(surfsense_docs_router) # Surfsense documentation for citations router.include_router(notifications_router) # Notifications with Electric SQL sync router.include_router(composio_router) # Composio OAuth and toolkit management +router.include_router(public_chat_router) # Public chat sharing and cloning router.include_router(incentive_tasks_router) # Incentive tasks for earning free pages diff --git a/surfsense_backend/app/routes/new_chat_routes.py b/surfsense_backend/app/routes/new_chat_routes.py index 7631ec7eb..38352d348 100644 --- a/surfsense_backend/app/routes/new_chat_routes.py +++ b/surfsense_backend/app/routes/new_chat_routes.py @@ -37,6 +37,7 @@ from app.db import ( get_async_session, ) from app.schemas.new_chat import ( + CompleteCloneResponse, NewChatMessageAppend, NewChatMessageRead, NewChatRequest, @@ -45,11 +46,14 @@ from app.schemas.new_chat import ( NewChatThreadUpdate, NewChatThreadVisibilityUpdate, NewChatThreadWithMessages, + PublicShareToggleRequest, + PublicShareToggleResponse, RegenerateRequest, ThreadHistoryLoadResponse, ThreadListItem, ThreadListResponse, ) +from app.services.public_chat_service import toggle_public_share from app.tasks.chat.stream_new_chat import stream_new_chat from app.users import current_active_user from app.utils.rbac import check_permission @@ -215,6 +219,7 @@ async def list_threads( visibility=thread.visibility, created_by_id=thread.created_by_id, is_own_thread=is_own_thread, + public_share_enabled=thread.public_share_enabled, created_at=thread.created_at, updated_at=thread.updated_at, ) @@ -316,6 +321,7 @@ async def search_threads( thread.created_by_id == user.id or (thread.created_by_id is None and is_search_space_owner) ), + public_share_enabled=thread.public_share_enabled, created_at=thread.created_at, updated_at=thread.updated_at, ) @@ -664,6 +670,66 @@ async def delete_thread( ) from None +@router.post( + "/threads/{thread_id}/complete-clone", response_model=CompleteCloneResponse +) +async def complete_clone( + thread_id: int, + session: AsyncSession = Depends(get_async_session), + user: User = Depends(current_active_user), +): + """ + Complete the cloning process for a thread. + + Copies messages and podcasts from the source thread. + Sets clone_pending=False and needs_history_bootstrap=True when done. + + Requires authentication and ownership of the thread. + """ + from app.services.public_chat_service import complete_clone_content + + try: + result = await session.execute( + select(NewChatThread).filter(NewChatThread.id == thread_id) + ) + thread = result.scalars().first() + + if not thread: + raise HTTPException(status_code=404, detail="Thread not found") + + if thread.created_by_id != user.id: + raise HTTPException(status_code=403, detail="Not authorized") + + if not thread.clone_pending: + raise HTTPException(status_code=400, detail="Clone already completed") + + if not thread.cloned_from_thread_id: + raise HTTPException( + status_code=400, detail="No source thread to clone from" + ) + + message_count = await complete_clone_content( + session=session, + target_thread=thread, + source_thread_id=thread.cloned_from_thread_id, + target_search_space_id=thread.search_space_id, + ) + + return CompleteCloneResponse( + status="success", + message_count=message_count, + ) + + except HTTPException: + raise + except Exception as e: + await session.rollback() + raise HTTPException( + status_code=500, + detail=f"An unexpected error occurred while completing clone: {e!s}", + ) from None + + @router.patch("/threads/{thread_id}/visibility", response_model=NewChatThreadRead) async def update_thread_visibility( thread_id: int, @@ -729,6 +795,32 @@ async def update_thread_visibility( ) from None +@router.patch( + "/threads/{thread_id}/public-share", response_model=PublicShareToggleResponse +) +async def update_thread_public_share( + thread_id: int, + request: Request, + toggle_request: PublicShareToggleRequest, + session: AsyncSession = Depends(get_async_session), + user: User = Depends(current_active_user), +): + """ + Enable or disable public sharing for a thread. + + Only the creator of the thread can manage public sharing. + When enabled, returns a public URL that anyone can use to view the chat. + """ + base_url = str(request.base_url).rstrip("/") + return await toggle_public_share( + session=session, + thread_id=thread_id, + enabled=toggle_request.enabled, + user=user, + base_url=base_url, + ) + + # ============================================================================= # Message Endpoints # ============================================================================= @@ -996,6 +1088,7 @@ async def handle_new_chat( attachments=request.attachments, mentioned_document_ids=request.mentioned_document_ids, mentioned_surfsense_doc_ids=request.mentioned_surfsense_doc_ids, + needs_history_bootstrap=thread.needs_history_bootstrap, ), media_type="text/event-stream", headers={ @@ -1223,6 +1316,7 @@ async def regenerate_response( mentioned_document_ids=request.mentioned_document_ids, mentioned_surfsense_doc_ids=request.mentioned_surfsense_doc_ids, checkpoint_id=target_checkpoint_id, + needs_history_bootstrap=thread.needs_history_bootstrap, ): yield chunk # If we get here, streaming completed successfully diff --git a/surfsense_backend/app/routes/notifications_routes.py b/surfsense_backend/app/routes/notifications_routes.py index 6bc945643..84591f001 100644 --- a/surfsense_backend/app/routes/notifications_routes.py +++ b/surfsense_backend/app/routes/notifications_routes.py @@ -6,6 +6,7 @@ For older items (beyond the sync window), use the list endpoint. """ from datetime import UTC, datetime, timedelta +from typing import Literal from fastapi import APIRouter, Depends, HTTPException, Query, status from pydantic import BaseModel @@ -20,6 +21,9 @@ router = APIRouter(prefix="/notifications", tags=["notifications"]) # Must match frontend SYNC_WINDOW_DAYS in use-inbox.ts SYNC_WINDOW_DAYS = 14 +# Valid notification types - must match frontend InboxItemTypeEnum +NotificationType = Literal["connector_indexing", "document_processing", "new_mention"] + class NotificationResponse(BaseModel): """Response model for a single notification.""" @@ -73,6 +77,9 @@ class UnreadCountResponse(BaseModel): @router.get("/unread-count", response_model=UnreadCountResponse) async def get_unread_count( search_space_id: int | None = Query(None, description="Filter by search space ID"), + type_filter: NotificationType | None = Query( + None, alias="type", description="Filter by notification type" + ), user: User = Depends(current_active_user), session: AsyncSession = Depends(get_async_session), ) -> UnreadCountResponse: @@ -103,6 +110,10 @@ async def get_unread_count( | (Notification.search_space_id.is_(None)) ) + # Filter by notification type if provided + if type_filter: + base_filter.append(Notification.type == type_filter) + # Total unread count (all time) total_query = select(func.count(Notification.id)).where(*base_filter) total_result = await session.execute(total_query) @@ -125,7 +136,7 @@ async def get_unread_count( @router.get("", response_model=NotificationListResponse) async def list_notifications( search_space_id: int | None = Query(None, description="Filter by search space ID"), - type_filter: str | None = Query( + type_filter: NotificationType | None = Query( None, alias="type", description="Filter by notification type" ), before_date: str | None = Query( diff --git a/surfsense_backend/app/routes/podcasts_routes.py b/surfsense_backend/app/routes/podcasts_routes.py index ef362edb5..fa8326096 100644 --- a/surfsense_backend/app/routes/podcasts_routes.py +++ b/surfsense_backend/app/routes/podcasts_routes.py @@ -1,21 +1,19 @@ """ -Podcast routes for task status polling and audio retrieval. +Podcast routes for CRUD operations and audio streaming. These routes support the podcast generation feature in new-chat. -Note: The old Chat-based podcast generation has been removed. +Frontend polls GET /podcasts/{podcast_id} to check status field. """ import os from pathlib import Path -from celery.result import AsyncResult from fastapi import APIRouter, Depends, HTTPException from fastapi.responses import StreamingResponse from sqlalchemy import select from sqlalchemy.exc import SQLAlchemyError from sqlalchemy.ext.asyncio import AsyncSession -from app.celery_app import celery_app from app.db import ( Permission, Podcast, @@ -25,7 +23,7 @@ from app.db import ( get_async_session, ) from app.schemas import PodcastRead -from app.users import current_active_user +from app.users import current_active_user, current_optional_user from app.utils.rbac import check_permission router = APIRouter() @@ -84,12 +82,17 @@ async def read_podcasts( async def read_podcast( podcast_id: int, session: AsyncSession = Depends(get_async_session), - user: User = Depends(current_active_user), + user: User | None = Depends(current_optional_user), ): """ Get a specific podcast by ID. - Requires PODCASTS_READ permission for the search space. + + Access is allowed if: + - User is authenticated with PODCASTS_READ permission, OR + - Podcast belongs to a publicly shared thread """ + from app.services.public_chat_service import is_podcast_publicly_accessible + try: result = await session.execute(select(Podcast).filter(Podcast.id == podcast_id)) podcast = result.scalars().first() @@ -100,16 +103,20 @@ async def read_podcast( detail="Podcast not found", ) - # Check permission for the search space - await check_permission( - session, - user, - podcast.search_space_id, - Permission.PODCASTS_READ.value, - "You don't have permission to read podcasts in this search space", - ) + is_public = await is_podcast_publicly_accessible(session, podcast_id) - return podcast + if not is_public: + if not user: + raise HTTPException(status_code=401, detail="Authentication required") + await check_permission( + session, + user, + podcast.search_space_id, + Permission.PODCASTS_READ.value, + "You don't have permission to read podcasts in this search space", + ) + + return PodcastRead.from_orm_with_entries(podcast) except HTTPException as he: raise he except SQLAlchemyError: @@ -161,46 +168,49 @@ async def delete_podcast( async def stream_podcast( podcast_id: int, session: AsyncSession = Depends(get_async_session), - user: User = Depends(current_active_user), + user: User | None = Depends(current_optional_user), ): """ Stream a podcast audio file. - Requires PODCASTS_READ permission for the search space. + + Access is allowed if: + - User is authenticated with PODCASTS_READ permission, OR + - Podcast belongs to a publicly shared thread Note: Both /stream and /audio endpoints are supported for compatibility. """ + from app.services.public_chat_service import is_podcast_publicly_accessible + try: result = await session.execute(select(Podcast).filter(Podcast.id == podcast_id)) podcast = result.scalars().first() if not podcast: - raise HTTPException( - status_code=404, - detail="Podcast not found", + raise HTTPException(status_code=404, detail="Podcast not found") + + is_public = await is_podcast_publicly_accessible(session, podcast_id) + + if not is_public: + if not user: + raise HTTPException(status_code=401, detail="Authentication required") + + await check_permission( + session, + user, + podcast.search_space_id, + Permission.PODCASTS_READ.value, + "You don't have permission to access podcasts in this search space", ) - # Check permission for the search space - await check_permission( - session, - user, - podcast.search_space_id, - Permission.PODCASTS_READ.value, - "You don't have permission to access podcasts in this search space", - ) - - # Get the file path file_path = podcast.file_location - # Check if the file exists if not file_path or not os.path.isfile(file_path): raise HTTPException(status_code=404, detail="Podcast audio file not found") - # Define a generator function to stream the file def iterfile(): with open(file_path, mode="rb") as file_like: yield from file_like - # Return a streaming response with appropriate headers return StreamingResponse( iterfile(), media_type="audio/mpeg", @@ -216,62 +226,3 @@ async def stream_podcast( raise HTTPException( status_code=500, detail=f"Error streaming podcast: {e!s}" ) from e - - -@router.get("/podcasts/task/{task_id}/status") -async def get_podcast_task_status( - task_id: str, - user: User = Depends(current_active_user), -): - """ - Get the status of a podcast generation task. - Used by new-chat frontend to poll for completion. - - Returns: - - status: "processing" | "success" | "error" - - podcast_id: (only if status == "success") - - title: (only if status == "success") - - error: (only if status == "error") - """ - try: - result = AsyncResult(task_id, app=celery_app) - - if result.ready(): - # Task completed - if result.successful(): - task_result = result.result - if isinstance(task_result, dict): - if task_result.get("status") == "success": - return { - "status": "success", - "podcast_id": task_result.get("podcast_id"), - "title": task_result.get("title"), - "transcript_entries": task_result.get("transcript_entries"), - } - else: - return { - "status": "error", - "error": task_result.get("error", "Unknown error"), - } - else: - return { - "status": "error", - "error": "Unexpected task result format", - } - else: - # Task failed - return { - "status": "error", - "error": str(result.result) if result.result else "Task failed", - } - else: - # Task still processing - return { - "status": "processing", - "state": result.state, - } - - except Exception as e: - raise HTTPException( - status_code=500, detail=f"Error checking task status: {e!s}" - ) from e diff --git a/surfsense_backend/app/routes/public_chat_routes.py b/surfsense_backend/app/routes/public_chat_routes.py new file mode 100644 index 000000000..4676f2ad0 --- /dev/null +++ b/surfsense_backend/app/routes/public_chat_routes.py @@ -0,0 +1,84 @@ +""" +Routes for public chat access (unauthenticated and mixed-auth endpoints). +""" + +from datetime import UTC, datetime + +from fastapi import APIRouter, Depends, HTTPException +from sqlalchemy.ext.asyncio import AsyncSession + +from app.db import ChatVisibility, NewChatThread, User, get_async_session +from app.schemas.new_chat import ( + CloneInitResponse, + PublicChatResponse, +) +from app.services.public_chat_service import ( + get_public_chat, + get_thread_by_share_token, + get_user_default_search_space, +) +from app.users import current_active_user + +router = APIRouter(prefix="/public", tags=["public"]) + + +@router.get("/{share_token}", response_model=PublicChatResponse) +async def read_public_chat( + share_token: str, + session: AsyncSession = Depends(get_async_session), +): + """ + Get a public chat by share token. + + No authentication required. + Returns sanitized content (citations stripped). + """ + return await get_public_chat(session, share_token) + + +@router.post("/{share_token}/clone", response_model=CloneInitResponse) +async def clone_public_chat_endpoint( + share_token: str, + session: AsyncSession = Depends(get_async_session), + user: User = Depends(current_active_user), +): + """ + Initialize cloning a public chat to the user's account. + + Creates an empty thread with clone_pending=True. + Frontend should redirect to the new thread and call /complete-clone. + + Requires authentication. + """ + source_thread = await get_thread_by_share_token(session, share_token) + + if not source_thread: + raise HTTPException( + status_code=404, detail="Chat not found or no longer public" + ) + + target_search_space_id = await get_user_default_search_space(session, user.id) + + if target_search_space_id is None: + raise HTTPException(status_code=400, detail="No search space found for user") + + new_thread = NewChatThread( + title=source_thread.title, + archived=False, + visibility=ChatVisibility.PRIVATE, + search_space_id=target_search_space_id, + created_by_id=user.id, + public_share_enabled=False, + cloned_from_thread_id=source_thread.id, + cloned_at=datetime.now(UTC), + clone_pending=True, + ) + session.add(new_thread) + await session.commit() + await session.refresh(new_thread) + + return CloneInitResponse( + thread_id=new_thread.id, + search_space_id=target_search_space_id, + share_token=share_token, + ) diff --git a/surfsense_backend/app/routes/rbac_routes.py b/surfsense_backend/app/routes/rbac_routes.py index 5070a2724..7d2cc5c77 100644 --- a/surfsense_backend/app/routes/rbac_routes.py +++ b/surfsense_backend/app/routes/rbac_routes.py @@ -123,7 +123,9 @@ async def list_all_permissions( for perm in Permission: # Extract category from permission value (e.g., "documents:read" -> "documents") category = perm.value.split(":")[0] if ":" in perm.value else "general" - description = PERMISSION_DESCRIPTIONS.get(perm.value, f"Permission for {perm.value}") + description = PERMISSION_DESCRIPTIONS.get( + perm.value, f"Permission for {perm.value}" + ) permissions.append( PermissionInfo( diff --git a/surfsense_backend/app/routes/search_source_connectors_routes.py b/surfsense_backend/app/routes/search_source_connectors_routes.py index 191c6f954..a27c2125c 100644 --- a/surfsense_backend/app/routes/search_source_connectors_routes.py +++ b/surfsense_backend/app/routes/search_source_connectors_routes.py @@ -187,6 +187,7 @@ async def create_search_source_connector( user_id=str(user.id), connector_type=db_connector.connector_type, frequency_minutes=db_connector.indexing_frequency_minutes, + connector_config=db_connector.config, ) if not success: logger.warning( @@ -646,6 +647,7 @@ async def index_connector_content( # Handle different connector types response_message = "" + indexing_started = True # Use UTC for consistency with last_indexed_at storage today_str = datetime.now(UTC).strftime("%Y-%m-%d") @@ -921,14 +923,31 @@ async def index_connector_content( elif connector.connector_type == SearchSourceConnectorType.WEBCRAWLER_CONNECTOR: from app.tasks.celery_tasks.connector_tasks import index_crawled_urls_task + from app.utils.webcrawler_utils import parse_webcrawler_urls - logger.info( - f"Triggering web pages indexing for connector {connector_id} into search space {search_space_id} from {indexing_from} to {indexing_to}" - ) - index_crawled_urls_task.delay( - connector_id, search_space_id, str(user.id), indexing_from, indexing_to - ) - response_message = "Web page indexing started in the background." + # Check if URLs are configured before triggering indexing + connector_config = connector.config or {} + urls = parse_webcrawler_urls(connector_config.get("INITIAL_URLS")) + + if not urls: + # URLs are optional - skip indexing gracefully + logger.info( + f"Webcrawler connector {connector_id} has no URLs configured, skipping indexing" + ) + response_message = "No URLs configured for this connector. Add URLs in the connector settings to enable indexing." + indexing_started = False + else: + logger.info( + f"Triggering web pages indexing for connector {connector_id} into search space {search_space_id} from {indexing_from} to {indexing_to}" + ) + index_crawled_urls_task.delay( + connector_id, + search_space_id, + str(user.id), + indexing_from, + indexing_to, + ) + response_message = "Web page indexing started in the background." elif connector.connector_type == SearchSourceConnectorType.OBSIDIAN_CONNECTOR: from app.config import config as app_config @@ -1025,6 +1044,7 @@ async def index_connector_content( return { "message": response_message, + "indexing_started": indexing_started, "connector_id": connector_id, "search_space_id": search_space_id, "indexing_from": indexing_from, @@ -1129,6 +1149,7 @@ async def _run_indexing_with_notifications( end_date: str, indexing_function, update_timestamp_func=None, + supports_retry_callback: bool = False, ): """ Generic helper to run indexing with real-time notifications. @@ -1142,10 +1163,14 @@ async def _run_indexing_with_notifications( end_date: End date for indexing indexing_function: Async function that performs the indexing update_timestamp_func: Optional function to update connector timestamp + supports_retry_callback: Whether the indexing function supports on_retry_callback """ from uuid import UUID notification = None + # Track indexed count for retry notifications + current_indexed_count = 0 + try: # Get connector info for notification connector_result = await session.execute( @@ -1179,16 +1204,54 @@ async def _run_indexing_with_notifications( stage="fetching", ) + # Create retry callback for connectors that support it + async def on_retry_callback( + retry_reason: str, attempt: int, max_attempts: int, wait_seconds: float + ) -> None: + """Callback to update notification during API retries (rate limits, etc.)""" + nonlocal notification + if notification: + try: + await session.refresh(notification) + await NotificationService.connector_indexing.notify_retry_progress( + session=session, + notification=notification, + indexed_count=current_indexed_count, + retry_reason=retry_reason, + attempt=attempt, + max_attempts=max_attempts, + wait_seconds=wait_seconds, + ) + await session.commit() + except Exception as e: + # Don't let notification errors break the indexing + logger.warning(f"Failed to update retry notification: {e}") + + # Build kwargs for indexing function + indexing_kwargs = { + "session": session, + "connector_id": connector_id, + "search_space_id": search_space_id, + "user_id": user_id, + "start_date": start_date, + "end_date": end_date, + "update_last_indexed": False, + } + + # Add retry callback for connectors that support it + if supports_retry_callback: + indexing_kwargs["on_retry_callback"] = on_retry_callback + # Run the indexing function - documents_processed, error_or_warning = await indexing_function( - session=session, - connector_id=connector_id, - search_space_id=search_space_id, - user_id=user_id, - start_date=start_date, - end_date=end_date, - update_last_indexed=False, - ) + # Some indexers return (indexed, error), others return (indexed, skipped, error) + result = await indexing_function(**indexing_kwargs) + + # Handle both 2-tuple and 3-tuple returns for backwards compatibility + if len(result) == 3: + documents_processed, documents_skipped, error_or_warning = result + else: + documents_processed, error_or_warning = result + documents_skipped = None # Update connector timestamp if function provided and indexing was successful if documents_processed > 0 and update_timestamp_func: @@ -1216,6 +1279,7 @@ async def _run_indexing_with_notifications( notification=notification, indexed_count=documents_processed, error_message=error_or_warning, # Show errors even if some documents were indexed + skipped_count=documents_skipped, ) await ( session.commit() @@ -1242,6 +1306,7 @@ async def _run_indexing_with_notifications( notification=notification, indexed_count=documents_processed, error_message=error_or_warning, # Show errors even if some documents were indexed + skipped_count=documents_skipped, ) await ( session.commit() @@ -1260,8 +1325,15 @@ async def _run_indexing_with_notifications( "no " in error_or_warning_lower and "found" in error_or_warning_lower ) + # Informational warnings - sync succeeded but some content couldn't be synced + # These are NOT errors, just notifications about API limitations or recommendations + is_info_warning = ( + "couldn't be synced" in error_or_warning_lower + or "using legacy token" in error_or_warning_lower + or "(api limitation)" in error_or_warning_lower + ) - if is_duplicate_warning or is_empty_result: + if is_duplicate_warning or is_empty_result or is_info_warning: # These are success cases - sync worked, just found nothing new logger.info(f"Indexing completed successfully: {error_or_warning}") # Still update timestamp so ElectricSQL syncs and clears "Syncing" UI @@ -1283,6 +1355,7 @@ async def _run_indexing_with_notifications( indexed_count=0, error_message=notification_message, # Pass as warning, not error is_warning=True, # Flag to indicate this is a warning, not an error + skipped_count=documents_skipped, ) await ( session.commit() @@ -1298,6 +1371,7 @@ async def _run_indexing_with_notifications( notification=notification, indexed_count=0, error_message=error_or_warning, + skipped_count=documents_skipped, ) await ( session.commit() @@ -1319,6 +1393,7 @@ async def _run_indexing_with_notifications( notification=notification, indexed_count=0, error_message=None, # No error - sync succeeded + skipped_count=documents_skipped, ) await ( session.commit() @@ -1336,6 +1411,7 @@ async def _run_indexing_with_notifications( notification=notification, indexed_count=0, error_message=str(e), + skipped_count=None, # Unknown on exception ) except Exception as notif_error: logger.error(f"Failed to update notification: {notif_error!s}") @@ -1362,6 +1438,7 @@ async def run_notion_indexing_with_new_session( end_date=end_date, indexing_function=index_notion_pages, update_timestamp_func=_update_connector_timestamp_by_id, + supports_retry_callback=True, # Notion connector supports retry notifications ) @@ -1393,6 +1470,7 @@ async def run_notion_indexing( end_date=end_date, indexing_function=index_notion_pages, update_timestamp_func=_update_connector_timestamp_by_id, + supports_retry_callback=True, # Notion connector supports retry notifications ) diff --git a/surfsense_backend/app/schemas/new_chat.py b/surfsense_backend/app/schemas/new_chat.py index 7a29fc678..ab6be9c9f 100644 --- a/surfsense_backend/app/schemas/new_chat.py +++ b/surfsense_backend/app/schemas/new_chat.py @@ -95,6 +95,9 @@ class NewChatThreadRead(NewChatThreadBase, IDModel): search_space_id: int visibility: ChatVisibility created_by_id: UUID | None = None + public_share_enabled: bool = False + public_share_token: str | None = None + clone_pending: bool = False created_at: datetime updated_at: datetime @@ -133,7 +136,8 @@ class ThreadListItem(BaseModel): archived: bool visibility: ChatVisibility created_by_id: UUID | None = None - is_own_thread: bool = False # True if the current user created this thread + is_own_thread: bool = False + public_share_enabled: bool = False created_at: datetime = Field(alias="createdAt") updated_at: datetime = Field(alias="updatedAt") @@ -204,3 +208,60 @@ class RegenerateRequest(BaseModel): attachments: list[ChatAttachment] | None = None mentioned_document_ids: list[int] | None = None mentioned_surfsense_doc_ids: list[int] | None = None + + +# ============================================================================= +# Public Sharing Schemas +# ============================================================================= + + +class PublicShareToggleRequest(BaseModel): + """Request to enable/disable public sharing for a thread.""" + + enabled: bool + + +class PublicShareToggleResponse(BaseModel): + """Response after toggling public sharing.""" + + enabled: bool + public_url: str | None = None + share_token: str | None = None + + +# ============================================================================= +# Public Chat View Schemas (for unauthenticated access) +# ============================================================================= + + +class PublicAuthor(BaseModel): + display_name: str | None = None + avatar_url: str | None = None + + +class PublicChatMessage(BaseModel): + role: NewChatMessageRole + content: Any + author: PublicAuthor | None = None + created_at: datetime + + +class PublicChatThread(BaseModel): + title: str + created_at: datetime + + +class PublicChatResponse(BaseModel): + thread: PublicChatThread + messages: list[PublicChatMessage] + + +class CloneInitResponse(BaseModel): + thread_id: int + search_space_id: int + share_token: str + + +class CompleteCloneResponse(BaseModel): + status: str + message_count: int diff --git a/surfsense_backend/app/schemas/podcasts.py b/surfsense_backend/app/schemas/podcasts.py index 72c915d88..60f9d7dc0 100644 --- a/surfsense_backend/app/schemas/podcasts.py +++ b/surfsense_backend/app/schemas/podcasts.py @@ -1,11 +1,19 @@ """Podcast schemas for API responses.""" from datetime import datetime +from enum import Enum from typing import Any from pydantic import BaseModel +class PodcastStatusEnum(str, Enum): + PENDING = "pending" + GENERATING = "generating" + READY = "ready" + FAILED = "failed" + + class PodcastBase(BaseModel): """Base podcast schema.""" @@ -33,7 +41,26 @@ class PodcastRead(PodcastBase): """Schema for reading a podcast.""" id: int + status: PodcastStatusEnum = PodcastStatusEnum.READY created_at: datetime + transcript_entries: int | None = None class Config: from_attributes = True + + @classmethod + def from_orm_with_entries(cls, obj): + """Create PodcastRead with transcript_entries computed.""" + data = { + "id": obj.id, + "title": obj.title, + "podcast_transcript": obj.podcast_transcript, + "file_location": obj.file_location, + "search_space_id": obj.search_space_id, + "status": obj.status, + "created_at": obj.created_at, + "transcript_entries": len(obj.podcast_transcript) + if obj.podcast_transcript + else None, + } + return cls(**data) diff --git a/surfsense_backend/app/services/notification_service.py b/surfsense_backend/app/services/notification_service.py index 04f39d8ef..e0385b91c 100644 --- a/surfsense_backend/app/services/notification_service.py +++ b/surfsense_backend/app/services/notification_service.py @@ -329,6 +329,90 @@ class ConnectorIndexingNotificationHandler(BaseNotificationHandler): metadata_updates=metadata_updates, ) + async def notify_retry_progress( + self, + session: AsyncSession, + notification: Notification, + indexed_count: int, + retry_reason: str, + attempt: int, + max_attempts: int, + wait_seconds: float | None = None, + service_name: str | None = None, + ) -> Notification: + """ + Update notification when a connector is retrying due to rate limits or errors. + + This method provides user-friendly feedback when external service limitations + (rate limits, temporary outages) cause delays. Users see that the delay is + not our fault and the sync is still progressing. + + This method can be used by ANY connector (Notion, Slack, Airtable, etc.) + when they hit rate limits or transient errors. + + Args: + session: Database session + notification: Notification to update + indexed_count: Number of items indexed so far + retry_reason: Reason for retry ('rate_limit', 'server_error', 'timeout') + attempt: Current retry attempt number (1-based) + max_attempts: Maximum number of retry attempts + wait_seconds: Seconds to wait before retry (optional, for display) + service_name: Name of the external service (e.g., 'Notion', 'Slack') + If not provided, extracts from notification metadata + + Returns: + Updated notification + """ + # Get service name from notification if not provided + if not service_name: + service_name = notification.notification_metadata.get( + "connector_name", "Service" + ) + # Extract just the service name if it's "Notion - My Workspace" + if " - " in service_name: + service_name = service_name.split(" - ")[0] + + # User-friendly messages for different retry reasons + # These make it clear the delay is due to the external service, not SurfSense + retry_messages = { + "rate_limit": f"{service_name} rate limit reached", + "server_error": f"{service_name} is slow to respond", + "timeout": f"{service_name} took too long", + "temporary_error": f"{service_name} temporarily unavailable", + } + + base_message = retry_messages.get(retry_reason, f"Waiting for {service_name}") + + # Add wait time and progress info + if wait_seconds and wait_seconds > 5: + # Only show wait time if it's significant + message = f"{base_message}. Retrying in {int(wait_seconds)}s..." + else: + message = f"{base_message}. Retrying..." + + # Add progress count if we have any + if indexed_count > 0: + item_text = "item" if indexed_count == 1 else "items" + message = f"{message} ({indexed_count} {item_text} synced so far)" + + metadata_updates = { + "indexed_count": indexed_count, + "sync_stage": "waiting_retry", + "retry_attempt": attempt, + "retry_max_attempts": max_attempts, + "retry_reason": retry_reason, + "retry_wait_seconds": wait_seconds, + } + + return await self.update_notification( + session=session, + notification=notification, + message=message, + status="in_progress", + metadata_updates=metadata_updates, + ) + async def notify_indexing_completed( self, session: AsyncSession, @@ -336,6 +420,7 @@ class ConnectorIndexingNotificationHandler(BaseNotificationHandler): indexed_count: int, error_message: str | None = None, is_warning: bool = False, + skipped_count: int | None = None, ) -> Notification: """ Update notification when connector indexing completes. @@ -346,6 +431,7 @@ class ConnectorIndexingNotificationHandler(BaseNotificationHandler): indexed_count: Total number of items indexed error_message: Error message if indexing failed, or warning message (optional) is_warning: If True, treat error_message as a warning (success case) rather than an error + skipped_count: Number of items skipped (e.g., duplicates) - optional Returns: Updated notification @@ -354,6 +440,14 @@ class ConnectorIndexingNotificationHandler(BaseNotificationHandler): "connector_name", "Connector" ) + # Build the skipped text if there are skipped items + skipped_text = "" + if skipped_count and skipped_count > 0: + skipped_item_text = "item" if skipped_count == 1 else "items" + skipped_text = ( + f" ({skipped_count} {skipped_item_text} skipped - already indexed)" + ) + # If there's an error message but items were indexed, treat it as a warning (partial success) # If is_warning is True, treat it as success even with 0 items (e.g., duplicates found) # Otherwise, treat it as a failure @@ -362,12 +456,12 @@ class ConnectorIndexingNotificationHandler(BaseNotificationHandler): # Partial success with warnings (e.g., duplicate content from other connectors) title = f"Ready: {connector_name}" item_text = "item" if indexed_count == 1 else "items" - message = f"Now searchable! {indexed_count} {item_text} synced. Note: {error_message}" + message = f"Now searchable! {indexed_count} {item_text} synced{skipped_text}. Note: {error_message}" status = "completed" elif is_warning: # Warning case (e.g., duplicates found) - treat as success title = f"Ready: {connector_name}" - message = f"Sync completed. {error_message}" + message = f"Sync completed{skipped_text}. {error_message}" status = "completed" else: # Complete failure @@ -377,14 +471,21 @@ class ConnectorIndexingNotificationHandler(BaseNotificationHandler): else: title = f"Ready: {connector_name}" if indexed_count == 0: - message = "Already up to date! No new items to sync." + if skipped_count and skipped_count > 0: + skipped_item_text = "item" if skipped_count == 1 else "items" + message = f"Already up to date! {skipped_count} {skipped_item_text} skipped (already indexed)." + else: + message = "Already up to date! No new items to sync." else: item_text = "item" if indexed_count == 1 else "items" - message = f"Now searchable! {indexed_count} {item_text} synced." + message = ( + f"Now searchable! {indexed_count} {item_text} synced{skipped_text}." + ) status = "completed" metadata_updates = { "indexed_count": indexed_count, + "skipped_count": skipped_count or 0, "sync_stage": "completed" if (not error_message or is_warning or indexed_count > 0) else "failed", diff --git a/surfsense_backend/app/services/public_chat_service.py b/surfsense_backend/app/services/public_chat_service.py new file mode 100644 index 000000000..a5b8c9ffe --- /dev/null +++ b/surfsense_backend/app/services/public_chat_service.py @@ -0,0 +1,379 @@ +""" +Service layer for public chat sharing and cloning. +""" + +import re +import secrets +from uuid import UUID + +from fastapi import HTTPException +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy.orm import selectinload + +from app.db import NewChatThread, User + +UI_TOOLS = { + "display_image", + "link_preview", + "generate_podcast", + "scrape_webpage", + "multi_link_preview", +} + + +def strip_citations(text: str) -> str: + """ + Remove [citation:X] and [citation:doc-X] patterns from text. + Preserves newlines to maintain markdown formatting. + """ + # Remove citation patterns + text = re.sub(r"[\[【]\u200B?citation:(doc-)?\d+\u200B?[\]】]", "", text) + # Collapse multiple spaces/tabs (but NOT newlines) into single space + text = re.sub(r"[^\S\n]+", " ", text) + # Normalize excessive blank lines (3+ newlines → 2) + text = re.sub(r"\n{3,}", "\n\n", text) + # Clean up spaces around newlines + text = re.sub(r" *\n *", "\n", text) + return text.strip() + + +def sanitize_content_for_public(content: list | str | None) -> list: + """ + Filter message content for public view. + Strips citations and filters to UI-relevant tools. + """ + if content is None: + return [] + + if isinstance(content, str): + clean_text = strip_citations(content) + return [{"type": "text", "text": clean_text}] if clean_text else [] + + if not isinstance(content, list): + return [] + + sanitized = [] + for part in content: + if not isinstance(part, dict): + continue + + part_type = part.get("type") + + if part_type == "text": + clean_text = strip_citations(part.get("text", "")) + if clean_text: + sanitized.append({"type": "text", "text": clean_text}) + + elif part_type == "tool-call": + tool_name = part.get("toolName") + if tool_name not in UI_TOOLS: + continue + sanitized.append(part) + + return sanitized + + +async def get_author_display( + session: AsyncSession, + author_id: UUID | None, + user_cache: dict[UUID, dict], +) -> dict | None: + """Transform author UUID to display info.""" + if author_id is None: + return None + + if author_id not in user_cache: + result = await session.execute(select(User).filter(User.id == author_id)) + user = result.scalars().first() + if user: + user_cache[author_id] = { + "display_name": user.display_name or "User", + "avatar_url": user.avatar_url, + } + else: + user_cache[author_id] = { + "display_name": "Unknown User", + "avatar_url": None, + } + + return user_cache[author_id] + + +async def toggle_public_share( + session: AsyncSession, + thread_id: int, + enabled: bool, + user: User, + base_url: str, +) -> dict: + """ + Enable or disable public sharing for a thread. + + Only the thread owner can toggle public sharing. + When enabling, generates a new token if one doesn't exist. + When disabling, keeps the token for potential re-enable. + """ + result = await session.execute( + select(NewChatThread).filter(NewChatThread.id == thread_id) + ) + thread = result.scalars().first() + + if not thread: + raise HTTPException(status_code=404, detail="Thread not found") + + if thread.created_by_id != user.id: + raise HTTPException( + status_code=403, + detail="Only the creator of this chat can manage public sharing", + ) + + if enabled and not thread.public_share_token: + thread.public_share_token = secrets.token_urlsafe(48) + + thread.public_share_enabled = enabled + + await session.commit() + await session.refresh(thread) + + if enabled: + return { + "enabled": True, + "public_url": f"{base_url}/public/{thread.public_share_token}", + "share_token": thread.public_share_token, + } + + return { + "enabled": False, + "public_url": None, + "share_token": None, + } + + +async def get_public_chat( + session: AsyncSession, + share_token: str, +) -> dict: + """ + Get a public chat by share token. + + Returns sanitized content suitable for public viewing. + """ + result = await session.execute( + select(NewChatThread) + .options(selectinload(NewChatThread.messages)) + .filter( + NewChatThread.public_share_token == share_token, + NewChatThread.public_share_enabled.is_(True), + ) + ) + thread = result.scalars().first() + + if not thread: + raise HTTPException(status_code=404, detail="Not found") + + user_cache: dict[UUID, dict] = {} + + messages = [] + for msg in sorted(thread.messages, key=lambda m: m.created_at): + author = await get_author_display(session, msg.author_id, user_cache) + sanitized_content = sanitize_content_for_public(msg.content) + + messages.append( + { + "role": msg.role, + "content": sanitized_content, + "author": author, + "created_at": msg.created_at, + } + ) + + return { + "thread": { + "title": thread.title, + "created_at": thread.created_at, + }, + "messages": messages, + } + + +async def get_thread_by_share_token( + session: AsyncSession, + share_token: str, +) -> NewChatThread | None: + """Get a thread by its public share token if sharing is enabled.""" + result = await session.execute( + select(NewChatThread) + .options(selectinload(NewChatThread.messages)) + .filter( + NewChatThread.public_share_token == share_token, + NewChatThread.public_share_enabled.is_(True), + ) + ) + return result.scalars().first() + + +async def get_user_default_search_space( + session: AsyncSession, + user_id: UUID, +) -> int | None: + """ + Get user's default search space for cloning. + + Returns the first search space where user is owner, or None if not found. + """ + from app.db import SearchSpaceMembership + + result = await session.execute( + select(SearchSpaceMembership) + .filter( + SearchSpaceMembership.user_id == user_id, + SearchSpaceMembership.is_owner.is_(True), + ) + .limit(1) + ) + membership = result.scalars().first() + + if membership: + return membership.search_space_id + + return None + + +async def complete_clone_content( + session: AsyncSession, + target_thread: NewChatThread, + source_thread_id: int, + target_search_space_id: int, +) -> int: + """ + Copy messages and podcasts from source thread to target thread. + + Sets clone_pending=False and needs_history_bootstrap=True when done. + Returns the number of messages copied. + """ + from app.db import NewChatMessage + + result = await session.execute( + select(NewChatThread) + .options(selectinload(NewChatThread.messages)) + .filter(NewChatThread.id == source_thread_id) + ) + source_thread = result.scalars().first() + + if not source_thread: + raise ValueError("Source thread not found") + + podcast_id_map: dict[int, int] = {} + message_count = 0 + + for msg in sorted(source_thread.messages, key=lambda m: m.created_at): + new_content = sanitize_content_for_public(msg.content) + + if isinstance(new_content, list): + for part in new_content: + if ( + isinstance(part, dict) + and part.get("type") == "tool-call" + and part.get("toolName") == "generate_podcast" + ): + result_data = part.get("result", {}) + old_podcast_id = result_data.get("podcast_id") + if old_podcast_id and old_podcast_id not in podcast_id_map: + new_podcast_id = await _clone_podcast( + session, + old_podcast_id, + target_search_space_id, + target_thread.id, + ) + if new_podcast_id: + podcast_id_map[old_podcast_id] = new_podcast_id + + if old_podcast_id and old_podcast_id in podcast_id_map: + result_data["podcast_id"] = podcast_id_map[old_podcast_id] + elif old_podcast_id: + # Podcast couldn't be cloned (not ready), remove reference + result_data.pop("podcast_id", None) + + new_message = NewChatMessage( + thread_id=target_thread.id, + role=msg.role, + content=new_content, + author_id=msg.author_id, + created_at=msg.created_at, + ) + session.add(new_message) + message_count += 1 + + target_thread.clone_pending = False + target_thread.needs_history_bootstrap = True + + await session.commit() + + return message_count + + +async def _clone_podcast( + session: AsyncSession, + podcast_id: int, + target_search_space_id: int, + target_thread_id: int, +) -> int | None: + """Clone a podcast record and its audio file. Only clones ready podcasts.""" + import shutil + import uuid + from pathlib import Path + + from app.db import Podcast, PodcastStatus + + result = await session.execute(select(Podcast).filter(Podcast.id == podcast_id)) + original = result.scalars().first() + if not original or original.status != PodcastStatus.READY: + return None + + new_file_path = None + if original.file_location: + original_path = Path(original.file_location) + if original_path.exists(): + new_filename = f"{uuid.uuid4()}_podcast.mp3" + new_dir = Path("podcasts") + new_dir.mkdir(parents=True, exist_ok=True) + new_file_path = str(new_dir / new_filename) + shutil.copy2(original.file_location, new_file_path) + + new_podcast = Podcast( + title=original.title, + podcast_transcript=original.podcast_transcript, + file_location=new_file_path, + status=PodcastStatus.READY, + search_space_id=target_search_space_id, + thread_id=target_thread_id, + ) + session.add(new_podcast) + await session.flush() + + return new_podcast.id + + +async def is_podcast_publicly_accessible( + session: AsyncSession, + podcast_id: int, +) -> bool: + """ + Check if a podcast belongs to a publicly shared thread. + + Uses the thread_id foreign key for efficient lookup. + """ + from app.db import Podcast + + result = await session.execute( + select(Podcast) + .options(selectinload(Podcast.thread)) + .filter(Podcast.id == podcast_id) + ) + podcast = result.scalars().first() + + if not podcast or not podcast.thread: + return False + + return podcast.thread.public_share_enabled diff --git a/surfsense_backend/app/tasks/celery_tasks/podcast_tasks.py b/surfsense_backend/app/tasks/celery_tasks/podcast_tasks.py index 34b9b827c..2ce8716e0 100644 --- a/surfsense_backend/app/tasks/celery_tasks/podcast_tasks.py +++ b/surfsense_backend/app/tasks/celery_tasks/podcast_tasks.py @@ -4,15 +4,15 @@ import asyncio import logging import sys +from sqlalchemy import select from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine from sqlalchemy.pool import NullPool -# Import for content-based podcast (new-chat) from app.agents.podcaster.graph import graph as podcaster_graph from app.agents.podcaster.state import State as PodcasterState from app.celery_app import celery_app from app.config import config -from app.db import Podcast +from app.db import Podcast, PodcastStatus logger = logging.getLogger(__name__) @@ -44,8 +44,8 @@ def get_celery_session_maker(): # ============================================================================= -def _clear_active_podcast_redis_key(search_space_id: int) -> None: - """Clear the active podcast task key from Redis when task completes.""" +def _clear_generating_podcast(search_space_id: int) -> None: + """Clear the generating podcast marker from Redis when task completes.""" import os import redis @@ -53,34 +53,26 @@ def _clear_active_podcast_redis_key(search_space_id: int) -> None: try: redis_url = os.getenv("CELERY_BROKER_URL", "redis://localhost:6379/0") client = redis.from_url(redis_url, decode_responses=True) - key = f"podcast:active:{search_space_id}" + key = f"podcast:generating:{search_space_id}" client.delete(key) - logger.info(f"Cleared active podcast key for search_space_id={search_space_id}") + logger.info( + f"Cleared generating podcast key for search_space_id={search_space_id}" + ) except Exception as e: - logger.warning(f"Could not clear active podcast key: {e}") + logger.warning(f"Could not clear generating podcast key: {e}") @celery_app.task(name="generate_content_podcast", bind=True) def generate_content_podcast_task( self, + podcast_id: int, source_content: str, search_space_id: int, - podcast_title: str = "SurfSense Podcast", user_prompt: str | None = None, ) -> dict: """ - Celery task to generate podcast from source content (for new-chat). - - This task generates a podcast directly from provided content. - - Args: - source_content: The text content to convert into a podcast - search_space_id: ID of the search space - podcast_title: Title for the podcast - user_prompt: Optional instructions for podcast style/tone - - Returns: - dict with podcast_id on success, or error info on failure + Celery task to generate podcast from source content. + Updates existing podcast record created by the tool. """ loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) @@ -88,9 +80,9 @@ def generate_content_podcast_task( try: result = loop.run_until_complete( _generate_content_podcast( + podcast_id, source_content, search_space_id, - podcast_title, user_prompt, ) ) @@ -98,46 +90,67 @@ def generate_content_podcast_task( return result except Exception as e: logger.error(f"Error generating content podcast: {e!s}") - return {"status": "error", "error": str(e)} + loop.run_until_complete(_mark_podcast_failed(podcast_id)) + return {"status": "failed", "podcast_id": podcast_id} finally: - # Always clear the active podcast key when task completes (success or failure) - _clear_active_podcast_redis_key(search_space_id) + _clear_generating_podcast(search_space_id) asyncio.set_event_loop(None) loop.close() -async def _generate_content_podcast( - source_content: str, - search_space_id: int, - podcast_title: str = "SurfSense Podcast", - user_prompt: str | None = None, -) -> dict: - """Generate content-based podcast with new session.""" +async def _mark_podcast_failed(podcast_id: int) -> None: + """Mark a podcast as failed in the database.""" async with get_celery_session_maker()() as session: try: - # Configure the podcaster graph + result = await session.execute( + select(Podcast).filter(Podcast.id == podcast_id) + ) + podcast = result.scalars().first() + if podcast: + podcast.status = PodcastStatus.FAILED + await session.commit() + except Exception as e: + logger.error(f"Failed to mark podcast as failed: {e}") + + +async def _generate_content_podcast( + podcast_id: int, + source_content: str, + search_space_id: int, + user_prompt: str | None = None, +) -> dict: + """Generate content-based podcast and update existing record.""" + async with get_celery_session_maker()() as session: + result = await session.execute(select(Podcast).filter(Podcast.id == podcast_id)) + podcast = result.scalars().first() + + if not podcast: + raise ValueError(f"Podcast {podcast_id} not found") + + try: + podcast.status = PodcastStatus.GENERATING + await session.commit() + graph_config = { "configurable": { - "podcast_title": podcast_title, + "podcast_title": podcast.title, "search_space_id": search_space_id, "user_prompt": user_prompt, } } - # Initialize the podcaster state with the source content initial_state = PodcasterState( source_content=source_content, db_session=session, ) - # Run the podcaster graph - result = await podcaster_graph.ainvoke(initial_state, config=graph_config) + graph_result = await podcaster_graph.ainvoke( + initial_state, config=graph_config + ) - # Extract results - podcast_transcript = result.get("podcast_transcript", []) - file_path = result.get("final_podcast_file_path", "") + podcast_transcript = graph_result.get("podcast_transcript", []) + file_path = graph_result.get("final_podcast_file_path", "") - # Convert transcript to serializable format serializable_transcript = [] for entry in podcast_transcript: if hasattr(entry, "speaker_id"): @@ -152,27 +165,22 @@ async def _generate_content_podcast( } ) - # Save podcast to database - podcast = Podcast( - title=podcast_title, - podcast_transcript=serializable_transcript, - file_location=file_path, - search_space_id=search_space_id, - ) - session.add(podcast) + podcast.podcast_transcript = serializable_transcript + podcast.file_location = file_path + podcast.status = PodcastStatus.READY await session.commit() - await session.refresh(podcast) - logger.info(f"Successfully generated content podcast: {podcast.id}") + logger.info(f"Successfully generated podcast: {podcast.id}") return { - "status": "success", + "status": "ready", "podcast_id": podcast.id, - "title": podcast_title, + "title": podcast.title, "transcript_entries": len(serializable_transcript), } except Exception as e: logger.error(f"Error in _generate_content_podcast: {e!s}") - await session.rollback() + podcast.status = PodcastStatus.FAILED + await session.commit() raise diff --git a/surfsense_backend/app/tasks/celery_tasks/schedule_checker_task.py b/surfsense_backend/app/tasks/celery_tasks/schedule_checker_task.py index bf80cbe78..b33e25170 100644 --- a/surfsense_backend/app/tasks/celery_tasks/schedule_checker_task.py +++ b/surfsense_backend/app/tasks/celery_tasks/schedule_checker_task.py @@ -156,6 +156,41 @@ async def _check_and_trigger_schedules(): ) await session.commit() continue + + # Special handling for Webcrawler - skip if no URLs configured + elif ( + connector.connector_type + == SearchSourceConnectorType.WEBCRAWLER_CONNECTOR + ): + from app.utils.webcrawler_utils import parse_webcrawler_urls + + connector_config = connector.config or {} + urls = parse_webcrawler_urls( + connector_config.get("INITIAL_URLS") + ) + + if urls: + task.delay( + connector.id, + connector.search_space_id, + str(connector.user_id), + None, # start_date + None, # end_date + ) + else: + # No URLs configured - skip indexing but still update next_scheduled_at + logger.info( + f"Webcrawler connector {connector.id} has no URLs configured, " + "skipping periodic indexing (will check again at next scheduled time)" + ) + from datetime import timedelta + + connector.next_scheduled_at = now + timedelta( + minutes=connector.indexing_frequency_minutes + ) + await session.commit() + continue + else: task.delay( connector.id, diff --git a/surfsense_backend/app/tasks/chat/stream_new_chat.py b/surfsense_backend/app/tasks/chat/stream_new_chat.py index 9a4f050a1..12d7cbd4e 100644 --- a/surfsense_backend/app/tasks/chat/stream_new_chat.py +++ b/surfsense_backend/app/tasks/chat/stream_new_chat.py @@ -34,6 +34,7 @@ from app.services.chat_session_state_service import ( ) from app.services.connector_service import ConnectorService from app.services.new_streaming_service import VercelStreamingService +from app.utils.content_utils import bootstrap_history_from_db def format_attachments_as_context(attachments: list[ChatAttachment]) -> str: @@ -205,13 +206,13 @@ async def stream_new_chat( mentioned_document_ids: list[int] | None = None, mentioned_surfsense_doc_ids: list[int] | None = None, checkpoint_id: str | None = None, + needs_history_bootstrap: bool = False, ) -> AsyncGenerator[str, None]: """ Stream chat responses from the new SurfSense deep agent. This uses the Vercel AI SDK Data Stream Protocol (SSE format) for streaming. The chat_id is used as LangGraph's thread_id for memory/checkpointing. - Message history can be passed from the frontend for context. Args: user_query: The user's query @@ -221,6 +222,7 @@ async def stream_new_chat( user_id: The current user's UUID string (for memory tools and session state) llm_config_id: The LLM configuration ID (default: -1 for first global config) attachments: Optional attachments with extracted content + needs_history_bootstrap: If True, load message history from DB (for cloned chats) mentioned_document_ids: Optional list of document IDs mentioned with @ in the chat mentioned_surfsense_doc_ids: Optional list of SurfSense doc IDs mentioned with @ in the chat checkpoint_id: Optional checkpoint ID to rewind/fork from (for edit/reload operations) @@ -300,13 +302,29 @@ async def stream_new_chat( connector_service=connector_service, checkpointer=checkpointer, user_id=user_id, # Pass user ID for memory tools + thread_id=chat_id, # Pass chat ID for podcast association agent_config=agent_config, # Pass prompt configuration firecrawl_api_key=firecrawl_api_key, # Pass Firecrawl API key if configured ) - # Build input with message history from frontend + # Build input with message history langchain_messages = [] + # Bootstrap history for cloned chats (no LangGraph checkpoint exists yet) + if needs_history_bootstrap: + langchain_messages = await bootstrap_history_from_db(session, chat_id) + + # Clear the flag so we don't bootstrap again on next message + from app.db import NewChatThread + + thread_result = await session.execute( + select(NewChatThread).filter(NewChatThread.id == chat_id) + ) + thread = thread_result.scalars().first() + if thread: + thread.needs_history_bootstrap = False + await session.commit() + # Fetch mentioned documents if any (with chunks for proper citations) mentioned_documents: list[Document] = [] if mentioned_document_ids: diff --git a/surfsense_backend/app/tasks/composio_indexer.py b/surfsense_backend/app/tasks/composio_indexer.py index f97652114..ffc4a1f27 100644 --- a/surfsense_backend/app/tasks/composio_indexer.py +++ b/surfsense_backend/app/tasks/composio_indexer.py @@ -86,7 +86,7 @@ async def index_composio_connector( end_date: str | None = None, update_last_indexed: bool = True, max_items: int = 1000, -) -> tuple[int, str]: +) -> tuple[int, int, str | None]: """ Index content from a Composio connector. @@ -104,7 +104,7 @@ async def index_composio_connector( max_items: Maximum number of items to fetch Returns: - Tuple of (number_of_indexed_items, error_message or None) + Tuple of (number_of_indexed_items, number_of_skipped_items, error_message or None) """ task_logger = TaskLoggingService(session, search_space_id) @@ -132,14 +132,14 @@ async def index_composio_connector( await task_logger.log_task_failure( log_entry, error_msg, {"error_type": "InvalidConnectorType"} ) - return 0, error_msg + return 0, 0, error_msg if not connector: error_msg = f"Composio connector with ID {connector_id} not found" await task_logger.log_task_failure( log_entry, error_msg, {"error_type": "ConnectorNotFound"} ) - return 0, error_msg + return 0, 0, error_msg # Get toolkit ID from config toolkit_id = connector.config.get("toolkit_id") @@ -150,7 +150,7 @@ async def index_composio_connector( await task_logger.log_task_failure( log_entry, error_msg, {"error_type": "MissingToolkitId"} ) - return 0, error_msg + return 0, 0, error_msg # Check if toolkit is indexable if toolkit_id not in INDEXABLE_TOOLKITS: @@ -158,7 +158,7 @@ async def index_composio_connector( await task_logger.log_task_failure( log_entry, error_msg, {"error_type": "ToolkitNotIndexable"} ) - return 0, error_msg + return 0, 0, error_msg # Get indexer function from registry try: @@ -167,7 +167,7 @@ async def index_composio_connector( await task_logger.log_task_failure( log_entry, str(e), {"error_type": "NoIndexerImplemented"} ) - return 0, str(e) + return 0, 0, str(e) # Build kwargs for the indexer function kwargs = { @@ -199,7 +199,7 @@ async def index_composio_connector( {"error_type": "SQLAlchemyError"}, ) logger.error(f"Database error: {db_error!s}", exc_info=True) - return 0, f"Database error: {db_error!s}" + return 0, 0, f"Database error: {db_error!s}" except Exception as e: await session.rollback() await task_logger.log_task_failure( @@ -209,4 +209,4 @@ async def index_composio_connector( {"error_type": type(e).__name__}, ) logger.error(f"Failed to index Composio connector: {e!s}", exc_info=True) - return 0, f"Failed to index Composio connector: {e!s}" + return 0, 0, f"Failed to index Composio connector: {e!s}" diff --git a/surfsense_backend/app/tasks/connector_indexers/airtable_indexer.py b/surfsense_backend/app/tasks/connector_indexers/airtable_indexer.py index 4d5a33b79..6bb62d716 100644 --- a/surfsense_backend/app/tasks/connector_indexers/airtable_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/airtable_indexer.py @@ -20,6 +20,7 @@ from app.utils.document_converters import ( from .base import ( calculate_date_range, check_document_by_unique_identifier, + check_duplicate_document_by_hash, get_connector_by_id, get_current_timestamp, logger, @@ -317,6 +318,24 @@ async def index_airtable_records( ) continue + # Document doesn't exist by unique_identifier_hash + # Check if a document with the same content_hash exists (from another connector) + with session.no_autoflush: + duplicate_by_content = ( + await check_duplicate_document_by_hash( + session, content_hash + ) + ) + + if duplicate_by_content: + logger.info( + f"Airtable record {record_id} already indexed by another connector " + f"(existing document ID: {duplicate_by_content.id}, " + f"type: {duplicate_by_content.document_type}). Skipping." + ) + documents_skipped += 1 + continue + # Document doesn't exist - create new one # Generate document summary user_llm = await get_user_long_context_llm( diff --git a/surfsense_backend/app/tasks/connector_indexers/bookstack_indexer.py b/surfsense_backend/app/tasks/connector_indexers/bookstack_indexer.py index a1067255d..e183ab333 100644 --- a/surfsense_backend/app/tasks/connector_indexers/bookstack_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/bookstack_indexer.py @@ -22,6 +22,7 @@ from app.utils.document_converters import ( from .base import ( calculate_date_range, check_document_by_unique_identifier, + check_duplicate_document_by_hash, get_connector_by_id, get_current_timestamp, logger, @@ -308,6 +309,22 @@ async def index_bookstack_pages( logger.info(f"Successfully updated BookStack page {page_name}") continue + # Document doesn't exist by unique_identifier_hash + # Check if a document with the same content_hash exists (from another connector) + with session.no_autoflush: + duplicate_by_content = await check_duplicate_document_by_hash( + session, content_hash + ) + + if duplicate_by_content: + logger.info( + f"BookStack page {page_name} already indexed by another connector " + f"(existing document ID: {duplicate_by_content.id}, " + f"type: {duplicate_by_content.document_type}). Skipping." + ) + documents_skipped += 1 + continue + # Document doesn't exist - create new one # Generate summary with metadata user_llm = await get_user_long_context_llm( diff --git a/surfsense_backend/app/tasks/connector_indexers/clickup_indexer.py b/surfsense_backend/app/tasks/connector_indexers/clickup_indexer.py index e459584f8..887c3e2e5 100644 --- a/surfsense_backend/app/tasks/connector_indexers/clickup_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/clickup_indexer.py @@ -22,6 +22,7 @@ from app.utils.document_converters import ( from .base import ( check_document_by_unique_identifier, + check_duplicate_document_by_hash, get_connector_by_id, get_current_timestamp, logger, @@ -302,6 +303,22 @@ async def index_clickup_tasks( ) continue + # Document doesn't exist by unique_identifier_hash + # Check if a document with the same content_hash exists (from another connector) + with session.no_autoflush: + duplicate_by_content = await check_duplicate_document_by_hash( + session, content_hash + ) + + if duplicate_by_content: + logger.info( + f"ClickUp task {task_name} already indexed by another connector " + f"(existing document ID: {duplicate_by_content.id}, " + f"type: {duplicate_by_content.document_type}). Skipping." + ) + documents_skipped += 1 + continue + # Document doesn't exist - create new one # Generate summary with metadata user_llm = await get_user_long_context_llm( diff --git a/surfsense_backend/app/tasks/connector_indexers/confluence_indexer.py b/surfsense_backend/app/tasks/connector_indexers/confluence_indexer.py index ddbefafb9..5673839bb 100644 --- a/surfsense_backend/app/tasks/connector_indexers/confluence_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/confluence_indexer.py @@ -23,6 +23,7 @@ from app.utils.document_converters import ( from .base import ( calculate_date_range, check_document_by_unique_identifier, + check_duplicate_document_by_hash, get_connector_by_id, get_current_timestamp, logger, @@ -306,6 +307,22 @@ async def index_confluence_pages( ) continue + # Document doesn't exist by unique_identifier_hash + # Check if a document with the same content_hash exists (from another connector) + with session.no_autoflush: + duplicate_by_content = await check_duplicate_document_by_hash( + session, content_hash + ) + + if duplicate_by_content: + logger.info( + f"Confluence page {page_title} already indexed by another connector " + f"(existing document ID: {duplicate_by_content.id}, " + f"type: {duplicate_by_content.document_type}). Skipping." + ) + documents_skipped += 1 + continue + # Document doesn't exist - create new one # Generate summary with metadata user_llm = await get_user_long_context_llm( diff --git a/surfsense_backend/app/tasks/connector_indexers/discord_indexer.py b/surfsense_backend/app/tasks/connector_indexers/discord_indexer.py index 8f0c76e53..9e401b335 100644 --- a/surfsense_backend/app/tasks/connector_indexers/discord_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/discord_indexer.py @@ -21,6 +21,7 @@ from app.utils.document_converters import ( from .base import ( build_document_metadata_markdown, check_document_by_unique_identifier, + check_duplicate_document_by_hash, get_connector_by_id, get_current_timestamp, logger, @@ -454,6 +455,24 @@ async def index_discord_messages( ) continue + # Document doesn't exist by unique_identifier_hash + # Check if a document with the same content_hash exists (from another connector) + with session.no_autoflush: + duplicate_by_content = ( + await check_duplicate_document_by_hash( + session, content_hash + ) + ) + + if duplicate_by_content: + logger.info( + f"Discord message {msg_id} in {guild_name}#{channel_name} already indexed by another connector " + f"(existing document ID: {duplicate_by_content.id}, " + f"type: {duplicate_by_content.document_type}). Skipping." + ) + documents_skipped += 1 + continue + # Document doesn't exist - create new one # Process chunks chunks = await create_document_chunks( diff --git a/surfsense_backend/app/tasks/connector_indexers/github_indexer.py b/surfsense_backend/app/tasks/connector_indexers/github_indexer.py index 4a8df4918..fb6989bb9 100644 --- a/surfsense_backend/app/tasks/connector_indexers/github_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/github_indexer.py @@ -24,6 +24,7 @@ from app.utils.document_converters import ( from .base import ( check_document_by_unique_identifier, + check_duplicate_document_by_hash, get_connector_by_id, get_current_timestamp, logger, @@ -319,6 +320,21 @@ async def _process_repository_digest( # Delete existing document to replace with new one await session.delete(existing_document) await session.flush() + else: + # Document doesn't exist by unique_identifier_hash + # Check if a document with the same content_hash exists (from another connector) + with session.no_autoflush: + duplicate_by_content = await check_duplicate_document_by_hash( + session, content_hash + ) + + if duplicate_by_content: + logger.info( + f"Repository {repo_full_name} already indexed by another connector " + f"(existing document ID: {duplicate_by_content.id}, " + f"type: {duplicate_by_content.document_type}). Skipping." + ) + return 0 # Generate summary using LLM (ONE call per repository!) user_llm = await get_user_long_context_llm(session, user_id, search_space_id) diff --git a/surfsense_backend/app/tasks/connector_indexers/google_gmail_indexer.py b/surfsense_backend/app/tasks/connector_indexers/google_gmail_indexer.py index 08d2904d6..e599abd22 100644 --- a/surfsense_backend/app/tasks/connector_indexers/google_gmail_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/google_gmail_indexer.py @@ -24,7 +24,9 @@ from app.utils.document_converters import ( ) from .base import ( + calculate_date_range, check_document_by_unique_identifier, + check_duplicate_document_by_hash, get_connector_by_id, get_current_timestamp, logger, @@ -163,10 +165,22 @@ async def index_google_gmail_messages( credentials, session, user_id, connector_id ) + # Calculate date range using last_indexed_at if dates not provided + # This ensures Gmail uses the same date logic as other connectors + # (uses last_indexed_at → now, or 365 days back for first-time indexing) + calculated_start_date, calculated_end_date = calculate_date_range( + connector, start_date, end_date, default_days_back=365 + ) + # Fetch recent Google gmail messages - logger.info(f"Fetching recent emails for connector {connector_id}") + logger.info( + f"Fetching emails for connector {connector_id} " + f"from {calculated_start_date} to {calculated_end_date}" + ) messages, error = await gmail_connector.get_recent_messages( - max_results=max_messages, start_date=start_date, end_date=end_date + max_results=max_messages, + start_date=calculated_start_date, + end_date=calculated_end_date, ) if error: @@ -316,6 +330,22 @@ async def index_google_gmail_messages( logger.info(f"Successfully updated Gmail message {subject}") continue + # Document doesn't exist by unique_identifier_hash + # Check if a document with the same content_hash exists (from another connector) + with session.no_autoflush: + duplicate_by_content = await check_duplicate_document_by_hash( + session, content_hash + ) + + if duplicate_by_content: + logger.info( + f"Gmail message {subject} already indexed by another connector " + f"(existing document ID: {duplicate_by_content.id}, " + f"type: {duplicate_by_content.document_type}). Skipping." + ) + documents_skipped += 1 + continue + # Document doesn't exist - create new one # Generate summary with metadata user_llm = await get_user_long_context_llm( diff --git a/surfsense_backend/app/tasks/connector_indexers/jira_indexer.py b/surfsense_backend/app/tasks/connector_indexers/jira_indexer.py index 4851a6466..d6095d20e 100644 --- a/surfsense_backend/app/tasks/connector_indexers/jira_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/jira_indexer.py @@ -23,6 +23,7 @@ from app.utils.document_converters import ( from .base import ( calculate_date_range, check_document_by_unique_identifier, + check_duplicate_document_by_hash, get_connector_by_id, get_current_timestamp, logger, @@ -284,6 +285,22 @@ async def index_jira_issues( ) continue + # Document doesn't exist by unique_identifier_hash + # Check if a document with the same content_hash exists (from another connector) + with session.no_autoflush: + duplicate_by_content = await check_duplicate_document_by_hash( + session, content_hash + ) + + if duplicate_by_content: + logger.info( + f"Jira issue {issue_identifier} already indexed by another connector " + f"(existing document ID: {duplicate_by_content.id}, " + f"type: {duplicate_by_content.document_type}). Skipping." + ) + documents_skipped += 1 + continue + # Document doesn't exist - create new one # Generate summary with metadata user_llm = await get_user_long_context_llm( diff --git a/surfsense_backend/app/tasks/connector_indexers/linear_indexer.py b/surfsense_backend/app/tasks/connector_indexers/linear_indexer.py index 7d8e0c30e..d00a39160 100644 --- a/surfsense_backend/app/tasks/connector_indexers/linear_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/linear_indexer.py @@ -22,6 +22,7 @@ from app.utils.document_converters import ( from .base import ( calculate_date_range, check_document_by_unique_identifier, + check_duplicate_document_by_hash, get_connector_by_id, get_current_timestamp, logger, @@ -315,6 +316,22 @@ async def index_linear_issues( ) continue + # Document doesn't exist by unique_identifier_hash + # Check if a document with the same content_hash exists (from another connector) + with session.no_autoflush: + duplicate_by_content = await check_duplicate_document_by_hash( + session, content_hash + ) + + if duplicate_by_content: + logger.info( + f"Linear issue {issue_identifier} already indexed by another connector " + f"(existing document ID: {duplicate_by_content.id}, " + f"type: {duplicate_by_content.document_type}). Skipping." + ) + documents_skipped += 1 + continue + # Document doesn't exist - create new one # Generate summary with metadata user_llm = await get_user_long_context_llm( diff --git a/surfsense_backend/app/tasks/connector_indexers/luma_indexer.py b/surfsense_backend/app/tasks/connector_indexers/luma_indexer.py index ead259a44..59890dbe4 100644 --- a/surfsense_backend/app/tasks/connector_indexers/luma_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/luma_indexer.py @@ -21,6 +21,7 @@ from app.utils.document_converters import ( from .base import ( check_document_by_unique_identifier, + check_duplicate_document_by_hash, get_connector_by_id, get_current_timestamp, logger, @@ -363,6 +364,22 @@ async def index_luma_events( logger.info(f"Successfully updated Luma event {event_name}") continue + # Document doesn't exist by unique_identifier_hash + # Check if a document with the same content_hash exists (from another connector) + with session.no_autoflush: + duplicate_by_content = await check_duplicate_document_by_hash( + session, content_hash + ) + + if duplicate_by_content: + logger.info( + f"Luma event {event_name} already indexed by another connector " + f"(existing document ID: {duplicate_by_content.id}, " + f"type: {duplicate_by_content.document_type}). Skipping." + ) + documents_skipped += 1 + continue + # Document doesn't exist - create new one # Generate summary with metadata user_llm = await get_user_long_context_llm( diff --git a/surfsense_backend/app/tasks/connector_indexers/notion_indexer.py b/surfsense_backend/app/tasks/connector_indexers/notion_indexer.py index 2d36351fa..a65bf84a7 100644 --- a/surfsense_backend/app/tasks/connector_indexers/notion_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/notion_indexer.py @@ -2,6 +2,7 @@ Notion connector indexer. """ +from collections.abc import Awaitable, Callable from datetime import datetime from sqlalchemy.exc import SQLAlchemyError @@ -22,12 +23,17 @@ from .base import ( build_document_metadata_string, calculate_date_range, check_document_by_unique_identifier, + check_duplicate_document_by_hash, get_connector_by_id, get_current_timestamp, logger, update_connector_last_indexed, ) +# Type alias for retry callback +# Signature: async callback(retry_reason, attempt, max_attempts, wait_seconds) -> None +RetryCallbackType = Callable[[str, int, int, float], Awaitable[None]] + async def index_notion_pages( session: AsyncSession, @@ -37,6 +43,7 @@ async def index_notion_pages( start_date: str | None = None, end_date: str | None = None, update_last_indexed: bool = True, + on_retry_callback: RetryCallbackType | None = None, ) -> tuple[int, str | None]: """ Index Notion pages from all accessible pages. @@ -49,6 +56,9 @@ async def index_notion_pages( start_date: Start date for indexing (YYYY-MM-DD format) end_date: End date for indexing (YYYY-MM-DD format) update_last_indexed: Whether to update the last_indexed_at timestamp (default: True) + on_retry_callback: Optional callback for retry progress notifications. + Signature: async callback(retry_reason, attempt, max_attempts, wait_seconds) + retry_reason is one of: 'rate_limit', 'server_error', 'timeout' Returns: Tuple containing (number of documents indexed, error message or None) @@ -138,6 +148,10 @@ async def index_notion_pages( session=session, connector_id=connector_id ) + # Set retry callback if provided (for user notifications during rate limits) + if on_retry_callback: + notion_client.set_retry_callback(on_retry_callback) + logger.info(f"Fetching Notion pages from {start_date_iso} to {end_date_iso}") await task_logger.log_task_progress( @@ -156,6 +170,20 @@ async def index_notion_pages( start_date=start_date_iso, end_date=end_date_iso ) logger.info(f"Found {len(pages)} Notion pages") + + # Get count of pages that had unsupported content skipped + pages_with_skipped_content = notion_client.get_skipped_content_count() + if pages_with_skipped_content > 0: + logger.info( + f"{pages_with_skipped_content} pages had Notion AI content skipped (not available via API)" + ) + + # Check if using legacy integration token and log warning + if notion_client.is_using_legacy_token(): + logger.warning( + f"Connector {connector_id} is using legacy integration token. " + "Recommend reconnecting with OAuth." + ) except Exception as e: await task_logger.log_task_failure( log_entry, @@ -170,12 +198,13 @@ async def index_notion_pages( if not pages: await task_logger.log_task_success( log_entry, - f"No Notion pages found for connector {connector_id}", + f"No Notion pages found for connector {connector_id}. " + "Ensure pages are shared with the Notion integration.", {"pages_found": 0}, ) logger.info("No Notion pages found to index") await notion_client.close() - return 0, "No Notion pages found" + return 0, None # Success with 0 pages, not an error # Track the number of documents indexed documents_indexed = 0 @@ -360,6 +389,22 @@ async def index_notion_pages( continue + # Document doesn't exist by unique_identifier_hash + # Check if a document with the same content_hash exists (from another connector) + with session.no_autoflush: + duplicate_by_content = await check_duplicate_document_by_hash( + session, content_hash + ) + + if duplicate_by_content: + logger.info( + f"Notion page {page_title} already indexed by another connector " + f"(existing document ID: {duplicate_by_content.id}, " + f"type: {duplicate_by_content.document_type}). Skipping." + ) + documents_skipped += 1 + continue + # Document doesn't exist - create new one # Get user's long context LLM user_llm = await get_user_long_context_llm( @@ -437,13 +482,23 @@ async def index_notion_pages( logger.info(f"Final commit: Total {documents_indexed} documents processed") await session.commit() - # Prepare result message + # Get final count of pages with skipped Notion AI content + pages_with_skipped_ai_content = notion_client.get_skipped_content_count() + + # Prepare result message with user-friendly notification about skipped content result_message = None if skipped_pages: result_message = f"Processed {total_processed} pages. Skipped {len(skipped_pages)} pages: {', '.join(skipped_pages)}" else: result_message = f"Processed {total_processed} pages." + # Add user-friendly message about skipped Notion AI content + if pages_with_skipped_ai_content > 0: + result_message += ( + " Audio transcriptions and AI summaries from Notion aren't accessible " + "via their API - all other content was saved." + ) + # Log success await task_logger.log_task_success( log_entry, @@ -453,6 +508,7 @@ async def index_notion_pages( "documents_indexed": documents_indexed, "documents_skipped": documents_skipped, "skipped_pages_count": len(skipped_pages), + "pages_with_skipped_ai_content": pages_with_skipped_ai_content, "result_message": result_message, }, ) @@ -464,10 +520,28 @@ async def index_notion_pages( # Clean up the async client await notion_client.close() + # Build user-friendly notification messages + # This will be shown in the notification to inform users + notification_parts = [] + + if pages_with_skipped_ai_content > 0: + notification_parts.append( + "Some Notion AI content couldn't be synced (API limitation)" + ) + + if notion_client.is_using_legacy_token(): + notification_parts.append( + "Using legacy token. Reconnect with OAuth for better reliability." + ) + + user_notification_message = ( + " ".join(notification_parts) if notification_parts else None + ) + return ( total_processed, - None, - ) # Return None on success (result_message is for logging only) + user_notification_message, + ) except SQLAlchemyError as db_error: await session.rollback() diff --git a/surfsense_backend/app/tasks/connector_indexers/obsidian_indexer.py b/surfsense_backend/app/tasks/connector_indexers/obsidian_indexer.py index 4c4dab4c2..a603d3fba 100644 --- a/surfsense_backend/app/tasks/connector_indexers/obsidian_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/obsidian_indexer.py @@ -28,6 +28,7 @@ from app.utils.document_converters import ( from .base import ( build_document_metadata_string, check_document_by_unique_identifier, + check_duplicate_document_by_hash, get_connector_by_id, get_current_timestamp, logger, @@ -426,6 +427,22 @@ async def index_obsidian_vault( indexed_count += 1 else: + # Document doesn't exist by unique_identifier_hash + # Check if a document with the same content_hash exists (from another connector) + with session.no_autoflush: + duplicate_by_content = await check_duplicate_document_by_hash( + session, content_hash + ) + + if duplicate_by_content: + logger.info( + f"Obsidian note {title} already indexed by another connector " + f"(existing document ID: {duplicate_by_content.id}, " + f"type: {duplicate_by_content.document_type}). Skipping." + ) + skipped_count += 1 + continue + # Create new document logger.info(f"Indexing new note: {title}") diff --git a/surfsense_backend/app/tasks/connector_indexers/slack_indexer.py b/surfsense_backend/app/tasks/connector_indexers/slack_indexer.py index f6ed4f567..f244c97f8 100644 --- a/surfsense_backend/app/tasks/connector_indexers/slack_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/slack_indexer.py @@ -22,6 +22,7 @@ from .base import ( build_document_metadata_markdown, calculate_date_range, check_document_by_unique_identifier, + check_duplicate_document_by_hash, get_connector_by_id, get_current_timestamp, logger, @@ -325,6 +326,22 @@ async def index_slack_messages( logger.info(f"Successfully updated Slack message {msg_ts}") continue + # Document doesn't exist by unique_identifier_hash + # Check if a document with the same content_hash exists (from another connector) + with session.no_autoflush: + duplicate_by_content = await check_duplicate_document_by_hash( + session, content_hash + ) + + if duplicate_by_content: + logger.info( + f"Slack message {msg_ts} in channel {channel_name} already indexed by another connector " + f"(existing document ID: {duplicate_by_content.id}, " + f"type: {duplicate_by_content.document_type}). Skipping." + ) + documents_skipped += 1 + continue + # Document doesn't exist - create new one # Process chunks chunks = await create_document_chunks(combined_document_string) diff --git a/surfsense_backend/app/tasks/connector_indexers/teams_indexer.py b/surfsense_backend/app/tasks/connector_indexers/teams_indexer.py index b879ddfcb..66b709ddc 100644 --- a/surfsense_backend/app/tasks/connector_indexers/teams_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/teams_indexer.py @@ -21,6 +21,7 @@ from .base import ( build_document_metadata_markdown, calculate_date_range, check_document_by_unique_identifier, + check_duplicate_document_by_hash, get_connector_by_id, get_current_timestamp, logger, @@ -354,6 +355,27 @@ async def index_teams_messages( ) continue + # Document doesn't exist by unique_identifier_hash + # Check if a document with the same content_hash exists (from another connector) + with session.no_autoflush: + duplicate_by_content = ( + await check_duplicate_document_by_hash( + session, content_hash + ) + ) + + if duplicate_by_content: + logger.info( + "Teams message %s in channel %s already indexed by another connector " + "(existing document ID: %s, type: %s). Skipping.", + message_id, + channel_name, + duplicate_by_content.id, + duplicate_by_content.document_type, + ) + documents_skipped += 1 + continue + # Document doesn't exist - create new one # Process chunks chunks = await create_document_chunks( diff --git a/surfsense_backend/app/tasks/connector_indexers/webcrawler_indexer.py b/surfsense_backend/app/tasks/connector_indexers/webcrawler_indexer.py index fb1aae5f2..0c63fd2f0 100644 --- a/surfsense_backend/app/tasks/connector_indexers/webcrawler_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/webcrawler_indexer.py @@ -18,9 +18,11 @@ from app.utils.document_converters import ( generate_document_summary, generate_unique_identifier_hash, ) +from app.utils.webcrawler_utils import parse_webcrawler_urls from .base import ( check_document_by_unique_identifier, + check_duplicate_document_by_hash, get_connector_by_id, get_current_timestamp, logger, @@ -96,13 +98,7 @@ async def index_crawled_urls( api_key = connector.config.get("FIRECRAWL_API_KEY") # Get URLs from connector config - initial_urls = connector.config.get("INITIAL_URLS", "") - if isinstance(initial_urls, str): - urls = [url.strip() for url in initial_urls.split("\n") if url.strip()] - elif isinstance(initial_urls, list): - urls = [url.strip() for url in initial_urls if url.strip()] - else: - urls = [] + urls = parse_webcrawler_urls(connector.config.get("INITIAL_URLS")) logger.info( f"Starting crawled web page indexing for connector {connector_id} with {len(urls)} URLs" @@ -281,6 +277,22 @@ async def index_crawled_urls( logger.info(f"Successfully updated URL {url}") continue + # Document doesn't exist by unique_identifier_hash + # Check if a document with the same content_hash exists (from another connector) + with session.no_autoflush: + duplicate_by_content = await check_duplicate_document_by_hash( + session, content_hash + ) + + if duplicate_by_content: + logger.info( + f"URL {url} already indexed by another connector " + f"(existing document ID: {duplicate_by_content.id}, " + f"type: {duplicate_by_content.document_type}). Skipping." + ) + documents_skipped += 1 + continue + # Document doesn't exist - create new one # Generate summary with metadata user_llm = await get_user_long_context_llm( diff --git a/surfsense_backend/app/tasks/document_processors/file_processors.py b/surfsense_backend/app/tasks/document_processors/file_processors.py index 0a22c20c2..6c4be0cb8 100644 --- a/surfsense_backend/app/tasks/document_processors/file_processors.py +++ b/surfsense_backend/app/tasks/document_processors/file_processors.py @@ -37,18 +37,32 @@ from .base import ( from .markdown_processor import add_received_markdown_file_document # Constants for LlamaCloud retry configuration -LLAMACLOUD_MAX_RETRIES = 3 -LLAMACLOUD_BASE_DELAY = 5 # Base delay in seconds for exponential backoff +LLAMACLOUD_MAX_RETRIES = 5 # Increased from 3 for large file resilience +LLAMACLOUD_BASE_DELAY = 10 # Base delay in seconds for exponential backoff +LLAMACLOUD_MAX_DELAY = 120 # Maximum delay between retries (2 minutes) LLAMACLOUD_RETRYABLE_EXCEPTIONS = ( ssl.SSLError, httpx.ConnectError, httpx.ConnectTimeout, httpx.ReadTimeout, httpx.WriteTimeout, + httpx.RemoteProtocolError, + httpx.LocalProtocolError, ConnectionError, + ConnectionResetError, TimeoutError, + OSError, # Catches various network-level errors ) +# Timeout calculation constants +UPLOAD_BYTES_PER_SECOND_SLOW = ( + 100 * 1024 +) # 100 KB/s (conservative for slow connections) +MIN_UPLOAD_TIMEOUT = 120 # Minimum 2 minutes for any file +MAX_UPLOAD_TIMEOUT = 1800 # Maximum 30 minutes for very large files +BASE_JOB_TIMEOUT = 600 # 10 minutes base for job processing +PER_PAGE_JOB_TIMEOUT = 60 # 1 minute per page for processing + def get_google_drive_unique_identifier( connector: dict | None, @@ -204,6 +218,48 @@ async def find_existing_document_with_migration( return existing_document +def calculate_upload_timeout(file_size_bytes: int) -> float: + """ + Calculate appropriate upload timeout based on file size. + + Assumes a conservative slow connection speed to handle worst-case scenarios. + + Args: + file_size_bytes: Size of the file in bytes + + Returns: + Timeout in seconds + """ + # Calculate time needed at slow connection speed + # Add 50% buffer for network variability and SSL overhead + estimated_time = (file_size_bytes / UPLOAD_BYTES_PER_SECOND_SLOW) * 1.5 + + # Clamp to reasonable bounds + return max(MIN_UPLOAD_TIMEOUT, min(estimated_time, MAX_UPLOAD_TIMEOUT)) + + +def calculate_job_timeout(estimated_pages: int, file_size_bytes: int) -> float: + """ + Calculate job processing timeout based on page count and file size. + + Args: + estimated_pages: Estimated number of pages + file_size_bytes: Size of the file in bytes + + Returns: + Timeout in seconds + """ + # Base timeout + time per page + page_based_timeout = BASE_JOB_TIMEOUT + (estimated_pages * PER_PAGE_JOB_TIMEOUT) + + # Also consider file size (large images take longer to process) + # ~1 minute per 10MB of file size + size_based_timeout = BASE_JOB_TIMEOUT + (file_size_bytes / (10 * 1024 * 1024)) * 60 + + # Use the larger of the two estimates + return max(page_based_timeout, size_based_timeout) + + async def parse_with_llamacloud_retry( file_path: str, estimated_pages: int, @@ -213,6 +269,9 @@ async def parse_with_llamacloud_retry( """ Parse a file with LlamaCloud with retry logic for transient SSL/connection errors. + Uses dynamic timeout calculations based on file size and page count to handle + very large files reliably. + Args: file_path: Path to the file to parse estimated_pages: Estimated number of pages for timeout calculation @@ -225,25 +284,37 @@ async def parse_with_llamacloud_retry( Raises: Exception: If all retries fail """ + import os + import random + from llama_cloud_services import LlamaParse from llama_cloud_services.parse.utils import ResultType - # Calculate timeouts based on estimated pages - # Base timeout of 300 seconds + 30 seconds per page for large documents - base_timeout = 300 - per_page_timeout = 30 - job_timeout = base_timeout + (estimated_pages * per_page_timeout) + # Get file size for timeout calculations + file_size_bytes = os.path.getsize(file_path) + file_size_mb = file_size_bytes / (1024 * 1024) - # Create custom httpx client with larger timeouts for file uploads - # The SSL error often occurs during large file uploads, so we need generous timeouts + # Calculate dynamic timeouts based on file size and page count + upload_timeout = calculate_upload_timeout(file_size_bytes) + job_timeout = calculate_job_timeout(estimated_pages, file_size_bytes) + + # HTTP client timeouts - scaled based on file size + # Write timeout is critical for large file uploads custom_timeout = httpx.Timeout( - connect=60.0, # 60 seconds to establish connection - read=300.0, # 5 minutes to read response - write=300.0, # 5 minutes to write/upload (important for large files) - pool=60.0, # 60 seconds to acquire connection from pool + connect=120.0, # 2 minutes to establish connection (handles slow DNS, etc.) + read=upload_timeout, # Dynamic based on file size + write=upload_timeout, # Dynamic based on file size (upload time) + pool=120.0, # 2 minutes to acquire connection from pool + ) + + logging.info( + f"LlamaCloud upload configured: file_size={file_size_mb:.1f}MB, " + f"pages={estimated_pages}, upload_timeout={upload_timeout:.0f}s, " + f"job_timeout={job_timeout:.0f}s" ) last_exception = None + attempt_errors = [] for attempt in range(1, LLAMACLOUD_MAX_RETRIES + 1): try: @@ -257,46 +328,66 @@ async def parse_with_llamacloud_retry( language="en", result_type=ResultType.MD, # Timeout settings for large files - max_timeout=max(2000, job_timeout), # Overall max timeout + max_timeout=int(max(2000, job_timeout + upload_timeout)), job_timeout_in_seconds=job_timeout, - job_timeout_extra_time_per_page_in_seconds=per_page_timeout, + job_timeout_extra_time_per_page_in_seconds=PER_PAGE_JOB_TIMEOUT, # Use our custom client with larger timeouts custom_client=custom_client, ) # Parse the file asynchronously result = await parser.aparse(file_path) + + # Success - log if we had previous failures + if attempt > 1: + logging.info( + f"LlamaCloud upload succeeded on attempt {attempt} after " + f"{len(attempt_errors)} failures" + ) + return result except LLAMACLOUD_RETRYABLE_EXCEPTIONS as e: last_exception = e error_type = type(e).__name__ + error_msg = str(e)[:200] + attempt_errors.append(f"Attempt {attempt}: {error_type} - {error_msg}") if attempt < LLAMACLOUD_MAX_RETRIES: - # Calculate exponential backoff delay - delay = LLAMACLOUD_BASE_DELAY * (2 ** (attempt - 1)) + # Calculate exponential backoff with jitter + # Base delay doubles each attempt, capped at max delay + base_delay = min( + LLAMACLOUD_BASE_DELAY * (2 ** (attempt - 1)), LLAMACLOUD_MAX_DELAY + ) + # Add random jitter (±25%) to prevent thundering herd + jitter = base_delay * 0.25 * (2 * random.random() - 1) + delay = base_delay + jitter if task_logger and log_entry: await task_logger.log_task_progress( log_entry, - f"LlamaCloud upload failed (attempt {attempt}/{LLAMACLOUD_MAX_RETRIES}), retrying in {delay}s", + f"LlamaCloud upload failed (attempt {attempt}/{LLAMACLOUD_MAX_RETRIES}), retrying in {delay:.0f}s", { "error_type": error_type, - "error_message": str(e)[:200], + "error_message": error_msg, "attempt": attempt, "retry_delay": delay, + "file_size_mb": round(file_size_mb, 1), + "upload_timeout": upload_timeout, }, ) else: logging.warning( - f"LlamaCloud upload failed (attempt {attempt}/{LLAMACLOUD_MAX_RETRIES}): {error_type}. " - f"Retrying in {delay}s..." + f"LlamaCloud upload failed (attempt {attempt}/{LLAMACLOUD_MAX_RETRIES}): " + f"{error_type}. File: {file_size_mb:.1f}MB. Retrying in {delay:.0f}s..." ) await asyncio.sleep(delay) else: logging.error( - f"LlamaCloud upload failed after {LLAMACLOUD_MAX_RETRIES} attempts: {error_type} - {e}" + f"LlamaCloud upload failed after {LLAMACLOUD_MAX_RETRIES} attempts. " + f"File size: {file_size_mb:.1f}MB, Pages: {estimated_pages}. " + f"Errors: {'; '.join(attempt_errors)}" ) except Exception: @@ -304,7 +395,10 @@ async def parse_with_llamacloud_retry( raise # All retries exhausted - raise last_exception or RuntimeError("LlamaCloud parsing failed after all retries") + raise last_exception or RuntimeError( + f"LlamaCloud parsing failed after {LLAMACLOUD_MAX_RETRIES} retries. " + f"File size: {file_size_mb:.1f}MB" + ) async def add_received_file_document_using_unstructured( diff --git a/surfsense_backend/app/users.py b/surfsense_backend/app/users.py index e86eb752b..4be2fe525 100644 --- a/surfsense_backend/app/users.py +++ b/surfsense_backend/app/users.py @@ -229,3 +229,4 @@ auth_backend = AuthenticationBackend( fastapi_users = FastAPIUsers[User, uuid.UUID](get_user_manager, [auth_backend]) current_active_user = fastapi_users.current_user(active=True) +current_optional_user = fastapi_users.current_user(active=True, optional=True) diff --git a/surfsense_backend/app/utils/content_utils.py b/surfsense_backend/app/utils/content_utils.py new file mode 100644 index 000000000..d2342b79e --- /dev/null +++ b/surfsense_backend/app/utils/content_utils.py @@ -0,0 +1,75 @@ +""" +Utilities for working with message content. + +Message content in new_chat_messages can be stored in various formats: +- String: Simple text content +- List: Array of content parts [{"type": "text", "text": "..."}, {"type": "tool-call", ...}] +- Dict: Single content object + +These utilities help extract and transform content for different use cases. +""" + +from langchain_core.messages import AIMessage, HumanMessage +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + + +def extract_text_content(content: str | dict | list) -> str: + """Extract plain text content from various message formats.""" + if isinstance(content, str): + return content + if isinstance(content, dict): + # Handle dict with 'text' key + if "text" in content: + return content["text"] + return str(content) + if isinstance(content, list): + # Handle list of parts (e.g., [{"type": "text", "text": "..."}]) + texts = [] + for part in content: + if isinstance(part, dict) and part.get("type") == "text": + texts.append(part.get("text", "")) + elif isinstance(part, str): + texts.append(part) + return "\n".join(texts) if texts else "" + return "" + + +async def bootstrap_history_from_db( + session: AsyncSession, + thread_id: int, +) -> list[HumanMessage | AIMessage]: + """ + Load message history from database and convert to LangChain format. + + Used for cloned chats where the LangGraph checkpointer has no state, + but we have messages in the database that should be used as context. + + Args: + session: Database session + thread_id: The chat thread ID + + Returns: + List of LangChain messages (HumanMessage/AIMessage) + """ + from app.db import NewChatMessage + + result = await session.execute( + select(NewChatMessage) + .filter(NewChatMessage.thread_id == thread_id) + .order_by(NewChatMessage.created_at) + ) + db_messages = result.scalars().all() + + langchain_messages: list[HumanMessage | AIMessage] = [] + + for msg in db_messages: + text_content = extract_text_content(msg.content) + if not text_content: + continue + if msg.role == "user": + langchain_messages.append(HumanMessage(content=text_content)) + elif msg.role == "assistant": + langchain_messages.append(AIMessage(content=text_content)) + + return langchain_messages diff --git a/surfsense_backend/app/utils/periodic_scheduler.py b/surfsense_backend/app/utils/periodic_scheduler.py index 219641933..aa8c07ce4 100644 --- a/surfsense_backend/app/utils/periodic_scheduler.py +++ b/surfsense_backend/app/utils/periodic_scheduler.py @@ -43,6 +43,7 @@ def create_periodic_schedule( user_id: str, connector_type: SearchSourceConnectorType, frequency_minutes: int, + connector_config: dict | None = None, ) -> bool: """ Trigger the first indexing run immediately when periodic indexing is enabled. @@ -57,11 +58,26 @@ def create_periodic_schedule( user_id: User ID connector_type: Type of connector frequency_minutes: Frequency in minutes (used for logging) + connector_config: Optional connector config dict for validation Returns: True if successful, False otherwise """ try: + # Special handling for connectors that require config validation + if connector_type == SearchSourceConnectorType.WEBCRAWLER_CONNECTOR: + from app.utils.webcrawler_utils import parse_webcrawler_urls + + config = connector_config or {} + urls = parse_webcrawler_urls(config.get("INITIAL_URLS")) + + if not urls: + logger.info( + f"Webcrawler connector {connector_id} has no URLs configured, " + "skipping first indexing run (will run when URLs are added)" + ) + return True # Return success - schedule is created, just no first run + logger.info( f"Periodic indexing enabled for connector {connector_id} " f"(frequency: {frequency_minutes} minutes). Triggering first run..." diff --git a/surfsense_backend/app/utils/webcrawler_utils.py b/surfsense_backend/app/utils/webcrawler_utils.py new file mode 100644 index 000000000..31d2ebe50 --- /dev/null +++ b/surfsense_backend/app/utils/webcrawler_utils.py @@ -0,0 +1,28 @@ +""" +Utility functions for webcrawler connector. +""" + + +def parse_webcrawler_urls(initial_urls: str | list | None) -> list[str]: + """ + Parse URLs from webcrawler INITIAL_URLS value. + + Handles both string (newline-separated) and list formats. + + Args: + initial_urls: The INITIAL_URLS value (string, list, or None) + + Returns: + List of parsed, stripped, non-empty URLs + """ + if initial_urls is None: + return [] + + if isinstance(initial_urls, str): + return [url.strip() for url in initial_urls.split("\n") if url.strip()] + elif isinstance(initial_urls, list): + return [ + url.strip() for url in initial_urls if isinstance(url, str) and url.strip() + ] + else: + return [] diff --git a/surfsense_web/app/(home)/login/page.tsx b/surfsense_web/app/(home)/login/page.tsx index 0dc9c445f..8b3be3805 100644 --- a/surfsense_web/app/(home)/login/page.tsx +++ b/surfsense_web/app/(home)/login/page.tsx @@ -27,6 +27,13 @@ function LoginContent() { const error = searchParams.get("error"); const message = searchParams.get("message"); const logout = searchParams.get("logout"); + const returnUrl = searchParams.get("returnUrl"); + + // Save returnUrl to localStorage so it persists through OAuth flows (e.g., Google) + // This is read by TokenHandler after successful authentication + if (returnUrl) { + localStorage.setItem("surfsense_redirect_path", decodeURIComponent(returnUrl)); + } // Show registration success message if (registered === "true") { @@ -93,7 +100,7 @@ function LoginContent() { }, [searchParams, t, tCommon]); // Use global loading screen for auth type determination - spinner animation won't reset - useGlobalLoadingEffect(isLoading, tCommon("loading"), "login"); + useGlobalLoadingEffect(isLoading); // Show nothing while loading - the GlobalLoadingProvider handles the loading UI if (isLoading) { diff --git a/surfsense_web/app/auth/callback/loading.tsx b/surfsense_web/app/auth/callback/loading.tsx index 0c94e1ee0..f12b3847d 100644 --- a/surfsense_web/app/auth/callback/loading.tsx +++ b/surfsense_web/app/auth/callback/loading.tsx @@ -1,13 +1,10 @@ "use client"; -import { useTranslations } from "next-intl"; import { useGlobalLoadingEffect } from "@/hooks/use-global-loading"; export default function AuthCallbackLoading() { - const t = useTranslations("auth"); - // Use global loading - spinner animation won't reset when page transitions - useGlobalLoadingEffect(true, t("processing_authentication"), "default"); + useGlobalLoadingEffect(true); // Return null - the GlobalLoadingProvider handles the loading UI return null; diff --git a/surfsense_web/app/dashboard/[search_space_id]/client-layout.tsx b/surfsense_web/app/dashboard/[search_space_id]/client-layout.tsx index e6730d8d1..8418d4719 100644 --- a/surfsense_web/app/dashboard/[search_space_id]/client-layout.tsx +++ b/surfsense_web/app/dashboard/[search_space_id]/client-layout.tsx @@ -154,11 +154,7 @@ export function DashboardClientLayout({ isAutoConfiguring; // Use global loading screen - spinner animation won't reset - useGlobalLoadingEffect( - shouldShowLoading, - isAutoConfiguring ? t("setting_up_ai") : t("checking_llm_prefs"), - "default" - ); + useGlobalLoadingEffect(shouldShowLoading); if (shouldShowLoading) { return null; diff --git a/surfsense_web/app/dashboard/[search_space_id]/new-chat/[[...chat_id]]/page.tsx b/surfsense_web/app/dashboard/[search_space_id]/new-chat/[[...chat_id]]/page.tsx index 4509a44a7..803bd6661 100644 --- a/surfsense_web/app/dashboard/[search_space_id]/new-chat/[[...chat_id]]/page.tsx +++ b/surfsense_web/app/dashboard/[search_space_id]/new-chat/[[...chat_id]]/page.tsx @@ -13,7 +13,11 @@ import { useTranslations } from "next-intl"; import { useCallback, useEffect, useMemo, useRef, useState } from "react"; import { toast } from "sonner"; import { z } from "zod"; -import { currentThreadAtom } from "@/atoms/chat/current-thread.atom"; +import { + clearTargetCommentIdAtom, + currentThreadAtom, + setTargetCommentIdAtom, +} from "@/atoms/chat/current-thread.atom"; import { type MentionedDocumentInfo, mentionedDocumentIdsAtom, @@ -38,9 +42,11 @@ import { RecallMemoryToolUI, SaveMemoryToolUI } from "@/components/tool-ui/user- import { Spinner } from "@/components/ui/spinner"; import { useChatSessionStateSync } from "@/hooks/use-chat-session-state"; import { useMessagesElectric } from "@/hooks/use-messages-electric"; +import { publicChatApiService } from "@/lib/apis/public-chat-api.service"; // import { WriteTodosToolUI } from "@/components/tool-ui/write-todos"; import { getBearerToken } from "@/lib/auth-utils"; import { createAttachmentAdapter, extractAttachmentContent } from "@/lib/chat/attachment-adapter"; +import { convertToThreadMessage } from "@/lib/chat/message-utils"; import { isPodcastGenerating, looksLikePodcastRequest, @@ -110,112 +116,6 @@ function extractMentionedDocuments(content: unknown): MentionedDocumentInfo[] { return []; } -/** - * Zod schema for persisted attachment info - */ -const PersistedAttachmentSchema = z.object({ - id: z.string(), - name: z.string(), - type: z.string(), - contentType: z.string().optional(), - imageDataUrl: z.string().optional(), - extractedContent: z.string().optional(), -}); - -const AttachmentsPartSchema = z.object({ - type: z.literal("attachments"), - items: z.array(PersistedAttachmentSchema), -}); - -type PersistedAttachment = z.infer; - -/** - * Extract persisted attachments from message content (type-safe with Zod) - */ -function extractPersistedAttachments(content: unknown): PersistedAttachment[] { - if (!Array.isArray(content)) return []; - - for (const part of content) { - const result = AttachmentsPartSchema.safeParse(part); - if (result.success) { - return result.data.items; - } - } - - return []; -} - -/** - * Convert backend message to assistant-ui ThreadMessageLike format - * Filters out 'thinking-steps' part as it's handled separately via messageThinkingSteps - * Restores attachments for user messages from persisted data - */ -function convertToThreadMessage(msg: MessageRecord): ThreadMessageLike { - let content: ThreadMessageLike["content"]; - - if (typeof msg.content === "string") { - content = [{ type: "text", text: msg.content }]; - } else if (Array.isArray(msg.content)) { - // Filter out custom metadata parts - they're handled separately - const filteredContent = msg.content.filter((part: unknown) => { - if (typeof part !== "object" || part === null || !("type" in part)) return true; - const partType = (part as { type: string }).type; - // Filter out thinking-steps, mentioned-documents, and attachments - return ( - partType !== "thinking-steps" && - partType !== "mentioned-documents" && - partType !== "attachments" - ); - }); - content = - filteredContent.length > 0 - ? (filteredContent as ThreadMessageLike["content"]) - : [{ type: "text", text: "" }]; - } else { - content = [{ type: "text", text: String(msg.content) }]; - } - - // Restore attachments for user messages - let attachments: ThreadMessageLike["attachments"]; - if (msg.role === "user") { - const persistedAttachments = extractPersistedAttachments(msg.content); - if (persistedAttachments.length > 0) { - attachments = persistedAttachments.map((att) => ({ - id: att.id, - name: att.name, - type: att.type as "document" | "image" | "file", - contentType: att.contentType || "application/octet-stream", - status: { type: "complete" as const }, - content: [], - // Custom fields for our ChatAttachment interface - imageDataUrl: att.imageDataUrl, - extractedContent: att.extractedContent, - })); - } - } - - // Build metadata.custom for author display in shared chats - const metadata = msg.author_id - ? { - custom: { - author: { - displayName: msg.author_display_name ?? null, - avatarUrl: msg.author_avatar_url ?? null, - }, - }, - } - : undefined; - - return { - id: `msg-${msg.id}`, - role: msg.role, - content, - createdAt: new Date(msg.created_at), - attachments, - metadata, - }; -} - /** * Tools that should render custom UI in the chat. */ @@ -242,6 +142,8 @@ export default function NewChatPage() { const params = useParams(); const queryClient = useQueryClient(); const [isInitializing, setIsInitializing] = useState(true); + const [isCompletingClone, setIsCompletingClone] = useState(false); + const [cloneError, setCloneError] = useState(false); const [threadId, setThreadId] = useState(null); const [currentThread, setCurrentThread] = useState(null); const [messages, setMessages] = useState([]); @@ -261,6 +163,8 @@ export default function NewChatPage() { const setMessageDocumentsMap = useSetAtom(messageDocumentsMapAtom); const hydratePlanState = useSetAtom(hydratePlanStateAtom); const setCurrentThreadState = useSetAtom(currentThreadAtom); + const setTargetCommentId = useSetAtom(setTargetCommentIdAtom); + const clearTargetCommentId = useSetAtom(clearTargetCommentIdAtom); // Get current user for author info in shared chats const { data: currentUser } = useAtomValue(currentUserAtom); @@ -294,6 +198,12 @@ export default function NewChatPage() { ? membersData?.find((m) => m.user_id === msg.author_id) : null; + // Preserve existing author info if member lookup fails (e.g., cloned chats) + const existingMsg = prev.find((m) => m.id === `msg-${msg.id}`); + const existingAuthor = existingMsg?.metadata?.custom?.author as + | { displayName?: string | null; avatarUrl?: string | null } + | undefined; + return convertToThreadMessage({ id: msg.id, thread_id: msg.thread_id, @@ -301,8 +211,8 @@ export default function NewChatPage() { content: msg.content, author_id: msg.author_id, created_at: msg.created_at, - author_display_name: member?.user_display_name ?? null, - author_avatar_url: member?.user_avatar_url ?? null, + author_display_name: member?.user_display_name ?? existingAuthor?.displayName ?? null, + author_avatar_url: member?.user_avatar_url ?? existingAuthor?.avatarUrl ?? null, }); }); }); @@ -422,46 +332,71 @@ export default function NewChatPage() { initializeThread(); }, [initializeThread]); + // Handle clone completion when thread has clone_pending flag + useEffect(() => { + if (!currentThread?.clone_pending || isCompletingClone || cloneError) return; + + const completeClone = async () => { + setIsCompletingClone(true); + + try { + await publicChatApiService.completeClone({ thread_id: currentThread.id }); + + // Re-initialize thread to fetch cloned content using existing logic + await initializeThread(); + + // Invalidate threads query to update sidebar + queryClient.invalidateQueries({ + predicate: (query) => Array.isArray(query.queryKey) && query.queryKey[0] === "threads", + }); + } catch (error) { + console.error("[NewChatPage] Failed to complete clone:", error); + toast.error("Failed to copy chat content. Please try again."); + setCloneError(true); + } finally { + setIsCompletingClone(false); + } + }; + + completeClone(); + }, [ + currentThread?.clone_pending, + currentThread?.id, + isCompletingClone, + cloneError, + initializeThread, + queryClient, + ]); + // Handle scroll to comment from URL query params (e.g., from inbox item click) const searchParams = useSearchParams(); - const targetCommentId = searchParams.get("commentId"); + const targetCommentIdParam = searchParams.get("commentId"); + // Set target comment ID from URL param - the AssistantMessage and CommentItem + // components will handle scrolling and highlighting once comments are loaded useEffect(() => { - if (!targetCommentId || isInitializing || messages.length === 0) return; - - const tryScroll = () => { - const el = document.querySelector(`[data-comment-id="${targetCommentId}"]`); - if (el) { - el.scrollIntoView({ behavior: "smooth", block: "center" }); - return true; + if (targetCommentIdParam && !isInitializing) { + const commentId = Number.parseInt(targetCommentIdParam, 10); + if (!Number.isNaN(commentId)) { + setTargetCommentId(commentId); } - return false; - }; + } - // Try immediately - if (tryScroll()) return; - - // Retry every 200ms for up to 10 seconds - const intervalId = setInterval(() => { - if (tryScroll()) clearInterval(intervalId); - }, 200); - - const timeoutId = setTimeout(() => clearInterval(intervalId), 10000); - - return () => { - clearInterval(intervalId); - clearTimeout(timeoutId); - }; - }, [targetCommentId, isInitializing, messages.length]); + // Cleanup on unmount or when navigating away + return () => clearTargetCommentId(); + }, [targetCommentIdParam, isInitializing, setTargetCommentId, clearTargetCommentId]); // Sync current thread state to atom useEffect(() => { - setCurrentThreadState({ + setCurrentThreadState((prev) => ({ + ...prev, id: currentThread?.id ?? null, visibility: currentThread?.visibility ?? null, hasComments: currentThread?.has_comments ?? false, addingCommentToMessageId: null, - }); + publicShareEnabled: currentThread?.public_share_enabled ?? false, + publicShareToken: currentThread?.public_share_token ?? null, + })); }, [currentThread, setCurrentThreadState]); // Cancel ongoing request @@ -887,13 +822,13 @@ export default function NewChatPage() { // Update the tool call with its result updateToolCall(parsed.toolCallId, { result: parsed.output }); // Handle podcast-specific logic - if (parsed.output?.status === "processing" && parsed.output?.task_id) { + if (parsed.output?.status === "pending" && parsed.output?.podcast_id) { // Check if this is a podcast tool by looking at the content part const idx = toolCallIndices.get(parsed.toolCallId); if (idx !== undefined) { const part = contentParts[idx]; if (part?.type === "tool-call" && part.toolName === "generate_podcast") { - setActivePodcastTaskId(parsed.output.task_id); + setActivePodcastTaskId(String(parsed.output.podcast_id)); } } } @@ -1307,12 +1242,12 @@ export default function NewChatPage() { case "tool-output-available": updateToolCall(parsed.toolCallId, { result: parsed.output }); - if (parsed.output?.status === "processing" && parsed.output?.task_id) { + if (parsed.output?.status === "pending" && parsed.output?.podcast_id) { const idx = toolCallIndices.get(parsed.toolCallId); if (idx !== undefined) { const part = contentParts[idx]; if (part?.type === "tool-call" && part.toolName === "generate_podcast") { - setActivePodcastTaskId(parsed.output.task_id); + setActivePodcastTaskId(String(parsed.output.podcast_id)); } } } @@ -1485,6 +1420,16 @@ export default function NewChatPage() { ); } + // Show loading state while completing clone + if (isCompletingClone) { + return ( +
+ +
Copying chat content...
+
+ ); + } + // Show error state only if we tried to load an existing thread but failed // For new chats (urlChatId === 0), threadId being null is expected (lazy creation) if (!threadId && urlChatId > 0) { diff --git a/surfsense_web/app/dashboard/[search_space_id]/team/page.tsx b/surfsense_web/app/dashboard/[search_space_id]/team/page.tsx index 298871cf7..87e4281ae 100644 --- a/surfsense_web/app/dashboard/[search_space_id]/team/page.tsx +++ b/surfsense_web/app/dashboard/[search_space_id]/team/page.tsx @@ -115,13 +115,13 @@ import type { Membership, UpdateMembershipRequest, } from "@/contracts/types/members.types"; +import type { PermissionInfo } from "@/contracts/types/permissions.types"; import type { CreateRoleRequest, DeleteRoleRequest, Role, UpdateRoleRequest, } from "@/contracts/types/roles.types"; -import type { PermissionInfo } from "@/contracts/types/permissions.types"; import { invitesApiService } from "@/lib/apis/invites-api.service"; import { rolesApiService } from "@/lib/apis/roles-api.service"; import { trackSearchSpaceInviteSent, trackSearchSpaceUsersViewed } from "@/lib/posthog/events"; @@ -980,11 +980,7 @@ function RolesTab({ > {/* Create Role Button / Section */} {canCreate && !showCreateRole && ( - +