Merge pull request #751 from CREDO23/fix-various-issues

[FIX] Connector indexing, chat cloning, and Gmail date handling fixes
This commit is contained in:
Rohan Verma 2026-01-28 16:40:49 -08:00 committed by GitHub
commit 0b65c3a98c
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
28 changed files with 430 additions and 47 deletions

View file

@ -55,7 +55,9 @@ def _clear_generating_podcast(search_space_id: int) -> None:
client = redis.from_url(redis_url, decode_responses=True)
key = f"podcast:generating:{search_space_id}"
client.delete(key)
logger.info(f"Cleared generating podcast key for search_space_id={search_space_id}")
logger.info(
f"Cleared generating podcast key for search_space_id={search_space_id}"
)
except Exception as e:
logger.warning(f"Could not clear generating podcast key: {e}")
@ -119,9 +121,7 @@ async def _generate_content_podcast(
) -> dict:
"""Generate content-based podcast and update existing record."""
async with get_celery_session_maker()() as session:
result = await session.execute(
select(Podcast).filter(Podcast.id == podcast_id)
)
result = await session.execute(select(Podcast).filter(Podcast.id == podcast_id))
podcast = result.scalars().first()
if not podcast:

View file

@ -156,6 +156,41 @@ async def _check_and_trigger_schedules():
)
await session.commit()
continue
# Special handling for Webcrawler - skip if no URLs configured
elif (
connector.connector_type
== SearchSourceConnectorType.WEBCRAWLER_CONNECTOR
):
from app.utils.webcrawler_utils import parse_webcrawler_urls
connector_config = connector.config or {}
urls = parse_webcrawler_urls(
connector_config.get("INITIAL_URLS")
)
if urls:
task.delay(
connector.id,
connector.search_space_id,
str(connector.user_id),
None, # start_date
None, # end_date
)
else:
# No URLs configured - skip indexing but still update next_scheduled_at
logger.info(
f"Webcrawler connector {connector.id} has no URLs configured, "
"skipping periodic indexing (will check again at next scheduled time)"
)
from datetime import timedelta
connector.next_scheduled_at = now + timedelta(
minutes=connector.indexing_frequency_minutes
)
await session.commit()
continue
else:
task.delay(
connector.id,

View file

@ -20,6 +20,7 @@ from app.utils.document_converters import (
from .base import (
calculate_date_range,
check_document_by_unique_identifier,
check_duplicate_document_by_hash,
get_connector_by_id,
get_current_timestamp,
logger,
@ -317,6 +318,24 @@ async def index_airtable_records(
)
continue
# Document doesn't exist by unique_identifier_hash
# Check if a document with the same content_hash exists (from another connector)
with session.no_autoflush:
duplicate_by_content = (
await check_duplicate_document_by_hash(
session, content_hash
)
)
if duplicate_by_content:
logger.info(
f"Airtable record {record_id} already indexed by another connector "
f"(existing document ID: {duplicate_by_content.id}, "
f"type: {duplicate_by_content.document_type}). Skipping."
)
documents_skipped += 1
continue
# Document doesn't exist - create new one
# Generate document summary
user_llm = await get_user_long_context_llm(

View file

@ -22,6 +22,7 @@ from app.utils.document_converters import (
from .base import (
calculate_date_range,
check_document_by_unique_identifier,
check_duplicate_document_by_hash,
get_connector_by_id,
get_current_timestamp,
logger,
@ -308,6 +309,22 @@ async def index_bookstack_pages(
logger.info(f"Successfully updated BookStack page {page_name}")
continue
# Document doesn't exist by unique_identifier_hash
# Check if a document with the same content_hash exists (from another connector)
with session.no_autoflush:
duplicate_by_content = await check_duplicate_document_by_hash(
session, content_hash
)
if duplicate_by_content:
logger.info(
f"BookStack page {page_name} already indexed by another connector "
f"(existing document ID: {duplicate_by_content.id}, "
f"type: {duplicate_by_content.document_type}). Skipping."
)
documents_skipped += 1
continue
# Document doesn't exist - create new one
# Generate summary with metadata
user_llm = await get_user_long_context_llm(

View file

@ -22,6 +22,7 @@ from app.utils.document_converters import (
from .base import (
check_document_by_unique_identifier,
check_duplicate_document_by_hash,
get_connector_by_id,
get_current_timestamp,
logger,
@ -302,6 +303,22 @@ async def index_clickup_tasks(
)
continue
# Document doesn't exist by unique_identifier_hash
# Check if a document with the same content_hash exists (from another connector)
with session.no_autoflush:
duplicate_by_content = await check_duplicate_document_by_hash(
session, content_hash
)
if duplicate_by_content:
logger.info(
f"ClickUp task {task_name} already indexed by another connector "
f"(existing document ID: {duplicate_by_content.id}, "
f"type: {duplicate_by_content.document_type}). Skipping."
)
documents_skipped += 1
continue
# Document doesn't exist - create new one
# Generate summary with metadata
user_llm = await get_user_long_context_llm(

View file

@ -23,6 +23,7 @@ from app.utils.document_converters import (
from .base import (
calculate_date_range,
check_document_by_unique_identifier,
check_duplicate_document_by_hash,
get_connector_by_id,
get_current_timestamp,
logger,
@ -306,6 +307,22 @@ async def index_confluence_pages(
)
continue
# Document doesn't exist by unique_identifier_hash
# Check if a document with the same content_hash exists (from another connector)
with session.no_autoflush:
duplicate_by_content = await check_duplicate_document_by_hash(
session, content_hash
)
if duplicate_by_content:
logger.info(
f"Confluence page {page_title} already indexed by another connector "
f"(existing document ID: {duplicate_by_content.id}, "
f"type: {duplicate_by_content.document_type}). Skipping."
)
documents_skipped += 1
continue
# Document doesn't exist - create new one
# Generate summary with metadata
user_llm = await get_user_long_context_llm(

View file

@ -21,6 +21,7 @@ from app.utils.document_converters import (
from .base import (
build_document_metadata_markdown,
check_document_by_unique_identifier,
check_duplicate_document_by_hash,
get_connector_by_id,
get_current_timestamp,
logger,
@ -454,6 +455,24 @@ async def index_discord_messages(
)
continue
# Document doesn't exist by unique_identifier_hash
# Check if a document with the same content_hash exists (from another connector)
with session.no_autoflush:
duplicate_by_content = (
await check_duplicate_document_by_hash(
session, content_hash
)
)
if duplicate_by_content:
logger.info(
f"Discord message {msg_id} in {guild_name}#{channel_name} already indexed by another connector "
f"(existing document ID: {duplicate_by_content.id}, "
f"type: {duplicate_by_content.document_type}). Skipping."
)
documents_skipped += 1
continue
# Document doesn't exist - create new one
# Process chunks
chunks = await create_document_chunks(

View file

@ -24,6 +24,7 @@ from app.utils.document_converters import (
from .base import (
check_document_by_unique_identifier,
check_duplicate_document_by_hash,
get_connector_by_id,
get_current_timestamp,
logger,
@ -319,6 +320,21 @@ async def _process_repository_digest(
# Delete existing document to replace with new one
await session.delete(existing_document)
await session.flush()
else:
# Document doesn't exist by unique_identifier_hash
# Check if a document with the same content_hash exists (from another connector)
with session.no_autoflush:
duplicate_by_content = await check_duplicate_document_by_hash(
session, content_hash
)
if duplicate_by_content:
logger.info(
f"Repository {repo_full_name} already indexed by another connector "
f"(existing document ID: {duplicate_by_content.id}, "
f"type: {duplicate_by_content.document_type}). Skipping."
)
return 0
# Generate summary using LLM (ONE call per repository!)
user_llm = await get_user_long_context_llm(session, user_id, search_space_id)

View file

@ -24,7 +24,9 @@ from app.utils.document_converters import (
)
from .base import (
calculate_date_range,
check_document_by_unique_identifier,
check_duplicate_document_by_hash,
get_connector_by_id,
get_current_timestamp,
logger,
@ -163,10 +165,22 @@ async def index_google_gmail_messages(
credentials, session, user_id, connector_id
)
# Calculate date range using last_indexed_at if dates not provided
# This ensures Gmail uses the same date logic as other connectors
# (uses last_indexed_at → now, or 365 days back for first-time indexing)
calculated_start_date, calculated_end_date = calculate_date_range(
connector, start_date, end_date, default_days_back=365
)
# Fetch recent Google gmail messages
logger.info(f"Fetching recent emails for connector {connector_id}")
logger.info(
f"Fetching emails for connector {connector_id} "
f"from {calculated_start_date} to {calculated_end_date}"
)
messages, error = await gmail_connector.get_recent_messages(
max_results=max_messages, start_date=start_date, end_date=end_date
max_results=max_messages,
start_date=calculated_start_date,
end_date=calculated_end_date,
)
if error:
@ -316,6 +330,22 @@ async def index_google_gmail_messages(
logger.info(f"Successfully updated Gmail message {subject}")
continue
# Document doesn't exist by unique_identifier_hash
# Check if a document with the same content_hash exists (from another connector)
with session.no_autoflush:
duplicate_by_content = await check_duplicate_document_by_hash(
session, content_hash
)
if duplicate_by_content:
logger.info(
f"Gmail message {subject} already indexed by another connector "
f"(existing document ID: {duplicate_by_content.id}, "
f"type: {duplicate_by_content.document_type}). Skipping."
)
documents_skipped += 1
continue
# Document doesn't exist - create new one
# Generate summary with metadata
user_llm = await get_user_long_context_llm(

View file

@ -23,6 +23,7 @@ from app.utils.document_converters import (
from .base import (
calculate_date_range,
check_document_by_unique_identifier,
check_duplicate_document_by_hash,
get_connector_by_id,
get_current_timestamp,
logger,
@ -284,6 +285,22 @@ async def index_jira_issues(
)
continue
# Document doesn't exist by unique_identifier_hash
# Check if a document with the same content_hash exists (from another connector)
with session.no_autoflush:
duplicate_by_content = await check_duplicate_document_by_hash(
session, content_hash
)
if duplicate_by_content:
logger.info(
f"Jira issue {issue_identifier} already indexed by another connector "
f"(existing document ID: {duplicate_by_content.id}, "
f"type: {duplicate_by_content.document_type}). Skipping."
)
documents_skipped += 1
continue
# Document doesn't exist - create new one
# Generate summary with metadata
user_llm = await get_user_long_context_llm(

View file

@ -22,6 +22,7 @@ from app.utils.document_converters import (
from .base import (
calculate_date_range,
check_document_by_unique_identifier,
check_duplicate_document_by_hash,
get_connector_by_id,
get_current_timestamp,
logger,
@ -315,6 +316,22 @@ async def index_linear_issues(
)
continue
# Document doesn't exist by unique_identifier_hash
# Check if a document with the same content_hash exists (from another connector)
with session.no_autoflush:
duplicate_by_content = await check_duplicate_document_by_hash(
session, content_hash
)
if duplicate_by_content:
logger.info(
f"Linear issue {issue_identifier} already indexed by another connector "
f"(existing document ID: {duplicate_by_content.id}, "
f"type: {duplicate_by_content.document_type}). Skipping."
)
documents_skipped += 1
continue
# Document doesn't exist - create new one
# Generate summary with metadata
user_llm = await get_user_long_context_llm(

View file

@ -21,6 +21,7 @@ from app.utils.document_converters import (
from .base import (
check_document_by_unique_identifier,
check_duplicate_document_by_hash,
get_connector_by_id,
get_current_timestamp,
logger,
@ -363,6 +364,22 @@ async def index_luma_events(
logger.info(f"Successfully updated Luma event {event_name}")
continue
# Document doesn't exist by unique_identifier_hash
# Check if a document with the same content_hash exists (from another connector)
with session.no_autoflush:
duplicate_by_content = await check_duplicate_document_by_hash(
session, content_hash
)
if duplicate_by_content:
logger.info(
f"Luma event {event_name} already indexed by another connector "
f"(existing document ID: {duplicate_by_content.id}, "
f"type: {duplicate_by_content.document_type}). Skipping."
)
documents_skipped += 1
continue
# Document doesn't exist - create new one
# Generate summary with metadata
user_llm = await get_user_long_context_llm(

View file

@ -22,6 +22,7 @@ from .base import (
build_document_metadata_string,
calculate_date_range,
check_document_by_unique_identifier,
check_duplicate_document_by_hash,
get_connector_by_id,
get_current_timestamp,
logger,
@ -360,6 +361,22 @@ async def index_notion_pages(
continue
# Document doesn't exist by unique_identifier_hash
# Check if a document with the same content_hash exists (from another connector)
with session.no_autoflush:
duplicate_by_content = await check_duplicate_document_by_hash(
session, content_hash
)
if duplicate_by_content:
logger.info(
f"Notion page {page_title} already indexed by another connector "
f"(existing document ID: {duplicate_by_content.id}, "
f"type: {duplicate_by_content.document_type}). Skipping."
)
documents_skipped += 1
continue
# Document doesn't exist - create new one
# Get user's long context LLM
user_llm = await get_user_long_context_llm(

View file

@ -28,6 +28,7 @@ from app.utils.document_converters import (
from .base import (
build_document_metadata_string,
check_document_by_unique_identifier,
check_duplicate_document_by_hash,
get_connector_by_id,
get_current_timestamp,
logger,
@ -426,6 +427,22 @@ async def index_obsidian_vault(
indexed_count += 1
else:
# Document doesn't exist by unique_identifier_hash
# Check if a document with the same content_hash exists (from another connector)
with session.no_autoflush:
duplicate_by_content = await check_duplicate_document_by_hash(
session, content_hash
)
if duplicate_by_content:
logger.info(
f"Obsidian note {title} already indexed by another connector "
f"(existing document ID: {duplicate_by_content.id}, "
f"type: {duplicate_by_content.document_type}). Skipping."
)
skipped_count += 1
continue
# Create new document
logger.info(f"Indexing new note: {title}")

View file

@ -22,6 +22,7 @@ from .base import (
build_document_metadata_markdown,
calculate_date_range,
check_document_by_unique_identifier,
check_duplicate_document_by_hash,
get_connector_by_id,
get_current_timestamp,
logger,
@ -325,6 +326,22 @@ async def index_slack_messages(
logger.info(f"Successfully updated Slack message {msg_ts}")
continue
# Document doesn't exist by unique_identifier_hash
# Check if a document with the same content_hash exists (from another connector)
with session.no_autoflush:
duplicate_by_content = await check_duplicate_document_by_hash(
session, content_hash
)
if duplicate_by_content:
logger.info(
f"Slack message {msg_ts} in channel {channel_name} already indexed by another connector "
f"(existing document ID: {duplicate_by_content.id}, "
f"type: {duplicate_by_content.document_type}). Skipping."
)
documents_skipped += 1
continue
# Document doesn't exist - create new one
# Process chunks
chunks = await create_document_chunks(combined_document_string)

View file

@ -21,6 +21,7 @@ from .base import (
build_document_metadata_markdown,
calculate_date_range,
check_document_by_unique_identifier,
check_duplicate_document_by_hash,
get_connector_by_id,
get_current_timestamp,
logger,
@ -354,6 +355,27 @@ async def index_teams_messages(
)
continue
# Document doesn't exist by unique_identifier_hash
# Check if a document with the same content_hash exists (from another connector)
with session.no_autoflush:
duplicate_by_content = (
await check_duplicate_document_by_hash(
session, content_hash
)
)
if duplicate_by_content:
logger.info(
"Teams message %s in channel %s already indexed by another connector "
"(existing document ID: %s, type: %s). Skipping.",
message_id,
channel_name,
duplicate_by_content.id,
duplicate_by_content.document_type,
)
documents_skipped += 1
continue
# Document doesn't exist - create new one
# Process chunks
chunks = await create_document_chunks(

View file

@ -18,9 +18,11 @@ from app.utils.document_converters import (
generate_document_summary,
generate_unique_identifier_hash,
)
from app.utils.webcrawler_utils import parse_webcrawler_urls
from .base import (
check_document_by_unique_identifier,
check_duplicate_document_by_hash,
get_connector_by_id,
get_current_timestamp,
logger,
@ -96,13 +98,7 @@ async def index_crawled_urls(
api_key = connector.config.get("FIRECRAWL_API_KEY")
# Get URLs from connector config
initial_urls = connector.config.get("INITIAL_URLS", "")
if isinstance(initial_urls, str):
urls = [url.strip() for url in initial_urls.split("\n") if url.strip()]
elif isinstance(initial_urls, list):
urls = [url.strip() for url in initial_urls if url.strip()]
else:
urls = []
urls = parse_webcrawler_urls(connector.config.get("INITIAL_URLS"))
logger.info(
f"Starting crawled web page indexing for connector {connector_id} with {len(urls)} URLs"
@ -281,6 +277,22 @@ async def index_crawled_urls(
logger.info(f"Successfully updated URL {url}")
continue
# Document doesn't exist by unique_identifier_hash
# Check if a document with the same content_hash exists (from another connector)
with session.no_autoflush:
duplicate_by_content = await check_duplicate_document_by_hash(
session, content_hash
)
if duplicate_by_content:
logger.info(
f"URL {url} already indexed by another connector "
f"(existing document ID: {duplicate_by_content.id}, "
f"type: {duplicate_by_content.document_type}). Skipping."
)
documents_skipped += 1
continue
# Document doesn't exist - create new one
# Generate summary with metadata
user_llm = await get_user_long_context_llm(

View file

@ -55,7 +55,9 @@ LLAMACLOUD_RETRYABLE_EXCEPTIONS = (
)
# Timeout calculation constants
UPLOAD_BYTES_PER_SECOND_SLOW = 100 * 1024 # 100 KB/s (conservative for slow connections)
UPLOAD_BYTES_PER_SECOND_SLOW = (
100 * 1024
) # 100 KB/s (conservative for slow connections)
MIN_UPLOAD_TIMEOUT = 120 # Minimum 2 minutes for any file
MAX_UPLOAD_TIMEOUT = 1800 # Maximum 30 minutes for very large files
BASE_JOB_TIMEOUT = 600 # 10 minutes base for job processing
@ -219,19 +221,19 @@ async def find_existing_document_with_migration(
def calculate_upload_timeout(file_size_bytes: int) -> float:
"""
Calculate appropriate upload timeout based on file size.
Assumes a conservative slow connection speed to handle worst-case scenarios.
Args:
file_size_bytes: Size of the file in bytes
Returns:
Timeout in seconds
"""
# Calculate time needed at slow connection speed
# Add 50% buffer for network variability and SSL overhead
estimated_time = (file_size_bytes / UPLOAD_BYTES_PER_SECOND_SLOW) * 1.5
# Clamp to reasonable bounds
return max(MIN_UPLOAD_TIMEOUT, min(estimated_time, MAX_UPLOAD_TIMEOUT))
@ -239,21 +241,21 @@ def calculate_upload_timeout(file_size_bytes: int) -> float:
def calculate_job_timeout(estimated_pages: int, file_size_bytes: int) -> float:
"""
Calculate job processing timeout based on page count and file size.
Args:
estimated_pages: Estimated number of pages
file_size_bytes: Size of the file in bytes
Returns:
Timeout in seconds
"""
# Base timeout + time per page
page_based_timeout = BASE_JOB_TIMEOUT + (estimated_pages * PER_PAGE_JOB_TIMEOUT)
# Also consider file size (large images take longer to process)
# ~1 minute per 10MB of file size
size_based_timeout = BASE_JOB_TIMEOUT + (file_size_bytes / (10 * 1024 * 1024)) * 60
# Use the larger of the two estimates
return max(page_based_timeout, size_based_timeout)
@ -284,18 +286,18 @@ async def parse_with_llamacloud_retry(
"""
import os
import random
from llama_cloud_services import LlamaParse
from llama_cloud_services.parse.utils import ResultType
# Get file size for timeout calculations
file_size_bytes = os.path.getsize(file_path)
file_size_mb = file_size_bytes / (1024 * 1024)
# Calculate dynamic timeouts based on file size and page count
upload_timeout = calculate_upload_timeout(file_size_bytes)
job_timeout = calculate_job_timeout(estimated_pages, file_size_bytes)
# HTTP client timeouts - scaled based on file size
# Write timeout is critical for large file uploads
custom_timeout = httpx.Timeout(
@ -304,7 +306,7 @@ async def parse_with_llamacloud_retry(
write=upload_timeout, # Dynamic based on file size (upload time)
pool=120.0, # 2 minutes to acquire connection from pool
)
logging.info(
f"LlamaCloud upload configured: file_size={file_size_mb:.1f}MB, "
f"pages={estimated_pages}, upload_timeout={upload_timeout:.0f}s, "
@ -335,14 +337,14 @@ async def parse_with_llamacloud_retry(
# Parse the file asynchronously
result = await parser.aparse(file_path)
# Success - log if we had previous failures
if attempt > 1:
logging.info(
f"LlamaCloud upload succeeded on attempt {attempt} after "
f"{len(attempt_errors)} failures"
)
return result
except LLAMACLOUD_RETRYABLE_EXCEPTIONS as e:
@ -355,8 +357,7 @@ async def parse_with_llamacloud_retry(
# Calculate exponential backoff with jitter
# Base delay doubles each attempt, capped at max delay
base_delay = min(
LLAMACLOUD_BASE_DELAY * (2 ** (attempt - 1)),
LLAMACLOUD_MAX_DELAY
LLAMACLOUD_BASE_DELAY * (2 ** (attempt - 1)), LLAMACLOUD_MAX_DELAY
)
# Add random jitter (±25%) to prevent thundering herd
jitter = base_delay * 0.25 * (2 * random.random() - 1)