feat: add file selection to Google Drive connector

- Add structured request body with folders and files arrays
- Support individual file indexing alongside folder indexing
- Remove deprecated folder_ids/folder_names query params
- Update UI to allow selecting both folders and files
This commit is contained in:
CREDO23 2025-12-31 14:15:07 +02:00
parent 476c764611
commit 9c78726b6b
12 changed files with 366 additions and 97 deletions

View file

@ -4,13 +4,14 @@ from .change_tracker import categorize_change, fetch_all_changes, get_start_page
from .client import GoogleDriveClient
from .content_extractor import download_and_process_file
from .credentials import get_valid_credentials, validate_credentials
from .folder_manager import get_files_in_folder, list_folder_contents
from .folder_manager import get_file_by_id, get_files_in_folder, list_folder_contents
__all__ = [
"GoogleDriveClient",
"categorize_change",
"download_and_process_file",
"fetch_all_changes",
"get_file_by_id",
"get_files_in_folder",
"get_start_page_token",
"get_valid_credentials",

View file

@ -140,6 +140,39 @@ async def get_files_in_folder(
return [], None, f"Error getting files in folder: {e!s}"
async def get_file_by_id(
client: GoogleDriveClient,
file_id: str,
) -> tuple[dict[str, Any] | None, str | None]:
"""
Get file metadata by ID.
Args:
client: GoogleDriveClient instance
file_id: File ID to fetch
Returns:
Tuple of (file metadata dict, error message)
"""
try:
file, error = await client.get_file_metadata(
file_id,
fields="id, name, mimeType, parents, createdTime, modifiedTime, size, webViewLink, iconLink",
)
if error:
return None, error
if not file:
return None, f"File not found: {file_id}"
return file, None
except Exception as e:
logger.error(f"Error getting file by ID: {e!s}", exc_info=True)
return None, f"Error getting file by ID: {e!s}"
def format_folder_path(hierarchy: list[dict[str, str]]) -> str:
"""
Format folder hierarchy as a path string.

View file

@ -14,7 +14,7 @@ import logging
from datetime import UTC, datetime, timedelta
from typing import Any
from fastapi import APIRouter, Depends, HTTPException, Query
from fastapi import APIRouter, Body, Depends, HTTPException, Query
from pydantic import BaseModel, Field, ValidationError
from sqlalchemy.exc import IntegrityError
from sqlalchemy.ext.asyncio import AsyncSession
@ -30,6 +30,7 @@ from app.db import (
get_async_session,
)
from app.schemas import (
GoogleDriveIndexRequest,
SearchSourceConnectorBase,
SearchSourceConnectorCreate,
SearchSourceConnectorRead,
@ -542,13 +543,9 @@ async def index_connector_content(
None,
description="End date for indexing (YYYY-MM-DD format). If not provided, uses today's date",
),
folder_ids: str = Query(
drive_items: GoogleDriveIndexRequest | None = Body(
None,
description="[Google Drive only] Comma-separated folder IDs to index",
),
folder_names: str = Query(
None,
description="[Google Drive only] Comma-separated folder names for display purposes",
description="[Google Drive only] Structured request with folders and files to index",
),
session: AsyncSession = Depends(get_async_session),
user: User = Depends(current_active_user),
@ -762,22 +759,23 @@ async def index_connector_content(
index_google_drive_files_task,
)
if not folder_ids or not folder_names:
if not drive_items or not drive_items.has_items():
raise HTTPException(
status_code=400,
detail="Google Drive indexing requires folder_ids and folder_names parameters",
detail="Google Drive indexing requires drive_items body parameter with folders or files",
)
logger.info(
f"Triggering Google Drive indexing for connector {connector_id} into search space {search_space_id}, folders: {folder_names}"
f"Triggering Google Drive indexing for connector {connector_id} into search space {search_space_id}, "
f"folders: {len(drive_items.folders)}, files: {len(drive_items.files)}"
)
# Pass comma-separated strings directly to Celery task
# Pass structured data to Celery task
index_google_drive_files_task.delay(
connector_id,
search_space_id,
str(user.id),
folder_ids, # Pass as comma-separated string
folder_names, # Pass as comma-separated string
drive_items.model_dump(), # Convert to dict for JSON serialization
)
response_message = "Google Drive indexing started in the background."
@ -1554,45 +1552,63 @@ async def run_google_drive_indexing(
connector_id: int,
search_space_id: int,
user_id: str,
folder_ids: str, # Comma-separated folder IDs
folder_names: str, # Comma-separated folder names
items_dict: dict, # Dictionary with 'folders' and 'files' lists
):
"""Runs the Google Drive indexing task for multiple folders and updates the timestamp."""
"""Runs the Google Drive indexing task for folders and files and updates the timestamp."""
try:
from app.tasks.connector_indexers.google_drive_indexer import (
index_google_drive_files,
index_google_drive_single_file,
)
# Split comma-separated IDs and names into lists
folder_id_list = [fid.strip() for fid in folder_ids.split(",")]
folder_name_list = [fname.strip() for fname in folder_names.split(",")]
# Parse the structured data
items = GoogleDriveIndexRequest(**items_dict)
total_indexed = 0
errors = []
# Index each folder
for folder_id, folder_name in zip(
folder_id_list, folder_name_list, strict=False
):
for folder in items.folders:
try:
indexed_count, error_message = await index_google_drive_files(
session,
connector_id,
search_space_id,
user_id,
folder_id,
folder_name,
folder_id=folder.id,
folder_name=folder.name,
use_delta_sync=True,
update_last_indexed=False,
)
if error_message:
errors.append(f"{folder_name}: {error_message}")
errors.append(f"Folder '{folder.name}': {error_message}")
else:
total_indexed += indexed_count
except Exception as e:
errors.append(f"{folder_name}: {e!s}")
errors.append(f"Folder '{folder.name}': {e!s}")
logger.error(
f"Error indexing folder {folder_name} ({folder_id}): {e}",
f"Error indexing folder {folder.name} ({folder.id}): {e}",
exc_info=True,
)
# Index each individual file
for file in items.files:
try:
indexed_count, error_message = await index_google_drive_single_file(
session,
connector_id,
search_space_id,
user_id,
file_id=file.id,
file_name=file.name,
)
if error_message:
errors.append(f"File '{file.name}': {error_message}")
else:
total_indexed += indexed_count
except Exception as e:
errors.append(f"File '{file.name}': {e!s}")
logger.error(
f"Error indexing file {file.name} ({file.id}): {e}",
exc_info=True,
)
@ -1602,7 +1618,7 @@ async def run_google_drive_indexing(
)
else:
logger.info(
f"Google Drive indexing successful for connector {connector_id}. Indexed {total_indexed} documents from {len(folder_id_list)} folder(s)."
f"Google Drive indexing successful for connector {connector_id}. Indexed {total_indexed} documents from {len(items.folders)} folder(s) and {len(items.files)} file(s)."
)
# Update the last indexed timestamp only on full success
await update_connector_last_indexed(session, connector_id)

View file

@ -10,6 +10,7 @@ from .documents import (
ExtensionDocumentMetadata,
PaginatedResponse,
)
from .google_drive import DriveItem, GoogleDriveIndexRequest
from .logs import LogBase, LogCreate, LogFilter, LogRead, LogUpdate
from .new_chat import (
ChatMessage,
@ -79,6 +80,8 @@ __all__ = [
"DefaultSystemInstructionsResponse",
# Document schemas
"DocumentBase",
# Google Drive schemas
"DriveItem",
"DocumentRead",
"DocumentUpdate",
"DocumentWithChunksRead",
@ -86,6 +89,7 @@ __all__ = [
"ExtensionDocumentContent",
"ExtensionDocumentMetadata",
"GlobalNewLLMConfigRead",
"GoogleDriveIndexRequest",
# Base schemas
"IDModel",
# RBAC schemas

View file

@ -0,0 +1,42 @@
"""Schemas for Google Drive connector."""
from pydantic import BaseModel, Field
class DriveItem(BaseModel):
"""Represents a Google Drive file or folder."""
id: str = Field(..., description="Google Drive item ID")
name: str = Field(..., description="Item display name")
class GoogleDriveIndexRequest(BaseModel):
"""Request body for indexing Google Drive content."""
folders: list[DriveItem] = Field(
default_factory=list, description="List of folders to index"
)
files: list[DriveItem] = Field(
default_factory=list, description="List of specific files to index"
)
def has_items(self) -> bool:
"""Check if any items are selected."""
return len(self.folders) > 0 or len(self.files) > 0
def get_folder_ids(self) -> list[str]:
"""Get list of folder IDs."""
return [folder.id for folder in self.folders]
def get_folder_names(self) -> list[str]:
"""Get list of folder names."""
return [folder.name for folder in self.folders]
def get_file_ids(self) -> list[str]:
"""Get list of file IDs."""
return [file.id for file in self.files]
def get_file_names(self) -> list[str]:
"""Get list of file names."""
return [file.name for file in self.files]

View file

@ -479,10 +479,9 @@ def index_google_drive_files_task(
connector_id: int,
search_space_id: int,
user_id: str,
folder_ids: str, # Comma-separated folder IDs
folder_names: str, # Comma-separated folder names
items_dict: dict, # Dictionary with 'folders' and 'files' lists
):
"""Celery task to index Google Drive files from multiple folders."""
"""Celery task to index Google Drive folders and files."""
import asyncio
loop = asyncio.new_event_loop()
@ -494,8 +493,7 @@ def index_google_drive_files_task(
connector_id,
search_space_id,
user_id,
folder_ids,
folder_names,
items_dict,
)
)
finally:
@ -506,10 +504,9 @@ async def _index_google_drive_files(
connector_id: int,
search_space_id: int,
user_id: str,
folder_ids: str, # Comma-separated folder IDs
folder_names: str, # Comma-separated folder names
items_dict: dict, # Dictionary with 'folders' and 'files' lists
):
"""Index Google Drive files from multiple folders with new session."""
"""Index Google Drive folders and files with new session."""
from app.routes.search_source_connectors_routes import (
run_google_drive_indexing,
)
@ -520,8 +517,7 @@ async def _index_google_drive_files(
connector_id,
search_space_id,
user_id,
folder_ids,
folder_names,
items_dict,
)

View file

@ -10,6 +10,7 @@ from app.connectors.google_drive import (
categorize_change,
download_and_process_file,
fetch_all_changes,
get_file_by_id,
get_files_in_folder,
get_start_page_token,
)
@ -194,6 +195,131 @@ async def index_google_drive_files(
return 0, f"Failed to index Google Drive files: {e!s}"
async def index_google_drive_single_file(
session: AsyncSession,
connector_id: int,
search_space_id: int,
user_id: str,
file_id: str,
file_name: str | None = None,
) -> tuple[int, str | None]:
"""
Index a single Google Drive file by its ID.
Args:
session: Database session
connector_id: ID of the Drive connector
search_space_id: ID of the search space
user_id: ID of the user
file_id: Specific file ID to index
file_name: File name for display (optional)
Returns:
Tuple of (number_of_indexed_files, error_message)
"""
task_logger = TaskLoggingService(session, search_space_id)
log_entry = await task_logger.log_task_start(
task_name="google_drive_single_file_indexing",
source="connector_indexing_task",
message=f"Starting Google Drive single file indexing for file {file_id}",
metadata={
"connector_id": connector_id,
"user_id": str(user_id),
"file_id": file_id,
"file_name": file_name,
},
)
try:
connector = await get_connector_by_id(
session, connector_id, SearchSourceConnectorType.GOOGLE_DRIVE_CONNECTOR
)
if not connector:
error_msg = f"Google Drive connector with ID {connector_id} not found"
await task_logger.log_task_failure(
log_entry, error_msg, {"error_type": "ConnectorNotFound"}
)
return 0, error_msg
await task_logger.log_task_progress(
log_entry,
f"Initializing Google Drive client for connector {connector_id}",
{"stage": "client_initialization"},
)
drive_client = GoogleDriveClient(session, connector_id)
# Fetch the file metadata
file, error = await get_file_by_id(drive_client, file_id)
if error or not file:
error_msg = f"Failed to fetch file {file_id}: {error or 'File not found'}"
await task_logger.log_task_failure(
log_entry, error_msg, {"error_type": "FileNotFound"}
)
return 0, error_msg
display_name = file_name or file.get("name", "Unknown")
logger.info(f"Indexing Google Drive file: {display_name} ({file_id})")
# Process the file
indexed, skipped = await _process_single_file(
drive_client=drive_client,
session=session,
file=file,
connector_id=connector_id,
search_space_id=search_space_id,
user_id=user_id,
task_logger=task_logger,
log_entry=log_entry,
)
await session.commit()
logger.info("Successfully committed Google Drive file indexing changes to database")
if indexed > 0:
await task_logger.log_task_success(
log_entry,
f"Successfully indexed file {display_name}",
{
"file_name": display_name,
"file_id": file_id,
},
)
logger.info(f"Google Drive file indexing completed: {display_name}")
return 1, None
else:
await task_logger.log_task_progress(
log_entry,
f"File {display_name} was skipped",
{"status": "skipped"},
)
return 0, None
except SQLAlchemyError as db_error:
await session.rollback()
await task_logger.log_task_failure(
log_entry,
f"Database error during file indexing",
str(db_error),
{"error_type": "SQLAlchemyError"},
)
logger.error(f"Database error: {db_error!s}", exc_info=True)
return 0, f"Database error: {db_error!s}"
except Exception as e:
await session.rollback()
await task_logger.log_task_failure(
log_entry,
f"Failed to index Google Drive file",
str(e),
{"error_type": type(e).__name__},
)
logger.error(f"Failed to index Google Drive file: {e!s}", exc_info=True)
return 0, f"Failed to index Google Drive file: {e!s}"
async def _index_full_scan(
drive_client: GoogleDriveClient,
session: AsyncSession,

View file

@ -119,9 +119,10 @@ export default function ConnectorsPage() {
const [customFrequency, setCustomFrequency] = useState<string>("");
const [isSavingPeriodic, setIsSavingPeriodic] = useState(false);
// Google Drive folder selection state
// Google Drive folder and file selection state
const [driveFolderDialogOpen, setDriveFolderDialogOpen] = useState(false);
const [selectedFolders, setSelectedFolders] = useState<Array<{ id: string; name: string }>>([]);
const [selectedFiles, setSelectedFiles] = useState<Array<{ id: string; name: string }>>([]);
useEffect(() => {
if (error) {
@ -162,10 +163,10 @@ export default function ConnectorsPage() {
setDriveFolderDialogOpen(true);
};
// Handle Google Drive folder indexing
const handleIndexDriveFolder = async () => {
if (selectedConnectorForIndexing === null || selectedFolders.length === 0) {
toast.error("Please select at least one folder");
// Handle Google Drive folder and file indexing
const handleIndexGoogleDrive = async () => {
if (selectedConnectorForIndexing === null || (selectedFolders.length === 0 && selectedFiles.length === 0)) {
toast.error("Please select at least one folder or file");
return;
}
@ -174,15 +175,14 @@ export default function ConnectorsPage() {
try {
setIndexingConnectorId(selectedConnectorForIndexing);
const folderIds = selectedFolders.map((f) => f.id).join(",");
const folderNames = selectedFolders.map((f) => f.name).join(", ");
await indexConnector({
connector_id: selectedConnectorForIndexing,
body: {
folders: selectedFolders,
files: selectedFiles,
},
queryParams: {
search_space_id: searchSpaceId,
folder_ids: folderIds,
folder_names: folderNames,
},
});
toast.success(t("indexing_started"));
@ -190,10 +190,11 @@ export default function ConnectorsPage() {
console.error("Error indexing connector content:", error);
toast.error(error instanceof Error ? error.message : t("indexing_failed"));
} finally {
setIndexingConnectorId(null);
setSelectedConnectorForIndexing(null);
setSelectedFolders([]);
}
setIndexingConnectorId(null);
setSelectedConnectorForIndexing(null);
setSelectedFolders([]);
setSelectedFiles([]);
}
};
// Handle connector indexing with dates
@ -679,11 +680,11 @@ export default function ConnectorsPage() {
<Dialog open={driveFolderDialogOpen} onOpenChange={setDriveFolderDialogOpen}>
<DialogContent className="w-auto max-w-full">
<DialogHeader>
<DialogTitle>Select Google Drive Folders</DialogTitle>
<DialogTitle>Select Google Drive Folders & Files</DialogTitle>
<DialogDescription className="flex items-start gap-2 text-sm p-2 border mt-1 rounded ">
<Info className="h-4 w-4 shrink-0 text-blue-500" />
<span>
Select folders to index. Only files <strong>directly in each folder</strong> will be
Select folders and/or individual files to index. For folders, only files <strong>directly in each folder</strong> will be
processedsubfolders must be selected separately.
</span>
</DialogDescription>
@ -698,23 +699,43 @@ export default function ConnectorsPage() {
onSelectFolders={(folders) => {
setSelectedFolders(folders);
}}
selectedFiles={selectedFiles}
onSelectFiles={(files) => {
setSelectedFiles(files);
}}
/>
)}
</div>
{selectedFolders.length > 0 && (
{(selectedFolders.length > 0 || selectedFiles.length > 0) && (
<div className="p-3 bg-muted rounded-lg text-sm space-y-2">
<div>
<p className="font-medium mb-1">
Selected {selectedFolders.length} folder{selectedFolders.length > 1 ? "s" : ""}:
</p>
<div className="max-h-24 overflow-y-auto">
{selectedFolders.map((folder) => (
<p key={folder.id} className="text-sm text-muted-foreground truncate" title={folder.name}>
{folder.name}
</p>
))}
{selectedFolders.length > 0 && (
<div>
<p className="font-medium mb-1">
Selected {selectedFolders.length} folder{selectedFolders.length > 1 ? "s" : ""}:
</p>
<div className="max-h-24 overflow-y-auto">
{selectedFolders.map((folder) => (
<p key={folder.id} className="text-sm text-muted-foreground truncate" title={folder.name}>
📁 {folder.name}
</p>
))}
</div>
</div>
</div>
)}
{selectedFiles.length > 0 && (
<div>
<p className="font-medium mb-1">
Selected {selectedFiles.length} file{selectedFiles.length > 1 ? "s" : ""}:
</p>
<div className="max-h-24 overflow-y-auto">
{selectedFiles.map((file) => (
<p key={file.id} className="text-sm text-muted-foreground truncate" title={file.name}>
📄 {file.name}
</p>
))}
</div>
</div>
)}
</div>
)}
</div>
@ -725,11 +746,12 @@ export default function ConnectorsPage() {
setDriveFolderDialogOpen(false);
setSelectedConnectorForIndexing(null);
setSelectedFolders([]);
setSelectedFiles([]);
}}
>
{tCommon("cancel")}
</Button>
<Button onClick={handleIndexDriveFolder} disabled={selectedFolders.length === 0}>
<Button onClick={handleIndexGoogleDrive} disabled={selectedFolders.length === 0 && selectedFiles.length === 0}>
{t("start_indexing")}
</Button>
</DialogFooter>

View file

@ -47,6 +47,8 @@ interface GoogleDriveFolderTreeProps {
connectorId: number;
selectedFolders: SelectedFolder[];
onSelectFolders: (folders: SelectedFolder[]) => void;
selectedFiles?: SelectedFolder[];
onSelectFiles?: (files: SelectedFolder[]) => void;
}
// Helper to get appropriate icon for file type
@ -70,6 +72,8 @@ export function GoogleDriveFolderTree({
connectorId,
selectedFolders,
onSelectFolders,
selectedFiles = [],
onSelectFiles = () => {},
}: GoogleDriveFolderTreeProps) {
const [itemStates, setItemStates] = useState<Map<string, ItemTreeNode>>(new Map());
@ -83,6 +87,10 @@ export function GoogleDriveFolderTree({
return selectedFolders.some((f) => f.id === folderId);
};
const isFileSelected = (fileId: string): boolean => {
return selectedFiles.some((f) => f.id === fileId);
};
const toggleFolderSelection = (folderId: string, folderName: string) => {
if (isFolderSelected(folderId)) {
onSelectFolders(selectedFolders.filter((f) => f.id !== folderId));
@ -91,6 +99,14 @@ export function GoogleDriveFolderTree({
}
};
const toggleFileSelection = (fileId: string, fileName: string) => {
if (isFileSelected(fileId)) {
onSelectFiles(selectedFiles.filter((f) => f.id !== fileId));
} else {
onSelectFiles([...selectedFiles, { id: fileId, name: fileName }]);
}
};
/**
* Find an item by ID across all loaded items (root and nested).
*/
@ -201,8 +217,8 @@ export function GoogleDriveFolderTree({
const isExpanded = state?.isExpanded || false;
const isLoading = state?.isLoading || false;
const children = state?.children;
const isSelected = isFolderSelected(item.id);
const isFolder = item.isFolder;
const isSelected = isFolder ? isFolderSelected(item.id) : isFileSelected(item.id);
const childFolders = children?.filter((c) => c.isFolder) || [];
const childFiles = children?.filter((c) => !c.isFolder) || [];
@ -211,10 +227,8 @@ export function GoogleDriveFolderTree({
<div key={item.id} className="w-full" style={{ marginLeft: `${level * 1.25}rem` }}>
<div
className={cn(
"flex items-center gap-2 h-auto py-2 px-2 rounded-md",
isFolder && "hover:bg-accent cursor-pointer",
!isFolder && "cursor-default opacity-60",
isSelected && isFolder && "bg-accent/50"
"flex items-center group gap-2 h-auto py-2 px-2 rounded-md hover:bg-accent cursor-pointer",
isSelected && "bg-accent/50"
)}
>
{isFolder ? (
@ -237,16 +251,20 @@ export function GoogleDriveFolderTree({
<span className="w-4 h-4 shrink-0" />
)}
{isFolder && (
<Checkbox
checked={isSelected}
onCheckedChange={() => toggleFolderSelection(item.id, item.name)}
className="shrink-0"
onClick={(e) => e.stopPropagation()}
/>
)}
<Checkbox
checked={isSelected}
onCheckedChange={() => {
if (isFolder) {
toggleFolderSelection(item.id, item.name);
} else {
toggleFileSelection(item.id, item.name);
}
}}
className="shrink-0 z-20 group-hover:border-white group-hover:border"
onClick={(e) => e.stopPropagation()}
/>
<div className="shrink-0" style={{ marginLeft: isFolder ? "0" : "1.25rem" }}>
<div className="shrink-0">
{isFolder ? (
isExpanded ? (
<FolderOpen className="h-4 w-4 text-blue-500" />

View file

@ -126,6 +126,24 @@ export const deleteConnectorResponse = z.object({
message: z.literal("Search source connector deleted successfully"),
});
/**
* Google Drive index request body
*/
export const googleDriveIndexBody = z.object({
folders: z.array(
z.object({
id: z.string(),
name: z.string(),
})
),
files: z.array(
z.object({
id: z.string(),
name: z.string(),
})
),
});
/**
* Index connector
*/
@ -135,10 +153,8 @@ export const indexConnectorRequest = z.object({
search_space_id: z.number().or(z.string()),
start_date: z.string().optional(),
end_date: z.string().optional(),
// Google Drive only
folder_ids: z.string().optional(),
folder_names: z.string().optional(),
}),
body: googleDriveIndexBody.optional(),
});
export const indexConnectorResponse = z.object({

View file

@ -267,9 +267,7 @@ export const useSearchSourceConnectors = (lazy: boolean = false, searchSpaceId?:
connectorId: number,
searchSpaceId: string | number,
startDate?: string,
endDate?: string,
folderIds?: string,
folderNames?: string
endDate?: string
) => {
try {
// Build query parameters
@ -282,12 +280,6 @@ export const useSearchSourceConnectors = (lazy: boolean = false, searchSpaceId?:
if (endDate) {
params.append("end_date", endDate);
}
if (folderIds) {
params.append("folder_ids", folderIds);
}
if (folderNames) {
params.append("folder_names", folderNames);
}
const response = await authenticatedFetch(
`${

View file

@ -164,7 +164,7 @@ class ConnectorsApiService {
throw new ValidationError(`Invalid request: ${errorMessage}`);
}
const { connector_id, queryParams } = parsedRequest.data;
const { connector_id, queryParams, body } = parsedRequest.data;
// Transform query params to be string values
const transformedQueryParams = Object.fromEntries(
@ -177,7 +177,10 @@ class ConnectorsApiService {
return baseApiService.post(
`/api/v1/search-source-connectors/${connector_id}/index?${queryString}`,
indexConnectorResponse
indexConnectorResponse,
{
body: body || {},
}
);
};