feat: enhance Google Drive indexing with new options

- Updated the Google Drive indexing functionality to include indexing options such as max files per folder, incremental sync, and inclusion of subfolders.
- Modified the API to accept a new 'indexing_options' parameter in the request body.
- Enhanced the UI to allow users to configure these options when selecting folders and files for indexing.
- Updated related components and tasks to support the new indexing options, ensuring a more flexible and efficient indexing process.
This commit is contained in:
Anish Sarkar 2026-01-17 12:33:57 +05:30
parent cf53338119
commit a3112a24fe
9 changed files with 381 additions and 178 deletions

View file

@ -1716,7 +1716,7 @@ async def run_google_drive_indexing(
connector_id: int,
search_space_id: int,
user_id: str,
items_dict: dict, # Dictionary with 'folders' and 'files' lists
items_dict: dict, # Dictionary with 'folders', 'files', and 'indexing_options'
):
"""Runs the Google Drive indexing task for folders and files with notifications."""
from uuid import UUID
@ -1730,6 +1730,7 @@ async def run_google_drive_indexing(
# Parse the structured data
items = GoogleDriveIndexRequest(**items_dict)
indexing_options = items.indexing_options
total_indexed = 0
errors = []
@ -1765,7 +1766,7 @@ async def run_google_drive_indexing(
stage="fetching",
)
# Index each folder
# Index each folder with indexing options
for folder in items.folders:
try:
indexed_count, error_message = await index_google_drive_files(
@ -1775,8 +1776,10 @@ async def run_google_drive_indexing(
user_id,
folder_id=folder.id,
folder_name=folder.name,
use_delta_sync=True,
use_delta_sync=indexing_options.incremental_sync,
update_last_indexed=False,
max_files=indexing_options.max_files_per_folder,
include_subfolders=indexing_options.include_subfolders,
)
if error_message:
errors.append(f"Folder '{folder.name}': {error_message}")

View file

@ -10,7 +10,7 @@ from .documents import (
ExtensionDocumentMetadata,
PaginatedResponse,
)
from .google_drive import DriveItem, GoogleDriveIndexRequest
from .google_drive import DriveItem, GoogleDriveIndexingOptions, GoogleDriveIndexRequest
from .logs import LogBase, LogCreate, LogFilter, LogRead, LogUpdate
from .new_chat import (
ChatMessage,
@ -90,6 +90,7 @@ __all__ = [
"DocumentsCreate",
# Google Drive schemas
"DriveItem",
"GoogleDriveIndexingOptions",
"ExtensionDocumentContent",
"ExtensionDocumentMetadata",
"GlobalNewLLMConfigRead",

View file

@ -10,6 +10,25 @@ class DriveItem(BaseModel):
name: str = Field(..., description="Item display name")
class GoogleDriveIndexingOptions(BaseModel):
"""Indexing options for Google Drive connector."""
max_files_per_folder: int = Field(
default=100,
ge=1,
le=1000,
description="Maximum number of files to index from each folder (1-1000)",
)
incremental_sync: bool = Field(
default=True,
description="Only sync changes since last index (faster). Disable for a full re-index.",
)
include_subfolders: bool = Field(
default=True,
description="Recursively index files in subfolders of selected folders",
)
class GoogleDriveIndexRequest(BaseModel):
"""Request body for indexing Google Drive content."""
@ -19,6 +38,10 @@ class GoogleDriveIndexRequest(BaseModel):
files: list[DriveItem] = Field(
default_factory=list, description="List of specific files to index"
)
indexing_options: GoogleDriveIndexingOptions = Field(
default_factory=GoogleDriveIndexingOptions,
description="Indexing configuration options",
)
def has_items(self) -> bool:
"""Check if any items are selected."""

View file

@ -461,7 +461,7 @@ def index_google_drive_files_task(
connector_id: int,
search_space_id: int,
user_id: str,
items_dict: dict, # Dictionary with 'folders' and 'files' lists
items_dict: dict, # Dictionary with 'folders', 'files', and 'indexing_options'
):
"""Celery task to index Google Drive folders and files."""
import asyncio
@ -486,7 +486,7 @@ async def _index_google_drive_files(
connector_id: int,
search_space_id: int,
user_id: str,
items_dict: dict, # Dictionary with 'folders' and 'files' lists
items_dict: dict, # Dictionary with 'folders', 'files', and 'indexing_options'
):
"""Index Google Drive folders and files with new session."""
from app.routes.search_source_connectors_routes import (

View file

@ -72,6 +72,7 @@ async def _check_and_trigger_schedules():
index_elasticsearch_documents_task,
index_github_repos_task,
index_google_calendar_events_task,
index_google_drive_files_task,
index_google_gmail_messages_task,
index_jira_issues_task,
index_linear_issues_task,
@ -96,6 +97,7 @@ async def _check_and_trigger_schedules():
SearchSourceConnectorType.LUMA_CONNECTOR: index_luma_events_task,
SearchSourceConnectorType.ELASTICSEARCH_CONNECTOR: index_elasticsearch_documents_task,
SearchSourceConnectorType.WEBCRAWLER_CONNECTOR: index_crawled_urls_task,
SearchSourceConnectorType.GOOGLE_DRIVE_CONNECTOR: index_google_drive_files_task,
}
# Trigger indexing for each due connector
@ -106,13 +108,42 @@ async def _check_and_trigger_schedules():
f"Triggering periodic indexing for connector {connector.id} "
f"({connector.connector_type.value})"
)
task.delay(
connector.id,
connector.search_space_id,
str(connector.user_id),
None, # start_date - uses last_indexed_at
None, # end_date - uses now
)
# Special handling for Google Drive - uses config for folder/file selection
if connector.connector_type == SearchSourceConnectorType.GOOGLE_DRIVE_CONNECTOR:
config = connector.config or {}
selected_folders = config.get("selected_folders", [])
selected_files = config.get("selected_files", [])
indexing_options = config.get("indexing_options", {
"max_files_per_folder": 100,
"incremental_sync": True,
"include_subfolders": True,
})
if selected_folders or selected_files:
task.delay(
connector.id,
connector.search_space_id,
str(connector.user_id),
{
"folders": selected_folders,
"files": selected_files,
"indexing_options": indexing_options,
},
)
else:
logger.warning(
f"Google Drive connector {connector.id} has no folders or files selected, skipping periodic indexing"
)
continue
else:
task.delay(
connector.id,
connector.search_space_id,
str(connector.user_id),
None, # start_date - uses last_indexed_at
None, # end_date - uses now
)
# Update next_scheduled_at for next run
from datetime import timedelta

View file

@ -37,6 +37,7 @@ async def index_google_drive_files(
use_delta_sync: bool = True,
update_last_indexed: bool = True,
max_files: int = 500,
include_subfolders: bool = False,
) -> tuple[int, str | None]:
"""
Index Google Drive files for a specific connector.
@ -51,6 +52,7 @@ async def index_google_drive_files(
use_delta_sync: Whether to use change tracking for incremental sync
update_last_indexed: Whether to update last_indexed_at timestamp
max_files: Maximum number of files to index
include_subfolders: Whether to recursively index files in subfolders
Returns:
Tuple of (number_of_indexed_files, error_message)
@ -144,6 +146,7 @@ async def index_google_drive_files(
task_logger=task_logger,
log_entry=log_entry,
max_files=max_files,
include_subfolders=include_subfolders,
)
else:
logger.info(f"Using full scan for connector {connector_id}")
@ -159,6 +162,7 @@ async def index_google_drive_files(
task_logger=task_logger,
log_entry=log_entry,
max_files=max_files,
include_subfolders=include_subfolders,
)
documents_indexed, documents_skipped = result
@ -375,60 +379,80 @@ async def _index_full_scan(
task_logger: TaskLoggingService,
log_entry: any,
max_files: int,
include_subfolders: bool = False,
) -> tuple[int, int]:
"""Perform full scan indexing of a folder."""
await task_logger.log_task_progress(
log_entry,
f"Starting full scan of folder: {folder_name}",
{"stage": "full_scan", "folder_id": folder_id},
f"Starting full scan of folder: {folder_name} (include_subfolders={include_subfolders})",
{"stage": "full_scan", "folder_id": folder_id, "include_subfolders": include_subfolders},
)
documents_indexed = 0
documents_skipped = 0
page_token = None
files_processed = 0
while files_processed < max_files:
files, next_token, error = await get_files_in_folder(
drive_client, folder_id, include_subfolders=False, page_token=page_token
)
# Queue of folders to process: (folder_id, folder_name)
folders_to_process = [(folder_id, folder_name)]
if error:
logger.error(f"Error listing files: {error}")
break
while folders_to_process and files_processed < max_files:
current_folder_id, current_folder_name = folders_to_process.pop(0)
logger.info(f"Processing folder: {current_folder_name} ({current_folder_id})")
page_token = None
if not files:
break
for file in files:
if files_processed >= max_files:
break
files_processed += 1
indexed, skipped = await _process_single_file(
drive_client=drive_client,
session=session,
file=file,
connector_id=connector_id,
search_space_id=search_space_id,
user_id=user_id,
task_logger=task_logger,
log_entry=log_entry,
while files_processed < max_files:
# Get files and folders in current folder
# include_subfolders=True here so we get folder items to queue them
files, next_token, error = await get_files_in_folder(
drive_client, current_folder_id, include_subfolders=True, page_token=page_token
)
documents_indexed += indexed
documents_skipped += skipped
if error:
logger.error(f"Error listing files in {current_folder_name}: {error}")
break
if documents_indexed % 10 == 0 and documents_indexed > 0:
await session.commit()
logger.info(
f"Committed batch: {documents_indexed} files indexed so far"
if not files:
break
for file in files:
if files_processed >= max_files:
break
mime_type = file.get("mimeType", "")
# If this is a folder and include_subfolders is enabled, queue it for processing
if mime_type == "application/vnd.google-apps.folder":
if include_subfolders:
folders_to_process.append((file["id"], file.get("name", "Unknown")))
logger.debug(f"Queued subfolder: {file.get('name', 'Unknown')}")
continue
# Process the file
files_processed += 1
indexed, skipped = await _process_single_file(
drive_client=drive_client,
session=session,
file=file,
connector_id=connector_id,
search_space_id=search_space_id,
user_id=user_id,
task_logger=task_logger,
log_entry=log_entry,
)
page_token = next_token
if not page_token:
break
documents_indexed += indexed
documents_skipped += skipped
if documents_indexed % 10 == 0 and documents_indexed > 0:
await session.commit()
logger.info(
f"Committed batch: {documents_indexed} files indexed so far"
)
page_token = next_token
if not page_token:
break
logger.info(
f"Full scan complete: {documents_indexed} indexed, {documents_skipped} skipped"
@ -448,8 +472,13 @@ async def _index_with_delta_sync(
task_logger: TaskLoggingService,
log_entry: any,
max_files: int,
include_subfolders: bool = False,
) -> tuple[int, int]:
"""Perform delta sync indexing using change tracking."""
"""Perform delta sync indexing using change tracking.
Note: include_subfolders is accepted for API consistency but delta sync
automatically tracks changes across all folders including subfolders.
"""
await task_logger.log_task_progress(
log_entry,
f"Starting delta sync from token: {start_page_token[:20]}...",