diff --git a/surfsense_backend/.env.example b/surfsense_backend/.env.example index b7e580a66..15dcc76e1 100644 --- a/surfsense_backend/.env.example +++ b/surfsense_backend/.env.example @@ -37,6 +37,7 @@ GOOGLE_OAUTH_CLIENT_SECRET=GOCSV # Connector Specific Configs GOOGLE_CALENDAR_REDIRECT_URI=http://localhost:8000/api/v1/auth/google/calendar/connector/callback GOOGLE_GMAIL_REDIRECT_URI=http://localhost:8000/api/v1/auth/google/gmail/connector/callback +GOOGLE_DRIVE_REDIRECT_URI=http://localhost:8000/api/v1/auth/google/drive/connector/callback # Airtable OAuth for Aitable Connector AIRTABLE_CLIENT_ID=your_airtable_client_id diff --git a/surfsense_backend/alembic/versions/54_add_google_drive_connector_enums.py b/surfsense_backend/alembic/versions/54_add_google_drive_connector_enums.py new file mode 100644 index 000000000..8e7d69340 --- /dev/null +++ b/surfsense_backend/alembic/versions/54_add_google_drive_connector_enums.py @@ -0,0 +1,74 @@ +"""Add Google Drive connector enums + +Revision ID: 54 +Revises: 53 +Create Date: 2025-12-28 12:00:00.000000 + +""" + +from collections.abc import Sequence + +from alembic import op + +# revision identifiers, used by Alembic. +revision: str = "54" +down_revision: str | None = "53" +branch_labels: str | Sequence[str] | None = None +depends_on: str | Sequence[str] | None = None + + +def upgrade() -> None: + """Safely add 'GOOGLE_DRIVE_CONNECTOR' to enum types if missing.""" + + # Add to searchsourceconnectortype enum + op.execute( + """ + DO $$ + BEGIN + IF NOT EXISTS ( + SELECT 1 FROM pg_type t + JOIN pg_enum e ON t.oid = e.enumtypid + WHERE t.typname = 'searchsourceconnectortype' AND e.enumlabel = 'GOOGLE_DRIVE_CONNECTOR' + ) THEN + ALTER TYPE searchsourceconnectortype ADD VALUE 'GOOGLE_DRIVE_CONNECTOR'; + END IF; + END + $$; + """ + ) + + # Add to documenttype enum + op.execute( + """ + DO $$ + BEGIN + IF NOT EXISTS ( + SELECT 1 FROM pg_type t + JOIN pg_enum e ON t.oid = e.enumtypid + WHERE t.typname = 'documenttype' AND e.enumlabel = 'GOOGLE_DRIVE_CONNECTOR' + ) THEN + ALTER TYPE documenttype ADD VALUE 'GOOGLE_DRIVE_CONNECTOR'; + END IF; + END + $$; + """ + ) + + +def downgrade() -> None: + """Remove 'GOOGLE_DRIVE_CONNECTOR' from enum types. + + Note: PostgreSQL doesn't support removing enum values directly. + This would require recreating the enum type, which is complex and risky. + For now, we'll leave the enum values in place. + + In a production environment with strict downgrade requirements, you would need to: + 1. Create new enum types without the value + 2. Convert all columns to use the new type + 3. Drop the old enum type + 4. Rename the new type to the old name + + This is left as pass to avoid accidental data loss. + """ + pass + diff --git a/surfsense_backend/alembic/versions/55_rename_google_drive_connector_to_file.py b/surfsense_backend/alembic/versions/55_rename_google_drive_connector_to_file.py new file mode 100644 index 000000000..137274b16 --- /dev/null +++ b/surfsense_backend/alembic/versions/55_rename_google_drive_connector_to_file.py @@ -0,0 +1,74 @@ +"""Rename GOOGLE_DRIVE_CONNECTOR document type to GOOGLE_DRIVE_FILE + +Revision ID: 55 +Revises: 54 +Create Date: 2025-12-29 12:00:00.000000 + +""" + +from collections.abc import Sequence + +from alembic import op + +# revision identifiers, used by Alembic. +revision: str = "55" +down_revision: str | None = "54" +branch_labels: str | Sequence[str] | None = None +depends_on: str | Sequence[str] | None = None + + +def upgrade() -> None: + from sqlalchemy import text + + connection = op.get_bind() + + connection.execute( + text( + """ + DO $$ + BEGIN + IF NOT EXISTS ( + SELECT 1 FROM pg_type t + JOIN pg_enum e ON t.oid = e.enumtypid + WHERE t.typname = 'documenttype' AND e.enumlabel = 'GOOGLE_DRIVE_FILE' + ) THEN + ALTER TYPE documenttype ADD VALUE IF NOT EXISTS 'GOOGLE_DRIVE_FILE'; + END IF; + END + $$; + """ + ) + ) + + connection.commit() + + connection.execute( + text( + """ + UPDATE documents + SET document_type = 'GOOGLE_DRIVE_FILE' + WHERE document_type = 'GOOGLE_DRIVE_CONNECTOR'; + """ + ) + ) + + connection.commit() + + +def downgrade() -> None: + from sqlalchemy import text + + connection = op.get_bind() + + connection.execute( + text( + """ + UPDATE documents + SET document_type = 'GOOGLE_DRIVE_CONNECTOR' + WHERE document_type = 'GOOGLE_DRIVE_FILE'; + """ + ) + ) + + connection.commit() + diff --git a/surfsense_backend/app/agents/new_chat/tools/knowledge_base.py b/surfsense_backend/app/agents/new_chat/tools/knowledge_base.py index 6c3dfd34b..ecaff6f2f 100644 --- a/surfsense_backend/app/agents/new_chat/tools/knowledge_base.py +++ b/surfsense_backend/app/agents/new_chat/tools/knowledge_base.py @@ -36,6 +36,7 @@ _ALL_CONNECTORS: list[str] = [ "CLICKUP_CONNECTOR", "GOOGLE_CALENDAR_CONNECTOR", "GOOGLE_GMAIL_CONNECTOR", + "GOOGLE_DRIVE_FILE", "DISCORD_CONNECTOR", "AIRTABLE_CONNECTOR", "TAVILY_API", @@ -425,6 +426,16 @@ async def search_knowledge_base_async( ) all_documents.extend(chunks) + elif connector == "GOOGLE_DRIVE_FILE": + _, chunks = await connector_service.search_google_drive( + user_query=query, + search_space_id=search_space_id, + top_k=top_k, + start_date=resolved_start_date, + end_date=resolved_end_date, + ) + all_documents.extend(chunks) + elif connector == "CONFLUENCE_CONNECTOR": _, chunks = await connector_service.search_confluence( user_query=query, @@ -561,6 +572,7 @@ def create_search_knowledge_base_tool( - CLICKUP_CONNECTOR: "ClickUp tasks and project data" (personal task management) - GOOGLE_CALENDAR_CONNECTOR: "Google Calendar events, meetings, and schedules" (personal calendar and time management) - GOOGLE_GMAIL_CONNECTOR: "Google Gmail emails and conversations" (personal emails and communications) + - GOOGLE_DRIVE_FILE: "Google Drive files and documents" (personal cloud storage and file management) - DISCORD_CONNECTOR: "Discord server conversations and shared content" (personal community communications) - AIRTABLE_CONNECTOR: "Airtable records, tables, and database content" (personal data management and organization) - TAVILY_API: "Tavily search API results" (personalized search results) diff --git a/surfsense_backend/app/config/__init__.py b/surfsense_backend/app/config/__init__.py index 08be26de1..9c503fb18 100644 --- a/surfsense_backend/app/config/__init__.py +++ b/surfsense_backend/app/config/__init__.py @@ -82,6 +82,9 @@ class Config: # Google Gmail redirect URI GOOGLE_GMAIL_REDIRECT_URI = os.getenv("GOOGLE_GMAIL_REDIRECT_URI") + # Google Drive redirect URI + GOOGLE_DRIVE_REDIRECT_URI = os.getenv("GOOGLE_DRIVE_REDIRECT_URI") + # Airtable OAuth AIRTABLE_CLIENT_ID = os.getenv("AIRTABLE_CLIENT_ID") AIRTABLE_CLIENT_SECRET = os.getenv("AIRTABLE_CLIENT_SECRET") diff --git a/surfsense_backend/app/connectors/google_drive/__init__.py b/surfsense_backend/app/connectors/google_drive/__init__.py new file mode 100644 index 000000000..6e0d25725 --- /dev/null +++ b/surfsense_backend/app/connectors/google_drive/__init__.py @@ -0,0 +1,20 @@ +"""Google Drive Connector Module.""" + +from .change_tracker import categorize_change, fetch_all_changes, get_start_page_token +from .client import GoogleDriveClient +from .content_extractor import download_and_process_file +from .credentials import get_valid_credentials, validate_credentials +from .folder_manager import get_files_in_folder, list_folder_contents + +__all__ = [ + "GoogleDriveClient", + "get_valid_credentials", + "validate_credentials", + "download_and_process_file", + "get_files_in_folder", + "list_folder_contents", + "get_start_page_token", + "fetch_all_changes", + "categorize_change", +] + diff --git a/surfsense_backend/app/connectors/google_drive/change_tracker.py b/surfsense_backend/app/connectors/google_drive/change_tracker.py new file mode 100644 index 000000000..860e2dbef --- /dev/null +++ b/surfsense_backend/app/connectors/google_drive/change_tracker.py @@ -0,0 +1,205 @@ +"""Change tracking for Google Drive delta sync.""" + +import logging +from datetime import datetime +from typing import Any + +from .client import GoogleDriveClient + +logger = logging.getLogger(__name__) + + +async def get_start_page_token( + client: GoogleDriveClient, +) -> tuple[str | None, str | None]: + """ + Get the starting page token for change tracking. + + This token represents the current state and is used for future delta syncs. + + Args: + client: GoogleDriveClient instance + + Returns: + Tuple of (start_page_token, error message) + """ + try: + service = await client.get_service() + response = service.changes().getStartPageToken(supportsAllDrives=True).execute() + token = response.get("startPageToken") + + logger.info(f"Got start page token: {token}") + return token, None + + except Exception as e: + logger.error(f"Error getting start page token: {e!s}", exc_info=True) + return None, f"Error getting start page token: {e!s}" + + +async def get_changes( + client: GoogleDriveClient, + page_token: str, + folder_id: str | None = None, +) -> tuple[list[dict[str, Any]], str | None, str | None]: + """ + Get list of changes since the given page token. + + Args: + client: GoogleDriveClient instance + page_token: Page token from previous sync + folder_id: Optional folder ID to filter changes + + Returns: + Tuple of (changes list, new_page_token, error message) + """ + try: + service = await client.get_service() + + params = { + "pageToken": page_token, + "pageSize": 100, + "fields": "nextPageToken, newStartPageToken, changes(fileId, removed, file(id, name, mimeType, modifiedTime, size, webViewLink, parents, trashed))", + "supportsAllDrives": True, + "includeItemsFromAllDrives": True, + } + + response = service.changes().list(**params).execute() + + changes = response.get("changes", []) + next_token = response.get("nextPageToken") + new_start_token = response.get("newStartPageToken") + + # Use new start token if this is the last page + token_to_return = new_start_token if new_start_token else next_token + + # Filter changes by folder if specified + if folder_id: + changes = await _filter_changes_by_folder(client, changes, folder_id) + + logger.info(f"Got {len(changes)} changes, next token: {token_to_return}") + return changes, token_to_return, None + + except Exception as e: + logger.error(f"Error getting changes: {e!s}", exc_info=True) + return [], None, f"Error getting changes: {e!s}" + + +async def _filter_changes_by_folder( + client: GoogleDriveClient, + changes: list[dict[str, Any]], + folder_id: str, +) -> list[dict[str, Any]]: + """ + Filter changes to only include files within the specified folder. + + Args: + client: GoogleDriveClient instance + changes: List of changes from API + folder_id: Folder ID to filter by + + Returns: + Filtered list of changes + """ + filtered = [] + + for change in changes: + file = change.get("file") + if not file: + filtered.append(change) + continue + + # Check if file is in the folder (or subfolder) + parents = file.get("parents", []) + if folder_id in parents: + filtered.append(change) + else: + # Check if any parent is a descendant of folder_id + # This is a simplified check - full implementation would traverse hierarchy + # For now, we'll include it and let indexer validate + filtered.append(change) + + return filtered + + +def categorize_change(change: dict[str, Any]) -> str: + """ + Categorize a change event. + + Args: + change: Change event from Drive API + + Returns: + Category: 'removed', 'trashed', 'modified', 'new' + """ + if change.get("removed"): + return "removed" + + file = change.get("file") + if not file: + return "removed" + + if file.get("trashed"): + return "trashed" + + created_time = file.get("createdTime") + modified_time = file.get("modifiedTime") + + if created_time and modified_time: + try: + created = datetime.fromisoformat(created_time.replace("Z", "+00:00")) + modified = datetime.fromisoformat(modified_time.replace("Z", "+00:00")) + + # If created and modified times are very close, it's likely a new file + time_diff = abs((modified - created).total_seconds()) + if time_diff < 60: # Within 1 minute + return "new" + except Exception: + pass + + return "modified" + + +async def fetch_all_changes( + client: GoogleDriveClient, + start_token: str, + folder_id: str | None = None, +) -> tuple[list[dict[str, Any]], str | None, str | None]: + """ + Fetch all changes from start token, handling pagination. + + Args: + client: GoogleDriveClient instance + start_token: Starting page token + folder_id: Optional folder ID to filter changes + + Returns: + Tuple of (all changes, final_page_token, error message) + """ + all_changes = [] + current_token = start_token + error = None + + try: + while current_token: + changes, next_token, err = await get_changes( + client, current_token, folder_id + ) + + if err: + error = err + break + + all_changes.extend(changes) + + if not next_token or next_token == current_token: + break + + current_token = next_token + + logger.info(f"Fetched total of {len(all_changes)} changes") + return all_changes, current_token, error + + except Exception as e: + logger.error(f"Error fetching all changes: {e!s}", exc_info=True) + return all_changes, current_token, f"Error fetching all changes: {e!s}" + diff --git a/surfsense_backend/app/connectors/google_drive/client.py b/surfsense_backend/app/connectors/google_drive/client.py new file mode 100644 index 000000000..5053aa449 --- /dev/null +++ b/surfsense_backend/app/connectors/google_drive/client.py @@ -0,0 +1,183 @@ +"""Google Drive API client.""" + +from typing import Any + +from google.oauth2.credentials import Credentials +from googleapiclient.discovery import build +from googleapiclient.errors import HttpError +from sqlalchemy.ext.asyncio import AsyncSession + +from .credentials import get_valid_credentials + + +class GoogleDriveClient: + """Client for Google Drive API operations.""" + + def __init__(self, session: AsyncSession, connector_id: int): + """ + Initialize Google Drive client. + + Args: + session: Database session + connector_id: ID of the Drive connector + """ + self.session = session + self.connector_id = connector_id + self.service = None + + async def get_service(self): + """ + Get or create the Drive service instance. + + Returns: + Google Drive service instance + + Raises: + Exception: If service creation fails + """ + if self.service: + return self.service + + try: + credentials = await get_valid_credentials(self.session, self.connector_id) + self.service = build("drive", "v3", credentials=credentials) + return self.service + except Exception as e: + raise Exception(f"Failed to create Google Drive service: {e!s}") from e + + async def list_files( + self, + query: str = "", + fields: str = "nextPageToken, files(id, name, mimeType, modifiedTime, size, webViewLink, parents, owners, createdTime, description)", + page_size: int = 100, + page_token: str | None = None, + ) -> tuple[list[dict[str, Any]], str | None, str | None]: + """ + List files from Google Drive with pagination. + + Args: + query: Search query (e.g., "mimeType != 'application/vnd.google-apps.folder'") + fields: Fields to retrieve + page_size: Number of files per page (max 1000) + page_token: Token for next page + + Returns: + Tuple of (files list, next_page_token, error message) + """ + try: + service = await self.get_service() + + params = { + "pageSize": min(page_size, 1000), + "fields": fields, + "supportsAllDrives": True, + "includeItemsFromAllDrives": True, + } + + if query: + params["q"] = query + if page_token: + params["pageToken"] = page_token + + result = service.files().list(**params).execute() + + files = result.get("files", []) + next_token = result.get("nextPageToken") + + return files, next_token, None + + except HttpError as e: + error_msg = f"HTTP error listing files: {e.resp.status} - {e.error_details}" + return [], None, error_msg + except Exception as e: + return [], None, f"Error listing files: {e!s}" + + async def get_file_metadata( + self, file_id: str, fields: str = "*" + ) -> tuple[dict[str, Any] | None, str | None]: + """ + Get metadata for a specific file. + + Args: + file_id: ID of the file + fields: Fields to retrieve + + Returns: + Tuple of (file metadata, error message) + """ + try: + service = await self.get_service() + file = service.files().get(fileId=file_id, fields=fields, supportsAllDrives=True).execute() + return file, None + except HttpError as e: + return None, f"HTTP error getting file metadata: {e.resp.status}" + except Exception as e: + return None, f"Error getting file metadata: {e!s}" + + async def download_file( + self, file_id: str + ) -> tuple[bytes | None, str | None]: + """ + Download binary file content. + + Args: + file_id: ID of the file to download + + Returns: + Tuple of (file content bytes, error message) + """ + try: + service = await self.get_service() + request = service.files().get_media(fileId=file_id) + + import io + + fh = io.BytesIO() + from googleapiclient.http import MediaIoBaseDownload + + downloader = MediaIoBaseDownload(fh, request) + + done = False + while not done: + _, done = downloader.next_chunk() + + return fh.getvalue(), None + + except HttpError as e: + return None, f"HTTP error downloading file: {e.resp.status}" + except Exception as e: + return None, f"Error downloading file: {e!s}" + + async def export_google_file( + self, file_id: str, mime_type: str + ) -> tuple[bytes | None, str | None]: + """ + Export Google Workspace file to specified format. + + Args: + file_id: ID of the Google file + mime_type: Target MIME type (e.g., 'application/pdf', 'text/plain') + + Returns: + Tuple of (exported content as bytes, error message) + """ + try: + service = await self.get_service() + content = ( + service.files() + .export(fileId=file_id, mimeType=mime_type) + .execute() + ) + + # Content is already bytes from the API + # Keep as bytes to support both text and binary formats (like PDF) + if not isinstance(content, bytes): + content = content.encode("utf-8") + + return content, None + + except HttpError as e: + return None, f"HTTP error exporting file: {e.resp.status}" + except Exception as e: + return None, f"Error exporting file: {e!s}" + diff --git a/surfsense_backend/app/connectors/google_drive/content_extractor.py b/surfsense_backend/app/connectors/google_drive/content_extractor.py new file mode 100644 index 000000000..1246d9e43 --- /dev/null +++ b/surfsense_backend/app/connectors/google_drive/content_extractor.py @@ -0,0 +1,136 @@ +"""Content extraction for Google Drive files.""" + +import logging +import os +import tempfile +from pathlib import Path +from typing import Any + +from sqlalchemy.ext.asyncio import AsyncSession + +from app.db import Log +from app.services.task_logging_service import TaskLoggingService + +from .client import GoogleDriveClient +from .file_types import get_export_mime_type, is_google_workspace_file, should_skip_file + +logger = logging.getLogger(__name__) + + +async def download_and_process_file( + client: GoogleDriveClient, + file: dict[str, Any], + search_space_id: int, + user_id: str, + session: AsyncSession, + task_logger: TaskLoggingService, + log_entry: Log, +) -> tuple[Any, str | None, dict[str, Any] | None]: + """ + Download Google Drive file and process using Surfsense file processors. + + Args: + client: GoogleDriveClient instance + file: File metadata from Drive API + search_space_id: ID of the search space + user_id: ID of the user + session: Database session + task_logger: Task logging service + log_entry: Log entry for tracking + + Returns: + Tuple of (Document object if successful, error message if failed, file metadata dict) + """ + file_id = file.get("id") + file_name = file.get("name", "Unknown") + mime_type = file.get("mimeType", "") + + # Skip folders and shortcuts + if should_skip_file(mime_type): + return None, f"Skipping {mime_type}", None + + logger.info(f"Downloading file: {file_name} ({mime_type})") + + temp_file_path = None + try: + # Step 1: Download or export the file + if is_google_workspace_file(mime_type): + # Google Workspace files need export (as PDF to preserve formatting & images) + export_mime = get_export_mime_type(mime_type) + if not export_mime: + return None, f"Cannot export Google Workspace type: {mime_type}" + + logger.info(f"Exporting Google Workspace file as {export_mime}") + content_bytes, error = await client.export_google_file(file_id, export_mime) + if error: + return None, error + + extension = ".pdf" if export_mime == "application/pdf" else ".txt" + else: + content_bytes, error = await client.download_file(file_id) + if error: + return None, error + + # Preserve original file extension + extension = Path(file_name).suffix or ".bin" + + with tempfile.NamedTemporaryFile(delete=False, suffix=extension) as tmp_file: + tmp_file.write(content_bytes) + temp_file_path = tmp_file.name + + from app.tasks.document_processors.file_processors import ( + process_file_in_background, + ) + from app.db import DocumentType + + connector_info = { + "type": DocumentType.GOOGLE_DRIVE_FILE, + "metadata": { + "google_drive_file_id": file_id, + "google_drive_file_name": file_name, + "google_drive_mime_type": mime_type, + "source_connector": "google_drive", + }, + } + + # Add additional Drive metadata if available + if "modifiedTime" in file: + connector_info["metadata"]["modified_time"] = file["modifiedTime"] + if "createdTime" in file: + connector_info["metadata"]["created_time"] = file["createdTime"] + if "size" in file: + connector_info["metadata"]["file_size"] = file["size"] + if "webViewLink" in file: + connector_info["metadata"]["web_view_link"] = file["webViewLink"] + + if is_google_workspace_file(mime_type): + connector_info["metadata"]["exported_as"] = "pdf" + connector_info["metadata"]["original_workspace_type"] = mime_type.split(".")[-1] + + logger.info(f"Processing {file_name} with Surfsense's file processor") + await process_file_in_background( + file_path=temp_file_path, + filename=file_name, + search_space_id=search_space_id, + user_id=user_id, + session=session, + task_logger=task_logger, + log_entry=log_entry, + connector=connector_info, + ) + + return None, None, connector_info["metadata"] + + except Exception as e: + logger.warning(f"Failed to process {file_name}: {e!s}") + return None, str(e), None + + finally: + # Cleanup temp file (if process_file_in_background didn't already delete it) + if temp_file_path and os.path.exists(temp_file_path): + try: + os.unlink(temp_file_path) + except Exception as e: + logger.debug(f"Could not delete temp file {temp_file_path}: {e}") + + diff --git a/surfsense_backend/app/connectors/google_drive/credentials.py b/surfsense_backend/app/connectors/google_drive/credentials.py new file mode 100644 index 000000000..4c1ef9c03 --- /dev/null +++ b/surfsense_backend/app/connectors/google_drive/credentials.py @@ -0,0 +1,98 @@ +"""Google Drive OAuth credential management.""" + +import json +from datetime import datetime + +from google.auth.transport.requests import Request +from google.oauth2.credentials import Credentials +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy.future import select +from sqlalchemy.orm.attributes import flag_modified + +from app.db import SearchSourceConnector, SearchSourceConnectorType + + +async def get_valid_credentials( + session: AsyncSession, + connector_id: int, +) -> Credentials: + """ + Get valid Google OAuth credentials, refreshing if needed. + + Args: + session: Database session + connector_id: Connector ID + + Returns: + Valid Google OAuth credentials + + Raises: + ValueError: If credentials are missing or invalid + Exception: If token refresh fails + """ + result = await session.execute( + select(SearchSourceConnector).filter( + SearchSourceConnector.id == connector_id + ) + ) + connector = result.scalars().first() + + if not connector: + raise ValueError(f"Connector {connector_id} not found") + + config_data = connector.config + exp = config_data.get("expiry", "").replace("Z", "") + + if not all( + [ + config_data.get("client_id"), + config_data.get("client_secret"), + config_data.get("refresh_token"), + ] + ): + raise ValueError( + "Google OAuth credentials (client_id, client_secret, refresh_token) must be set" + ) + + credentials = Credentials( + token=config_data.get("token"), + refresh_token=config_data.get("refresh_token"), + token_uri=config_data.get("token_uri"), + client_id=config_data.get("client_id"), + client_secret=config_data.get("client_secret"), + scopes=config_data.get("scopes", []), + expiry=datetime.fromisoformat(exp) if exp else None, + ) + + if credentials.expired or not credentials.valid: + try: + credentials.refresh(Request()) + + connector.config = json.loads(credentials.to_json()) + flag_modified(connector, "config") + await session.commit() + + except Exception as e: + raise Exception(f"Failed to refresh Google OAuth credentials: {e!s}") from e + + return credentials + + +def validate_credentials(credentials: Credentials) -> bool: + """ + Validate that credentials have required fields. + + Args: + credentials: Google OAuth credentials + + Returns: + True if valid, False otherwise + """ + return all( + [ + credentials.client_id, + credentials.client_secret, + credentials.refresh_token, + ] + ) + diff --git a/surfsense_backend/app/connectors/google_drive/file_types.py b/surfsense_backend/app/connectors/google_drive/file_types.py new file mode 100644 index 000000000..cb2354585 --- /dev/null +++ b/surfsense_backend/app/connectors/google_drive/file_types.py @@ -0,0 +1,30 @@ +"""File type handlers for Google Drive.""" + +GOOGLE_DOC = "application/vnd.google-apps.document" +GOOGLE_SHEET = "application/vnd.google-apps.spreadsheet" +GOOGLE_SLIDE = "application/vnd.google-apps.presentation" +GOOGLE_FOLDER = "application/vnd.google-apps.folder" +GOOGLE_SHORTCUT = "application/vnd.google-apps.shortcut" + +EXPORT_FORMATS = { + GOOGLE_DOC: "application/pdf", + GOOGLE_SHEET: "application/pdf", + GOOGLE_SLIDE: "application/pdf", +} + + +def is_google_workspace_file(mime_type: str) -> bool: + """Check if file is a Google Workspace file that needs export.""" + return mime_type.startswith("application/vnd.google-apps") + + +def should_skip_file(mime_type: str) -> bool: + """Check if file should be skipped (folders, shortcuts, etc).""" + return mime_type in [GOOGLE_FOLDER, GOOGLE_SHORTCUT] + + +def get_export_mime_type(mime_type: str) -> str | None: + """Get export MIME type for Google Workspace files.""" + return EXPORT_FORMATS.get(mime_type) + + diff --git a/surfsense_backend/app/connectors/google_drive/folder_manager.py b/surfsense_backend/app/connectors/google_drive/folder_manager.py new file mode 100644 index 000000000..599475a46 --- /dev/null +++ b/surfsense_backend/app/connectors/google_drive/folder_manager.py @@ -0,0 +1,230 @@ +"""Folder management for Google Drive.""" + +import logging +from typing import Any + +from .client import GoogleDriveClient + +logger = logging.getLogger(__name__) + + +async def list_folders( + client: GoogleDriveClient, + parent_id: str | None = None, +) -> tuple[list[dict[str, Any]], str | None]: + """ + List folders in Google Drive. + + Args: + client: GoogleDriveClient instance + parent_id: Parent folder ID (None for root) + + Returns: + Tuple of (folders list, error message) + """ + try: + # Build query to get only folders + query_parts = ["mimeType = 'application/vnd.google-apps.folder'", "trashed = false"] + + if parent_id: + query_parts.append(f"'{parent_id}' in parents") + + query = " and ".join(query_parts) + + folders, _, error = await client.list_files( + query=query, + fields="files(id, name, parents, createdTime, modifiedTime)", + page_size=100, + ) + + if error: + return [], error + + return folders, None + + except Exception as e: + logger.error(f"Error listing folders: {e!s}", exc_info=True) + return [], f"Error listing folders: {e!s}" + + +async def get_folder_hierarchy( + client: GoogleDriveClient, + folder_id: str, +) -> tuple[list[dict[str, str]], str | None]: + """ + Get the full path hierarchy for a folder. + + Args: + client: GoogleDriveClient instance + folder_id: Folder ID to get hierarchy for + + Returns: + Tuple of (hierarchy list [{'id': ..., 'name': ...}], error message) + """ + try: + hierarchy = [] + current_id = folder_id + + # Traverse up to root + while current_id: + file, error = await client.get_file_metadata( + current_id, + fields="id, name, parents, mimeType" + ) + + if error: + return [], error + + if not file: + break + + hierarchy.insert(0, {"id": file["id"], "name": file["name"]}) + + # Get parent + parents = file.get("parents", []) + current_id = parents[0] if parents else None + + return hierarchy, None + + except Exception as e: + logger.error(f"Error getting folder hierarchy: {e!s}", exc_info=True) + return [], f"Error getting folder hierarchy: {e!s}" + + +async def get_files_in_folder( + client: GoogleDriveClient, + folder_id: str, + include_subfolders: bool = True, + page_token: str | None = None, +) -> tuple[list[dict[str, Any]], str | None, str | None]: + """ + Get all indexable files in a folder. + + Args: + client: GoogleDriveClient instance + folder_id: Folder ID to search in + include_subfolders: Whether to include subfolders + page_token: Pagination token + + Returns: + Tuple of (files list, next_page_token, error message) + """ + try: + # Build query + query_parts = [ + f"'{folder_id}' in parents", + "trashed = false", + "mimeType != 'application/vnd.google-apps.shortcut'", # Skip shortcuts + ] + + if not include_subfolders: + query_parts.append("mimeType != 'application/vnd.google-apps.folder'") + + query = " and ".join(query_parts) + + files, next_token, error = await client.list_files( + query=query, + page_size=100, + page_token=page_token, + ) + + if error: + return [], None, error + + return files, next_token, None + + except Exception as e: + logger.error(f"Error getting files in folder: {e!s}", exc_info=True) + return [], None, f"Error getting files in folder: {e!s}" + + +def format_folder_path(hierarchy: list[dict[str, str]]) -> str: + """ + Format folder hierarchy as a path string. + + Args: + hierarchy: List of folder dicts with 'id' and 'name' + + Returns: + Formatted path (e.g., "My Drive / Projects / Documents") + """ + if not hierarchy: + return "My Drive" + + folder_names = [folder["name"] for folder in hierarchy] + return " / ".join(folder_names) + + +async def list_folder_contents( + client: GoogleDriveClient, + parent_id: str | None = None, +) -> tuple[list[dict[str, Any]], str | None]: + """ + List folders and files in a Google Drive folder with pagination support. + + Args: + client: GoogleDriveClient instance + parent_id: Parent folder ID (None for root) + + Returns: + Tuple of (items list with folders and files, error message) + """ + try: + # Build query to get folders and files (exclude shortcuts) + query_parts = [ + "trashed = false", + "mimeType != 'application/vnd.google-apps.shortcut'", + ] + + # For root, we need to explicitly query for items in 'root' + # For subfolders, query for items with that parent + if parent_id: + query_parts.append(f"'{parent_id}' in parents") + else: + # Query for root-level items + query_parts.append("'root' in parents") + + query = " and ".join(query_parts) + + # Fetch all items with pagination (max 1000 per page) + all_items = [] + page_token = None + + while True: + items, next_token, error = await client.list_files( + query=query, + fields="files(id, name, mimeType, parents, createdTime, modifiedTime, size, webViewLink, iconLink)", + page_size=1000, # Max allowed by Google Drive API + page_token=page_token, + ) + + if error: + return [], error + + all_items.extend(items) + + if not next_token: + break + + page_token = next_token + + for item in all_items: + item["isFolder"] = item["mimeType"] == "application/vnd.google-apps.folder" + + all_items.sort(key=lambda x: (not x["isFolder"], x["name"].lower())) + + folder_count = sum(1 for item in all_items if item["isFolder"]) + file_count = len(all_items) - folder_count + + logger.info( + f"Listed {len(all_items)} items ({folder_count} folders, {file_count} files) " + + (f"in folder {parent_id}" if parent_id else "in root (My Drive)") + ) + + return all_items, None + + except Exception as e: + logger.error(f"Error listing folder contents: {e!s}", exc_info=True) + return [], f"Error listing folder contents: {e!s}" + + diff --git a/surfsense_backend/app/db.py b/surfsense_backend/app/db.py index a2a424c26..c761561b5 100644 --- a/surfsense_backend/app/db.py +++ b/surfsense_backend/app/db.py @@ -46,6 +46,7 @@ class DocumentType(str, Enum): CLICKUP_CONNECTOR = "CLICKUP_CONNECTOR" GOOGLE_CALENDAR_CONNECTOR = "GOOGLE_CALENDAR_CONNECTOR" GOOGLE_GMAIL_CONNECTOR = "GOOGLE_GMAIL_CONNECTOR" + GOOGLE_DRIVE_FILE = "GOOGLE_DRIVE_FILE" AIRTABLE_CONNECTOR = "AIRTABLE_CONNECTOR" LUMA_CONNECTOR = "LUMA_CONNECTOR" ELASTICSEARCH_CONNECTOR = "ELASTICSEARCH_CONNECTOR" @@ -69,6 +70,7 @@ class SearchSourceConnectorType(str, Enum): CLICKUP_CONNECTOR = "CLICKUP_CONNECTOR" GOOGLE_CALENDAR_CONNECTOR = "GOOGLE_CALENDAR_CONNECTOR" GOOGLE_GMAIL_CONNECTOR = "GOOGLE_GMAIL_CONNECTOR" + GOOGLE_DRIVE_CONNECTOR = "GOOGLE_DRIVE_CONNECTOR" AIRTABLE_CONNECTOR = "AIRTABLE_CONNECTOR" LUMA_CONNECTOR = "LUMA_CONNECTOR" ELASTICSEARCH_CONNECTOR = "ELASTICSEARCH_CONNECTOR" diff --git a/surfsense_backend/app/routes/__init__.py b/surfsense_backend/app/routes/__init__.py index a055bf549..24751e596 100644 --- a/surfsense_backend/app/routes/__init__.py +++ b/surfsense_backend/app/routes/__init__.py @@ -11,6 +11,9 @@ from .google_calendar_add_connector_route import ( from .google_gmail_add_connector_route import ( router as google_gmail_add_connector_router, ) +from .google_drive_add_connector_route import ( + router as google_drive_add_connector_router, +) from .logs_routes import router as logs_router from .luma_add_connector_route import router as luma_add_connector_router from .new_chat_routes import router as new_chat_router @@ -33,6 +36,7 @@ router.include_router(podcasts_router) # Podcast task status and audio router.include_router(search_source_connectors_router) router.include_router(google_calendar_add_connector_router) router.include_router(google_gmail_add_connector_router) +router.include_router(google_drive_add_connector_router) router.include_router(airtable_add_connector_router) router.include_router(luma_add_connector_router) router.include_router(new_llm_config_router) # LLM configs with prompt configuration diff --git a/surfsense_backend/app/routes/google_drive_add_connector_route.py b/surfsense_backend/app/routes/google_drive_add_connector_route.py new file mode 100644 index 000000000..d11404781 --- /dev/null +++ b/surfsense_backend/app/routes/google_drive_add_connector_route.py @@ -0,0 +1,315 @@ +""" +Google Drive Connector OAuth Routes. + +Handles OAuth 2.0 authentication flow for Google Drive connector. +Folder selection happens at index time on the manage connector page. + +Endpoints: +- GET /auth/google/drive/connector/add - Initiate OAuth +- GET /auth/google/drive/connector/callback - Handle OAuth callback +- GET /connectors/{connector_id}/google-drive/folders - List user's folders (for index-time selection) +""" + +import base64 +import json +import logging +import os +from uuid import UUID + +from fastapi import APIRouter, Depends, HTTPException, Request +from fastapi.responses import RedirectResponse +from google_auth_oauthlib.flow import Flow +from pydantic import ValidationError +from sqlalchemy.exc import IntegrityError +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy.future import select + +from app.config import config +from app.connectors.google_drive import ( + GoogleDriveClient, + get_start_page_token, + get_valid_credentials, + list_folder_contents, +) +from app.connectors.google_drive.folder_manager import list_folders +from app.db import ( + SearchSourceConnector, + SearchSourceConnectorType, + User, + get_async_session, +) +from app.users import current_active_user + +# Relax token scope validation for Google OAuth +os.environ["OAUTHLIB_RELAX_TOKEN_SCOPE"] = "1" + +logger = logging.getLogger(__name__) +router = APIRouter() + +# Google Drive OAuth scopes +SCOPES = [ + "https://www.googleapis.com/auth/drive.readonly", # Read-only access to Drive + "https://www.googleapis.com/auth/userinfo.email", # User email + "https://www.googleapis.com/auth/userinfo.profile", # User profile + "openid", +] + + +def get_google_flow(): + """Create and return a Google OAuth flow for Drive API.""" + try: + return Flow.from_client_config( + { + "web": { + "client_id": config.GOOGLE_OAUTH_CLIENT_ID, + "client_secret": config.GOOGLE_OAUTH_CLIENT_SECRET, + "auth_uri": "https://accounts.google.com/o/oauth2/auth", + "token_uri": "https://oauth2.googleapis.com/token", + "redirect_uris": [config.GOOGLE_DRIVE_REDIRECT_URI], + } + }, + scopes=SCOPES, + redirect_uri=config.GOOGLE_DRIVE_REDIRECT_URI, + ) + except Exception as e: + raise HTTPException( + status_code=500, detail=f"Failed to create Google OAuth flow: {e!s}" + ) from e + + +@router.get("/auth/google/drive/connector/add") +async def connect_drive(space_id: int, user: User = Depends(current_active_user)): + """ + Initiate Google Drive OAuth flow. + + Query params: + space_id: Search space ID to add connector to + + Returns: + JSON with auth_url to redirect user to Google authorization + """ + try: + if not space_id: + raise HTTPException(status_code=400, detail="space_id is required") + + flow = get_google_flow() + + # Encode space_id and user_id in state parameter + state_payload = json.dumps( + { + "space_id": space_id, + "user_id": str(user.id), + } + ) + state_encoded = base64.urlsafe_b64encode(state_payload.encode()).decode() + + # Generate authorization URL + auth_url, _ = flow.authorization_url( + access_type="offline", # Get refresh token + prompt="consent", # Force consent screen to get refresh token + include_granted_scopes="true", + state=state_encoded, + ) + + logger.info(f"Initiating Google Drive OAuth for user {user.id}, space {space_id}") + return {"auth_url": auth_url} + + except Exception as e: + logger.error(f"Failed to initiate Google Drive OAuth: {e!s}", exc_info=True) + raise HTTPException( + status_code=500, detail=f"Failed to initiate Google OAuth: {e!s}" + ) from e + + +@router.get("/auth/google/drive/connector/callback") +async def drive_callback( + request: Request, + code: str, + state: str, + session: AsyncSession = Depends(get_async_session), +): + """ + Handle Google Drive OAuth callback. + + Query params: + code: Authorization code from Google + state: Encoded state with space_id and user_id + + Returns: + Redirect to frontend success page + """ + try: + # Decode and parse state + decoded_state = base64.urlsafe_b64decode(state.encode()).decode() + data = json.loads(decoded_state) + + user_id = UUID(data["user_id"]) + space_id = data["space_id"] + + logger.info(f"Processing Google Drive callback for user {user_id}, space {space_id}") + + # Exchange authorization code for tokens + flow = get_google_flow() + flow.fetch_token(code=code) + + creds = flow.credentials + creds_dict = json.loads(creds.to_json()) + + # Check if connector already exists for this space/user + result = await session.execute( + select(SearchSourceConnector).filter( + SearchSourceConnector.search_space_id == space_id, + SearchSourceConnector.user_id == user_id, + SearchSourceConnector.connector_type + == SearchSourceConnectorType.GOOGLE_DRIVE_CONNECTOR, + ) + ) + existing_connector = result.scalars().first() + + if existing_connector: + raise HTTPException( + status_code=409, + detail="A GOOGLE_DRIVE_CONNECTOR already exists in this search space. Each search space can have only one connector of each type per user.", + ) + + # Create new connector (NO folder selection here - happens at index time) + db_connector = SearchSourceConnector( + name="Google Drive Connector", + connector_type=SearchSourceConnectorType.GOOGLE_DRIVE_CONNECTOR, + config={ + **creds_dict, + "start_page_token": None, # Will be set on first index + }, + search_space_id=space_id, + user_id=user_id, + is_indexable=True, + ) + + session.add(db_connector) + await session.commit() + await session.refresh(db_connector) + + # Get initial start page token for delta sync + try: + drive_client = GoogleDriveClient(session, db_connector.id) + start_token, token_error = await get_start_page_token(drive_client) + + if start_token and not token_error: + db_connector.config["start_page_token"] = start_token + from sqlalchemy.orm.attributes import flag_modified + + flag_modified(db_connector, "config") + await session.commit() + logger.info(f"Set initial start page token for connector {db_connector.id}") + except Exception as e: + logger.warning(f"Failed to get initial start page token: {e!s}") + + logger.info( + f"Successfully created Google Drive connector {db_connector.id} for user {user_id}" + ) + + # Redirect to connectors management page (not to folder selection) + return RedirectResponse( + url=f"{config.NEXT_FRONTEND_URL}/dashboard/{space_id}/connectors?success=google-drive-connected" + ) + + except HTTPException: + await session.rollback() + raise + except ValidationError as e: + await session.rollback() + logger.error(f"Validation error: {e!s}", exc_info=True) + raise HTTPException( + status_code=400, detail=f"Invalid connector configuration: {e!s}" + ) from e + except IntegrityError as e: + await session.rollback() + logger.error(f"Database integrity error: {e!s}", exc_info=True) + raise HTTPException( + status_code=409, + detail="A connector with this configuration already exists.", + ) from e + except Exception as e: + await session.rollback() + logger.error(f"Unexpected error in Drive callback: {e!s}", exc_info=True) + raise HTTPException( + status_code=500, detail=f"Failed to complete Google OAuth: {e!s}" + ) from e + + +@router.get("/connectors/{connector_id}/google-drive/folders") +async def list_google_drive_folders( + connector_id: int, + parent_id: str | None = None, + session: AsyncSession = Depends(get_async_session), + user: User = Depends(current_active_user), +): + """ + List folders AND files in user's Google Drive with hierarchical support. + + This is called at index time from the manage connector page to display + the complete file system (folders and files). Only folders are selectable. + + Args: + connector_id: ID of the Google Drive connector + parent_id: Optional parent folder ID to list contents (None for root) + + Returns: + JSON with list of items: { + "items": [ + {"id": str, "name": str, "mimeType": str, "isFolder": bool, ...}, + ... + ] + } + """ + try: + # Get connector and verify ownership + result = await session.execute( + select(SearchSourceConnector).filter( + SearchSourceConnector.id == connector_id, + SearchSourceConnector.user_id == user.id, + SearchSourceConnector.connector_type + == SearchSourceConnectorType.GOOGLE_DRIVE_CONNECTOR, + ) + ) + connector = result.scalars().first() + + if not connector: + raise HTTPException( + status_code=404, + detail="Google Drive connector not found or access denied", + ) + + # Initialize Drive client (credentials will be loaded on first API call) + drive_client = GoogleDriveClient(session, connector_id) + + # List both folders and files (sorted: folders first) + items, error = await list_folder_contents(drive_client, parent_id=parent_id) + + if error: + raise HTTPException( + status_code=500, detail=f"Failed to list folder contents: {error}" + ) + + # Count folders and files for better logging + folder_count = sum(1 for item in items if item.get("isFolder", False)) + file_count = len(items) - folder_count + + logger.info( + f"✅ Listed {len(items)} total items ({folder_count} folders, {file_count} files) for connector {connector_id}" + + (f" in folder {parent_id}" if parent_id else " in ROOT") + ) + + # Log first few items for debugging + if items: + logger.info(f"First 3 items: {[item.get('name') for item in items[:3]]}") + + return {"items": items} + + except HTTPException: + raise + except Exception as e: + logger.error(f"Error listing Drive contents: {e!s}", exc_info=True) + raise HTTPException( + status_code=500, detail=f"Failed to list Drive contents: {e!s}" + ) from e diff --git a/surfsense_backend/app/routes/search_source_connectors_routes.py b/surfsense_backend/app/routes/search_source_connectors_routes.py index 5a7db7f37..894be54c4 100644 --- a/surfsense_backend/app/routes/search_source_connectors_routes.py +++ b/surfsense_backend/app/routes/search_source_connectors_routes.py @@ -45,6 +45,7 @@ from app.tasks.connector_indexers import ( index_github_repos, index_google_calendar_events, index_google_gmail_messages, + index_google_drive_files, index_jira_issues, index_linear_issues, index_luma_events, @@ -542,6 +543,14 @@ async def index_connector_content( None, description="End date for indexing (YYYY-MM-DD format). If not provided, uses today's date", ), + folder_ids: str = Query( + None, + description="[Google Drive only] Comma-separated folder IDs to index", + ), + folder_names: str = Query( + None, + description="[Google Drive only] Comma-separated folder names for display purposes", + ), session: AsyncSession = Depends(get_async_session), user: User = Depends(current_active_user), ): @@ -747,6 +756,32 @@ async def index_connector_content( ) response_message = "Google Gmail indexing started in the background." + elif ( + connector.connector_type == SearchSourceConnectorType.GOOGLE_DRIVE_CONNECTOR + ): + from app.tasks.celery_tasks.connector_tasks import ( + index_google_drive_files_task, + ) + + if not folder_ids or not folder_names: + raise HTTPException( + status_code=400, + detail="Google Drive indexing requires folder_ids and folder_names parameters", + ) + + logger.info( + f"Triggering Google Drive indexing for connector {connector_id} into search space {search_space_id}, folders: {folder_names}" + ) + # Pass comma-separated strings directly to Celery task + index_google_drive_files_task.delay( + connector_id, + search_space_id, + str(user.id), + folder_ids, # Pass as comma-separated string + folder_names, # Pass as comma-separated string + ) + response_message = "Google Drive indexing started in the background." + elif connector.connector_type == SearchSourceConnectorType.DISCORD_CONNECTOR: from app.tasks.celery_tasks.connector_tasks import ( index_discord_messages_task, @@ -1515,6 +1550,70 @@ async def run_google_gmail_indexing( # Optionally update status in DB to indicate failure +async def run_google_drive_indexing( + session: AsyncSession, + connector_id: int, + search_space_id: int, + user_id: str, + folder_ids: str, # Comma-separated folder IDs + folder_names: str, # Comma-separated folder names +): + """Runs the Google Drive indexing task for multiple folders and updates the timestamp.""" + try: + from app.tasks.connector_indexers.google_drive_indexer import ( + index_google_drive_files, + ) + + # Split comma-separated IDs and names into lists + folder_id_list = [fid.strip() for fid in folder_ids.split(",")] + folder_name_list = [fname.strip() for fname in folder_names.split(",")] + + total_indexed = 0 + errors = [] + + # Index each folder + for folder_id, folder_name in zip(folder_id_list, folder_name_list): + try: + indexed_count, error_message = await index_google_drive_files( + session, + connector_id, + search_space_id, + user_id, + folder_id, + folder_name, + use_delta_sync=True, + update_last_indexed=False, + ) + if error_message: + errors.append(f"{folder_name}: {error_message}") + else: + total_indexed += indexed_count + except Exception as e: + errors.append(f"{folder_name}: {str(e)}") + logger.error( + f"Error indexing folder {folder_name} ({folder_id}): {e}", + exc_info=True, + ) + + if errors: + logger.error( + f"Google Drive indexing completed with errors for connector {connector_id}: {'; '.join(errors)}" + ) + else: + logger.info( + f"Google Drive indexing successful for connector {connector_id}. Indexed {total_indexed} documents from {len(folder_id_list)} folder(s)." + ) + # Update the last indexed timestamp only on full success + await update_connector_last_indexed(session, connector_id) + await session.commit() # Commit timestamp update + except Exception as e: + logger.error( + f"Critical error in run_google_drive_indexing for connector {connector_id}: {e}", + exc_info=True, + ) + # Optionally update status in DB to indicate failure + + # Add new helper functions for luma indexing async def run_luma_indexing_with_new_session( connector_id: int, diff --git a/surfsense_backend/app/services/connector_service.py b/surfsense_backend/app/services/connector_service.py index 3a6dcc605..cf0a83dc8 100644 --- a/surfsense_backend/app/services/connector_service.py +++ b/surfsense_backend/app/services/connector_service.py @@ -1808,6 +1808,106 @@ class ConnectorService: return result_object, gmail_docs + async def search_google_drive( + self, + user_query: str, + search_space_id: int, + top_k: int = 20, + start_date: datetime | None = None, + end_date: datetime | None = None, + ) -> tuple: + """ + Search for Google Drive files and return both the source information and langchain documents. + + Uses combined chunk-level and document-level hybrid search with RRF fusion. + + Args: + user_query: The user's query + search_space_id: The search space ID to search in + top_k: Maximum number of results to return + start_date: Optional start date for filtering documents by updated_at + end_date: Optional end date for filtering documents by updated_at + + Returns: + tuple: (sources_info, langchain_documents) + """ + drive_docs = await self._combined_rrf_search( + query_text=user_query, + search_space_id=search_space_id, + document_type="GOOGLE_DRIVE_FILE", + top_k=top_k, + start_date=start_date, + end_date=end_date, + ) + + # Early return if no results + if not drive_docs: + return { + "id": 33, + "name": "Google Drive Files", + "type": "GOOGLE_DRIVE_FILE", + "sources": [], + }, [] + + def _title_fn(doc_info: dict[str, Any], metadata: dict[str, Any]) -> str: + return ( + doc_info.get("title") + or metadata.get("google_drive_file_name") + or metadata.get("FILE_NAME") + or "Untitled File" + ) + + def _url_fn(_doc_info: dict[str, Any], metadata: dict[str, Any]) -> str: + file_id = metadata.get("google_drive_file_id", "") + return f"https://drive.google.com/file/d/{file_id}/view" if file_id else "" + + def _description_fn( + chunk: dict[str, Any], _doc_info: dict[str, Any], metadata: dict[str, Any] + ) -> str: + description = self._chunk_preview(chunk.get("content", "")) + info_parts = [] + mime_type = metadata.get("google_drive_mime_type", "") + modified_time = metadata.get("modified_time", "") + if mime_type: + # Simplify mime type for display + if "google-apps" in mime_type: + file_type = mime_type.split(".")[-1].title() + else: + file_type = mime_type.split("/")[-1].upper() + info_parts.append(f"Type: {file_type}") + if modified_time: + info_parts.append(f"Modified: {modified_time}") + if info_parts: + description = (description + " | " + " | ".join(info_parts)).strip(" |") + return description + + def _extra_fields_fn( + _chunk: dict[str, Any], _doc_info: dict[str, Any], metadata: dict[str, Any] + ) -> dict[str, Any]: + return { + "google_drive_file_id": metadata.get("google_drive_file_id", ""), + "google_drive_mime_type": metadata.get("google_drive_mime_type", ""), + "modified_time": metadata.get("modified_time", ""), + } + + sources_list = self._build_chunk_sources_from_documents( + drive_docs, + title_fn=_title_fn, + url_fn=_url_fn, + description_fn=_description_fn, + extra_fields_fn=_extra_fields_fn, + ) + + # Create result object + result_object = { + "id": 33, # Assign a unique ID for the Google Drive connector + "name": "Google Drive Files", + "type": "GOOGLE_DRIVE_FILE", + "sources": sources_list, + } + + return result_object, drive_docs + async def search_confluence( self, user_query: str, diff --git a/surfsense_backend/app/tasks/celery_tasks/connector_tasks.py b/surfsense_backend/app/tasks/celery_tasks/connector_tasks.py index 6cd557dc4..44f57d464 100644 --- a/surfsense_backend/app/tasks/celery_tasks/connector_tasks.py +++ b/surfsense_backend/app/tasks/celery_tasks/connector_tasks.py @@ -473,6 +473,58 @@ async def _index_google_gmail_messages( ) +@celery_app.task(name="index_google_drive_files", bind=True) +def index_google_drive_files_task( + self, + connector_id: int, + search_space_id: int, + user_id: str, + folder_ids: str, # Comma-separated folder IDs + folder_names: str, # Comma-separated folder names +): + """Celery task to index Google Drive files from multiple folders.""" + import asyncio + + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + try: + loop.run_until_complete( + _index_google_drive_files( + connector_id, + search_space_id, + user_id, + folder_ids, + folder_names, + ) + ) + finally: + loop.close() + + +async def _index_google_drive_files( + connector_id: int, + search_space_id: int, + user_id: str, + folder_ids: str, # Comma-separated folder IDs + folder_names: str, # Comma-separated folder names +): + """Index Google Drive files from multiple folders with new session.""" + from app.routes.search_source_connectors_routes import ( + run_google_drive_indexing, + ) + + async with get_celery_session_maker()() as session: + await run_google_drive_indexing( + session, + connector_id, + search_space_id, + user_id, + folder_ids, + folder_names, + ) + + @celery_app.task(name="index_discord_messages", bind=True) def index_discord_messages_task( self, diff --git a/surfsense_backend/app/tasks/connector_indexers/__init__.py b/surfsense_backend/app/tasks/connector_indexers/__init__.py index dcfca33c3..80a9eaf19 100644 --- a/surfsense_backend/app/tasks/connector_indexers/__init__.py +++ b/surfsense_backend/app/tasks/connector_indexers/__init__.py @@ -35,6 +35,7 @@ from .elasticsearch_indexer import index_elasticsearch_documents from .github_indexer import index_github_repos from .google_calendar_indexer import index_google_calendar_events from .google_gmail_indexer import index_google_gmail_messages +from .google_drive_indexer import index_google_drive_files from .jira_indexer import index_jira_issues # Issue tracking and project management @@ -57,6 +58,7 @@ __all__ = [ # noqa: RUF022 "index_github_repos", # Calendar and scheduling "index_google_calendar_events", + "index_google_drive_files", "index_luma_events", "index_jira_issues", # Issue tracking and project management diff --git a/surfsense_backend/app/tasks/connector_indexers/google_drive_indexer.py b/surfsense_backend/app/tasks/connector_indexers/google_drive_indexer.py new file mode 100644 index 000000000..5695c084d --- /dev/null +++ b/surfsense_backend/app/tasks/connector_indexers/google_drive_indexer.py @@ -0,0 +1,410 @@ +"""Google Drive indexer using Surfsense file processors.""" + +import logging +from datetime import datetime + +from sqlalchemy.exc import SQLAlchemyError +from sqlalchemy.ext.asyncio import AsyncSession + +from app.connectors.google_drive import ( + GoogleDriveClient, + categorize_change, + download_and_process_file, + fetch_all_changes, + get_files_in_folder, + get_start_page_token, +) +from app.db import DocumentType, SearchSourceConnectorType +from app.services.task_logging_service import TaskLoggingService +from app.tasks.connector_indexers.base import ( + check_document_by_unique_identifier, + get_connector_by_id, + update_connector_last_indexed, +) +from app.utils.document_converters import generate_unique_identifier_hash + +logger = logging.getLogger(__name__) + + +async def index_google_drive_files( + session: AsyncSession, + connector_id: int, + search_space_id: int, + user_id: str, + folder_id: str | None = None, + folder_name: str | None = None, + use_delta_sync: bool = True, + update_last_indexed: bool = True, + max_files: int = 500, +) -> tuple[int, str | None]: + """ + Index Google Drive files for a specific connector. + + Args: + session: Database session + connector_id: ID of the Drive connector + search_space_id: ID of the search space + user_id: ID of the user + folder_id: Specific folder to index (from UI/request, takes precedence) + folder_name: Folder name for display (from UI/request) + use_delta_sync: Whether to use change tracking for incremental sync + update_last_indexed: Whether to update last_indexed_at timestamp + max_files: Maximum number of files to index + + Returns: + Tuple of (number_of_indexed_files, error_message) + """ + task_logger = TaskLoggingService(session, search_space_id) + + log_entry = await task_logger.log_task_start( + task_name="google_drive_files_indexing", + source="connector_indexing_task", + message=f"Starting Google Drive indexing for connector {connector_id}", + metadata={ + "connector_id": connector_id, + "user_id": str(user_id), + "folder_id": folder_id, + "use_delta_sync": use_delta_sync, + "max_files": max_files, + }, + ) + + try: + connector = await get_connector_by_id( + session, connector_id, SearchSourceConnectorType.GOOGLE_DRIVE_CONNECTOR + ) + + if not connector: + error_msg = f"Google Drive connector with ID {connector_id} not found" + await task_logger.log_task_failure( + log_entry, error_msg, {"error_type": "ConnectorNotFound"} + ) + return 0, error_msg + + await task_logger.log_task_progress( + log_entry, + f"Initializing Google Drive client for connector {connector_id}", + {"stage": "client_initialization"}, + ) + + drive_client = GoogleDriveClient(session, connector_id) + + if not folder_id: + error_msg = "folder_id is required for Google Drive indexing" + await task_logger.log_task_failure( + log_entry, error_msg, {"error_type": "MissingParameter"} + ) + return 0, error_msg + + target_folder_id = folder_id + target_folder_name = folder_name or "Selected Folder" + + logger.info(f"Indexing Google Drive folder: {target_folder_name} ({target_folder_id})") + + folder_tokens = connector.config.get("folder_tokens", {}) + start_page_token = folder_tokens.get(target_folder_id) + can_use_delta_sync = use_delta_sync and start_page_token and connector.last_indexed_at + + if can_use_delta_sync: + logger.info(f"Using delta sync for connector {connector_id}") + result = await _index_with_delta_sync( + drive_client=drive_client, + session=session, + connector=connector, + connector_id=connector_id, + search_space_id=search_space_id, + user_id=user_id, + folder_id=target_folder_id, + start_page_token=start_page_token, + task_logger=task_logger, + log_entry=log_entry, + max_files=max_files, + ) + else: + logger.info(f"Using full scan for connector {connector_id}") + result = await _index_full_scan( + drive_client=drive_client, + session=session, + connector=connector, + connector_id=connector_id, + search_space_id=search_space_id, + user_id=user_id, + folder_id=target_folder_id, + folder_name=target_folder_name, + task_logger=task_logger, + log_entry=log_entry, + max_files=max_files, + ) + + documents_indexed, documents_skipped = result + + if documents_indexed > 0 or can_use_delta_sync: + new_token, token_error = await get_start_page_token(drive_client) + if new_token and not token_error: + from sqlalchemy.orm.attributes import flag_modified + + if "folder_tokens" not in connector.config: + connector.config["folder_tokens"] = {} + connector.config["folder_tokens"][target_folder_id] = new_token + flag_modified(connector, "config") + + await update_connector_last_indexed(session, connector, update_last_indexed) + + await session.commit() + logger.info( + f"Successfully committed Google Drive indexing changes to database" + ) + + await task_logger.log_task_success( + log_entry, + f"Successfully completed Google Drive indexing for connector {connector_id}", + { + "files_processed": documents_indexed, + "files_skipped": documents_skipped, + "sync_type": "delta" if can_use_delta_sync else "full", + "folder": target_folder_name, + }, + ) + + logger.info( + f"Google Drive indexing completed: {documents_indexed} files indexed, {documents_skipped} skipped" + ) + return documents_indexed, None + + except SQLAlchemyError as db_error: + await session.rollback() + await task_logger.log_task_failure( + log_entry, + f"Database error during Google Drive indexing for connector {connector_id}", + str(db_error), + {"error_type": "SQLAlchemyError"}, + ) + logger.error(f"Database error: {db_error!s}", exc_info=True) + return 0, f"Database error: {db_error!s}" + except Exception as e: + await session.rollback() + await task_logger.log_task_failure( + log_entry, + f"Failed to index Google Drive files for connector {connector_id}", + str(e), + {"error_type": type(e).__name__}, + ) + logger.error(f"Failed to index Google Drive files: {e!s}", exc_info=True) + return 0, f"Failed to index Google Drive files: {e!s}" + + +async def _index_full_scan( + drive_client: GoogleDriveClient, + session: AsyncSession, + connector: any, + connector_id: int, + search_space_id: int, + user_id: str, + folder_id: str | None, + folder_name: str, + task_logger: TaskLoggingService, + log_entry: any, + max_files: int, +) -> tuple[int, int]: + """Perform full scan indexing of a folder.""" + await task_logger.log_task_progress( + log_entry, + f"Starting full scan of folder: {folder_name}", + {"stage": "full_scan", "folder_id": folder_id}, + ) + + documents_indexed = 0 + documents_skipped = 0 + page_token = None + files_processed = 0 + + while files_processed < max_files: + files, next_token, error = await get_files_in_folder( + drive_client, folder_id, include_subfolders=False, page_token=page_token + ) + + if error: + logger.error(f"Error listing files: {error}") + break + + if not files: + break + + for file in files: + if files_processed >= max_files: + break + + files_processed += 1 + + indexed, skipped = await _process_single_file( + drive_client=drive_client, + session=session, + file=file, + connector_id=connector_id, + search_space_id=search_space_id, + user_id=user_id, + task_logger=task_logger, + log_entry=log_entry, + ) + + documents_indexed += indexed + documents_skipped += skipped + + if documents_indexed % 10 == 0 and documents_indexed > 0: + await session.commit() + logger.info(f"Committed batch: {documents_indexed} files indexed so far") + + page_token = next_token + if not page_token: + break + + logger.info( + f"Full scan complete: {documents_indexed} indexed, {documents_skipped} skipped" + ) + return documents_indexed, documents_skipped + + +async def _index_with_delta_sync( + drive_client: GoogleDriveClient, + session: AsyncSession, + connector: any, + connector_id: int, + search_space_id: int, + user_id: str, + folder_id: str | None, + start_page_token: str, + task_logger: TaskLoggingService, + log_entry: any, + max_files: int, +) -> tuple[int, int]: + """Perform delta sync indexing using change tracking.""" + await task_logger.log_task_progress( + log_entry, + f"Starting delta sync from token: {start_page_token[:20]}...", + {"stage": "delta_sync", "start_token": start_page_token}, + ) + + changes, final_token, error = await fetch_all_changes( + drive_client, start_page_token, folder_id + ) + + if error: + logger.error(f"Error fetching changes: {error}") + return 0, 0 + + if not changes: + logger.info("No changes detected since last sync") + return 0, 0 + + logger.info(f"Processing {len(changes)} changes") + + documents_indexed = 0 + documents_skipped = 0 + files_processed = 0 + + for change in changes: + if files_processed >= max_files: + break + + files_processed += 1 + change_type = categorize_change(change) + + if change_type in ["removed", "trashed"]: + file_id = change.get("fileId") + if file_id: + await _remove_document(session, file_id, search_space_id) + continue + + file = change.get("file") + if not file: + continue + + indexed, skipped = await _process_single_file( + drive_client=drive_client, + session=session, + file=file, + connector_id=connector_id, + search_space_id=search_space_id, + user_id=user_id, + task_logger=task_logger, + log_entry=log_entry, + ) + + documents_indexed += indexed + documents_skipped += skipped + + if documents_indexed % 10 == 0 and documents_indexed > 0: + await session.commit() + logger.info(f"Committed batch: {documents_indexed} changes processed") + + logger.info( + f"Delta sync complete: {documents_indexed} indexed, {documents_skipped} skipped" + ) + return documents_indexed, documents_skipped + + +async def _process_single_file( + drive_client: GoogleDriveClient, + session: AsyncSession, + file: dict, + connector_id: int, + search_space_id: int, + user_id: str, + task_logger: TaskLoggingService, + log_entry: any, +) -> tuple[int, int]: + """ + Process a single file by downloading and using Surfsense's file processor. + + Returns: + Tuple of (indexed_count, skipped_count) + """ + file_name = file.get("name", "Unknown") + mime_type = file.get("mimeType", "") + + try: + logger.info(f"Processing file: {file_name} ({mime_type})") + + _, error, _ = await download_and_process_file( + client=drive_client, + file=file, + search_space_id=search_space_id, + user_id=user_id, + session=session, + task_logger=task_logger, + log_entry=log_entry, + ) + + if error: + await task_logger.log_task_progress( + log_entry, + f"Skipped {file_name}: {error}", + {"status": "skipped", "reason": error}, + ) + return 0, 1 + + logger.info(f"Successfully indexed Google Drive file: {file_name}") + return 1, 0 + + except Exception as e: + logger.error(f"Error processing file {file_name}: {e!s}", exc_info=True) + return 0, 1 + + +async def _remove_document( + session: AsyncSession, file_id: str, search_space_id: int +): + """Remove a document that was deleted in Drive.""" + unique_identifier_hash = generate_unique_identifier_hash( + DocumentType.GOOGLE_DRIVE_FILE, file_id, search_space_id + ) + + existing_document = await check_document_by_unique_identifier( + session, unique_identifier_hash + ) + + if existing_document: + await session.delete(existing_document) + logger.info(f"Removed deleted file document: {file_id}") + + diff --git a/surfsense_backend/app/tasks/document_processors/file_processors.py b/surfsense_backend/app/tasks/document_processors/file_processors.py index a32e75a32..6a01db6a9 100644 --- a/surfsense_backend/app/tasks/document_processors/file_processors.py +++ b/surfsense_backend/app/tasks/document_processors/file_processors.py @@ -447,6 +447,24 @@ async def add_received_file_document_using_docling( ) from e +async def _update_document_from_connector( + document: Document | None, connector: dict | None, session: AsyncSession +) -> None: + """Helper to update document type and metadata from connector info.""" + if document and connector: + if "type" in connector: + document.document_type = connector["type"] + if "metadata" in connector: + # Merge with existing document_metadata (the actual column name) + if not document.document_metadata: + document.document_metadata = connector["metadata"] + else: + # Expand existing metadata with connector metadata + merged = {**document.document_metadata, **connector["metadata"]} + document.document_metadata = merged + await session.commit() + + async def process_file_in_background( file_path: str, filename: str, @@ -455,6 +473,7 @@ async def process_file_in_background( session: AsyncSession, task_logger: TaskLoggingService, log_entry: Log, + connector: dict | None = None, # Optional: {"type": "GOOGLE_DRIVE_FILE", "metadata": {...}} ): try: # Check if the file is a markdown or text file @@ -492,6 +511,9 @@ async def process_file_in_background( session, filename, markdown_content, search_space_id, user_id ) + if connector: + await _update_document_from_connector(result, connector, session) + if result: await task_logger.log_task_success( log_entry, @@ -608,6 +630,9 @@ async def process_file_in_background( session, filename, transcribed_text, search_space_id, user_id ) + if connector: + await _update_document_from_connector(result, connector, session) + if result: await task_logger.log_task_success( log_entry, @@ -753,6 +778,9 @@ async def process_file_in_background( session, filename, docs, search_space_id, user_id ) + if connector: + await _update_document_from_connector(result, connector, session) + if result: # Update page usage after successful processing # allow_exceed=True because document was already created after passing initial check @@ -897,6 +925,9 @@ async def process_file_in_background( user_id, final_page_count, allow_exceed=True ) + if connector: + await _update_document_from_connector(last_created_doc, connector, session) + await task_logger.log_task_success( log_entry, f"Successfully processed file with LlamaCloud: {filename}", @@ -1021,6 +1052,9 @@ async def process_file_in_background( user_id, final_page_count, allow_exceed=True ) + if connector: + await _update_document_from_connector(doc_result, connector, session) + await task_logger.log_task_success( log_entry, f"Successfully processed file with Docling: {filename}", diff --git a/surfsense_web/app/dashboard/[search_space_id]/connectors/(manage)/page.tsx b/surfsense_web/app/dashboard/[search_space_id]/connectors/(manage)/page.tsx index d10a2338c..a5d811c81 100644 --- a/surfsense_web/app/dashboard/[search_space_id]/connectors/(manage)/page.tsx +++ b/surfsense_web/app/dashboard/[search_space_id]/connectors/(manage)/page.tsx @@ -6,6 +6,9 @@ import { Calendar as CalendarIcon, Clock, Edit, + Folder, + HardDrive, + Info, Loader2, Plus, RefreshCw, @@ -67,6 +70,7 @@ import { Tooltip, TooltipContent, TooltipProvider, TooltipTrigger } from "@/comp import { EnumConnectorName } from "@/contracts/enums/connector"; import { getConnectorIcon } from "@/contracts/enums/connectorIcons"; import { cn } from "@/lib/utils"; +import { GoogleDriveFolderTree } from "@/components/connectors/google-drive-folder-tree"; export default function ConnectorsPage() { const t = useTranslations("connectors"); @@ -115,6 +119,10 @@ export default function ConnectorsPage() { const [customFrequency, setCustomFrequency] = useState(""); const [isSavingPeriodic, setIsSavingPeriodic] = useState(false); + // Google Drive folder selection state + const [driveFolderDialogOpen, setDriveFolderDialogOpen] = useState(false); + const [selectedFolders, setSelectedFolders] = useState>([]); + useEffect(() => { if (error) { toast.error(t("failed_load")); @@ -137,8 +145,55 @@ export default function ConnectorsPage() { // Handle opening date picker for indexing const handleOpenDatePicker = (connectorId: number) => { + // Check if this is a Google Drive connector + const connector = connectors.find((c) => c.id === connectorId); + if (connector?.connector_type === EnumConnectorName.GOOGLE_DRIVE_CONNECTOR) { + // Open folder selection dialog for Google Drive + handleOpenDriveFolderDialog(connectorId); + } else { + // Open date picker for other connectors + setSelectedConnectorForIndexing(connectorId); + setDatePickerOpen(true); + } + }; + + const handleOpenDriveFolderDialog = (connectorId: number) => { setSelectedConnectorForIndexing(connectorId); - setDatePickerOpen(true); + setDriveFolderDialogOpen(true); + }; + + // Handle Google Drive folder indexing + const handleIndexDriveFolder = async () => { + if (selectedConnectorForIndexing === null || selectedFolders.length === 0) { + toast.error("Please select at least one folder"); + return; + } + + setDriveFolderDialogOpen(false); + + try { + setIndexingConnectorId(selectedConnectorForIndexing); + + const folderIds = selectedFolders.map((f) => f.id).join(","); + const folderNames = selectedFolders.map((f) => f.name).join(", "); + + await indexConnector({ + connector_id: selectedConnectorForIndexing, + queryParams: { + search_space_id: searchSpaceId, + folder_ids: folderIds, + folder_names: folderNames, + }, + }); + toast.success(t("indexing_started")); + } catch (error) { + console.error("Error indexing connector content:", error); + toast.error(error instanceof Error ? error.message : t("indexing_failed")); + } finally { + setIndexingConnectorId(null); + setSelectedConnectorForIndexing(null); + setSelectedFolders([]); + } }; // Handle connector indexing with dates @@ -387,39 +442,52 @@ export default function ConnectorsPage() { > {indexingConnectorId === connector.id ? ( + ) : connector.connector_type === EnumConnectorName.GOOGLE_DRIVE_CONNECTOR ? ( + ) : ( )} - {t("index_date_range")} + + {connector.connector_type === EnumConnectorName.GOOGLE_DRIVE_CONNECTOR + ? "Select folder to index" + : t("index_date_range")} + -

{t("index_date_range")}

-
- - - - - - - - -

{t("quick_index_auto")}

+

+ {connector.connector_type === EnumConnectorName.GOOGLE_DRIVE_CONNECTOR + ? "Select folder to index" + : t("index_date_range")} +

+ {/* Hide quick index button for Google Drive (requires folder selection) */} + {connector.connector_type !== EnumConnectorName.GOOGLE_DRIVE_CONNECTOR && ( + + + + + + +

{t("quick_index_auto")}

+
+
+
+ )} )} {connector.is_indexable && ( @@ -607,6 +675,67 @@ export default function ConnectorsPage() { + {/* Google Drive Folder Selection Dialog */} + + + + Select Google Drive Folders + + + + Select folders to index. Only files directly in each folder will be + processed—subfolders must be selected separately. + + + +
+
+ + {selectedConnectorForIndexing && ( + { + setSelectedFolders(folders); + }} + /> + )} +
+ {selectedFolders.length > 0 && ( +
+
+

+ Selected {selectedFolders.length} folder{selectedFolders.length > 1 ? "s" : ""}: +

+
+ {selectedFolders.map((folder) => ( +

+ • {folder.name} +

+ ))} +
+
+
+ )} +
+ + + + +
+
+ {/* Periodic Indexing Configuration Dialog */} diff --git a/surfsense_web/app/dashboard/[search_space_id]/connectors/add/google-drive-connector/page.tsx b/surfsense_web/app/dashboard/[search_space_id]/connectors/add/google-drive-connector/page.tsx new file mode 100644 index 000000000..4f0c2b23f --- /dev/null +++ b/surfsense_web/app/dashboard/[search_space_id]/connectors/add/google-drive-connector/page.tsx @@ -0,0 +1,205 @@ +"use client"; + +import { useAtomValue } from "jotai"; +import { ArrowLeft, Check, ExternalLink, Loader2 } from "lucide-react"; +import { motion } from "motion/react"; +import Link from "next/link"; +import { useParams, useRouter } from "next/navigation"; +import { useEffect, useState } from "react"; +import { toast } from "sonner"; +import { connectorsAtom } from "@/atoms/connectors/connector-query.atoms"; +import { Button } from "@/components/ui/button"; +import { + Card, + CardContent, + CardDescription, + CardFooter, + CardHeader, + CardTitle, +} from "@/components/ui/card"; +import { EnumConnectorName } from "@/contracts/enums/connector"; +import { getConnectorIcon } from "@/contracts/enums/connectorIcons"; +import type { SearchSourceConnector } from "@/contracts/types/connector.types"; +import { authenticatedFetch } from "@/lib/auth-utils"; + +export default function GoogleDriveConnectorPage() { + const router = useRouter(); + const params = useParams(); + const searchSpaceId = params.search_space_id as string; + + const [isConnecting, setIsConnecting] = useState(false); + const [doesConnectorExist, setDoesConnectorExist] = useState(false); + + const { refetch: fetchConnectors } = useAtomValue(connectorsAtom); + + useEffect(() => { + fetchConnectors().then((data) => { + const connectors = data.data || []; + const connector = connectors.find( + (c: SearchSourceConnector) => c.connector_type === EnumConnectorName.GOOGLE_DRIVE_CONNECTOR + ); + if (connector) { + setDoesConnectorExist(true); + } + }); + }, []); + + const handleConnectGoogle = async () => { + try { + setIsConnecting(true); + const response = await authenticatedFetch( + `${process.env.NEXT_PUBLIC_FASTAPI_BACKEND_URL}/api/v1/auth/google/drive/connector/add/?space_id=${searchSpaceId}`, + { method: "GET" } + ); + + if (!response.ok) { + throw new Error("Failed to initiate Google OAuth"); + } + + const data = await response.json(); + window.location.href = data.auth_url; + } catch (error) { + console.error("Error connecting to Google:", error); + toast.error("Failed to connect to Google Drive"); + } finally { + setIsConnecting(false); + } + }; + + return ( +
+ + {/* Header */} +
+ + + Back to connectors + +
+
+ {getConnectorIcon(EnumConnectorName.GOOGLE_DRIVE_CONNECTOR, "h-6 w-6")} +
+
+

Connect Google Drive

+

+ Securely connect your Google Drive account +

+
+
+
+ + {/* Connection Card */} + {!doesConnectorExist ? ( + + + Connect Your Google Account + + Authorize read-only access to your Google Drive. You'll select which folder to + index when you start indexing. + + + +
+ + Read-only access to your Drive files +
+
+ + Index documents, spreadsheets, presentations, PDFs & more +
+
+ + Automatic updates with change tracking +
+
+ + Secure OAuth 2.0 authentication +
+
+ + + + +
+ ) : ( + + + ✅ Already Connected + + Your Google Drive connector is already set up. Go to the connectors page to + start indexing. + + + + + + + )} + + {/* Information Card */} + + + How Google Drive Integration Works + + +
+

1️⃣ Connect Your Account

+

+ First, securely connect your Google Drive account using OAuth 2.0. We only + request read-only access. +

+
+
+

2️⃣ Select Folder to Index

+

+ When you're ready to index, go to the connectors page and click "Index". You'll + choose which folder to process. +

+
+
+

3️⃣ Automatic Change Detection

+

+ We use Google Drive's change tracking API to detect when files are modified, + added, or deleted. Only changed files are re-indexed. +

+
+
+

📄 Comprehensive File Support

+

+ Supports Google Workspace files (Docs, Sheets, Slides), Microsoft Office + documents, PDFs, text files, images (with OCR), and more. +

+
+
+
+
+
+ ); +} diff --git a/surfsense_web/components/connectors/google-drive-folder-tree.tsx b/surfsense_web/components/connectors/google-drive-folder-tree.tsx new file mode 100644 index 000000000..cec207b2a --- /dev/null +++ b/surfsense_web/components/connectors/google-drive-folder-tree.tsx @@ -0,0 +1,320 @@ +"use client"; + +import { + ChevronDown, + ChevronRight, + File, + FileText, + Folder, + FolderOpen, + HardDrive, + Image, + Loader2, + Sheet, + Presentation, +} from "lucide-react"; +import { useState } from "react"; +import { Button } from "@/components/ui/button"; +import { Checkbox } from "@/components/ui/checkbox"; +import { ScrollArea } from "@/components/ui/scroll-area"; +import { cn } from "@/lib/utils"; +import { useGoogleDriveFolders } from "@/hooks/use-google-drive-folders"; +import { connectorsApiService } from "@/lib/apis/connectors-api.service"; + +interface DriveItem { + id: string; + name: string; + mimeType: string; + isFolder: boolean; + parents?: string[]; + size?: number; + iconLink?: string; +} + +interface ItemTreeNode { + item: DriveItem; + children: DriveItem[] | null; // null = not loaded, [] = loaded but empty + isExpanded: boolean; + isLoading: boolean; +} + +interface SelectedFolder { + id: string; + name: string; +} + +interface GoogleDriveFolderTreeProps { + connectorId: number; + selectedFolders: SelectedFolder[]; + onSelectFolders: (folders: SelectedFolder[]) => void; +} + +// Helper to get appropriate icon for file type +function getFileIcon(mimeType: string, className: string = "h-4 w-4") { + if (mimeType.includes("spreadsheet") || mimeType.includes("excel")) { + return ; + } + if (mimeType.includes("presentation") || mimeType.includes("powerpoint")) { + return ; + } + if (mimeType.includes("document") || mimeType.includes("word") || mimeType.includes("text")) { + return ; + } + if (mimeType.includes("image")) { + return ; + } + return ; +} + +export function GoogleDriveFolderTree({ + connectorId, + selectedFolders, + onSelectFolders, +}: GoogleDriveFolderTreeProps) { + const [itemStates, setItemStates] = useState>(new Map()); + + const { data: rootData, isLoading: isLoadingRoot } = useGoogleDriveFolders({ + connectorId, + }); + + const rootItems = rootData?.items || []; + + const isFolderSelected = (folderId: string): boolean => { + return selectedFolders.some((f) => f.id === folderId); + }; + + const toggleFolderSelection = (folderId: string, folderName: string) => { + if (isFolderSelected(folderId)) { + onSelectFolders(selectedFolders.filter((f) => f.id !== folderId)); + } else { + onSelectFolders([...selectedFolders, { id: folderId, name: folderName }]); + } + }; + + /** + * Find an item by ID across all loaded items (root and nested). + */ + const findItem = (itemId: string): DriveItem | undefined => { + const state = itemStates.get(itemId); + if (state?.item) return state.item; + + const rootItem = rootItems.find((item) => item.id === itemId); + if (rootItem) return rootItem; + + for (const [, nodeState] of itemStates) { + if (nodeState.children) { + const found = nodeState.children.find((child) => child.id === itemId); + if (found) return found; + } + } + + return undefined; + }; + + /** + * Load and display contents of a specific folder. + */ + const loadFolderContents = async (folderId: string) => { + try { + setItemStates((prev) => { + const newMap = new Map(prev); + const existing = newMap.get(folderId); + if (existing) { + newMap.set(folderId, { ...existing, isLoading: true }); + } else { + const item = findItem(folderId); + if (item) { + newMap.set(folderId, { + item, + children: null, + isExpanded: false, + isLoading: true, + }); + } + } + return newMap; + }); + + const data = await connectorsApiService.listGoogleDriveFolders({ + connector_id: connectorId, + parent_id: folderId, + }); + const items = data.items || []; + + setItemStates((prev) => { + const newMap = new Map(prev); + const existing = newMap.get(folderId); + const item = existing?.item || findItem(folderId); + + if (item) { + newMap.set(folderId, { + item, + children: items, + isExpanded: true, + isLoading: false, + }); + } else { + console.error(`Could not find item for folderId: ${folderId}`); + } + return newMap; + }); + } catch (error) { + console.error("Error loading folder contents:", error); + setItemStates((prev) => { + const newMap = new Map(prev); + const existing = newMap.get(folderId); + if (existing) { + newMap.set(folderId, { ...existing, isLoading: false }); + } + return newMap; + }); + } + }; + + /** + * Toggle folder expand/collapse state. + */ + const toggleFolder = async (item: DriveItem) => { + if (!item.isFolder) return; + + const state = itemStates.get(item.id); + + if (!state || state.children === null) { + await loadFolderContents(item.id); + } else { + setItemStates((prev) => { + const newMap = new Map(prev); + newMap.set(item.id, { + ...state, + isExpanded: !state.isExpanded, + }); + return newMap; + }); + } + }; + + /** + * Render a single item (folder or file) with its children. + */ + const renderItem = (item: DriveItem, level: number = 0) => { + const state = itemStates.get(item.id); + const isExpanded = state?.isExpanded || false; + const isLoading = state?.isLoading || false; + const children = state?.children; + const isSelected = isFolderSelected(item.id); + const isFolder = item.isFolder; + + const childFolders = children?.filter((c) => c.isFolder) || []; + const childFiles = children?.filter((c) => !c.isFolder) || []; + + return ( +
+
+ {isFolder ? ( + { + e.stopPropagation(); + toggleFolder(item); + }} + > + {isLoading ? ( + + ) : isExpanded ? ( + + ) : ( + + )} + + ) : ( + + )} + + {isFolder && ( + toggleFolderSelection(item.id, item.name)} + className="shrink-0" + onClick={(e) => e.stopPropagation()} + /> + )} + +
+ {isFolder ? ( + isExpanded ? ( + + ) : ( + + ) + ) : ( + getFileIcon(item.mimeType, "h-4 w-4") + )} +
+ + isFolder && toggleFolder(item)} + > + {item.name} + +
+ + {isExpanded && isFolder && children && ( +
+ {childFolders.map((child) => renderItem(child, level + 1))} + {childFiles.map((child) => renderItem(child, level + 1))} + + {children.length === 0 && ( +
Empty folder
+ )} +
+ )} +
+ ); + }; + + return ( +
+ +
+
+
+ toggleFolderSelection("root", "My Drive")} + className="shrink-0" + /> + + toggleFolderSelection("root", "My Drive")}> + My Drive + +
+
+ + {isLoadingRoot && ( +
+ +
+ )} + +
+ {!isLoadingRoot && rootItems.map((item) => renderItem(item, 0))} +
+ + {!isLoadingRoot && rootItems.length === 0 && ( +
+ No files or folders found in your Google Drive +
+ )} +
+
+
+ ); +} diff --git a/surfsense_web/components/sources/connector-data.tsx b/surfsense_web/components/sources/connector-data.tsx index 338c3ae20..7fca3e6b9 100644 --- a/surfsense_web/components/sources/connector-data.tsx +++ b/surfsense_web/components/sources/connector-data.tsx @@ -183,6 +183,13 @@ export const connectorCategories: ConnectorCategory[] = [ icon: getConnectorIcon(EnumConnectorName.GOOGLE_GMAIL_CONNECTOR, "h-6 w-6"), status: "available", }, + { + id: "google-drive-connector", + title: "Google Drive", + description: "google_drive_desc", + icon: getConnectorIcon(EnumConnectorName.GOOGLE_DRIVE_CONNECTOR, "h-6 w-6"), + status: "available", + }, { id: "luma-connector", title: "Luma", diff --git a/surfsense_web/contracts/enums/connector.ts b/surfsense_web/contracts/enums/connector.ts index 6cdbc5656..eb2cf7ad8 100644 --- a/surfsense_web/contracts/enums/connector.ts +++ b/surfsense_web/contracts/enums/connector.ts @@ -14,6 +14,7 @@ export enum EnumConnectorName { CLICKUP_CONNECTOR = "CLICKUP_CONNECTOR", GOOGLE_CALENDAR_CONNECTOR = "GOOGLE_CALENDAR_CONNECTOR", GOOGLE_GMAIL_CONNECTOR = "GOOGLE_GMAIL_CONNECTOR", + GOOGLE_DRIVE_CONNECTOR = "GOOGLE_DRIVE_CONNECTOR", AIRTABLE_CONNECTOR = "AIRTABLE_CONNECTOR", LUMA_CONNECTOR = "LUMA_CONNECTOR", ELASTICSEARCH_CONNECTOR = "ELASTICSEARCH_CONNECTOR", diff --git a/surfsense_web/contracts/enums/connectorIcons.tsx b/surfsense_web/contracts/enums/connectorIcons.tsx index 87840d7e4..661be5253 100644 --- a/surfsense_web/contracts/enums/connectorIcons.tsx +++ b/surfsense_web/contracts/enums/connectorIcons.tsx @@ -26,6 +26,7 @@ import { Sparkles, Telescope, Webhook, + HardDrive, } from "lucide-react"; import { EnumConnectorName } from "./connector"; @@ -57,6 +58,8 @@ export const getConnectorIcon = (connectorType: EnumConnectorName | string, clas return ; case EnumConnectorName.GOOGLE_GMAIL_CONNECTOR: return ; + case EnumConnectorName.GOOGLE_DRIVE_CONNECTOR: + return ; case EnumConnectorName.AIRTABLE_CONNECTOR: return ; case EnumConnectorName.CONFLUENCE_CONNECTOR: diff --git a/surfsense_web/contracts/types/connector.types.ts b/surfsense_web/contracts/types/connector.types.ts index 4e09ba067..c590f3941 100644 --- a/surfsense_web/contracts/types/connector.types.ts +++ b/surfsense_web/contracts/types/connector.types.ts @@ -17,6 +17,7 @@ export const searchSourceConnectorTypeEnum = z.enum([ "CLICKUP_CONNECTOR", "GOOGLE_CALENDAR_CONNECTOR", "GOOGLE_GMAIL_CONNECTOR", + "GOOGLE_DRIVE_CONNECTOR", "AIRTABLE_CONNECTOR", "LUMA_CONNECTOR", "ELASTICSEARCH_CONNECTOR", @@ -39,6 +40,19 @@ export const searchSourceConnector = z.object({ created_at: z.string(), }); +export const googleDriveItem = z.object({ + id: z.string(), + name: z.string(), + mimeType: z.string(), + isFolder: z.boolean(), + parents: z.array(z.string()).optional(), + size: z.number().optional(), + iconLink: z.string().optional(), + webViewLink: z.string().optional(), + createdTime: z.string().optional(), + modifiedTime: z.string().optional(), +}); + /** * Get connectors */ @@ -120,6 +134,9 @@ export const indexConnectorRequest = z.object({ search_space_id: z.number().or(z.string()), start_date: z.string().optional(), end_date: z.string().optional(), + // Google Drive only + folder_ids: z.string().optional(), + folder_names: z.string().optional(), }), }); @@ -140,6 +157,18 @@ export const listGitHubRepositoriesRequest = z.object({ export const listGitHubRepositoriesResponse = z.array(z.record(z.string(), z.any())); +/** + * List Google Drive folders + */ +export const listGoogleDriveFoldersRequest = z.object({ + connector_id: z.number(), + parent_id: z.string().optional(), +}); + +export const listGoogleDriveFoldersResponse = z.object({ + items: z.array(googleDriveItem), +}); + // Inferred types export type SearchSourceConnectorType = z.infer; export type SearchSourceConnector = z.infer; @@ -157,3 +186,6 @@ export type IndexConnectorRequest = z.infer; export type IndexConnectorResponse = z.infer; export type ListGitHubRepositoriesRequest = z.infer; export type ListGitHubRepositoriesResponse = z.infer; +export type ListGoogleDriveFoldersRequest = z.infer; +export type ListGoogleDriveFoldersResponse = z.infer; +export type GoogleDriveItem = z.infer; diff --git a/surfsense_web/contracts/types/document.types.ts b/surfsense_web/contracts/types/document.types.ts index 3ce5388dd..94ff27940 100644 --- a/surfsense_web/contracts/types/document.types.ts +++ b/surfsense_web/contracts/types/document.types.ts @@ -15,6 +15,7 @@ export const documentTypeEnum = z.enum([ "CLICKUP_CONNECTOR", "GOOGLE_CALENDAR_CONNECTOR", "GOOGLE_GMAIL_CONNECTOR", + "GOOGLE_DRIVE_FILE", "AIRTABLE_CONNECTOR", "LUMA_CONNECTOR", "ELASTICSEARCH_CONNECTOR", diff --git a/surfsense_web/hooks/use-google-drive-folders.ts b/surfsense_web/hooks/use-google-drive-folders.ts new file mode 100644 index 000000000..65555a6c9 --- /dev/null +++ b/surfsense_web/hooks/use-google-drive-folders.ts @@ -0,0 +1,29 @@ +import { useQuery } from "@tanstack/react-query"; +import { connectorsApiService } from "@/lib/apis/connectors-api.service"; +import { cacheKeys } from "@/lib/query-client/cache-keys"; + +interface UseGoogleDriveFoldersOptions { + connectorId: number; + parentId?: string; + enabled?: boolean; +} + +export function useGoogleDriveFolders({ + connectorId, + parentId, + enabled = true, +}: UseGoogleDriveFoldersOptions) { + return useQuery({ + queryKey: cacheKeys.connectors.googleDrive.folders(connectorId, parentId), + queryFn: async () => { + return connectorsApiService.listGoogleDriveFolders({ + connector_id: connectorId, + parent_id: parentId, + }); + }, + enabled: enabled && !!connectorId, + staleTime: 5 * 60 * 1000, // 5 minutes + retry: 2, + }); +} + diff --git a/surfsense_web/hooks/use-search-source-connectors.ts b/surfsense_web/hooks/use-search-source-connectors.ts index 2f77d7d82..14c21831b 100644 --- a/surfsense_web/hooks/use-search-source-connectors.ts +++ b/surfsense_web/hooks/use-search-source-connectors.ts @@ -267,7 +267,9 @@ export const useSearchSourceConnectors = (lazy: boolean = false, searchSpaceId?: connectorId: number, searchSpaceId: string | number, startDate?: string, - endDate?: string + endDate?: string, + folderIds?: string, + folderNames?: string ) => { try { // Build query parameters @@ -280,6 +282,12 @@ export const useSearchSourceConnectors = (lazy: boolean = false, searchSpaceId?: if (endDate) { params.append("end_date", endDate); } + if (folderIds) { + params.append("folder_ids", folderIds); + } + if (folderNames) { + params.append("folder_names", folderNames); + } const response = await authenticatedFetch( `${ diff --git a/surfsense_web/lib/apis/connectors-api.service.ts b/surfsense_web/lib/apis/connectors-api.service.ts index 4bf522606..f6929391a 100644 --- a/surfsense_web/lib/apis/connectors-api.service.ts +++ b/surfsense_web/lib/apis/connectors-api.service.ts @@ -17,6 +17,9 @@ import { type ListGitHubRepositoriesRequest, listGitHubRepositoriesRequest, listGitHubRepositoriesResponse, + type ListGoogleDriveFoldersRequest, + listGoogleDriveFoldersRequest, + listGoogleDriveFoldersResponse, type UpdateConnectorRequest, updateConnectorRequest, updateConnectorResponse, @@ -195,6 +198,29 @@ class ConnectorsApiService { body: parsedRequest.data, }); }; + + /** + * List Google Drive folders and files + */ + listGoogleDriveFolders = async (request: ListGoogleDriveFoldersRequest) => { + const parsedRequest = listGoogleDriveFoldersRequest.safeParse(request); + + if (!parsedRequest.success) { + console.error("Invalid request:", parsedRequest.error); + + const errorMessage = parsedRequest.error.issues.map((issue) => issue.message).join(", "); + throw new ValidationError(`Invalid request: ${errorMessage}`); + } + + const { connector_id, parent_id } = parsedRequest.data; + + const queryParams = parent_id ? `?parent_id=${encodeURIComponent(parent_id)}` : ""; + + return baseApiService.get( + `/api/v1/connectors/${connector_id}/google-drive/folders${queryParams}`, + listGoogleDriveFoldersResponse + ); + }; } export const connectorsApiService = new ConnectorsApiService(); diff --git a/surfsense_web/lib/query-client/cache-keys.ts b/surfsense_web/lib/query-client/cache-keys.ts index 7722ec01e..54f411ad1 100644 --- a/surfsense_web/lib/query-client/cache-keys.ts +++ b/surfsense_web/lib/query-client/cache-keys.ts @@ -67,5 +67,9 @@ export const cacheKeys = { ["connectors", ...(queries ? Object.values(queries) : [])] as const, byId: (connectorId: string) => ["connector", connectorId] as const, index: () => ["connector", "index"] as const, + googleDrive: { + folders: (connectorId: number, parentId?: string) => + ["connectors", "google-drive", connectorId, "folders", parentId] as const, + }, }, }; diff --git a/surfsense_web/messages/en.json b/surfsense_web/messages/en.json index 167a87dbc..0fa6e461b 100644 --- a/surfsense_web/messages/en.json +++ b/surfsense_web/messages/en.json @@ -307,6 +307,7 @@ "luma_desc": "Connect to Luma to search events, meetups and gatherings.", "calendar_desc": "Connect to Google Calendar to search events, meetings and schedules.", "gmail_desc": "Connect to your Gmail account to search through your emails.", + "google_drive_desc": "Connect to Google Drive to search and index your files and documents.", "zoom_desc": "Connect to Zoom to access meeting recordings and transcripts.", "webcrawler_desc": "Crawl and index content from any public web pages." }, diff --git a/surfsense_web/messages/zh.json b/surfsense_web/messages/zh.json index 3701a220d..625c8a31e 100644 --- a/surfsense_web/messages/zh.json +++ b/surfsense_web/messages/zh.json @@ -307,6 +307,7 @@ "luma_desc": "连接到 Luma 以搜索活动、聚会和集会。", "calendar_desc": "连接到 Google 日历以搜索活动、会议和日程。", "gmail_desc": "连接到您的 Gmail 账户以搜索您的电子邮件。", + "google_drive_desc": "连接到 Google 云端硬盘以搜索和索引您的文件和文档。", "zoom_desc": "连接到 Zoom 以访问会议录制和转录。", "webcrawler_desc": "爬取和索引任何公开网页的内容。" },