chore: ran backend linting

This commit is contained in:
Anish Sarkar 2026-01-28 09:10:37 +05:30
parent aab547264e
commit c125c9e87f
4 changed files with 32 additions and 23 deletions

View file

@ -470,7 +470,7 @@ async def check_document_by_content_hash(
session: AsyncSession, content_hash: str
) -> Document | None:
"""Check if a document with the given content hash already exists.
This is used to prevent duplicate content from being indexed, regardless
of which connector originally indexed it.
"""
@ -486,11 +486,11 @@ async def check_document_by_google_drive_file_id(
session: AsyncSession, file_id: str, search_space_id: int
) -> Document | None:
"""Check if a document with this Google Drive file ID exists (from any connector).
This checks both metadata key formats:
- 'google_drive_file_id' (normal Google Drive connector)
- 'file_id' (Composio Google Drive connector)
This allows detecting duplicates BEFORE downloading/ETL, saving expensive API calls.
"""
from sqlalchemy import String, cast, or_
@ -505,10 +505,11 @@ async def check_document_by_google_drive_file_id(
Document.search_space_id == search_space_id,
or_(
# Normal Google Drive connector format
cast(Document.document_metadata["google_drive_file_id"], String) == quoted_file_id,
cast(Document.document_metadata["google_drive_file_id"], String)
== quoted_file_id,
# Composio Google Drive connector format
cast(Document.document_metadata["file_id"], String) == quoted_file_id,
)
),
)
)
return existing_doc_result.scalars().first()
@ -533,15 +534,15 @@ def generate_indexing_settings_hash(
indexing_options: dict,
) -> str:
"""Generate a hash of indexing settings to detect configuration changes.
This hash is used to determine if indexing settings have changed since
the last index, which would require a full re-scan instead of delta sync.
Args:
selected_folders: List of {id, name} for folders to index
selected_files: List of {id, name} for individual files to index
indexing_options: Dict with max_files_per_folder, include_subfolders, etc.
Returns:
MD5 hash string of the settings
"""
@ -566,7 +567,7 @@ async def index_composio_google_drive(
max_items: int = 1000,
) -> tuple[int, int, str | None]:
"""Index Google Drive files via Composio with delta sync support.
Returns:
Tuple of (documents_indexed, documents_skipped, error_message or None)
@ -598,13 +599,13 @@ async def index_composio_google_drive(
selected_folders, selected_files, indexing_options
)
last_settings_hash = connector_config.get("last_indexed_settings_hash")
# Detect if settings changed since last index
settings_changed = (
last_settings_hash is not None and
current_settings_hash != last_settings_hash
last_settings_hash is not None
and current_settings_hash != last_settings_hash
)
if settings_changed:
logger.info(
f"Indexing settings changed for connector {connector_id}. "
@ -613,17 +614,17 @@ async def index_composio_google_drive(
# Check for stored page token for delta sync
stored_page_token = connector_config.get("drive_page_token")
# Determine whether to use delta sync:
# - Must have a stored page token
# - Must have been indexed before (last_indexed_at exists)
# - User must have incremental_sync enabled
# - Settings must not have changed (folder/subfolder config)
use_delta_sync = (
incremental_sync and
stored_page_token and
connector.last_indexed_at and
not settings_changed
incremental_sync
and stored_page_token
and connector.last_indexed_at
and not settings_changed
)
# Route to delta sync or full scan
@ -1177,7 +1178,9 @@ async def _process_single_drive_file(
# Check if content_hash already exists (from any connector)
# This prevents duplicate content and avoids IntegrityError on unique constraint
existing_by_content_hash = await check_document_by_content_hash(session, content_hash)
existing_by_content_hash = await check_document_by_content_hash(
session, content_hash
)
if existing_by_content_hash:
logger.info(
f"Skipping file {file_name} (file_id={file_id}): identical content "

View file

@ -123,7 +123,9 @@ async def list_all_permissions(
for perm in Permission:
# Extract category from permission value (e.g., "documents:read" -> "documents")
category = perm.value.split(":")[0] if ":" in perm.value else "general"
description = PERMISSION_DESCRIPTIONS.get(perm.value, f"Permission for {perm.value}")
description = PERMISSION_DESCRIPTIONS.get(
perm.value, f"Permission for {perm.value}"
)
permissions.append(
PermissionInfo(

View file

@ -1190,7 +1190,7 @@ async def _run_indexing_with_notifications(
end_date=end_date,
update_last_indexed=False,
)
# Handle both 2-tuple and 3-tuple returns for backwards compatibility
if len(result) == 3:
documents_processed, documents_skipped, error_or_warning = result

View file

@ -360,7 +360,9 @@ class ConnectorIndexingNotificationHandler(BaseNotificationHandler):
skipped_text = ""
if skipped_count and skipped_count > 0:
skipped_item_text = "item" if skipped_count == 1 else "items"
skipped_text = f" ({skipped_count} {skipped_item_text} skipped - already indexed)"
skipped_text = (
f" ({skipped_count} {skipped_item_text} skipped - already indexed)"
)
# If there's an error message but items were indexed, treat it as a warning (partial success)
# If is_warning is True, treat it as success even with 0 items (e.g., duplicates found)
@ -392,7 +394,9 @@ class ConnectorIndexingNotificationHandler(BaseNotificationHandler):
message = "Already up to date! No new items to sync."
else:
item_text = "item" if indexed_count == 1 else "items"
message = f"Now searchable! {indexed_count} {item_text} synced{skipped_text}."
message = (
f"Now searchable! {indexed_count} {item_text} synced{skipped_text}."
)
status = "completed"
metadata_updates = {