mirror of
https://github.com/MODSetter/SurfSense.git
synced 2026-06-30 21:59:46 +02:00
Merge remote-tracking branch 'upstream/dev' into fix/connector
This commit is contained in:
commit
fda682c9e7
42 changed files with 2174 additions and 314 deletions
|
|
@ -4,13 +4,14 @@ from .change_tracker import categorize_change, fetch_all_changes, get_start_page
|
|||
from .client import GoogleDriveClient
|
||||
from .content_extractor import download_and_process_file
|
||||
from .credentials import get_valid_credentials, validate_credentials
|
||||
from .folder_manager import get_files_in_folder, list_folder_contents
|
||||
from .folder_manager import get_file_by_id, get_files_in_folder, list_folder_contents
|
||||
|
||||
__all__ = [
|
||||
"GoogleDriveClient",
|
||||
"categorize_change",
|
||||
"download_and_process_file",
|
||||
"fetch_all_changes",
|
||||
"get_file_by_id",
|
||||
"get_files_in_folder",
|
||||
"get_start_page_token",
|
||||
"get_valid_credentials",
|
||||
|
|
|
|||
|
|
@ -140,6 +140,39 @@ async def get_files_in_folder(
|
|||
return [], None, f"Error getting files in folder: {e!s}"
|
||||
|
||||
|
||||
async def get_file_by_id(
|
||||
client: GoogleDriveClient,
|
||||
file_id: str,
|
||||
) -> tuple[dict[str, Any] | None, str | None]:
|
||||
"""
|
||||
Get file metadata by ID.
|
||||
|
||||
Args:
|
||||
client: GoogleDriveClient instance
|
||||
file_id: File ID to fetch
|
||||
|
||||
Returns:
|
||||
Tuple of (file metadata dict, error message)
|
||||
"""
|
||||
try:
|
||||
file, error = await client.get_file_metadata(
|
||||
file_id,
|
||||
fields="id, name, mimeType, parents, createdTime, modifiedTime, size, webViewLink, iconLink",
|
||||
)
|
||||
|
||||
if error:
|
||||
return None, error
|
||||
|
||||
if not file:
|
||||
return None, f"File not found: {file_id}"
|
||||
|
||||
return file, None
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting file by ID: {e!s}", exc_info=True)
|
||||
return None, f"Error getting file by ID: {e!s}"
|
||||
|
||||
|
||||
def format_folder_path(hierarchy: list[dict[str, str]]) -> str:
|
||||
"""
|
||||
Format folder hierarchy as a path string.
|
||||
|
|
|
|||
|
|
@ -14,7 +14,7 @@ import logging
|
|||
from datetime import UTC, datetime, timedelta
|
||||
from typing import Any
|
||||
|
||||
from fastapi import APIRouter, Depends, HTTPException, Query
|
||||
from fastapi import APIRouter, Body, Depends, HTTPException, Query
|
||||
from pydantic import BaseModel, Field, ValidationError
|
||||
from sqlalchemy.exc import IntegrityError
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
|
@ -30,6 +30,7 @@ from app.db import (
|
|||
get_async_session,
|
||||
)
|
||||
from app.schemas import (
|
||||
GoogleDriveIndexRequest,
|
||||
SearchSourceConnectorBase,
|
||||
SearchSourceConnectorCreate,
|
||||
SearchSourceConnectorRead,
|
||||
|
|
@ -542,13 +543,9 @@ async def index_connector_content(
|
|||
None,
|
||||
description="End date for indexing (YYYY-MM-DD format). If not provided, uses today's date",
|
||||
),
|
||||
folder_ids: str = Query(
|
||||
drive_items: GoogleDriveIndexRequest | None = Body(
|
||||
None,
|
||||
description="[Google Drive only] Comma-separated folder IDs to index",
|
||||
),
|
||||
folder_names: str = Query(
|
||||
None,
|
||||
description="[Google Drive only] Comma-separated folder names for display purposes",
|
||||
description="[Google Drive only] Structured request with folders and files to index",
|
||||
),
|
||||
session: AsyncSession = Depends(get_async_session),
|
||||
user: User = Depends(current_active_user),
|
||||
|
|
@ -762,22 +759,23 @@ async def index_connector_content(
|
|||
index_google_drive_files_task,
|
||||
)
|
||||
|
||||
if not folder_ids or not folder_names:
|
||||
if not drive_items or not drive_items.has_items():
|
||||
raise HTTPException(
|
||||
status_code=400,
|
||||
detail="Google Drive indexing requires folder_ids and folder_names parameters",
|
||||
detail="Google Drive indexing requires drive_items body parameter with folders or files",
|
||||
)
|
||||
|
||||
logger.info(
|
||||
f"Triggering Google Drive indexing for connector {connector_id} into search space {search_space_id}, folders: {folder_names}"
|
||||
f"Triggering Google Drive indexing for connector {connector_id} into search space {search_space_id}, "
|
||||
f"folders: {len(drive_items.folders)}, files: {len(drive_items.files)}"
|
||||
)
|
||||
# Pass comma-separated strings directly to Celery task
|
||||
|
||||
# Pass structured data to Celery task
|
||||
index_google_drive_files_task.delay(
|
||||
connector_id,
|
||||
search_space_id,
|
||||
str(user.id),
|
||||
folder_ids, # Pass as comma-separated string
|
||||
folder_names, # Pass as comma-separated string
|
||||
drive_items.model_dump(), # Convert to dict for JSON serialization
|
||||
)
|
||||
response_message = "Google Drive indexing started in the background."
|
||||
|
||||
|
|
@ -1554,45 +1552,63 @@ async def run_google_drive_indexing(
|
|||
connector_id: int,
|
||||
search_space_id: int,
|
||||
user_id: str,
|
||||
folder_ids: str, # Comma-separated folder IDs
|
||||
folder_names: str, # Comma-separated folder names
|
||||
items_dict: dict, # Dictionary with 'folders' and 'files' lists
|
||||
):
|
||||
"""Runs the Google Drive indexing task for multiple folders and updates the timestamp."""
|
||||
"""Runs the Google Drive indexing task for folders and files and updates the timestamp."""
|
||||
try:
|
||||
from app.tasks.connector_indexers.google_drive_indexer import (
|
||||
index_google_drive_files,
|
||||
index_google_drive_single_file,
|
||||
)
|
||||
|
||||
# Split comma-separated IDs and names into lists
|
||||
folder_id_list = [fid.strip() for fid in folder_ids.split(",")]
|
||||
folder_name_list = [fname.strip() for fname in folder_names.split(",")]
|
||||
|
||||
# Parse the structured data
|
||||
items = GoogleDriveIndexRequest(**items_dict)
|
||||
total_indexed = 0
|
||||
errors = []
|
||||
|
||||
# Index each folder
|
||||
for folder_id, folder_name in zip(
|
||||
folder_id_list, folder_name_list, strict=False
|
||||
):
|
||||
for folder in items.folders:
|
||||
try:
|
||||
indexed_count, error_message = await index_google_drive_files(
|
||||
session,
|
||||
connector_id,
|
||||
search_space_id,
|
||||
user_id,
|
||||
folder_id,
|
||||
folder_name,
|
||||
folder_id=folder.id,
|
||||
folder_name=folder.name,
|
||||
use_delta_sync=True,
|
||||
update_last_indexed=False,
|
||||
)
|
||||
if error_message:
|
||||
errors.append(f"{folder_name}: {error_message}")
|
||||
errors.append(f"Folder '{folder.name}': {error_message}")
|
||||
else:
|
||||
total_indexed += indexed_count
|
||||
except Exception as e:
|
||||
errors.append(f"{folder_name}: {e!s}")
|
||||
errors.append(f"Folder '{folder.name}': {e!s}")
|
||||
logger.error(
|
||||
f"Error indexing folder {folder_name} ({folder_id}): {e}",
|
||||
f"Error indexing folder {folder.name} ({folder.id}): {e}",
|
||||
exc_info=True,
|
||||
)
|
||||
|
||||
# Index each individual file
|
||||
for file in items.files:
|
||||
try:
|
||||
indexed_count, error_message = await index_google_drive_single_file(
|
||||
session,
|
||||
connector_id,
|
||||
search_space_id,
|
||||
user_id,
|
||||
file_id=file.id,
|
||||
file_name=file.name,
|
||||
)
|
||||
if error_message:
|
||||
errors.append(f"File '{file.name}': {error_message}")
|
||||
else:
|
||||
total_indexed += indexed_count
|
||||
except Exception as e:
|
||||
errors.append(f"File '{file.name}': {e!s}")
|
||||
logger.error(
|
||||
f"Error indexing file {file.name} ({file.id}): {e}",
|
||||
exc_info=True,
|
||||
)
|
||||
|
||||
|
|
@ -1602,7 +1618,7 @@ async def run_google_drive_indexing(
|
|||
)
|
||||
else:
|
||||
logger.info(
|
||||
f"Google Drive indexing successful for connector {connector_id}. Indexed {total_indexed} documents from {len(folder_id_list)} folder(s)."
|
||||
f"Google Drive indexing successful for connector {connector_id}. Indexed {total_indexed} documents from {len(items.folders)} folder(s) and {len(items.files)} file(s)."
|
||||
)
|
||||
# Update the last indexed timestamp only on full success
|
||||
await update_connector_last_indexed(session, connector_id)
|
||||
|
|
|
|||
|
|
@ -10,6 +10,7 @@ from .documents import (
|
|||
ExtensionDocumentMetadata,
|
||||
PaginatedResponse,
|
||||
)
|
||||
from .google_drive import DriveItem, GoogleDriveIndexRequest
|
||||
from .logs import LogBase, LogCreate, LogFilter, LogRead, LogUpdate
|
||||
from .new_chat import (
|
||||
ChatMessage,
|
||||
|
|
@ -79,6 +80,8 @@ __all__ = [
|
|||
"DefaultSystemInstructionsResponse",
|
||||
# Document schemas
|
||||
"DocumentBase",
|
||||
# Google Drive schemas
|
||||
"DriveItem",
|
||||
"DocumentRead",
|
||||
"DocumentUpdate",
|
||||
"DocumentWithChunksRead",
|
||||
|
|
@ -86,6 +89,7 @@ __all__ = [
|
|||
"ExtensionDocumentContent",
|
||||
"ExtensionDocumentMetadata",
|
||||
"GlobalNewLLMConfigRead",
|
||||
"GoogleDriveIndexRequest",
|
||||
# Base schemas
|
||||
"IDModel",
|
||||
# RBAC schemas
|
||||
|
|
|
|||
42
surfsense_backend/app/schemas/google_drive.py
Normal file
42
surfsense_backend/app/schemas/google_drive.py
Normal file
|
|
@ -0,0 +1,42 @@
|
|||
"""Schemas for Google Drive connector."""
|
||||
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
|
||||
class DriveItem(BaseModel):
|
||||
"""Represents a Google Drive file or folder."""
|
||||
|
||||
id: str = Field(..., description="Google Drive item ID")
|
||||
name: str = Field(..., description="Item display name")
|
||||
|
||||
|
||||
class GoogleDriveIndexRequest(BaseModel):
|
||||
"""Request body for indexing Google Drive content."""
|
||||
|
||||
folders: list[DriveItem] = Field(
|
||||
default_factory=list, description="List of folders to index"
|
||||
)
|
||||
files: list[DriveItem] = Field(
|
||||
default_factory=list, description="List of specific files to index"
|
||||
)
|
||||
|
||||
def has_items(self) -> bool:
|
||||
"""Check if any items are selected."""
|
||||
return len(self.folders) > 0 or len(self.files) > 0
|
||||
|
||||
def get_folder_ids(self) -> list[str]:
|
||||
"""Get list of folder IDs."""
|
||||
return [folder.id for folder in self.folders]
|
||||
|
||||
def get_folder_names(self) -> list[str]:
|
||||
"""Get list of folder names."""
|
||||
return [folder.name for folder in self.folders]
|
||||
|
||||
def get_file_ids(self) -> list[str]:
|
||||
"""Get list of file IDs."""
|
||||
return [file.id for file in self.files]
|
||||
|
||||
def get_file_names(self) -> list[str]:
|
||||
"""Get list of file names."""
|
||||
return [file.name for file in self.files]
|
||||
|
||||
|
|
@ -479,10 +479,9 @@ def index_google_drive_files_task(
|
|||
connector_id: int,
|
||||
search_space_id: int,
|
||||
user_id: str,
|
||||
folder_ids: str, # Comma-separated folder IDs
|
||||
folder_names: str, # Comma-separated folder names
|
||||
items_dict: dict, # Dictionary with 'folders' and 'files' lists
|
||||
):
|
||||
"""Celery task to index Google Drive files from multiple folders."""
|
||||
"""Celery task to index Google Drive folders and files."""
|
||||
import asyncio
|
||||
|
||||
loop = asyncio.new_event_loop()
|
||||
|
|
@ -494,8 +493,7 @@ def index_google_drive_files_task(
|
|||
connector_id,
|
||||
search_space_id,
|
||||
user_id,
|
||||
folder_ids,
|
||||
folder_names,
|
||||
items_dict,
|
||||
)
|
||||
)
|
||||
finally:
|
||||
|
|
@ -506,10 +504,9 @@ async def _index_google_drive_files(
|
|||
connector_id: int,
|
||||
search_space_id: int,
|
||||
user_id: str,
|
||||
folder_ids: str, # Comma-separated folder IDs
|
||||
folder_names: str, # Comma-separated folder names
|
||||
items_dict: dict, # Dictionary with 'folders' and 'files' lists
|
||||
):
|
||||
"""Index Google Drive files from multiple folders with new session."""
|
||||
"""Index Google Drive folders and files with new session."""
|
||||
from app.routes.search_source_connectors_routes import (
|
||||
run_google_drive_indexing,
|
||||
)
|
||||
|
|
@ -520,8 +517,7 @@ async def _index_google_drive_files(
|
|||
connector_id,
|
||||
search_space_id,
|
||||
user_id,
|
||||
folder_ids,
|
||||
folder_names,
|
||||
items_dict,
|
||||
)
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -10,6 +10,7 @@ from app.connectors.google_drive import (
|
|||
categorize_change,
|
||||
download_and_process_file,
|
||||
fetch_all_changes,
|
||||
get_file_by_id,
|
||||
get_files_in_folder,
|
||||
get_start_page_token,
|
||||
)
|
||||
|
|
@ -194,6 +195,131 @@ async def index_google_drive_files(
|
|||
return 0, f"Failed to index Google Drive files: {e!s}"
|
||||
|
||||
|
||||
async def index_google_drive_single_file(
|
||||
session: AsyncSession,
|
||||
connector_id: int,
|
||||
search_space_id: int,
|
||||
user_id: str,
|
||||
file_id: str,
|
||||
file_name: str | None = None,
|
||||
) -> tuple[int, str | None]:
|
||||
"""
|
||||
Index a single Google Drive file by its ID.
|
||||
|
||||
Args:
|
||||
session: Database session
|
||||
connector_id: ID of the Drive connector
|
||||
search_space_id: ID of the search space
|
||||
user_id: ID of the user
|
||||
file_id: Specific file ID to index
|
||||
file_name: File name for display (optional)
|
||||
|
||||
Returns:
|
||||
Tuple of (number_of_indexed_files, error_message)
|
||||
"""
|
||||
task_logger = TaskLoggingService(session, search_space_id)
|
||||
|
||||
log_entry = await task_logger.log_task_start(
|
||||
task_name="google_drive_single_file_indexing",
|
||||
source="connector_indexing_task",
|
||||
message=f"Starting Google Drive single file indexing for file {file_id}",
|
||||
metadata={
|
||||
"connector_id": connector_id,
|
||||
"user_id": str(user_id),
|
||||
"file_id": file_id,
|
||||
"file_name": file_name,
|
||||
},
|
||||
)
|
||||
|
||||
try:
|
||||
connector = await get_connector_by_id(
|
||||
session, connector_id, SearchSourceConnectorType.GOOGLE_DRIVE_CONNECTOR
|
||||
)
|
||||
|
||||
if not connector:
|
||||
error_msg = f"Google Drive connector with ID {connector_id} not found"
|
||||
await task_logger.log_task_failure(
|
||||
log_entry, error_msg, {"error_type": "ConnectorNotFound"}
|
||||
)
|
||||
return 0, error_msg
|
||||
|
||||
await task_logger.log_task_progress(
|
||||
log_entry,
|
||||
f"Initializing Google Drive client for connector {connector_id}",
|
||||
{"stage": "client_initialization"},
|
||||
)
|
||||
|
||||
drive_client = GoogleDriveClient(session, connector_id)
|
||||
|
||||
# Fetch the file metadata
|
||||
file, error = await get_file_by_id(drive_client, file_id)
|
||||
|
||||
if error or not file:
|
||||
error_msg = f"Failed to fetch file {file_id}: {error or 'File not found'}"
|
||||
await task_logger.log_task_failure(
|
||||
log_entry, error_msg, {"error_type": "FileNotFound"}
|
||||
)
|
||||
return 0, error_msg
|
||||
|
||||
display_name = file_name or file.get("name", "Unknown")
|
||||
logger.info(f"Indexing Google Drive file: {display_name} ({file_id})")
|
||||
|
||||
# Process the file
|
||||
indexed, skipped = await _process_single_file(
|
||||
drive_client=drive_client,
|
||||
session=session,
|
||||
file=file,
|
||||
connector_id=connector_id,
|
||||
search_space_id=search_space_id,
|
||||
user_id=user_id,
|
||||
task_logger=task_logger,
|
||||
log_entry=log_entry,
|
||||
)
|
||||
|
||||
await session.commit()
|
||||
logger.info("Successfully committed Google Drive file indexing changes to database")
|
||||
|
||||
if indexed > 0:
|
||||
await task_logger.log_task_success(
|
||||
log_entry,
|
||||
f"Successfully indexed file {display_name}",
|
||||
{
|
||||
"file_name": display_name,
|
||||
"file_id": file_id,
|
||||
},
|
||||
)
|
||||
logger.info(f"Google Drive file indexing completed: {display_name}")
|
||||
return 1, None
|
||||
else:
|
||||
await task_logger.log_task_progress(
|
||||
log_entry,
|
||||
f"File {display_name} was skipped",
|
||||
{"status": "skipped"},
|
||||
)
|
||||
return 0, None
|
||||
|
||||
except SQLAlchemyError as db_error:
|
||||
await session.rollback()
|
||||
await task_logger.log_task_failure(
|
||||
log_entry,
|
||||
f"Database error during file indexing",
|
||||
str(db_error),
|
||||
{"error_type": "SQLAlchemyError"},
|
||||
)
|
||||
logger.error(f"Database error: {db_error!s}", exc_info=True)
|
||||
return 0, f"Database error: {db_error!s}"
|
||||
except Exception as e:
|
||||
await session.rollback()
|
||||
await task_logger.log_task_failure(
|
||||
log_entry,
|
||||
f"Failed to index Google Drive file",
|
||||
str(e),
|
||||
{"error_type": type(e).__name__},
|
||||
)
|
||||
logger.error(f"Failed to index Google Drive file: {e!s}", exc_info=True)
|
||||
return 0, f"Failed to index Google Drive file: {e!s}"
|
||||
|
||||
|
||||
async def _index_full_scan(
|
||||
drive_client: GoogleDriveClient,
|
||||
session: AsyncSession,
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue