Merge remote-tracking branch 'upstream/dev' into fix/notion-connector

This commit is contained in:
Anish Sarkar 2026-01-29 10:45:31 +05:30
commit 1658724fb2
32 changed files with 633 additions and 78 deletions

View file

@ -54,7 +54,9 @@ def set_generating_podcast(search_space_id: int, podcast_id: int) -> None:
client = get_redis_client()
client.setex(_redis_key(search_space_id), 1800, str(podcast_id))
except Exception as e:
print(f"[generate_podcast] Warning: Could not set generating podcast in Redis: {e}")
print(
f"[generate_podcast] Warning: Could not set generating podcast in Redis: {e}"
)
def create_generate_podcast_tool(

View file

@ -4,6 +4,8 @@ Composio Google Drive Connector Module.
Provides Google Drive specific methods for data retrieval and indexing via Composio.
"""
import hashlib
import json
import logging
import os
import tempfile
@ -464,6 +466,55 @@ async def check_document_by_unique_identifier(
return existing_doc_result.scalars().first()
async def check_document_by_content_hash(
session: AsyncSession, content_hash: str
) -> Document | None:
"""Check if a document with the given content hash already exists.
This is used to prevent duplicate content from being indexed, regardless
of which connector originally indexed it.
"""
from sqlalchemy.future import select
existing_doc_result = await session.execute(
select(Document).where(Document.content_hash == content_hash)
)
return existing_doc_result.scalars().first()
async def check_document_by_google_drive_file_id(
session: AsyncSession, file_id: str, search_space_id: int
) -> Document | None:
"""Check if a document with this Google Drive file ID exists (from any connector).
This checks both metadata key formats:
- 'google_drive_file_id' (normal Google Drive connector)
- 'file_id' (Composio Google Drive connector)
This allows detecting duplicates BEFORE downloading/ETL, saving expensive API calls.
"""
from sqlalchemy import String, cast, or_
from sqlalchemy.future import select
# When casting JSON to String, the result includes quotes: "value" instead of value
# So we need to compare with the quoted version
quoted_file_id = f'"{file_id}"'
existing_doc_result = await session.execute(
select(Document).where(
Document.search_space_id == search_space_id,
or_(
# Normal Google Drive connector format
cast(Document.document_metadata["google_drive_file_id"], String)
== quoted_file_id,
# Composio Google Drive connector format
cast(Document.document_metadata["file_id"], String) == quoted_file_id,
),
)
)
return existing_doc_result.scalars().first()
async def update_connector_last_indexed(
session: AsyncSession,
connector,
@ -477,6 +528,33 @@ async def update_connector_last_indexed(
logger.info(f"Updated last_indexed_at to {connector.last_indexed_at}")
def generate_indexing_settings_hash(
selected_folders: list[dict],
selected_files: list[dict],
indexing_options: dict,
) -> str:
"""Generate a hash of indexing settings to detect configuration changes.
This hash is used to determine if indexing settings have changed since
the last index, which would require a full re-scan instead of delta sync.
Args:
selected_folders: List of {id, name} for folders to index
selected_files: List of {id, name} for individual files to index
indexing_options: Dict with max_files_per_folder, include_subfolders, etc.
Returns:
MD5 hash string of the settings
"""
settings = {
"folders": sorted([f.get("id", "") for f in selected_folders]),
"files": sorted([f.get("id", "") for f in selected_files]),
"include_subfolders": indexing_options.get("include_subfolders", True),
"max_files_per_folder": indexing_options.get("max_files_per_folder", 100),
}
return hashlib.md5(json.dumps(settings, sort_keys=True).encode()).hexdigest()
async def index_composio_google_drive(
session: AsyncSession,
connector,
@ -487,12 +565,16 @@ async def index_composio_google_drive(
log_entry,
update_last_indexed: bool = True,
max_items: int = 1000,
) -> tuple[int, str]:
) -> tuple[int, int, str | None]:
"""Index Google Drive files via Composio with delta sync support.
Returns:
Tuple of (documents_indexed, documents_skipped, error_message or None)
Delta Sync Flow:
1. First sync: Full scan + get initial page token
2. Subsequent syncs: Use LIST_CHANGES to process only changed files
(unless settings changed or incremental_sync is disabled)
Supports folder/file selection via connector config:
- selected_folders: List of {id, name} for folders to index
@ -508,12 +590,42 @@ async def index_composio_google_drive(
selected_files = connector_config.get("selected_files", [])
indexing_options = connector_config.get("indexing_options", {})
# Check for stored page token for delta sync
stored_page_token = connector_config.get("drive_page_token")
use_delta_sync = stored_page_token and connector.last_indexed_at
max_files_per_folder = indexing_options.get("max_files_per_folder", 100)
include_subfolders = indexing_options.get("include_subfolders", True)
incremental_sync = indexing_options.get("incremental_sync", True)
# Generate current settings hash to detect configuration changes
current_settings_hash = generate_indexing_settings_hash(
selected_folders, selected_files, indexing_options
)
last_settings_hash = connector_config.get("last_indexed_settings_hash")
# Detect if settings changed since last index
settings_changed = (
last_settings_hash is not None
and current_settings_hash != last_settings_hash
)
if settings_changed:
logger.info(
f"Indexing settings changed for connector {connector_id}. "
f"Will perform full re-scan to apply new configuration."
)
# Check for stored page token for delta sync
stored_page_token = connector_config.get("drive_page_token")
# Determine whether to use delta sync:
# - Must have a stored page token
# - Must have been indexed before (last_indexed_at exists)
# - User must have incremental_sync enabled
# - Settings must not have changed (folder/subfolder config)
use_delta_sync = (
incremental_sync
and stored_page_token
and connector.last_indexed_at
and not settings_changed
)
# Route to delta sync or full scan
if use_delta_sync:
@ -588,6 +700,14 @@ async def index_composio_google_drive(
elif token_error:
logger.warning(f"Failed to get new page token: {token_error}")
# Save current settings hash for future change detection
# This allows detecting when folder/subfolder settings change
if not connector.config:
connector.config = {}
connector.config["last_indexed_settings_hash"] = current_settings_hash
flag_modified(connector, "config")
logger.info(f"Saved indexing settings hash for connector {connector_id}")
# CRITICAL: Always update timestamp so Electric SQL syncs and UI shows indexed status
await update_connector_last_indexed(session, connector, update_last_indexed)
@ -628,11 +748,11 @@ async def index_composio_google_drive(
},
)
return documents_indexed, error_message
return documents_indexed, documents_skipped, error_message
except Exception as e:
logger.error(f"Failed to index Google Drive via Composio: {e!s}", exc_info=True)
return 0, f"Failed to index Google Drive via Composio: {e!s}"
return 0, 0, f"Failed to index Google Drive via Composio: {e!s}"
async def _index_composio_drive_delta_sync(
@ -953,13 +1073,28 @@ async def _process_single_drive_file(
"""
processing_errors = []
# ========== EARLY DUPLICATE CHECK BY FILE ID ==========
# Check if this Google Drive file was already indexed by ANY connector
# This happens BEFORE download/ETL to save expensive API calls
existing_by_file_id = await check_document_by_google_drive_file_id(
session, file_id, search_space_id
)
if existing_by_file_id:
logger.info(
f"Skipping file {file_name} (file_id={file_id}): already indexed "
f"by {existing_by_file_id.document_type.value} as '{existing_by_file_id.title}' "
f"(saved download & ETL cost)"
)
return 0, 1, processing_errors # Skip - NO download, NO ETL!
# ======================================================
# Generate unique identifier hash
document_type = DocumentType(TOOLKIT_TO_DOCUMENT_TYPE["googledrive"])
unique_identifier_hash = generate_unique_identifier_hash(
document_type, f"drive_{file_id}", search_space_id
)
# Check if document exists
# Check if document exists by unique identifier (same connector, same file)
existing_document = await check_document_by_unique_identifier(
session, unique_identifier_hash
)
@ -1000,7 +1135,7 @@ async def _process_single_drive_file(
if existing_document:
if existing_document.content_hash == content_hash:
return 0, 1, processing_errors # Skipped
return 0, 1, processing_errors # Skipped - unchanged
# Update existing document
user_llm = await get_user_long_context_llm(session, user_id, search_space_id)
@ -1039,7 +1174,19 @@ async def _process_single_drive_file(
existing_document.chunks = chunks
existing_document.updated_at = get_current_timestamp()
return 1, 0, processing_errors # Indexed
return 1, 0, processing_errors # Indexed - updated
# Check if content_hash already exists (from any connector)
# This prevents duplicate content and avoids IntegrityError on unique constraint
existing_by_content_hash = await check_document_by_content_hash(
session, content_hash
)
if existing_by_content_hash:
logger.info(
f"Skipping file {file_name} (file_id={file_id}): identical content "
f"already indexed as '{existing_by_content_hash.title}'"
)
return 0, 1, processing_errors # Skipped - duplicate content
# Create new document
user_llm = await get_user_long_context_llm(session, user_id, search_space_id)
@ -1085,7 +1232,7 @@ async def _process_single_drive_file(
)
session.add(document)
return 1, 0, processing_errors # Indexed
return 1, 0, processing_errors # Indexed - new
async def _fetch_folder_files_recursively(

View file

@ -670,7 +670,9 @@ async def delete_thread(
) from None
@router.post("/threads/{thread_id}/complete-clone", response_model=CompleteCloneResponse)
@router.post(
"/threads/{thread_id}/complete-clone", response_model=CompleteCloneResponse
)
async def complete_clone(
thread_id: int,
session: AsyncSession = Depends(get_async_session),
@ -702,7 +704,9 @@ async def complete_clone(
raise HTTPException(status_code=400, detail="Clone already completed")
if not thread.cloned_from_thread_id:
raise HTTPException(status_code=400, detail="No source thread to clone from")
raise HTTPException(
status_code=400, detail="No source thread to clone from"
)
message_count = await complete_clone_content(
session=session,

View file

@ -53,7 +53,9 @@ async def clone_public_chat_endpoint(
source_thread = await get_thread_by_share_token(session, share_token)
if not source_thread:
raise HTTPException(status_code=404, detail="Chat not found or no longer public")
raise HTTPException(
status_code=404, detail="Chat not found or no longer public"
)
target_search_space_id = await get_user_default_search_space(session, user.id)

View file

@ -187,6 +187,7 @@ async def create_search_source_connector(
user_id=str(user.id),
connector_type=db_connector.connector_type,
frequency_minutes=db_connector.indexing_frequency_minutes,
connector_config=db_connector.config,
)
if not success:
logger.warning(
@ -646,6 +647,7 @@ async def index_connector_content(
# Handle different connector types
response_message = ""
indexing_started = True
# Use UTC for consistency with last_indexed_at storage
today_str = datetime.now(UTC).strftime("%Y-%m-%d")
@ -921,14 +923,31 @@ async def index_connector_content(
elif connector.connector_type == SearchSourceConnectorType.WEBCRAWLER_CONNECTOR:
from app.tasks.celery_tasks.connector_tasks import index_crawled_urls_task
from app.utils.webcrawler_utils import parse_webcrawler_urls
logger.info(
f"Triggering web pages indexing for connector {connector_id} into search space {search_space_id} from {indexing_from} to {indexing_to}"
)
index_crawled_urls_task.delay(
connector_id, search_space_id, str(user.id), indexing_from, indexing_to
)
response_message = "Web page indexing started in the background."
# Check if URLs are configured before triggering indexing
connector_config = connector.config or {}
urls = parse_webcrawler_urls(connector_config.get("INITIAL_URLS"))
if not urls:
# URLs are optional - skip indexing gracefully
logger.info(
f"Webcrawler connector {connector_id} has no URLs configured, skipping indexing"
)
response_message = "No URLs configured for this connector. Add URLs in the connector settings to enable indexing."
indexing_started = False
else:
logger.info(
f"Triggering web pages indexing for connector {connector_id} into search space {search_space_id} from {indexing_from} to {indexing_to}"
)
index_crawled_urls_task.delay(
connector_id,
search_space_id,
str(user.id),
indexing_from,
indexing_to,
)
response_message = "Web page indexing started in the background."
elif connector.connector_type == SearchSourceConnectorType.OBSIDIAN_CONNECTOR:
from app.config import config as app_config
@ -1025,6 +1044,7 @@ async def index_connector_content(
return {
"message": response_message,
"indexing_started": indexing_started,
"connector_id": connector_id,
"search_space_id": search_space_id,
"indexing_from": indexing_from,
@ -1223,8 +1243,15 @@ async def _run_indexing_with_notifications(
indexing_kwargs["on_retry_callback"] = on_retry_callback
# Run the indexing function
documents_processed, error_or_warning = await indexing_function(**indexing_kwargs)
current_indexed_count = documents_processed
# Some indexers return (indexed, error), others return (indexed, skipped, error)
result = await indexing_function(**indexing_kwargs)
# Handle both 2-tuple and 3-tuple returns for backwards compatibility
if len(result) == 3:
documents_processed, documents_skipped, error_or_warning = result
else:
documents_processed, error_or_warning = result
documents_skipped = None
# Update connector timestamp if function provided and indexing was successful
if documents_processed > 0 and update_timestamp_func:
@ -1252,6 +1279,7 @@ async def _run_indexing_with_notifications(
notification=notification,
indexed_count=documents_processed,
error_message=error_or_warning, # Show errors even if some documents were indexed
skipped_count=documents_skipped,
)
await (
session.commit()
@ -1278,6 +1306,7 @@ async def _run_indexing_with_notifications(
notification=notification,
indexed_count=documents_processed,
error_message=error_or_warning, # Show errors even if some documents were indexed
skipped_count=documents_skipped,
)
await (
session.commit()
@ -1319,6 +1348,7 @@ async def _run_indexing_with_notifications(
indexed_count=0,
error_message=notification_message, # Pass as warning, not error
is_warning=True, # Flag to indicate this is a warning, not an error
skipped_count=documents_skipped,
)
await (
session.commit()
@ -1334,6 +1364,7 @@ async def _run_indexing_with_notifications(
notification=notification,
indexed_count=0,
error_message=error_or_warning,
skipped_count=documents_skipped,
)
await (
session.commit()
@ -1355,6 +1386,7 @@ async def _run_indexing_with_notifications(
notification=notification,
indexed_count=0,
error_message=None, # No error - sync succeeded
skipped_count=documents_skipped,
)
await (
session.commit()
@ -1372,6 +1404,7 @@ async def _run_indexing_with_notifications(
notification=notification,
indexed_count=0,
error_message=str(e),
skipped_count=None, # Unknown on exception
)
except Exception as notif_error:
logger.error(f"Failed to update notification: {notif_error!s}")

View file

@ -257,14 +257,11 @@ class PublicChatResponse(BaseModel):
class CloneInitResponse(BaseModel):
thread_id: int
search_space_id: int
share_token: str
class CompleteCloneResponse(BaseModel):
status: str
message_count: int

View file

@ -59,6 +59,8 @@ class PodcastRead(PodcastBase):
"search_space_id": obj.search_space_id,
"status": obj.status,
"created_at": obj.created_at,
"transcript_entries": len(obj.podcast_transcript) if obj.podcast_transcript else None,
"transcript_entries": len(obj.podcast_transcript)
if obj.podcast_transcript
else None,
}
return cls(**data)

View file

@ -422,6 +422,7 @@ class ConnectorIndexingNotificationHandler(BaseNotificationHandler):
indexed_count: int,
error_message: str | None = None,
is_warning: bool = False,
skipped_count: int | None = None,
) -> Notification:
"""
Update notification when connector indexing completes.
@ -432,6 +433,7 @@ class ConnectorIndexingNotificationHandler(BaseNotificationHandler):
indexed_count: Total number of items indexed
error_message: Error message if indexing failed, or warning message (optional)
is_warning: If True, treat error_message as a warning (success case) rather than an error
skipped_count: Number of items skipped (e.g., duplicates) - optional
Returns:
Updated notification
@ -440,6 +442,14 @@ class ConnectorIndexingNotificationHandler(BaseNotificationHandler):
"connector_name", "Connector"
)
# Build the skipped text if there are skipped items
skipped_text = ""
if skipped_count and skipped_count > 0:
skipped_item_text = "item" if skipped_count == 1 else "items"
skipped_text = (
f" ({skipped_count} {skipped_item_text} skipped - already indexed)"
)
# If there's an error message but items were indexed, treat it as a warning (partial success)
# If is_warning is True, treat it as success even with 0 items (e.g., duplicates found)
# Otherwise, treat it as a failure
@ -448,12 +458,12 @@ class ConnectorIndexingNotificationHandler(BaseNotificationHandler):
# Partial success with warnings (e.g., duplicate content from other connectors)
title = f"Ready: {connector_name}"
item_text = "item" if indexed_count == 1 else "items"
message = f"Now searchable! {indexed_count} {item_text} synced. Note: {error_message}"
message = f"Now searchable! {indexed_count} {item_text} synced{skipped_text}. Note: {error_message}"
status = "completed"
elif is_warning:
# Warning case (e.g., duplicates found) - treat as success
title = f"Ready: {connector_name}"
message = f"Sync completed. {error_message}"
message = f"Sync completed{skipped_text}. {error_message}"
status = "completed"
else:
# Complete failure
@ -463,14 +473,21 @@ class ConnectorIndexingNotificationHandler(BaseNotificationHandler):
else:
title = f"Ready: {connector_name}"
if indexed_count == 0:
message = "Already up to date! No new items to sync."
if skipped_count and skipped_count > 0:
skipped_item_text = "item" if skipped_count == 1 else "items"
message = f"Already up to date! {skipped_count} {skipped_item_text} skipped (already indexed)."
else:
message = "Already up to date! No new items to sync."
else:
item_text = "item" if indexed_count == 1 else "items"
message = f"Now searchable! {indexed_count} {item_text} synced."
message = (
f"Now searchable! {indexed_count} {item_text} synced{skipped_text}."
)
status = "completed"
metadata_updates = {
"indexed_count": indexed_count,
"skipped_count": skipped_count or 0,
"sync_stage": "completed"
if (not error_message or is_warning or indexed_count > 0)
else "failed",

View file

@ -291,6 +291,9 @@ async def complete_clone_content(
if old_podcast_id and old_podcast_id in podcast_id_map:
result_data["podcast_id"] = podcast_id_map[old_podcast_id]
elif old_podcast_id:
# Podcast couldn't be cloned (not ready), remove reference
result_data.pop("podcast_id", None)
new_message = NewChatMessage(
thread_id=target_thread.id,

View file

@ -55,7 +55,9 @@ def _clear_generating_podcast(search_space_id: int) -> None:
client = redis.from_url(redis_url, decode_responses=True)
key = f"podcast:generating:{search_space_id}"
client.delete(key)
logger.info(f"Cleared generating podcast key for search_space_id={search_space_id}")
logger.info(
f"Cleared generating podcast key for search_space_id={search_space_id}"
)
except Exception as e:
logger.warning(f"Could not clear generating podcast key: {e}")
@ -119,9 +121,7 @@ async def _generate_content_podcast(
) -> dict:
"""Generate content-based podcast and update existing record."""
async with get_celery_session_maker()() as session:
result = await session.execute(
select(Podcast).filter(Podcast.id == podcast_id)
)
result = await session.execute(select(Podcast).filter(Podcast.id == podcast_id))
podcast = result.scalars().first()
if not podcast:

View file

@ -156,6 +156,41 @@ async def _check_and_trigger_schedules():
)
await session.commit()
continue
# Special handling for Webcrawler - skip if no URLs configured
elif (
connector.connector_type
== SearchSourceConnectorType.WEBCRAWLER_CONNECTOR
):
from app.utils.webcrawler_utils import parse_webcrawler_urls
connector_config = connector.config or {}
urls = parse_webcrawler_urls(
connector_config.get("INITIAL_URLS")
)
if urls:
task.delay(
connector.id,
connector.search_space_id,
str(connector.user_id),
None, # start_date
None, # end_date
)
else:
# No URLs configured - skip indexing but still update next_scheduled_at
logger.info(
f"Webcrawler connector {connector.id} has no URLs configured, "
"skipping periodic indexing (will check again at next scheduled time)"
)
from datetime import timedelta
connector.next_scheduled_at = now + timedelta(
minutes=connector.indexing_frequency_minutes
)
await session.commit()
continue
else:
task.delay(
connector.id,

View file

@ -86,7 +86,7 @@ async def index_composio_connector(
end_date: str | None = None,
update_last_indexed: bool = True,
max_items: int = 1000,
) -> tuple[int, str]:
) -> tuple[int, int, str | None]:
"""
Index content from a Composio connector.
@ -104,7 +104,7 @@ async def index_composio_connector(
max_items: Maximum number of items to fetch
Returns:
Tuple of (number_of_indexed_items, error_message or None)
Tuple of (number_of_indexed_items, number_of_skipped_items, error_message or None)
"""
task_logger = TaskLoggingService(session, search_space_id)
@ -132,14 +132,14 @@ async def index_composio_connector(
await task_logger.log_task_failure(
log_entry, error_msg, {"error_type": "InvalidConnectorType"}
)
return 0, error_msg
return 0, 0, error_msg
if not connector:
error_msg = f"Composio connector with ID {connector_id} not found"
await task_logger.log_task_failure(
log_entry, error_msg, {"error_type": "ConnectorNotFound"}
)
return 0, error_msg
return 0, 0, error_msg
# Get toolkit ID from config
toolkit_id = connector.config.get("toolkit_id")
@ -150,7 +150,7 @@ async def index_composio_connector(
await task_logger.log_task_failure(
log_entry, error_msg, {"error_type": "MissingToolkitId"}
)
return 0, error_msg
return 0, 0, error_msg
# Check if toolkit is indexable
if toolkit_id not in INDEXABLE_TOOLKITS:
@ -158,7 +158,7 @@ async def index_composio_connector(
await task_logger.log_task_failure(
log_entry, error_msg, {"error_type": "ToolkitNotIndexable"}
)
return 0, error_msg
return 0, 0, error_msg
# Get indexer function from registry
try:
@ -167,7 +167,7 @@ async def index_composio_connector(
await task_logger.log_task_failure(
log_entry, str(e), {"error_type": "NoIndexerImplemented"}
)
return 0, str(e)
return 0, 0, str(e)
# Build kwargs for the indexer function
kwargs = {
@ -199,7 +199,7 @@ async def index_composio_connector(
{"error_type": "SQLAlchemyError"},
)
logger.error(f"Database error: {db_error!s}", exc_info=True)
return 0, f"Database error: {db_error!s}"
return 0, 0, f"Database error: {db_error!s}"
except Exception as e:
await session.rollback()
await task_logger.log_task_failure(
@ -209,4 +209,4 @@ async def index_composio_connector(
{"error_type": type(e).__name__},
)
logger.error(f"Failed to index Composio connector: {e!s}", exc_info=True)
return 0, f"Failed to index Composio connector: {e!s}"
return 0, 0, f"Failed to index Composio connector: {e!s}"

View file

@ -20,6 +20,7 @@ from app.utils.document_converters import (
from .base import (
calculate_date_range,
check_document_by_unique_identifier,
check_duplicate_document_by_hash,
get_connector_by_id,
get_current_timestamp,
logger,
@ -317,6 +318,24 @@ async def index_airtable_records(
)
continue
# Document doesn't exist by unique_identifier_hash
# Check if a document with the same content_hash exists (from another connector)
with session.no_autoflush:
duplicate_by_content = (
await check_duplicate_document_by_hash(
session, content_hash
)
)
if duplicate_by_content:
logger.info(
f"Airtable record {record_id} already indexed by another connector "
f"(existing document ID: {duplicate_by_content.id}, "
f"type: {duplicate_by_content.document_type}). Skipping."
)
documents_skipped += 1
continue
# Document doesn't exist - create new one
# Generate document summary
user_llm = await get_user_long_context_llm(

View file

@ -22,6 +22,7 @@ from app.utils.document_converters import (
from .base import (
calculate_date_range,
check_document_by_unique_identifier,
check_duplicate_document_by_hash,
get_connector_by_id,
get_current_timestamp,
logger,
@ -308,6 +309,22 @@ async def index_bookstack_pages(
logger.info(f"Successfully updated BookStack page {page_name}")
continue
# Document doesn't exist by unique_identifier_hash
# Check if a document with the same content_hash exists (from another connector)
with session.no_autoflush:
duplicate_by_content = await check_duplicate_document_by_hash(
session, content_hash
)
if duplicate_by_content:
logger.info(
f"BookStack page {page_name} already indexed by another connector "
f"(existing document ID: {duplicate_by_content.id}, "
f"type: {duplicate_by_content.document_type}). Skipping."
)
documents_skipped += 1
continue
# Document doesn't exist - create new one
# Generate summary with metadata
user_llm = await get_user_long_context_llm(

View file

@ -22,6 +22,7 @@ from app.utils.document_converters import (
from .base import (
check_document_by_unique_identifier,
check_duplicate_document_by_hash,
get_connector_by_id,
get_current_timestamp,
logger,
@ -302,6 +303,22 @@ async def index_clickup_tasks(
)
continue
# Document doesn't exist by unique_identifier_hash
# Check if a document with the same content_hash exists (from another connector)
with session.no_autoflush:
duplicate_by_content = await check_duplicate_document_by_hash(
session, content_hash
)
if duplicate_by_content:
logger.info(
f"ClickUp task {task_name} already indexed by another connector "
f"(existing document ID: {duplicate_by_content.id}, "
f"type: {duplicate_by_content.document_type}). Skipping."
)
documents_skipped += 1
continue
# Document doesn't exist - create new one
# Generate summary with metadata
user_llm = await get_user_long_context_llm(

View file

@ -23,6 +23,7 @@ from app.utils.document_converters import (
from .base import (
calculate_date_range,
check_document_by_unique_identifier,
check_duplicate_document_by_hash,
get_connector_by_id,
get_current_timestamp,
logger,
@ -306,6 +307,22 @@ async def index_confluence_pages(
)
continue
# Document doesn't exist by unique_identifier_hash
# Check if a document with the same content_hash exists (from another connector)
with session.no_autoflush:
duplicate_by_content = await check_duplicate_document_by_hash(
session, content_hash
)
if duplicate_by_content:
logger.info(
f"Confluence page {page_title} already indexed by another connector "
f"(existing document ID: {duplicate_by_content.id}, "
f"type: {duplicate_by_content.document_type}). Skipping."
)
documents_skipped += 1
continue
# Document doesn't exist - create new one
# Generate summary with metadata
user_llm = await get_user_long_context_llm(

View file

@ -21,6 +21,7 @@ from app.utils.document_converters import (
from .base import (
build_document_metadata_markdown,
check_document_by_unique_identifier,
check_duplicate_document_by_hash,
get_connector_by_id,
get_current_timestamp,
logger,
@ -454,6 +455,24 @@ async def index_discord_messages(
)
continue
# Document doesn't exist by unique_identifier_hash
# Check if a document with the same content_hash exists (from another connector)
with session.no_autoflush:
duplicate_by_content = (
await check_duplicate_document_by_hash(
session, content_hash
)
)
if duplicate_by_content:
logger.info(
f"Discord message {msg_id} in {guild_name}#{channel_name} already indexed by another connector "
f"(existing document ID: {duplicate_by_content.id}, "
f"type: {duplicate_by_content.document_type}). Skipping."
)
documents_skipped += 1
continue
# Document doesn't exist - create new one
# Process chunks
chunks = await create_document_chunks(

View file

@ -24,6 +24,7 @@ from app.utils.document_converters import (
from .base import (
check_document_by_unique_identifier,
check_duplicate_document_by_hash,
get_connector_by_id,
get_current_timestamp,
logger,
@ -319,6 +320,21 @@ async def _process_repository_digest(
# Delete existing document to replace with new one
await session.delete(existing_document)
await session.flush()
else:
# Document doesn't exist by unique_identifier_hash
# Check if a document with the same content_hash exists (from another connector)
with session.no_autoflush:
duplicate_by_content = await check_duplicate_document_by_hash(
session, content_hash
)
if duplicate_by_content:
logger.info(
f"Repository {repo_full_name} already indexed by another connector "
f"(existing document ID: {duplicate_by_content.id}, "
f"type: {duplicate_by_content.document_type}). Skipping."
)
return 0
# Generate summary using LLM (ONE call per repository!)
user_llm = await get_user_long_context_llm(session, user_id, search_space_id)

View file

@ -24,7 +24,9 @@ from app.utils.document_converters import (
)
from .base import (
calculate_date_range,
check_document_by_unique_identifier,
check_duplicate_document_by_hash,
get_connector_by_id,
get_current_timestamp,
logger,
@ -163,10 +165,22 @@ async def index_google_gmail_messages(
credentials, session, user_id, connector_id
)
# Calculate date range using last_indexed_at if dates not provided
# This ensures Gmail uses the same date logic as other connectors
# (uses last_indexed_at → now, or 365 days back for first-time indexing)
calculated_start_date, calculated_end_date = calculate_date_range(
connector, start_date, end_date, default_days_back=365
)
# Fetch recent Google gmail messages
logger.info(f"Fetching recent emails for connector {connector_id}")
logger.info(
f"Fetching emails for connector {connector_id} "
f"from {calculated_start_date} to {calculated_end_date}"
)
messages, error = await gmail_connector.get_recent_messages(
max_results=max_messages, start_date=start_date, end_date=end_date
max_results=max_messages,
start_date=calculated_start_date,
end_date=calculated_end_date,
)
if error:
@ -316,6 +330,22 @@ async def index_google_gmail_messages(
logger.info(f"Successfully updated Gmail message {subject}")
continue
# Document doesn't exist by unique_identifier_hash
# Check if a document with the same content_hash exists (from another connector)
with session.no_autoflush:
duplicate_by_content = await check_duplicate_document_by_hash(
session, content_hash
)
if duplicate_by_content:
logger.info(
f"Gmail message {subject} already indexed by another connector "
f"(existing document ID: {duplicate_by_content.id}, "
f"type: {duplicate_by_content.document_type}). Skipping."
)
documents_skipped += 1
continue
# Document doesn't exist - create new one
# Generate summary with metadata
user_llm = await get_user_long_context_llm(

View file

@ -23,6 +23,7 @@ from app.utils.document_converters import (
from .base import (
calculate_date_range,
check_document_by_unique_identifier,
check_duplicate_document_by_hash,
get_connector_by_id,
get_current_timestamp,
logger,
@ -284,6 +285,22 @@ async def index_jira_issues(
)
continue
# Document doesn't exist by unique_identifier_hash
# Check if a document with the same content_hash exists (from another connector)
with session.no_autoflush:
duplicate_by_content = await check_duplicate_document_by_hash(
session, content_hash
)
if duplicate_by_content:
logger.info(
f"Jira issue {issue_identifier} already indexed by another connector "
f"(existing document ID: {duplicate_by_content.id}, "
f"type: {duplicate_by_content.document_type}). Skipping."
)
documents_skipped += 1
continue
# Document doesn't exist - create new one
# Generate summary with metadata
user_llm = await get_user_long_context_llm(

View file

@ -22,6 +22,7 @@ from app.utils.document_converters import (
from .base import (
calculate_date_range,
check_document_by_unique_identifier,
check_duplicate_document_by_hash,
get_connector_by_id,
get_current_timestamp,
logger,
@ -315,6 +316,22 @@ async def index_linear_issues(
)
continue
# Document doesn't exist by unique_identifier_hash
# Check if a document with the same content_hash exists (from another connector)
with session.no_autoflush:
duplicate_by_content = await check_duplicate_document_by_hash(
session, content_hash
)
if duplicate_by_content:
logger.info(
f"Linear issue {issue_identifier} already indexed by another connector "
f"(existing document ID: {duplicate_by_content.id}, "
f"type: {duplicate_by_content.document_type}). Skipping."
)
documents_skipped += 1
continue
# Document doesn't exist - create new one
# Generate summary with metadata
user_llm = await get_user_long_context_llm(

View file

@ -21,6 +21,7 @@ from app.utils.document_converters import (
from .base import (
check_document_by_unique_identifier,
check_duplicate_document_by_hash,
get_connector_by_id,
get_current_timestamp,
logger,
@ -363,6 +364,22 @@ async def index_luma_events(
logger.info(f"Successfully updated Luma event {event_name}")
continue
# Document doesn't exist by unique_identifier_hash
# Check if a document with the same content_hash exists (from another connector)
with session.no_autoflush:
duplicate_by_content = await check_duplicate_document_by_hash(
session, content_hash
)
if duplicate_by_content:
logger.info(
f"Luma event {event_name} already indexed by another connector "
f"(existing document ID: {duplicate_by_content.id}, "
f"type: {duplicate_by_content.document_type}). Skipping."
)
documents_skipped += 1
continue
# Document doesn't exist - create new one
# Generate summary with metadata
user_llm = await get_user_long_context_llm(

View file

@ -23,6 +23,7 @@ from .base import (
build_document_metadata_string,
calculate_date_range,
check_document_by_unique_identifier,
check_duplicate_document_by_hash,
get_connector_by_id,
get_current_timestamp,
logger,
@ -388,6 +389,22 @@ async def index_notion_pages(
continue
# Document doesn't exist by unique_identifier_hash
# Check if a document with the same content_hash exists (from another connector)
with session.no_autoflush:
duplicate_by_content = await check_duplicate_document_by_hash(
session, content_hash
)
if duplicate_by_content:
logger.info(
f"Notion page {page_title} already indexed by another connector "
f"(existing document ID: {duplicate_by_content.id}, "
f"type: {duplicate_by_content.document_type}). Skipping."
)
documents_skipped += 1
continue
# Document doesn't exist - create new one
# Get user's long context LLM
user_llm = await get_user_long_context_llm(

View file

@ -28,6 +28,7 @@ from app.utils.document_converters import (
from .base import (
build_document_metadata_string,
check_document_by_unique_identifier,
check_duplicate_document_by_hash,
get_connector_by_id,
get_current_timestamp,
logger,
@ -426,6 +427,22 @@ async def index_obsidian_vault(
indexed_count += 1
else:
# Document doesn't exist by unique_identifier_hash
# Check if a document with the same content_hash exists (from another connector)
with session.no_autoflush:
duplicate_by_content = await check_duplicate_document_by_hash(
session, content_hash
)
if duplicate_by_content:
logger.info(
f"Obsidian note {title} already indexed by another connector "
f"(existing document ID: {duplicate_by_content.id}, "
f"type: {duplicate_by_content.document_type}). Skipping."
)
skipped_count += 1
continue
# Create new document
logger.info(f"Indexing new note: {title}")

View file

@ -22,6 +22,7 @@ from .base import (
build_document_metadata_markdown,
calculate_date_range,
check_document_by_unique_identifier,
check_duplicate_document_by_hash,
get_connector_by_id,
get_current_timestamp,
logger,
@ -325,6 +326,22 @@ async def index_slack_messages(
logger.info(f"Successfully updated Slack message {msg_ts}")
continue
# Document doesn't exist by unique_identifier_hash
# Check if a document with the same content_hash exists (from another connector)
with session.no_autoflush:
duplicate_by_content = await check_duplicate_document_by_hash(
session, content_hash
)
if duplicate_by_content:
logger.info(
f"Slack message {msg_ts} in channel {channel_name} already indexed by another connector "
f"(existing document ID: {duplicate_by_content.id}, "
f"type: {duplicate_by_content.document_type}). Skipping."
)
documents_skipped += 1
continue
# Document doesn't exist - create new one
# Process chunks
chunks = await create_document_chunks(combined_document_string)

View file

@ -21,6 +21,7 @@ from .base import (
build_document_metadata_markdown,
calculate_date_range,
check_document_by_unique_identifier,
check_duplicate_document_by_hash,
get_connector_by_id,
get_current_timestamp,
logger,
@ -354,6 +355,27 @@ async def index_teams_messages(
)
continue
# Document doesn't exist by unique_identifier_hash
# Check if a document with the same content_hash exists (from another connector)
with session.no_autoflush:
duplicate_by_content = (
await check_duplicate_document_by_hash(
session, content_hash
)
)
if duplicate_by_content:
logger.info(
"Teams message %s in channel %s already indexed by another connector "
"(existing document ID: %s, type: %s). Skipping.",
message_id,
channel_name,
duplicate_by_content.id,
duplicate_by_content.document_type,
)
documents_skipped += 1
continue
# Document doesn't exist - create new one
# Process chunks
chunks = await create_document_chunks(

View file

@ -18,9 +18,11 @@ from app.utils.document_converters import (
generate_document_summary,
generate_unique_identifier_hash,
)
from app.utils.webcrawler_utils import parse_webcrawler_urls
from .base import (
check_document_by_unique_identifier,
check_duplicate_document_by_hash,
get_connector_by_id,
get_current_timestamp,
logger,
@ -96,13 +98,7 @@ async def index_crawled_urls(
api_key = connector.config.get("FIRECRAWL_API_KEY")
# Get URLs from connector config
initial_urls = connector.config.get("INITIAL_URLS", "")
if isinstance(initial_urls, str):
urls = [url.strip() for url in initial_urls.split("\n") if url.strip()]
elif isinstance(initial_urls, list):
urls = [url.strip() for url in initial_urls if url.strip()]
else:
urls = []
urls = parse_webcrawler_urls(connector.config.get("INITIAL_URLS"))
logger.info(
f"Starting crawled web page indexing for connector {connector_id} with {len(urls)} URLs"
@ -281,6 +277,22 @@ async def index_crawled_urls(
logger.info(f"Successfully updated URL {url}")
continue
# Document doesn't exist by unique_identifier_hash
# Check if a document with the same content_hash exists (from another connector)
with session.no_autoflush:
duplicate_by_content = await check_duplicate_document_by_hash(
session, content_hash
)
if duplicate_by_content:
logger.info(
f"URL {url} already indexed by another connector "
f"(existing document ID: {duplicate_by_content.id}, "
f"type: {duplicate_by_content.document_type}). Skipping."
)
documents_skipped += 1
continue
# Document doesn't exist - create new one
# Generate summary with metadata
user_llm = await get_user_long_context_llm(

View file

@ -55,7 +55,9 @@ LLAMACLOUD_RETRYABLE_EXCEPTIONS = (
)
# Timeout calculation constants
UPLOAD_BYTES_PER_SECOND_SLOW = 100 * 1024 # 100 KB/s (conservative for slow connections)
UPLOAD_BYTES_PER_SECOND_SLOW = (
100 * 1024
) # 100 KB/s (conservative for slow connections)
MIN_UPLOAD_TIMEOUT = 120 # Minimum 2 minutes for any file
MAX_UPLOAD_TIMEOUT = 1800 # Maximum 30 minutes for very large files
BASE_JOB_TIMEOUT = 600 # 10 minutes base for job processing
@ -219,19 +221,19 @@ async def find_existing_document_with_migration(
def calculate_upload_timeout(file_size_bytes: int) -> float:
"""
Calculate appropriate upload timeout based on file size.
Assumes a conservative slow connection speed to handle worst-case scenarios.
Args:
file_size_bytes: Size of the file in bytes
Returns:
Timeout in seconds
"""
# Calculate time needed at slow connection speed
# Add 50% buffer for network variability and SSL overhead
estimated_time = (file_size_bytes / UPLOAD_BYTES_PER_SECOND_SLOW) * 1.5
# Clamp to reasonable bounds
return max(MIN_UPLOAD_TIMEOUT, min(estimated_time, MAX_UPLOAD_TIMEOUT))
@ -239,21 +241,21 @@ def calculate_upload_timeout(file_size_bytes: int) -> float:
def calculate_job_timeout(estimated_pages: int, file_size_bytes: int) -> float:
"""
Calculate job processing timeout based on page count and file size.
Args:
estimated_pages: Estimated number of pages
file_size_bytes: Size of the file in bytes
Returns:
Timeout in seconds
"""
# Base timeout + time per page
page_based_timeout = BASE_JOB_TIMEOUT + (estimated_pages * PER_PAGE_JOB_TIMEOUT)
# Also consider file size (large images take longer to process)
# ~1 minute per 10MB of file size
size_based_timeout = BASE_JOB_TIMEOUT + (file_size_bytes / (10 * 1024 * 1024)) * 60
# Use the larger of the two estimates
return max(page_based_timeout, size_based_timeout)
@ -284,18 +286,18 @@ async def parse_with_llamacloud_retry(
"""
import os
import random
from llama_cloud_services import LlamaParse
from llama_cloud_services.parse.utils import ResultType
# Get file size for timeout calculations
file_size_bytes = os.path.getsize(file_path)
file_size_mb = file_size_bytes / (1024 * 1024)
# Calculate dynamic timeouts based on file size and page count
upload_timeout = calculate_upload_timeout(file_size_bytes)
job_timeout = calculate_job_timeout(estimated_pages, file_size_bytes)
# HTTP client timeouts - scaled based on file size
# Write timeout is critical for large file uploads
custom_timeout = httpx.Timeout(
@ -304,7 +306,7 @@ async def parse_with_llamacloud_retry(
write=upload_timeout, # Dynamic based on file size (upload time)
pool=120.0, # 2 minutes to acquire connection from pool
)
logging.info(
f"LlamaCloud upload configured: file_size={file_size_mb:.1f}MB, "
f"pages={estimated_pages}, upload_timeout={upload_timeout:.0f}s, "
@ -335,14 +337,14 @@ async def parse_with_llamacloud_retry(
# Parse the file asynchronously
result = await parser.aparse(file_path)
# Success - log if we had previous failures
if attempt > 1:
logging.info(
f"LlamaCloud upload succeeded on attempt {attempt} after "
f"{len(attempt_errors)} failures"
)
return result
except LLAMACLOUD_RETRYABLE_EXCEPTIONS as e:
@ -355,8 +357,7 @@ async def parse_with_llamacloud_retry(
# Calculate exponential backoff with jitter
# Base delay doubles each attempt, capped at max delay
base_delay = min(
LLAMACLOUD_BASE_DELAY * (2 ** (attempt - 1)),
LLAMACLOUD_MAX_DELAY
LLAMACLOUD_BASE_DELAY * (2 ** (attempt - 1)), LLAMACLOUD_MAX_DELAY
)
# Add random jitter (±25%) to prevent thundering herd
jitter = base_delay * 0.25 * (2 * random.random() - 1)

View file

@ -43,6 +43,7 @@ def create_periodic_schedule(
user_id: str,
connector_type: SearchSourceConnectorType,
frequency_minutes: int,
connector_config: dict | None = None,
) -> bool:
"""
Trigger the first indexing run immediately when periodic indexing is enabled.
@ -57,11 +58,26 @@ def create_periodic_schedule(
user_id: User ID
connector_type: Type of connector
frequency_minutes: Frequency in minutes (used for logging)
connector_config: Optional connector config dict for validation
Returns:
True if successful, False otherwise
"""
try:
# Special handling for connectors that require config validation
if connector_type == SearchSourceConnectorType.WEBCRAWLER_CONNECTOR:
from app.utils.webcrawler_utils import parse_webcrawler_urls
config = connector_config or {}
urls = parse_webcrawler_urls(config.get("INITIAL_URLS"))
if not urls:
logger.info(
f"Webcrawler connector {connector_id} has no URLs configured, "
"skipping first indexing run (will run when URLs are added)"
)
return True # Return success - schedule is created, just no first run
logger.info(
f"Periodic indexing enabled for connector {connector_id} "
f"(frequency: {frequency_minutes} minutes). Triggering first run..."

View file

@ -0,0 +1,28 @@
"""
Utility functions for webcrawler connector.
"""
def parse_webcrawler_urls(initial_urls: str | list | None) -> list[str]:
"""
Parse URLs from webcrawler INITIAL_URLS value.
Handles both string (newline-separated) and list formats.
Args:
initial_urls: The INITIAL_URLS value (string, list, or None)
Returns:
List of parsed, stripped, non-empty URLs
"""
if initial_urls is None:
return []
if isinstance(initial_urls, str):
return [url.strip() for url in initial_urls.split("\n") if url.strip()]
elif isinstance(initial_urls, list):
return [
url.strip() for url in initial_urls if isinstance(url, str) and url.strip()
]
else:
return []

View file

@ -143,6 +143,7 @@ export default function NewChatPage() {
const queryClient = useQueryClient();
const [isInitializing, setIsInitializing] = useState(true);
const [isCompletingClone, setIsCompletingClone] = useState(false);
const [cloneError, setCloneError] = useState(false);
const [threadId, setThreadId] = useState<number | null>(null);
const [currentThread, setCurrentThread] = useState<ThreadRecord | null>(null);
const [messages, setMessages] = useState<ThreadMessageLike[]>([]);
@ -333,7 +334,7 @@ export default function NewChatPage() {
// Handle clone completion when thread has clone_pending flag
useEffect(() => {
if (!currentThread?.clone_pending || isCompletingClone) return;
if (!currentThread?.clone_pending || isCompletingClone || cloneError) return;
const completeClone = async () => {
setIsCompletingClone(true);
@ -351,13 +352,14 @@ export default function NewChatPage() {
} catch (error) {
console.error("[NewChatPage] Failed to complete clone:", error);
toast.error("Failed to copy chat content. Please try again.");
setCloneError(true);
} finally {
setIsCompletingClone(false);
}
};
completeClone();
}, [currentThread?.clone_pending, currentThread?.id, isCompletingClone, initializeThread, queryClient]);
}, [currentThread?.clone_pending, currentThread?.id, isCompletingClone, cloneError, initializeThread, queryClient]);
// Handle scroll to comment from URL query params (e.g., from inbox item click)
const searchParams = useSearchParams();

View file

@ -24,11 +24,6 @@
"enabled": true,
"status": "warning",
"statusMessage": "Some requests may be blocked if not using Firecrawl."
},
"COMPOSIO_GOOGLE_DRIVE_CONNECTOR": {
"enabled": false,
"status": "disabled",
"statusMessage": "Not available yet."
}
},
"globalSettings": {