style(backend): run ruff format on 10 files

This commit is contained in:
CREDO23 2026-01-28 22:20:02 +02:00
parent 20b8a17254
commit 949ec949f6
10 changed files with 48 additions and 32 deletions

View file

@ -54,7 +54,9 @@ def set_generating_podcast(search_space_id: int, podcast_id: int) -> None:
client = get_redis_client() client = get_redis_client()
client.setex(_redis_key(search_space_id), 1800, str(podcast_id)) client.setex(_redis_key(search_space_id), 1800, str(podcast_id))
except Exception as e: except Exception as e:
print(f"[generate_podcast] Warning: Could not set generating podcast in Redis: {e}") print(
f"[generate_podcast] Warning: Could not set generating podcast in Redis: {e}"
)
def create_generate_podcast_tool( def create_generate_podcast_tool(

View file

@ -670,7 +670,9 @@ async def delete_thread(
) from None ) from None
@router.post("/threads/{thread_id}/complete-clone", response_model=CompleteCloneResponse) @router.post(
"/threads/{thread_id}/complete-clone", response_model=CompleteCloneResponse
)
async def complete_clone( async def complete_clone(
thread_id: int, thread_id: int,
session: AsyncSession = Depends(get_async_session), session: AsyncSession = Depends(get_async_session),
@ -702,7 +704,9 @@ async def complete_clone(
raise HTTPException(status_code=400, detail="Clone already completed") raise HTTPException(status_code=400, detail="Clone already completed")
if not thread.cloned_from_thread_id: if not thread.cloned_from_thread_id:
raise HTTPException(status_code=400, detail="No source thread to clone from") raise HTTPException(
status_code=400, detail="No source thread to clone from"
)
message_count = await complete_clone_content( message_count = await complete_clone_content(
session=session, session=session,

View file

@ -53,7 +53,9 @@ async def clone_public_chat_endpoint(
source_thread = await get_thread_by_share_token(session, share_token) source_thread = await get_thread_by_share_token(session, share_token)
if not source_thread: if not source_thread:
raise HTTPException(status_code=404, detail="Chat not found or no longer public") raise HTTPException(
status_code=404, detail="Chat not found or no longer public"
)
target_search_space_id = await get_user_default_search_space(session, user.id) target_search_space_id = await get_user_default_search_space(session, user.id)

View file

@ -941,7 +941,11 @@ async def index_connector_content(
f"Triggering web pages indexing for connector {connector_id} into search space {search_space_id} from {indexing_from} to {indexing_to}" f"Triggering web pages indexing for connector {connector_id} into search space {search_space_id} from {indexing_from} to {indexing_to}"
) )
index_crawled_urls_task.delay( index_crawled_urls_task.delay(
connector_id, search_space_id, str(user.id), indexing_from, indexing_to connector_id,
search_space_id,
str(user.id),
indexing_from,
indexing_to,
) )
response_message = "Web page indexing started in the background." response_message = "Web page indexing started in the background."

View file

@ -257,14 +257,11 @@ class PublicChatResponse(BaseModel):
class CloneInitResponse(BaseModel): class CloneInitResponse(BaseModel):
thread_id: int thread_id: int
search_space_id: int search_space_id: int
share_token: str share_token: str
class CompleteCloneResponse(BaseModel): class CompleteCloneResponse(BaseModel):
status: str status: str
message_count: int message_count: int

View file

@ -59,6 +59,8 @@ class PodcastRead(PodcastBase):
"search_space_id": obj.search_space_id, "search_space_id": obj.search_space_id,
"status": obj.status, "status": obj.status,
"created_at": obj.created_at, "created_at": obj.created_at,
"transcript_entries": len(obj.podcast_transcript) if obj.podcast_transcript else None, "transcript_entries": len(obj.podcast_transcript)
if obj.podcast_transcript
else None,
} }
return cls(**data) return cls(**data)

View file

@ -55,7 +55,9 @@ def _clear_generating_podcast(search_space_id: int) -> None:
client = redis.from_url(redis_url, decode_responses=True) client = redis.from_url(redis_url, decode_responses=True)
key = f"podcast:generating:{search_space_id}" key = f"podcast:generating:{search_space_id}"
client.delete(key) client.delete(key)
logger.info(f"Cleared generating podcast key for search_space_id={search_space_id}") logger.info(
f"Cleared generating podcast key for search_space_id={search_space_id}"
)
except Exception as e: except Exception as e:
logger.warning(f"Could not clear generating podcast key: {e}") logger.warning(f"Could not clear generating podcast key: {e}")
@ -119,9 +121,7 @@ async def _generate_content_podcast(
) -> dict: ) -> dict:
"""Generate content-based podcast and update existing record.""" """Generate content-based podcast and update existing record."""
async with get_celery_session_maker()() as session: async with get_celery_session_maker()() as session:
result = await session.execute( result = await session.execute(select(Podcast).filter(Podcast.id == podcast_id))
select(Podcast).filter(Podcast.id == podcast_id)
)
podcast = result.scalars().first() podcast = result.scalars().first()
if not podcast: if not podcast:

View file

@ -165,7 +165,9 @@ async def _check_and_trigger_schedules():
from app.utils.webcrawler_utils import parse_webcrawler_urls from app.utils.webcrawler_utils import parse_webcrawler_urls
connector_config = connector.config or {} connector_config = connector.config or {}
urls = parse_webcrawler_urls(connector_config.get("INITIAL_URLS")) urls = parse_webcrawler_urls(
connector_config.get("INITIAL_URLS")
)
if urls: if urls:
task.delay( task.delay(

View file

@ -55,7 +55,9 @@ LLAMACLOUD_RETRYABLE_EXCEPTIONS = (
) )
# Timeout calculation constants # Timeout calculation constants
UPLOAD_BYTES_PER_SECOND_SLOW = 100 * 1024 # 100 KB/s (conservative for slow connections) UPLOAD_BYTES_PER_SECOND_SLOW = (
100 * 1024
) # 100 KB/s (conservative for slow connections)
MIN_UPLOAD_TIMEOUT = 120 # Minimum 2 minutes for any file MIN_UPLOAD_TIMEOUT = 120 # Minimum 2 minutes for any file
MAX_UPLOAD_TIMEOUT = 1800 # Maximum 30 minutes for very large files MAX_UPLOAD_TIMEOUT = 1800 # Maximum 30 minutes for very large files
BASE_JOB_TIMEOUT = 600 # 10 minutes base for job processing BASE_JOB_TIMEOUT = 600 # 10 minutes base for job processing
@ -219,19 +221,19 @@ async def find_existing_document_with_migration(
def calculate_upload_timeout(file_size_bytes: int) -> float: def calculate_upload_timeout(file_size_bytes: int) -> float:
""" """
Calculate appropriate upload timeout based on file size. Calculate appropriate upload timeout based on file size.
Assumes a conservative slow connection speed to handle worst-case scenarios. Assumes a conservative slow connection speed to handle worst-case scenarios.
Args: Args:
file_size_bytes: Size of the file in bytes file_size_bytes: Size of the file in bytes
Returns: Returns:
Timeout in seconds Timeout in seconds
""" """
# Calculate time needed at slow connection speed # Calculate time needed at slow connection speed
# Add 50% buffer for network variability and SSL overhead # Add 50% buffer for network variability and SSL overhead
estimated_time = (file_size_bytes / UPLOAD_BYTES_PER_SECOND_SLOW) * 1.5 estimated_time = (file_size_bytes / UPLOAD_BYTES_PER_SECOND_SLOW) * 1.5
# Clamp to reasonable bounds # Clamp to reasonable bounds
return max(MIN_UPLOAD_TIMEOUT, min(estimated_time, MAX_UPLOAD_TIMEOUT)) return max(MIN_UPLOAD_TIMEOUT, min(estimated_time, MAX_UPLOAD_TIMEOUT))
@ -239,21 +241,21 @@ def calculate_upload_timeout(file_size_bytes: int) -> float:
def calculate_job_timeout(estimated_pages: int, file_size_bytes: int) -> float: def calculate_job_timeout(estimated_pages: int, file_size_bytes: int) -> float:
""" """
Calculate job processing timeout based on page count and file size. Calculate job processing timeout based on page count and file size.
Args: Args:
estimated_pages: Estimated number of pages estimated_pages: Estimated number of pages
file_size_bytes: Size of the file in bytes file_size_bytes: Size of the file in bytes
Returns: Returns:
Timeout in seconds Timeout in seconds
""" """
# Base timeout + time per page # Base timeout + time per page
page_based_timeout = BASE_JOB_TIMEOUT + (estimated_pages * PER_PAGE_JOB_TIMEOUT) page_based_timeout = BASE_JOB_TIMEOUT + (estimated_pages * PER_PAGE_JOB_TIMEOUT)
# Also consider file size (large images take longer to process) # Also consider file size (large images take longer to process)
# ~1 minute per 10MB of file size # ~1 minute per 10MB of file size
size_based_timeout = BASE_JOB_TIMEOUT + (file_size_bytes / (10 * 1024 * 1024)) * 60 size_based_timeout = BASE_JOB_TIMEOUT + (file_size_bytes / (10 * 1024 * 1024)) * 60
# Use the larger of the two estimates # Use the larger of the two estimates
return max(page_based_timeout, size_based_timeout) return max(page_based_timeout, size_based_timeout)
@ -284,18 +286,18 @@ async def parse_with_llamacloud_retry(
""" """
import os import os
import random import random
from llama_cloud_services import LlamaParse from llama_cloud_services import LlamaParse
from llama_cloud_services.parse.utils import ResultType from llama_cloud_services.parse.utils import ResultType
# Get file size for timeout calculations # Get file size for timeout calculations
file_size_bytes = os.path.getsize(file_path) file_size_bytes = os.path.getsize(file_path)
file_size_mb = file_size_bytes / (1024 * 1024) file_size_mb = file_size_bytes / (1024 * 1024)
# Calculate dynamic timeouts based on file size and page count # Calculate dynamic timeouts based on file size and page count
upload_timeout = calculate_upload_timeout(file_size_bytes) upload_timeout = calculate_upload_timeout(file_size_bytes)
job_timeout = calculate_job_timeout(estimated_pages, file_size_bytes) job_timeout = calculate_job_timeout(estimated_pages, file_size_bytes)
# HTTP client timeouts - scaled based on file size # HTTP client timeouts - scaled based on file size
# Write timeout is critical for large file uploads # Write timeout is critical for large file uploads
custom_timeout = httpx.Timeout( custom_timeout = httpx.Timeout(
@ -304,7 +306,7 @@ async def parse_with_llamacloud_retry(
write=upload_timeout, # Dynamic based on file size (upload time) write=upload_timeout, # Dynamic based on file size (upload time)
pool=120.0, # 2 minutes to acquire connection from pool pool=120.0, # 2 minutes to acquire connection from pool
) )
logging.info( logging.info(
f"LlamaCloud upload configured: file_size={file_size_mb:.1f}MB, " f"LlamaCloud upload configured: file_size={file_size_mb:.1f}MB, "
f"pages={estimated_pages}, upload_timeout={upload_timeout:.0f}s, " f"pages={estimated_pages}, upload_timeout={upload_timeout:.0f}s, "
@ -335,14 +337,14 @@ async def parse_with_llamacloud_retry(
# Parse the file asynchronously # Parse the file asynchronously
result = await parser.aparse(file_path) result = await parser.aparse(file_path)
# Success - log if we had previous failures # Success - log if we had previous failures
if attempt > 1: if attempt > 1:
logging.info( logging.info(
f"LlamaCloud upload succeeded on attempt {attempt} after " f"LlamaCloud upload succeeded on attempt {attempt} after "
f"{len(attempt_errors)} failures" f"{len(attempt_errors)} failures"
) )
return result return result
except LLAMACLOUD_RETRYABLE_EXCEPTIONS as e: except LLAMACLOUD_RETRYABLE_EXCEPTIONS as e:
@ -355,8 +357,7 @@ async def parse_with_llamacloud_retry(
# Calculate exponential backoff with jitter # Calculate exponential backoff with jitter
# Base delay doubles each attempt, capped at max delay # Base delay doubles each attempt, capped at max delay
base_delay = min( base_delay = min(
LLAMACLOUD_BASE_DELAY * (2 ** (attempt - 1)), LLAMACLOUD_BASE_DELAY * (2 ** (attempt - 1)), LLAMACLOUD_MAX_DELAY
LLAMACLOUD_MAX_DELAY
) )
# Add random jitter (±25%) to prevent thundering herd # Add random jitter (±25%) to prevent thundering herd
jitter = base_delay * 0.25 * (2 * random.random() - 1) jitter = base_delay * 0.25 * (2 * random.random() - 1)

View file

@ -21,6 +21,8 @@ def parse_webcrawler_urls(initial_urls: str | list | None) -> list[str]:
if isinstance(initial_urls, str): if isinstance(initial_urls, str):
return [url.strip() for url in initial_urls.split("\n") if url.strip()] return [url.strip() for url in initial_urls.split("\n") if url.strip()]
elif isinstance(initial_urls, list): elif isinstance(initial_urls, list):
return [url.strip() for url in initial_urls if isinstance(url, str) and url.strip()] return [
url.strip() for url in initial_urls if isinstance(url, str) and url.strip()
]
else: else:
return [] return []