diff --git a/surfsense_backend/.env.example b/surfsense_backend/.env.example index b7e580a66..91a0cb42f 100644 --- a/surfsense_backend/.env.example +++ b/surfsense_backend/.env.example @@ -37,6 +37,8 @@ GOOGLE_OAUTH_CLIENT_SECRET=GOCSV # Connector Specific Configs GOOGLE_CALENDAR_REDIRECT_URI=http://localhost:8000/api/v1/auth/google/calendar/connector/callback GOOGLE_GMAIL_REDIRECT_URI=http://localhost:8000/api/v1/auth/google/gmail/connector/callback +GOOGLE_DRIVE_REDIRECT_URI=http://localhost:8000/api/v1/auth/google/drive/connector/callback +GOOGLE_DRIVE_REDIRECT_URI=http://localhost:8000/api/v1/auth/google/drive/connector/callback # Airtable OAuth for Aitable Connector AIRTABLE_CLIENT_ID=your_airtable_client_id diff --git a/surfsense_backend/alembic/versions/54_add_google_drive_connector_enums.py b/surfsense_backend/alembic/versions/54_add_google_drive_connector_enums.py new file mode 100644 index 000000000..d802deb0d --- /dev/null +++ b/surfsense_backend/alembic/versions/54_add_google_drive_connector_enums.py @@ -0,0 +1,73 @@ +"""Add Google Drive connector enums + +Revision ID: 54 +Revises: 53 +Create Date: 2025-12-28 12:00:00.000000 + +""" + +from collections.abc import Sequence + +from alembic import op + +# revision identifiers, used by Alembic. +revision: str = "54" +down_revision: str | None = "53" +branch_labels: str | Sequence[str] | None = None +depends_on: str | Sequence[str] | None = None + + +def upgrade() -> None: + """Safely add 'GOOGLE_DRIVE_CONNECTOR' to enum types if missing.""" + + # Add to searchsourceconnectortype enum + op.execute( + """ + DO $$ + BEGIN + IF NOT EXISTS ( + SELECT 1 FROM pg_type t + JOIN pg_enum e ON t.oid = e.enumtypid + WHERE t.typname = 'searchsourceconnectortype' AND e.enumlabel = 'GOOGLE_DRIVE_CONNECTOR' + ) THEN + ALTER TYPE searchsourceconnectortype ADD VALUE 'GOOGLE_DRIVE_CONNECTOR'; + END IF; + END + $$; + """ + ) + + # Add to documenttype enum + op.execute( + """ + DO $$ + BEGIN + IF NOT EXISTS ( + SELECT 1 FROM pg_type t + JOIN pg_enum e ON t.oid = e.enumtypid + WHERE t.typname = 'documenttype' AND e.enumlabel = 'GOOGLE_DRIVE_CONNECTOR' + ) THEN + ALTER TYPE documenttype ADD VALUE 'GOOGLE_DRIVE_CONNECTOR'; + END IF; + END + $$; + """ + ) + + +def downgrade() -> None: + """Remove 'GOOGLE_DRIVE_CONNECTOR' from enum types. + + Note: PostgreSQL doesn't support removing enum values directly. + This would require recreating the enum type, which is complex and risky. + For now, we'll leave the enum values in place. + + In a production environment with strict downgrade requirements, you would need to: + 1. Create new enum types without the value + 2. Convert all columns to use the new type + 3. Drop the old enum type + 4. Rename the new type to the old name + + This is left as pass to avoid accidental data loss. + """ + pass diff --git a/surfsense_backend/alembic/versions/55_rename_google_drive_connector_to_file.py b/surfsense_backend/alembic/versions/55_rename_google_drive_connector_to_file.py new file mode 100644 index 000000000..9ce57d95f --- /dev/null +++ b/surfsense_backend/alembic/versions/55_rename_google_drive_connector_to_file.py @@ -0,0 +1,73 @@ +"""Rename GOOGLE_DRIVE_CONNECTOR document type to GOOGLE_DRIVE_FILE + +Revision ID: 55 +Revises: 54 +Create Date: 2025-12-29 12:00:00.000000 + +""" + +from collections.abc import Sequence + +from alembic import op + +# revision identifiers, used by Alembic. +revision: str = "55" +down_revision: str | None = "54" +branch_labels: str | Sequence[str] | None = None +depends_on: str | Sequence[str] | None = None + + +def upgrade() -> None: + from sqlalchemy import text + + connection = op.get_bind() + + connection.execute( + text( + """ + DO $$ + BEGIN + IF NOT EXISTS ( + SELECT 1 FROM pg_type t + JOIN pg_enum e ON t.oid = e.enumtypid + WHERE t.typname = 'documenttype' AND e.enumlabel = 'GOOGLE_DRIVE_FILE' + ) THEN + ALTER TYPE documenttype ADD VALUE IF NOT EXISTS 'GOOGLE_DRIVE_FILE'; + END IF; + END + $$; + """ + ) + ) + + connection.commit() + + connection.execute( + text( + """ + UPDATE documents + SET document_type = 'GOOGLE_DRIVE_FILE' + WHERE document_type = 'GOOGLE_DRIVE_CONNECTOR'; + """ + ) + ) + + connection.commit() + + +def downgrade() -> None: + from sqlalchemy import text + + connection = op.get_bind() + + connection.execute( + text( + """ + UPDATE documents + SET document_type = 'GOOGLE_DRIVE_CONNECTOR' + WHERE document_type = 'GOOGLE_DRIVE_FILE'; + """ + ) + ) + + connection.commit() diff --git a/surfsense_backend/alembic/versions/56_add_circleback_connector_enums.py b/surfsense_backend/alembic/versions/56_add_circleback_connector_enums.py new file mode 100644 index 000000000..c4ca93efa --- /dev/null +++ b/surfsense_backend/alembic/versions/56_add_circleback_connector_enums.py @@ -0,0 +1,56 @@ +"""Add Circleback connector enums + +Revision ID: 56 +Revises: 55 +Create Date: 2025-12-30 12:00:00.000000 + +""" + +from collections.abc import Sequence + +from alembic import op + +# revision identifiers, used by Alembic. +revision: str = "56" +down_revision: str | None = "55" +branch_labels: str | Sequence[str] | None = None +depends_on: str | Sequence[str] | None = None + + +def upgrade() -> None: + """Safely add 'CIRCLEBACK' to documenttype and 'CIRCLEBACK_CONNECTOR' to searchsourceconnectortype enums if missing.""" + from sqlalchemy import text + + # Get connection and commit current transaction to allow ALTER TYPE + connection = op.get_bind() + connection.execute(text("COMMIT")) + + # Add to documenttype enum (must be outside transaction) + connection.execute( + text("ALTER TYPE documenttype ADD VALUE IF NOT EXISTS 'CIRCLEBACK'") + ) + + # Add to searchsourceconnectortype enum + connection.execute( + text( + "ALTER TYPE searchsourceconnectortype ADD VALUE IF NOT EXISTS 'CIRCLEBACK_CONNECTOR'" + ) + ) + + +def downgrade() -> None: + """Remove 'CIRCLEBACK' and 'CIRCLEBACK_CONNECTOR' from enum types. + + Note: PostgreSQL doesn't support removing enum values directly. + This would require recreating the enum type, which is complex and risky. + For now, we'll leave the enum values in place. + + In a production environment with strict downgrade requirements, you would need to: + 1. Create new enum types without the value + 2. Convert all columns to use the new type + 3. Drop the old enum type + 4. Rename the new type to the old name + + This is left as pass to avoid accidental data loss. + """ + pass diff --git a/surfsense_backend/app/agents/new_chat/tools/knowledge_base.py b/surfsense_backend/app/agents/new_chat/tools/knowledge_base.py index 6c3dfd34b..a3cdad359 100644 --- a/surfsense_backend/app/agents/new_chat/tools/knowledge_base.py +++ b/surfsense_backend/app/agents/new_chat/tools/knowledge_base.py @@ -36,6 +36,7 @@ _ALL_CONNECTORS: list[str] = [ "CLICKUP_CONNECTOR", "GOOGLE_CALENDAR_CONNECTOR", "GOOGLE_GMAIL_CONNECTOR", + "GOOGLE_DRIVE_FILE", "DISCORD_CONNECTOR", "AIRTABLE_CONNECTOR", "TAVILY_API", @@ -46,6 +47,7 @@ _ALL_CONNECTORS: list[str] = [ "NOTE", "BOOKSTACK_CONNECTOR", "CRAWLED_URL", + "CIRCLEBACK", ] @@ -425,6 +427,16 @@ async def search_knowledge_base_async( ) all_documents.extend(chunks) + elif connector == "GOOGLE_DRIVE_FILE": + _, chunks = await connector_service.search_google_drive( + user_query=query, + search_space_id=search_space_id, + top_k=top_k, + start_date=resolved_start_date, + end_date=resolved_end_date, + ) + all_documents.extend(chunks) + elif connector == "CONFLUENCE_CONNECTOR": _, chunks = await connector_service.search_confluence( user_query=query, @@ -485,6 +497,16 @@ async def search_knowledge_base_async( ) all_documents.extend(chunks) + elif connector == "CIRCLEBACK": + _, chunks = await connector_service.search_circleback( + user_query=query, + search_space_id=search_space_id, + top_k=top_k, + start_date=resolved_start_date, + end_date=resolved_end_date, + ) + all_documents.extend(chunks) + except Exception as e: print(f"Error searching connector {connector}: {e}") continue @@ -561,6 +583,7 @@ def create_search_knowledge_base_tool( - CLICKUP_CONNECTOR: "ClickUp tasks and project data" (personal task management) - GOOGLE_CALENDAR_CONNECTOR: "Google Calendar events, meetings, and schedules" (personal calendar and time management) - GOOGLE_GMAIL_CONNECTOR: "Google Gmail emails and conversations" (personal emails and communications) + - GOOGLE_DRIVE_FILE: "Google Drive files and documents" (personal cloud storage and file management) - DISCORD_CONNECTOR: "Discord server conversations and shared content" (personal community communications) - AIRTABLE_CONNECTOR: "Airtable records, tables, and database content" (personal data management and organization) - TAVILY_API: "Tavily search API results" (personalized search results) @@ -570,6 +593,7 @@ def create_search_knowledge_base_tool( - LUMA_CONNECTOR: "Luma events" - WEBCRAWLER_CONNECTOR: "Webpages indexed by SurfSense" (personally selected websites) - BOOKSTACK_CONNECTOR: "BookStack pages" (personal documentation) + - CIRCLEBACK: "Circleback meeting notes, transcripts, and action items" (personal meeting records) NOTE: `WEBCRAWLER_CONNECTOR` is mapped internally to the canonical document type `CRAWLED_URL`. diff --git a/surfsense_backend/app/config/__init__.py b/surfsense_backend/app/config/__init__.py index 08be26de1..9c503fb18 100644 --- a/surfsense_backend/app/config/__init__.py +++ b/surfsense_backend/app/config/__init__.py @@ -82,6 +82,9 @@ class Config: # Google Gmail redirect URI GOOGLE_GMAIL_REDIRECT_URI = os.getenv("GOOGLE_GMAIL_REDIRECT_URI") + # Google Drive redirect URI + GOOGLE_DRIVE_REDIRECT_URI = os.getenv("GOOGLE_DRIVE_REDIRECT_URI") + # Airtable OAuth AIRTABLE_CLIENT_ID = os.getenv("AIRTABLE_CLIENT_ID") AIRTABLE_CLIENT_SECRET = os.getenv("AIRTABLE_CLIENT_SECRET") diff --git a/surfsense_backend/app/connectors/google_drive/__init__.py b/surfsense_backend/app/connectors/google_drive/__init__.py new file mode 100644 index 000000000..47cc8598e --- /dev/null +++ b/surfsense_backend/app/connectors/google_drive/__init__.py @@ -0,0 +1,20 @@ +"""Google Drive Connector Module.""" + +from .change_tracker import categorize_change, fetch_all_changes, get_start_page_token +from .client import GoogleDriveClient +from .content_extractor import download_and_process_file +from .credentials import get_valid_credentials, validate_credentials +from .folder_manager import get_file_by_id, get_files_in_folder, list_folder_contents + +__all__ = [ + "GoogleDriveClient", + "categorize_change", + "download_and_process_file", + "fetch_all_changes", + "get_file_by_id", + "get_files_in_folder", + "get_start_page_token", + "get_valid_credentials", + "list_folder_contents", + "validate_credentials", +] diff --git a/surfsense_backend/app/connectors/google_drive/change_tracker.py b/surfsense_backend/app/connectors/google_drive/change_tracker.py new file mode 100644 index 000000000..dee828219 --- /dev/null +++ b/surfsense_backend/app/connectors/google_drive/change_tracker.py @@ -0,0 +1,204 @@ +"""Change tracking for Google Drive delta sync.""" + +import logging +from datetime import datetime +from typing import Any + +from .client import GoogleDriveClient + +logger = logging.getLogger(__name__) + + +async def get_start_page_token( + client: GoogleDriveClient, +) -> tuple[str | None, str | None]: + """ + Get the starting page token for change tracking. + + This token represents the current state and is used for future delta syncs. + + Args: + client: GoogleDriveClient instance + + Returns: + Tuple of (start_page_token, error message) + """ + try: + service = await client.get_service() + response = service.changes().getStartPageToken(supportsAllDrives=True).execute() + token = response.get("startPageToken") + + logger.info(f"Got start page token: {token}") + return token, None + + except Exception as e: + logger.error(f"Error getting start page token: {e!s}", exc_info=True) + return None, f"Error getting start page token: {e!s}" + + +async def get_changes( + client: GoogleDriveClient, + page_token: str, + folder_id: str | None = None, +) -> tuple[list[dict[str, Any]], str | None, str | None]: + """ + Get list of changes since the given page token. + + Args: + client: GoogleDriveClient instance + page_token: Page token from previous sync + folder_id: Optional folder ID to filter changes + + Returns: + Tuple of (changes list, new_page_token, error message) + """ + try: + service = await client.get_service() + + params = { + "pageToken": page_token, + "pageSize": 100, + "fields": "nextPageToken, newStartPageToken, changes(fileId, removed, file(id, name, mimeType, modifiedTime, size, webViewLink, parents, trashed))", + "supportsAllDrives": True, + "includeItemsFromAllDrives": True, + } + + response = service.changes().list(**params).execute() + + changes = response.get("changes", []) + next_token = response.get("nextPageToken") + new_start_token = response.get("newStartPageToken") + + # Use new start token if this is the last page + token_to_return = new_start_token if new_start_token else next_token + + # Filter changes by folder if specified + if folder_id: + changes = await _filter_changes_by_folder(client, changes, folder_id) + + logger.info(f"Got {len(changes)} changes, next token: {token_to_return}") + return changes, token_to_return, None + + except Exception as e: + logger.error(f"Error getting changes: {e!s}", exc_info=True) + return [], None, f"Error getting changes: {e!s}" + + +async def _filter_changes_by_folder( + client: GoogleDriveClient, + changes: list[dict[str, Any]], + folder_id: str, +) -> list[dict[str, Any]]: + """ + Filter changes to only include files within the specified folder. + + Args: + client: GoogleDriveClient instance + changes: List of changes from API + folder_id: Folder ID to filter by + + Returns: + Filtered list of changes + """ + filtered = [] + + for change in changes: + file = change.get("file") + if not file: + filtered.append(change) + continue + + # Check if file is in the folder (or subfolder) + parents = file.get("parents", []) + if folder_id in parents: + filtered.append(change) + else: + # Check if any parent is a descendant of folder_id + # This is a simplified check - full implementation would traverse hierarchy + # For now, we'll include it and let indexer validate + filtered.append(change) + + return filtered + + +def categorize_change(change: dict[str, Any]) -> str: + """ + Categorize a change event. + + Args: + change: Change event from Drive API + + Returns: + Category: 'removed', 'trashed', 'modified', 'new' + """ + if change.get("removed"): + return "removed" + + file = change.get("file") + if not file: + return "removed" + + if file.get("trashed"): + return "trashed" + + created_time = file.get("createdTime") + modified_time = file.get("modifiedTime") + + if created_time and modified_time: + try: + created = datetime.fromisoformat(created_time.replace("Z", "+00:00")) + modified = datetime.fromisoformat(modified_time.replace("Z", "+00:00")) + + # If created and modified times are very close, it's likely a new file + time_diff = abs((modified - created).total_seconds()) + if time_diff < 60: # Within 1 minute + return "new" + except Exception: + pass + + return "modified" + + +async def fetch_all_changes( + client: GoogleDriveClient, + start_token: str, + folder_id: str | None = None, +) -> tuple[list[dict[str, Any]], str | None, str | None]: + """ + Fetch all changes from start token, handling pagination. + + Args: + client: GoogleDriveClient instance + start_token: Starting page token + folder_id: Optional folder ID to filter changes + + Returns: + Tuple of (all changes, final_page_token, error message) + """ + all_changes = [] + current_token = start_token + error = None + + try: + while current_token: + changes, next_token, err = await get_changes( + client, current_token, folder_id + ) + + if err: + error = err + break + + all_changes.extend(changes) + + if not next_token or next_token == current_token: + break + + current_token = next_token + + logger.info(f"Fetched total of {len(all_changes)} changes") + return all_changes, current_token, error + + except Exception as e: + logger.error(f"Error fetching all changes: {e!s}", exc_info=True) + return all_changes, current_token, f"Error fetching all changes: {e!s}" diff --git a/surfsense_backend/app/connectors/google_drive/client.py b/surfsense_backend/app/connectors/google_drive/client.py new file mode 100644 index 000000000..aec5704b8 --- /dev/null +++ b/surfsense_backend/app/connectors/google_drive/client.py @@ -0,0 +1,181 @@ +"""Google Drive API client.""" + +from typing import Any + +from googleapiclient.discovery import build +from googleapiclient.errors import HttpError +from sqlalchemy.ext.asyncio import AsyncSession + +from .credentials import get_valid_credentials + + +class GoogleDriveClient: + """Client for Google Drive API operations.""" + + def __init__(self, session: AsyncSession, connector_id: int): + """ + Initialize Google Drive client. + + Args: + session: Database session + connector_id: ID of the Drive connector + """ + self.session = session + self.connector_id = connector_id + self.service = None + + async def get_service(self): + """ + Get or create the Drive service instance. + + Returns: + Google Drive service instance + + Raises: + Exception: If service creation fails + """ + if self.service: + return self.service + + try: + credentials = await get_valid_credentials(self.session, self.connector_id) + self.service = build("drive", "v3", credentials=credentials) + return self.service + except Exception as e: + raise Exception(f"Failed to create Google Drive service: {e!s}") from e + + async def list_files( + self, + query: str = "", + fields: str = "nextPageToken, files(id, name, mimeType, modifiedTime, size, webViewLink, parents, owners, createdTime, description)", + page_size: int = 100, + page_token: str | None = None, + ) -> tuple[list[dict[str, Any]], str | None, str | None]: + """ + List files from Google Drive with pagination. + + Args: + query: Search query (e.g., "mimeType != 'application/vnd.google-apps.folder'") + fields: Fields to retrieve + page_size: Number of files per page (max 1000) + page_token: Token for next page + + Returns: + Tuple of (files list, next_page_token, error message) + """ + try: + service = await self.get_service() + + params = { + "pageSize": min(page_size, 1000), + "fields": fields, + "supportsAllDrives": True, + "includeItemsFromAllDrives": True, + } + + if query: + params["q"] = query + if page_token: + params["pageToken"] = page_token + + result = service.files().list(**params).execute() + + files = result.get("files", []) + next_token = result.get("nextPageToken") + + return files, next_token, None + + except HttpError as e: + error_msg = f"HTTP error listing files: {e.resp.status} - {e.error_details}" + return [], None, error_msg + except Exception as e: + return [], None, f"Error listing files: {e!s}" + + async def get_file_metadata( + self, file_id: str, fields: str = "*" + ) -> tuple[dict[str, Any] | None, str | None]: + """ + Get metadata for a specific file. + + Args: + file_id: ID of the file + fields: Fields to retrieve + + Returns: + Tuple of (file metadata, error message) + """ + try: + service = await self.get_service() + file = ( + service.files() + .get(fileId=file_id, fields=fields, supportsAllDrives=True) + .execute() + ) + return file, None + except HttpError as e: + return None, f"HTTP error getting file metadata: {e.resp.status}" + except Exception as e: + return None, f"Error getting file metadata: {e!s}" + + async def download_file(self, file_id: str) -> tuple[bytes | None, str | None]: + """ + Download binary file content. + + Args: + file_id: ID of the file to download + + Returns: + Tuple of (file content bytes, error message) + """ + try: + service = await self.get_service() + request = service.files().get_media(fileId=file_id) + + import io + + fh = io.BytesIO() + from googleapiclient.http import MediaIoBaseDownload + + downloader = MediaIoBaseDownload(fh, request) + + done = False + while not done: + _, done = downloader.next_chunk() + + return fh.getvalue(), None + + except HttpError as e: + return None, f"HTTP error downloading file: {e.resp.status}" + except Exception as e: + return None, f"Error downloading file: {e!s}" + + async def export_google_file( + self, file_id: str, mime_type: str + ) -> tuple[bytes | None, str | None]: + """ + Export Google Workspace file to specified format. + + Args: + file_id: ID of the Google file + mime_type: Target MIME type (e.g., 'application/pdf', 'text/plain') + + Returns: + Tuple of (exported content as bytes, error message) + """ + try: + service = await self.get_service() + content = ( + service.files().export(fileId=file_id, mimeType=mime_type).execute() + ) + + # Content is already bytes from the API + # Keep as bytes to support both text and binary formats (like PDF) + if not isinstance(content, bytes): + content = content.encode("utf-8") + + return content, None + + except HttpError as e: + return None, f"HTTP error exporting file: {e.resp.status}" + except Exception as e: + return None, f"Error exporting file: {e!s}" diff --git a/surfsense_backend/app/connectors/google_drive/content_extractor.py b/surfsense_backend/app/connectors/google_drive/content_extractor.py new file mode 100644 index 000000000..28c14a757 --- /dev/null +++ b/surfsense_backend/app/connectors/google_drive/content_extractor.py @@ -0,0 +1,136 @@ +"""Content extraction for Google Drive files.""" + +import logging +import os +import tempfile +from pathlib import Path +from typing import Any + +from sqlalchemy.ext.asyncio import AsyncSession + +from app.db import Log +from app.services.task_logging_service import TaskLoggingService + +from .client import GoogleDriveClient +from .file_types import get_export_mime_type, is_google_workspace_file, should_skip_file + +logger = logging.getLogger(__name__) + + +async def download_and_process_file( + client: GoogleDriveClient, + file: dict[str, Any], + search_space_id: int, + user_id: str, + session: AsyncSession, + task_logger: TaskLoggingService, + log_entry: Log, +) -> tuple[Any, str | None, dict[str, Any] | None]: + """ + Download Google Drive file and process using Surfsense file processors. + + Args: + client: GoogleDriveClient instance + file: File metadata from Drive API + search_space_id: ID of the search space + user_id: ID of the user + session: Database session + task_logger: Task logging service + log_entry: Log entry for tracking + + Returns: + Tuple of (Document object if successful, error message if failed, file metadata dict) + """ + file_id = file.get("id") + file_name = file.get("name", "Unknown") + mime_type = file.get("mimeType", "") + + # Skip folders and shortcuts + if should_skip_file(mime_type): + return None, f"Skipping {mime_type}", None + + logger.info(f"Downloading file: {file_name} ({mime_type})") + + temp_file_path = None + try: + # Step 1: Download or export the file + if is_google_workspace_file(mime_type): + # Google Workspace files need export (as PDF to preserve formatting & images) + export_mime = get_export_mime_type(mime_type) + if not export_mime: + return None, f"Cannot export Google Workspace type: {mime_type}" + + logger.info(f"Exporting Google Workspace file as {export_mime}") + content_bytes, error = await client.export_google_file(file_id, export_mime) + if error: + return None, error + + extension = ".pdf" if export_mime == "application/pdf" else ".txt" + else: + content_bytes, error = await client.download_file(file_id) + if error: + return None, error + + # Preserve original file extension + extension = Path(file_name).suffix or ".bin" + + with tempfile.NamedTemporaryFile(delete=False, suffix=extension) as tmp_file: + tmp_file.write(content_bytes) + temp_file_path = tmp_file.name + + from app.db import DocumentType + from app.tasks.document_processors.file_processors import ( + process_file_in_background, + ) + + connector_info = { + "type": DocumentType.GOOGLE_DRIVE_FILE, + "metadata": { + "google_drive_file_id": file_id, + "google_drive_file_name": file_name, + "google_drive_mime_type": mime_type, + "source_connector": "google_drive", + }, + } + + # Add additional Drive metadata if available + if "modifiedTime" in file: + connector_info["metadata"]["modified_time"] = file["modifiedTime"] + if "createdTime" in file: + connector_info["metadata"]["created_time"] = file["createdTime"] + if "size" in file: + connector_info["metadata"]["file_size"] = file["size"] + if "webViewLink" in file: + connector_info["metadata"]["web_view_link"] = file["webViewLink"] + + if is_google_workspace_file(mime_type): + connector_info["metadata"]["exported_as"] = "pdf" + connector_info["metadata"]["original_workspace_type"] = mime_type.split( + "." + )[-1] + + logger.info(f"Processing {file_name} with Surfsense's file processor") + await process_file_in_background( + file_path=temp_file_path, + filename=file_name, + search_space_id=search_space_id, + user_id=user_id, + session=session, + task_logger=task_logger, + log_entry=log_entry, + connector=connector_info, + ) + + return None, None, connector_info["metadata"] + + except Exception as e: + logger.warning(f"Failed to process {file_name}: {e!s}") + return None, str(e), None + + finally: + # Cleanup temp file (if process_file_in_background didn't already delete it) + if temp_file_path and os.path.exists(temp_file_path): + try: + os.unlink(temp_file_path) + except Exception as e: + logger.debug(f"Could not delete temp file {temp_file_path}: {e}") diff --git a/surfsense_backend/app/connectors/google_drive/credentials.py b/surfsense_backend/app/connectors/google_drive/credentials.py new file mode 100644 index 000000000..f88486468 --- /dev/null +++ b/surfsense_backend/app/connectors/google_drive/credentials.py @@ -0,0 +1,95 @@ +"""Google Drive OAuth credential management.""" + +import json +from datetime import datetime + +from google.auth.transport.requests import Request +from google.oauth2.credentials import Credentials +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy.future import select +from sqlalchemy.orm.attributes import flag_modified + +from app.db import SearchSourceConnector + + +async def get_valid_credentials( + session: AsyncSession, + connector_id: int, +) -> Credentials: + """ + Get valid Google OAuth credentials, refreshing if needed. + + Args: + session: Database session + connector_id: Connector ID + + Returns: + Valid Google OAuth credentials + + Raises: + ValueError: If credentials are missing or invalid + Exception: If token refresh fails + """ + result = await session.execute( + select(SearchSourceConnector).filter(SearchSourceConnector.id == connector_id) + ) + connector = result.scalars().first() + + if not connector: + raise ValueError(f"Connector {connector_id} not found") + + config_data = connector.config + exp = config_data.get("expiry", "").replace("Z", "") + + if not all( + [ + config_data.get("client_id"), + config_data.get("client_secret"), + config_data.get("refresh_token"), + ] + ): + raise ValueError( + "Google OAuth credentials (client_id, client_secret, refresh_token) must be set" + ) + + credentials = Credentials( + token=config_data.get("token"), + refresh_token=config_data.get("refresh_token"), + token_uri=config_data.get("token_uri"), + client_id=config_data.get("client_id"), + client_secret=config_data.get("client_secret"), + scopes=config_data.get("scopes", []), + expiry=datetime.fromisoformat(exp) if exp else None, + ) + + if credentials.expired or not credentials.valid: + try: + credentials.refresh(Request()) + + connector.config = json.loads(credentials.to_json()) + flag_modified(connector, "config") + await session.commit() + + except Exception as e: + raise Exception(f"Failed to refresh Google OAuth credentials: {e!s}") from e + + return credentials + + +def validate_credentials(credentials: Credentials) -> bool: + """ + Validate that credentials have required fields. + + Args: + credentials: Google OAuth credentials + + Returns: + True if valid, False otherwise + """ + return all( + [ + credentials.client_id, + credentials.client_secret, + credentials.refresh_token, + ] + ) diff --git a/surfsense_backend/app/connectors/google_drive/file_types.py b/surfsense_backend/app/connectors/google_drive/file_types.py new file mode 100644 index 000000000..a66463208 --- /dev/null +++ b/surfsense_backend/app/connectors/google_drive/file_types.py @@ -0,0 +1,28 @@ +"""File type handlers for Google Drive.""" + +GOOGLE_DOC = "application/vnd.google-apps.document" +GOOGLE_SHEET = "application/vnd.google-apps.spreadsheet" +GOOGLE_SLIDE = "application/vnd.google-apps.presentation" +GOOGLE_FOLDER = "application/vnd.google-apps.folder" +GOOGLE_SHORTCUT = "application/vnd.google-apps.shortcut" + +EXPORT_FORMATS = { + GOOGLE_DOC: "application/pdf", + GOOGLE_SHEET: "application/pdf", + GOOGLE_SLIDE: "application/pdf", +} + + +def is_google_workspace_file(mime_type: str) -> bool: + """Check if file is a Google Workspace file that needs export.""" + return mime_type.startswith("application/vnd.google-apps") + + +def should_skip_file(mime_type: str) -> bool: + """Check if file should be skipped (folders, shortcuts, etc).""" + return mime_type in [GOOGLE_FOLDER, GOOGLE_SHORTCUT] + + +def get_export_mime_type(mime_type: str) -> str | None: + """Get export MIME type for Google Workspace files.""" + return EXPORT_FORMATS.get(mime_type) diff --git a/surfsense_backend/app/connectors/google_drive/folder_manager.py b/surfsense_backend/app/connectors/google_drive/folder_manager.py new file mode 100644 index 000000000..e28505f11 --- /dev/null +++ b/surfsense_backend/app/connectors/google_drive/folder_manager.py @@ -0,0 +1,263 @@ +"""Folder management for Google Drive.""" + +import logging +from typing import Any + +from .client import GoogleDriveClient + +logger = logging.getLogger(__name__) + + +async def list_folders( + client: GoogleDriveClient, + parent_id: str | None = None, +) -> tuple[list[dict[str, Any]], str | None]: + """ + List folders in Google Drive. + + Args: + client: GoogleDriveClient instance + parent_id: Parent folder ID (None for root) + + Returns: + Tuple of (folders list, error message) + """ + try: + # Build query to get only folders + query_parts = [ + "mimeType = 'application/vnd.google-apps.folder'", + "trashed = false", + ] + + if parent_id: + query_parts.append(f"'{parent_id}' in parents") + + query = " and ".join(query_parts) + + folders, _, error = await client.list_files( + query=query, + fields="files(id, name, parents, createdTime, modifiedTime)", + page_size=100, + ) + + if error: + return [], error + + return folders, None + + except Exception as e: + logger.error(f"Error listing folders: {e!s}", exc_info=True) + return [], f"Error listing folders: {e!s}" + + +async def get_folder_hierarchy( + client: GoogleDriveClient, + folder_id: str, +) -> tuple[list[dict[str, str]], str | None]: + """ + Get the full path hierarchy for a folder. + + Args: + client: GoogleDriveClient instance + folder_id: Folder ID to get hierarchy for + + Returns: + Tuple of (hierarchy list [{'id': ..., 'name': ...}], error message) + """ + try: + hierarchy = [] + current_id = folder_id + + # Traverse up to root + while current_id: + file, error = await client.get_file_metadata( + current_id, fields="id, name, parents, mimeType" + ) + + if error: + return [], error + + if not file: + break + + hierarchy.insert(0, {"id": file["id"], "name": file["name"]}) + + # Get parent + parents = file.get("parents", []) + current_id = parents[0] if parents else None + + return hierarchy, None + + except Exception as e: + logger.error(f"Error getting folder hierarchy: {e!s}", exc_info=True) + return [], f"Error getting folder hierarchy: {e!s}" + + +async def get_files_in_folder( + client: GoogleDriveClient, + folder_id: str, + include_subfolders: bool = True, + page_token: str | None = None, +) -> tuple[list[dict[str, Any]], str | None, str | None]: + """ + Get all indexable files in a folder. + + Args: + client: GoogleDriveClient instance + folder_id: Folder ID to search in + include_subfolders: Whether to include subfolders + page_token: Pagination token + + Returns: + Tuple of (files list, next_page_token, error message) + """ + try: + # Build query + query_parts = [ + f"'{folder_id}' in parents", + "trashed = false", + "mimeType != 'application/vnd.google-apps.shortcut'", # Skip shortcuts + ] + + if not include_subfolders: + query_parts.append("mimeType != 'application/vnd.google-apps.folder'") + + query = " and ".join(query_parts) + + files, next_token, error = await client.list_files( + query=query, + page_size=100, + page_token=page_token, + ) + + if error: + return [], None, error + + return files, next_token, None + + except Exception as e: + logger.error(f"Error getting files in folder: {e!s}", exc_info=True) + return [], None, f"Error getting files in folder: {e!s}" + + +async def get_file_by_id( + client: GoogleDriveClient, + file_id: str, +) -> tuple[dict[str, Any] | None, str | None]: + """ + Get file metadata by ID. + + Args: + client: GoogleDriveClient instance + file_id: File ID to fetch + + Returns: + Tuple of (file metadata dict, error message) + """ + try: + file, error = await client.get_file_metadata( + file_id, + fields="id, name, mimeType, parents, createdTime, modifiedTime, size, webViewLink, iconLink", + ) + + if error: + return None, error + + if not file: + return None, f"File not found: {file_id}" + + return file, None + + except Exception as e: + logger.error(f"Error getting file by ID: {e!s}", exc_info=True) + return None, f"Error getting file by ID: {e!s}" + + +def format_folder_path(hierarchy: list[dict[str, str]]) -> str: + """ + Format folder hierarchy as a path string. + + Args: + hierarchy: List of folder dicts with 'id' and 'name' + + Returns: + Formatted path (e.g., "My Drive / Projects / Documents") + """ + if not hierarchy: + return "My Drive" + + folder_names = [folder["name"] for folder in hierarchy] + return " / ".join(folder_names) + + +async def list_folder_contents( + client: GoogleDriveClient, + parent_id: str | None = None, +) -> tuple[list[dict[str, Any]], str | None]: + """ + List folders and files in a Google Drive folder with pagination support. + + Args: + client: GoogleDriveClient instance + parent_id: Parent folder ID (None for root) + + Returns: + Tuple of (items list with folders and files, error message) + """ + try: + # Build query to get folders and files (exclude shortcuts) + query_parts = [ + "trashed = false", + "mimeType != 'application/vnd.google-apps.shortcut'", + ] + + # For root, we need to explicitly query for items in 'root' + # For subfolders, query for items with that parent + if parent_id: + query_parts.append(f"'{parent_id}' in parents") + else: + # Query for root-level items + query_parts.append("'root' in parents") + + query = " and ".join(query_parts) + + # Fetch all items with pagination (max 1000 per page) + all_items = [] + page_token = None + + while True: + items, next_token, error = await client.list_files( + query=query, + fields="files(id, name, mimeType, parents, createdTime, modifiedTime, size, webViewLink, iconLink)", + page_size=1000, # Max allowed by Google Drive API + page_token=page_token, + ) + + if error: + return [], error + + all_items.extend(items) + + if not next_token: + break + + page_token = next_token + + for item in all_items: + item["isFolder"] = item["mimeType"] == "application/vnd.google-apps.folder" + + all_items.sort(key=lambda x: (not x["isFolder"], x["name"].lower())) + + folder_count = sum(1 for item in all_items if item["isFolder"]) + file_count = len(all_items) - folder_count + + logger.info( + f"Listed {len(all_items)} items ({folder_count} folders, {file_count} files) " + + (f"in folder {parent_id}" if parent_id else "in root (My Drive)") + ) + + return all_items, None + + except Exception as e: + logger.error(f"Error listing folder contents: {e!s}", exc_info=True) + return [], f"Error listing folder contents: {e!s}" diff --git a/surfsense_backend/app/db.py b/surfsense_backend/app/db.py index a2a424c26..fbd53bd06 100644 --- a/surfsense_backend/app/db.py +++ b/surfsense_backend/app/db.py @@ -46,10 +46,12 @@ class DocumentType(str, Enum): CLICKUP_CONNECTOR = "CLICKUP_CONNECTOR" GOOGLE_CALENDAR_CONNECTOR = "GOOGLE_CALENDAR_CONNECTOR" GOOGLE_GMAIL_CONNECTOR = "GOOGLE_GMAIL_CONNECTOR" + GOOGLE_DRIVE_FILE = "GOOGLE_DRIVE_FILE" AIRTABLE_CONNECTOR = "AIRTABLE_CONNECTOR" LUMA_CONNECTOR = "LUMA_CONNECTOR" ELASTICSEARCH_CONNECTOR = "ELASTICSEARCH_CONNECTOR" BOOKSTACK_CONNECTOR = "BOOKSTACK_CONNECTOR" + CIRCLEBACK = "CIRCLEBACK" NOTE = "NOTE" @@ -69,11 +71,13 @@ class SearchSourceConnectorType(str, Enum): CLICKUP_CONNECTOR = "CLICKUP_CONNECTOR" GOOGLE_CALENDAR_CONNECTOR = "GOOGLE_CALENDAR_CONNECTOR" GOOGLE_GMAIL_CONNECTOR = "GOOGLE_GMAIL_CONNECTOR" + GOOGLE_DRIVE_CONNECTOR = "GOOGLE_DRIVE_CONNECTOR" AIRTABLE_CONNECTOR = "AIRTABLE_CONNECTOR" LUMA_CONNECTOR = "LUMA_CONNECTOR" ELASTICSEARCH_CONNECTOR = "ELASTICSEARCH_CONNECTOR" WEBCRAWLER_CONNECTOR = "WEBCRAWLER_CONNECTOR" BOOKSTACK_CONNECTOR = "BOOKSTACK_CONNECTOR" + CIRCLEBACK_CONNECTOR = "CIRCLEBACK_CONNECTOR" class LiteLLMProvider(str, Enum): diff --git a/surfsense_backend/app/routes/__init__.py b/surfsense_backend/app/routes/__init__.py index a055bf549..3c18650ae 100644 --- a/surfsense_backend/app/routes/__init__.py +++ b/surfsense_backend/app/routes/__init__.py @@ -3,11 +3,15 @@ from fastapi import APIRouter from .airtable_add_connector_route import ( router as airtable_add_connector_router, ) +from .circleback_webhook_route import router as circleback_webhook_router from .documents_routes import router as documents_router from .editor_routes import router as editor_router from .google_calendar_add_connector_route import ( router as google_calendar_add_connector_router, ) +from .google_drive_add_connector_route import ( + router as google_drive_add_connector_router, +) from .google_gmail_add_connector_route import ( router as google_gmail_add_connector_router, ) @@ -33,7 +37,9 @@ router.include_router(podcasts_router) # Podcast task status and audio router.include_router(search_source_connectors_router) router.include_router(google_calendar_add_connector_router) router.include_router(google_gmail_add_connector_router) +router.include_router(google_drive_add_connector_router) router.include_router(airtable_add_connector_router) router.include_router(luma_add_connector_router) router.include_router(new_llm_config_router) # LLM configs with prompt configuration router.include_router(logs_router) +router.include_router(circleback_webhook_router) # Circleback meeting webhooks diff --git a/surfsense_backend/app/routes/airtable_add_connector_route.py b/surfsense_backend/app/routes/airtable_add_connector_route.py index fa124f1c2..3bcbe4dc0 100644 --- a/surfsense_backend/app/routes/airtable_add_connector_route.py +++ b/surfsense_backend/app/routes/airtable_add_connector_route.py @@ -255,9 +255,10 @@ async def airtable_callback( await session.commit() logger.info(f"Successfully saved Airtable connector for user {user_id}") - # Redirect to the frontend success page + # Redirect to the frontend with success params for indexing config + # Using query params to auto-open the popup with config view on new-chat page return RedirectResponse( - url=f"{config.NEXT_FRONTEND_URL}/dashboard/{space_id}/connectors/add/airtable-connector?success=true" + url=f"{config.NEXT_FRONTEND_URL}/dashboard/{space_id}/new-chat?modal=connectors&tab=all&success=true&connector=airtable-connector" ) except ValidationError as e: diff --git a/surfsense_backend/app/routes/circleback_webhook_route.py b/surfsense_backend/app/routes/circleback_webhook_route.py new file mode 100644 index 000000000..1285aadeb --- /dev/null +++ b/surfsense_backend/app/routes/circleback_webhook_route.py @@ -0,0 +1,317 @@ +""" +Circleback Webhook Route + +This module provides a webhook endpoint for receiving meeting data from Circleback. +It processes the incoming webhook payload and saves it as a document in the specified search space. +""" + +import logging +from datetime import datetime +from typing import Any + +from fastapi import APIRouter, HTTPException +from pydantic import BaseModel, Field + +logger = logging.getLogger(__name__) + +router = APIRouter() + + +# Pydantic models for Circleback webhook payload +class CirclebackAttendee(BaseModel): + """Attendee model for Circleback meeting.""" + + name: str | None = None + email: str | None = None + + +class CirclebackActionItemAssignee(BaseModel): + """Assignee model for action items.""" + + name: str | None = None + email: str | None = None + + +class CirclebackActionItem(BaseModel): + """Action item model for Circleback meeting.""" + + id: int + title: str + description: str = "" + assignee: CirclebackActionItemAssignee | None = None + status: str = "PENDING" + + +class CirclebackTranscriptSegment(BaseModel): + """Transcript segment model for Circleback meeting.""" + + speaker: str + text: str + timestamp: float + + +class CirclebackInsightItem(BaseModel): + """Individual insight item.""" + + insight: str | dict[str, Any] + speaker: str | None = None + timestamp: float | None = None + + +class CirclebackWebhookPayload(BaseModel): + """ + Circleback webhook payload model. + + This model represents the data sent by Circleback when a meeting is processed. + """ + + model_config = {"populate_by_name": True} + + id: int = Field(..., description="Circleback meeting ID") + name: str = Field(..., description="Meeting name") + created_at: str = Field( + ..., alias="createdAt", description="Meeting creation date in ISO format" + ) + duration: float = Field(..., description="Meeting duration in seconds") + url: str | None = Field(None, description="URL of the virtual meeting") + recording_url: str | None = Field( + None, + alias="recordingUrl", + description="URL of the meeting recording (valid for 24 hours)", + ) + tags: list[str] = Field(default_factory=list, description="Meeting tags") + ical_uid: str | None = Field( + None, alias="icalUid", description="Unique identifier of the calendar event" + ) + attendees: list[CirclebackAttendee] = Field( + default_factory=list, description="Meeting attendees" + ) + notes: str = Field("", description="Meeting notes in Markdown format") + action_items: list[CirclebackActionItem] = Field( + default_factory=list, + alias="actionItems", + description="Action items from the meeting", + ) + transcript: list[CirclebackTranscriptSegment] = Field( + default_factory=list, description="Meeting transcript segments" + ) + insights: dict[str, list[CirclebackInsightItem]] = Field( + default_factory=dict, description="Custom insights from the meeting" + ) + + +def format_circleback_meeting_to_markdown(payload: CirclebackWebhookPayload) -> str: + """ + Convert Circleback webhook payload to a well-formatted Markdown document. + + Args: + payload: The Circleback webhook payload + + Returns: + Markdown string representation of the meeting + """ + lines = [] + + # Title + lines.append(f"# {payload.name}") + lines.append("") + + # Meeting metadata + lines.append("## Meeting Details") + lines.append("") + + # Parse and format date + try: + created_dt = datetime.fromisoformat(payload.created_at.replace("Z", "+00:00")) + formatted_date = created_dt.strftime("%Y-%m-%d %H:%M:%S UTC") + except (ValueError, AttributeError): + formatted_date = payload.created_at + + lines.append(f"- **Date:** {formatted_date}") + lines.append(f"- **Duration:** {int(payload.duration // 60)} minutes") + + if payload.url: + lines.append(f"- **Meeting URL:** {payload.url}") + + if payload.tags: + lines.append(f"- **Tags:** {', '.join(payload.tags)}") + + lines.append( + f"- **Circleback Link:** [View on Circleback](https://app.circleback.ai/meetings/{payload.id})" + ) + lines.append("") + + # Attendees + if payload.attendees: + lines.append("## Attendees") + lines.append("") + for attendee in payload.attendees: + name = attendee.name or "Unknown" + if attendee.email: + lines.append(f"- **{name}** ({attendee.email})") + else: + lines.append(f"- **{name}**") + lines.append("") + + # Notes (if provided) + if payload.notes: + lines.append("## Meeting Notes") + lines.append("") + lines.append(payload.notes) + lines.append("") + + # Action Items + if payload.action_items: + lines.append("## Action Items") + lines.append("") + for item in payload.action_items: + status_emoji = "✅" if item.status == "DONE" else "⬜" + assignee_text = "" + if item.assignee and item.assignee.name: + assignee_text = f" (Assigned to: {item.assignee.name})" + + lines.append(f"{status_emoji} **{item.title}**{assignee_text}") + if item.description: + lines.append(f" {item.description}") + lines.append("") + + # Insights + if payload.insights: + lines.append("## Insights") + lines.append("") + for insight_name, insight_items in payload.insights.items(): + lines.append(f"### {insight_name}") + lines.append("") + for insight_item in insight_items: + if isinstance(insight_item.insight, dict): + for key, value in insight_item.insight.items(): + lines.append(f"- **{key}:** {value}") + else: + speaker_info = ( + f" _{insight_item.speaker}_" if insight_item.speaker else "" + ) + lines.append(f"- {insight_item.insight}{speaker_info}") + lines.append("") + + # Transcript + if payload.transcript: + lines.append("## Transcript") + lines.append("") + for segment in payload.transcript: + # Format timestamp as MM:SS + minutes = int(segment.timestamp // 60) + seconds = int(segment.timestamp % 60) + timestamp_str = f"[{minutes:02d}:{seconds:02d}]" + lines.append(f"**{segment.speaker}** {timestamp_str}: {segment.text}") + lines.append("") + + return "\n".join(lines) + + +@router.post("/webhooks/circleback/{search_space_id}") +async def receive_circleback_webhook( + search_space_id: int, + payload: CirclebackWebhookPayload, +): + """ + Receive and process a Circleback webhook. + + This endpoint receives meeting data from Circleback and saves it as a document + in the specified search space. The meeting data is converted to Markdown format + and processed asynchronously. + + Args: + search_space_id: The ID of the search space to save the document to + payload: The Circleback webhook payload containing meeting data + + Returns: + Success message with document details + + Note: + This endpoint does not require authentication as it's designed to receive + webhooks from Circleback. Signature verification can be added later for security. + """ + try: + logger.info( + f"Received Circleback webhook for meeting {payload.id} in search space {search_space_id}" + ) + + # Convert to markdown + markdown_content = format_circleback_meeting_to_markdown(payload) + + # Trigger async document processing + from app.tasks.celery_tasks.document_tasks import ( + process_circleback_meeting_task, + ) + + # Prepare meeting metadata for the task + meeting_metadata = { + "circleback_meeting_id": payload.id, + "meeting_name": payload.name, + "meeting_date": payload.created_at, + "duration_seconds": payload.duration, + "meeting_url": payload.url, + "tags": payload.tags, + "attendees_count": len(payload.attendees), + "action_items_count": len(payload.action_items), + "has_transcript": len(payload.transcript) > 0, + } + + # Queue the processing task + process_circleback_meeting_task.delay( + meeting_id=payload.id, + meeting_name=payload.name, + markdown_content=markdown_content, + metadata=meeting_metadata, + search_space_id=search_space_id, + ) + + logger.info( + f"Queued Circleback meeting {payload.id} for processing in search space {search_space_id}" + ) + + return { + "status": "accepted", + "message": f"Meeting '{payload.name}' queued for processing", + "meeting_id": payload.id, + "search_space_id": search_space_id, + } + + except Exception as e: + logger.error(f"Error processing Circleback webhook: {e!s}", exc_info=True) + raise HTTPException( + status_code=500, + detail=f"Failed to process Circleback webhook: {e!s}", + ) from e + + +@router.get("/webhooks/circleback/{search_space_id}/info") +async def get_circleback_webhook_info( + search_space_id: int, +): + """ + Get information about the Circleback webhook endpoint. + + This endpoint provides information about how to configure the Circleback + webhook integration. + + Args: + search_space_id: The ID of the search space + + Returns: + Webhook configuration information + """ + from app.config import config + + # Construct the webhook URL + base_url = getattr(config, "API_BASE_URL", "http://localhost:8000") + webhook_url = f"{base_url}/api/v1/webhooks/circleback/{search_space_id}" + + return { + "webhook_url": webhook_url, + "search_space_id": search_space_id, + "method": "POST", + "content_type": "application/json", + "description": "Use this URL in your Circleback automation to send meeting data to SurfSense", + "note": "Configure this URL in Circleback Settings → Automations → Create automation → Send webhook request", + } diff --git a/surfsense_backend/app/routes/google_calendar_add_connector_route.py b/surfsense_backend/app/routes/google_calendar_add_connector_route.py index fa4ef5466..8bb685450 100644 --- a/surfsense_backend/app/routes/google_calendar_add_connector_route.py +++ b/surfsense_backend/app/routes/google_calendar_add_connector_route.py @@ -131,8 +131,10 @@ async def calendar_callback( session.add(db_connector) await session.commit() await session.refresh(db_connector) + # Redirect to the frontend with success params for indexing config + # Using query params to auto-open the popup with config view on new-chat page return RedirectResponse( - f"{config.NEXT_FRONTEND_URL}/dashboard/{space_id}/connectors/add/google-calendar-connector?success=true" + f"{config.NEXT_FRONTEND_URL}/dashboard/{space_id}/new-chat?modal=connectors&tab=all&success=true&connector=google-calendar-connector" ) except ValidationError as e: await session.rollback() diff --git a/surfsense_backend/app/routes/google_drive_add_connector_route.py b/surfsense_backend/app/routes/google_drive_add_connector_route.py new file mode 100644 index 000000000..52461319b --- /dev/null +++ b/surfsense_backend/app/routes/google_drive_add_connector_route.py @@ -0,0 +1,318 @@ +""" +Google Drive Connector OAuth Routes. + +Handles OAuth 2.0 authentication flow for Google Drive connector. +Folder selection happens at index time on the manage connector page. + +Endpoints: +- GET /auth/google/drive/connector/add - Initiate OAuth +- GET /auth/google/drive/connector/callback - Handle OAuth callback +- GET /connectors/{connector_id}/google-drive/folders - List user's folders (for index-time selection) +""" + +import base64 +import json +import logging +import os +from uuid import UUID + +from fastapi import APIRouter, Depends, HTTPException, Request +from fastapi.responses import RedirectResponse +from google_auth_oauthlib.flow import Flow +from pydantic import ValidationError +from sqlalchemy.exc import IntegrityError +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy.future import select + +from app.config import config +from app.connectors.google_drive import ( + GoogleDriveClient, + get_start_page_token, + list_folder_contents, +) +from app.db import ( + SearchSourceConnector, + SearchSourceConnectorType, + User, + get_async_session, +) +from app.users import current_active_user + +# Relax token scope validation for Google OAuth +os.environ["OAUTHLIB_RELAX_TOKEN_SCOPE"] = "1" + +logger = logging.getLogger(__name__) +router = APIRouter() + +# Google Drive OAuth scopes +SCOPES = [ + "https://www.googleapis.com/auth/drive.readonly", # Read-only access to Drive + "https://www.googleapis.com/auth/userinfo.email", # User email + "https://www.googleapis.com/auth/userinfo.profile", # User profile + "openid", +] + + +def get_google_flow(): + """Create and return a Google OAuth flow for Drive API.""" + try: + return Flow.from_client_config( + { + "web": { + "client_id": config.GOOGLE_OAUTH_CLIENT_ID, + "client_secret": config.GOOGLE_OAUTH_CLIENT_SECRET, + "auth_uri": "https://accounts.google.com/o/oauth2/auth", + "token_uri": "https://oauth2.googleapis.com/token", + "redirect_uris": [config.GOOGLE_DRIVE_REDIRECT_URI], + } + }, + scopes=SCOPES, + redirect_uri=config.GOOGLE_DRIVE_REDIRECT_URI, + ) + except Exception as e: + raise HTTPException( + status_code=500, detail=f"Failed to create Google OAuth flow: {e!s}" + ) from e + + +@router.get("/auth/google/drive/connector/add") +async def connect_drive(space_id: int, user: User = Depends(current_active_user)): + """ + Initiate Google Drive OAuth flow. + + Query params: + space_id: Search space ID to add connector to + + Returns: + JSON with auth_url to redirect user to Google authorization + """ + try: + if not space_id: + raise HTTPException(status_code=400, detail="space_id is required") + + flow = get_google_flow() + + # Encode space_id and user_id in state parameter + state_payload = json.dumps( + { + "space_id": space_id, + "user_id": str(user.id), + } + ) + state_encoded = base64.urlsafe_b64encode(state_payload.encode()).decode() + + # Generate authorization URL + auth_url, _ = flow.authorization_url( + access_type="offline", # Get refresh token + prompt="consent", # Force consent screen to get refresh token + include_granted_scopes="true", + state=state_encoded, + ) + + logger.info( + f"Initiating Google Drive OAuth for user {user.id}, space {space_id}" + ) + return {"auth_url": auth_url} + + except Exception as e: + logger.error(f"Failed to initiate Google Drive OAuth: {e!s}", exc_info=True) + raise HTTPException( + status_code=500, detail=f"Failed to initiate Google OAuth: {e!s}" + ) from e + + +@router.get("/auth/google/drive/connector/callback") +async def drive_callback( + request: Request, + code: str, + state: str, + session: AsyncSession = Depends(get_async_session), +): + """ + Handle Google Drive OAuth callback. + + Query params: + code: Authorization code from Google + state: Encoded state with space_id and user_id + + Returns: + Redirect to frontend success page + """ + try: + # Decode and parse state + decoded_state = base64.urlsafe_b64decode(state.encode()).decode() + data = json.loads(decoded_state) + + user_id = UUID(data["user_id"]) + space_id = data["space_id"] + + logger.info( + f"Processing Google Drive callback for user {user_id}, space {space_id}" + ) + + # Exchange authorization code for tokens + flow = get_google_flow() + flow.fetch_token(code=code) + + creds = flow.credentials + creds_dict = json.loads(creds.to_json()) + + # Check if connector already exists for this space/user + result = await session.execute( + select(SearchSourceConnector).filter( + SearchSourceConnector.search_space_id == space_id, + SearchSourceConnector.user_id == user_id, + SearchSourceConnector.connector_type + == SearchSourceConnectorType.GOOGLE_DRIVE_CONNECTOR, + ) + ) + existing_connector = result.scalars().first() + + if existing_connector: + raise HTTPException( + status_code=409, + detail="A GOOGLE_DRIVE_CONNECTOR already exists in this search space. Each search space can have only one connector of each type per user.", + ) + + # Create new connector (NO folder selection here - happens at index time) + db_connector = SearchSourceConnector( + name="Google Drive Connector", + connector_type=SearchSourceConnectorType.GOOGLE_DRIVE_CONNECTOR, + config={ + **creds_dict, + "start_page_token": None, # Will be set on first index + }, + search_space_id=space_id, + user_id=user_id, + is_indexable=True, + ) + + session.add(db_connector) + await session.commit() + await session.refresh(db_connector) + + # Get initial start page token for delta sync + try: + drive_client = GoogleDriveClient(session, db_connector.id) + start_token, token_error = await get_start_page_token(drive_client) + + if start_token and not token_error: + db_connector.config["start_page_token"] = start_token + from sqlalchemy.orm.attributes import flag_modified + + flag_modified(db_connector, "config") + await session.commit() + logger.info( + f"Set initial start page token for connector {db_connector.id}" + ) + except Exception as e: + logger.warning(f"Failed to get initial start page token: {e!s}") + + logger.info( + f"Successfully created Google Drive connector {db_connector.id} for user {user_id}" + ) + + return RedirectResponse( + url=f"{config.NEXT_FRONTEND_URL}/dashboard/{space_id}/new-chat?modal=connectors&tab=all&success=true&connector=google-drive-connector" + ) + + except HTTPException: + await session.rollback() + raise + except ValidationError as e: + await session.rollback() + logger.error(f"Validation error: {e!s}", exc_info=True) + raise HTTPException( + status_code=400, detail=f"Invalid connector configuration: {e!s}" + ) from e + except IntegrityError as e: + await session.rollback() + logger.error(f"Database integrity error: {e!s}", exc_info=True) + raise HTTPException( + status_code=409, + detail="A connector with this configuration already exists.", + ) from e + except Exception as e: + await session.rollback() + logger.error(f"Unexpected error in Drive callback: {e!s}", exc_info=True) + raise HTTPException( + status_code=500, detail=f"Failed to complete Google OAuth: {e!s}" + ) from e + + +@router.get("/connectors/{connector_id}/google-drive/folders") +async def list_google_drive_folders( + connector_id: int, + parent_id: str | None = None, + session: AsyncSession = Depends(get_async_session), + user: User = Depends(current_active_user), +): + """ + List folders AND files in user's Google Drive with hierarchical support. + + This is called at index time from the manage connector page to display + the complete file system (folders and files). Only folders are selectable. + + Args: + connector_id: ID of the Google Drive connector + parent_id: Optional parent folder ID to list contents (None for root) + + Returns: + JSON with list of items: { + "items": [ + {"id": str, "name": str, "mimeType": str, "isFolder": bool, ...}, + ... + ] + } + """ + try: + # Get connector and verify ownership + result = await session.execute( + select(SearchSourceConnector).filter( + SearchSourceConnector.id == connector_id, + SearchSourceConnector.user_id == user.id, + SearchSourceConnector.connector_type + == SearchSourceConnectorType.GOOGLE_DRIVE_CONNECTOR, + ) + ) + connector = result.scalars().first() + + if not connector: + raise HTTPException( + status_code=404, + detail="Google Drive connector not found or access denied", + ) + + # Initialize Drive client (credentials will be loaded on first API call) + drive_client = GoogleDriveClient(session, connector_id) + + # List both folders and files (sorted: folders first) + items, error = await list_folder_contents(drive_client, parent_id=parent_id) + + if error: + raise HTTPException( + status_code=500, detail=f"Failed to list folder contents: {error}" + ) + + # Count folders and files for better logging + folder_count = sum(1 for item in items if item.get("isFolder", False)) + file_count = len(items) - folder_count + + logger.info( + f"✅ Listed {len(items)} total items ({folder_count} folders, {file_count} files) for connector {connector_id}" + + (f" in folder {parent_id}" if parent_id else " in ROOT") + ) + + # Log first few items for debugging + if items: + logger.info(f"First 3 items: {[item.get('name') for item in items[:3]]}") + + return {"items": items} + + except HTTPException: + raise + except Exception as e: + logger.error(f"Error listing Drive contents: {e!s}", exc_info=True) + raise HTTPException( + status_code=500, detail=f"Failed to list Drive contents: {e!s}" + ) from e diff --git a/surfsense_backend/app/routes/google_gmail_add_connector_route.py b/surfsense_backend/app/routes/google_gmail_add_connector_route.py index 6d37da244..21fcf2c38 100644 --- a/surfsense_backend/app/routes/google_gmail_add_connector_route.py +++ b/surfsense_backend/app/routes/google_gmail_add_connector_route.py @@ -135,9 +135,10 @@ async def gmail_callback( f"Successfully created Gmail connector for user {user_id} with ID {db_connector.id}" ) - # Redirect to the frontend success page + # Redirect to the frontend with success params for indexing config + # Using query params to auto-open the popup with config view on new-chat page return RedirectResponse( - url=f"{config.NEXT_FRONTEND_URL}/dashboard/{space_id}/connectors/add/google-gmail-connector?success=true" + url=f"{config.NEXT_FRONTEND_URL}/dashboard/{space_id}/new-chat?modal=connectors&tab=all&success=true&connector=google-gmail-connector" ) except IntegrityError as e: diff --git a/surfsense_backend/app/routes/logs_routes.py b/surfsense_backend/app/routes/logs_routes.py index e7e00280e..b82e02077 100644 --- a/surfsense_backend/app/routes/logs_routes.py +++ b/surfsense_backend/app/routes/logs_routes.py @@ -322,6 +322,9 @@ async def get_logs_summary( document_id = ( log.log_metadata.get("document_id") if log.log_metadata else None ) + connector_id = ( + log.log_metadata.get("connector_id") if log.log_metadata else None + ) summary["active_tasks"].append( { "id": log.id, @@ -330,6 +333,7 @@ async def get_logs_summary( "started_at": log.created_at, "source": log.source, "document_id": document_id, + "connector_id": connector_id, } ) diff --git a/surfsense_backend/app/routes/search_source_connectors_routes.py b/surfsense_backend/app/routes/search_source_connectors_routes.py index 5a7db7f37..d6fdedd7c 100644 --- a/surfsense_backend/app/routes/search_source_connectors_routes.py +++ b/surfsense_backend/app/routes/search_source_connectors_routes.py @@ -14,7 +14,7 @@ import logging from datetime import UTC, datetime, timedelta from typing import Any -from fastapi import APIRouter, Depends, HTTPException, Query +from fastapi import APIRouter, Body, Depends, HTTPException, Query from pydantic import BaseModel, Field, ValidationError from sqlalchemy.exc import IntegrityError from sqlalchemy.ext.asyncio import AsyncSession @@ -30,6 +30,7 @@ from app.db import ( get_async_session, ) from app.schemas import ( + GoogleDriveIndexRequest, SearchSourceConnectorBase, SearchSourceConnectorCreate, SearchSourceConnectorRead, @@ -542,6 +543,10 @@ async def index_connector_content( None, description="End date for indexing (YYYY-MM-DD format). If not provided, uses today's date", ), + drive_items: GoogleDriveIndexRequest | None = Body( + None, + description="[Google Drive only] Structured request with folders and files to index", + ), session: AsyncSession = Depends(get_async_session), user: User = Depends(current_active_user), ): @@ -747,6 +752,33 @@ async def index_connector_content( ) response_message = "Google Gmail indexing started in the background." + elif ( + connector.connector_type == SearchSourceConnectorType.GOOGLE_DRIVE_CONNECTOR + ): + from app.tasks.celery_tasks.connector_tasks import ( + index_google_drive_files_task, + ) + + if not drive_items or not drive_items.has_items(): + raise HTTPException( + status_code=400, + detail="Google Drive indexing requires drive_items body parameter with folders or files", + ) + + logger.info( + f"Triggering Google Drive indexing for connector {connector_id} into search space {search_space_id}, " + f"folders: {len(drive_items.folders)}, files: {len(drive_items.files)}" + ) + + # Pass structured data to Celery task + index_google_drive_files_task.delay( + connector_id, + search_space_id, + str(user.id), + drive_items.model_dump(), # Convert to dict for JSON serialization + ) + response_message = "Google Drive indexing started in the background." + elif connector.connector_type == SearchSourceConnectorType.DISCORD_CONNECTOR: from app.tasks.celery_tasks.connector_tasks import ( index_discord_messages_task, @@ -1515,6 +1547,90 @@ async def run_google_gmail_indexing( # Optionally update status in DB to indicate failure +async def run_google_drive_indexing( + session: AsyncSession, + connector_id: int, + search_space_id: int, + user_id: str, + items_dict: dict, # Dictionary with 'folders' and 'files' lists +): + """Runs the Google Drive indexing task for folders and files and updates the timestamp.""" + try: + from app.tasks.connector_indexers.google_drive_indexer import ( + index_google_drive_files, + index_google_drive_single_file, + ) + + # Parse the structured data + items = GoogleDriveIndexRequest(**items_dict) + total_indexed = 0 + errors = [] + + # Index each folder + for folder in items.folders: + try: + indexed_count, error_message = await index_google_drive_files( + session, + connector_id, + search_space_id, + user_id, + folder_id=folder.id, + folder_name=folder.name, + use_delta_sync=True, + update_last_indexed=False, + ) + if error_message: + errors.append(f"Folder '{folder.name}': {error_message}") + else: + total_indexed += indexed_count + except Exception as e: + errors.append(f"Folder '{folder.name}': {e!s}") + logger.error( + f"Error indexing folder {folder.name} ({folder.id}): {e}", + exc_info=True, + ) + + # Index each individual file + for file in items.files: + try: + indexed_count, error_message = await index_google_drive_single_file( + session, + connector_id, + search_space_id, + user_id, + file_id=file.id, + file_name=file.name, + ) + if error_message: + errors.append(f"File '{file.name}': {error_message}") + else: + total_indexed += indexed_count + except Exception as e: + errors.append(f"File '{file.name}': {e!s}") + logger.error( + f"Error indexing file {file.name} ({file.id}): {e}", + exc_info=True, + ) + + if errors: + logger.error( + f"Google Drive indexing completed with errors for connector {connector_id}: {'; '.join(errors)}" + ) + else: + logger.info( + f"Google Drive indexing successful for connector {connector_id}. Indexed {total_indexed} documents from {len(items.folders)} folder(s) and {len(items.files)} file(s)." + ) + # Update the last indexed timestamp only on full success + await update_connector_last_indexed(session, connector_id) + await session.commit() # Commit timestamp update + except Exception as e: + logger.error( + f"Critical error in run_google_drive_indexing for connector {connector_id}: {e}", + exc_info=True, + ) + # Optionally update status in DB to indicate failure + + # Add new helper functions for luma indexing async def run_luma_indexing_with_new_session( connector_id: int, diff --git a/surfsense_backend/app/schemas/__init__.py b/surfsense_backend/app/schemas/__init__.py index f5ae65e9d..a8bde7ed9 100644 --- a/surfsense_backend/app/schemas/__init__.py +++ b/surfsense_backend/app/schemas/__init__.py @@ -10,6 +10,7 @@ from .documents import ( ExtensionDocumentMetadata, PaginatedResponse, ) +from .google_drive import DriveItem, GoogleDriveIndexRequest from .logs import LogBase, LogCreate, LogFilter, LogRead, LogUpdate from .new_chat import ( ChatMessage, @@ -83,9 +84,12 @@ __all__ = [ "DocumentUpdate", "DocumentWithChunksRead", "DocumentsCreate", + # Google Drive schemas + "DriveItem", "ExtensionDocumentContent", "ExtensionDocumentMetadata", "GlobalNewLLMConfigRead", + "GoogleDriveIndexRequest", # Base schemas "IDModel", # RBAC schemas diff --git a/surfsense_backend/app/schemas/google_drive.py b/surfsense_backend/app/schemas/google_drive.py new file mode 100644 index 000000000..3f57b92ca --- /dev/null +++ b/surfsense_backend/app/schemas/google_drive.py @@ -0,0 +1,41 @@ +"""Schemas for Google Drive connector.""" + +from pydantic import BaseModel, Field + + +class DriveItem(BaseModel): + """Represents a Google Drive file or folder.""" + + id: str = Field(..., description="Google Drive item ID") + name: str = Field(..., description="Item display name") + + +class GoogleDriveIndexRequest(BaseModel): + """Request body for indexing Google Drive content.""" + + folders: list[DriveItem] = Field( + default_factory=list, description="List of folders to index" + ) + files: list[DriveItem] = Field( + default_factory=list, description="List of specific files to index" + ) + + def has_items(self) -> bool: + """Check if any items are selected.""" + return len(self.folders) > 0 or len(self.files) > 0 + + def get_folder_ids(self) -> list[str]: + """Get list of folder IDs.""" + return [folder.id for folder in self.folders] + + def get_folder_names(self) -> list[str]: + """Get list of folder names.""" + return [folder.name for folder in self.folders] + + def get_file_ids(self) -> list[str]: + """Get list of file IDs.""" + return [file.id for file in self.files] + + def get_file_names(self) -> list[str]: + """Get list of file names.""" + return [file.name for file in self.files] diff --git a/surfsense_backend/app/services/connector_service.py b/surfsense_backend/app/services/connector_service.py index 3a6dcc605..4e874729c 100644 --- a/surfsense_backend/app/services/connector_service.py +++ b/surfsense_backend/app/services/connector_service.py @@ -1808,6 +1808,106 @@ class ConnectorService: return result_object, gmail_docs + async def search_google_drive( + self, + user_query: str, + search_space_id: int, + top_k: int = 20, + start_date: datetime | None = None, + end_date: datetime | None = None, + ) -> tuple: + """ + Search for Google Drive files and return both the source information and langchain documents. + + Uses combined chunk-level and document-level hybrid search with RRF fusion. + + Args: + user_query: The user's query + search_space_id: The search space ID to search in + top_k: Maximum number of results to return + start_date: Optional start date for filtering documents by updated_at + end_date: Optional end date for filtering documents by updated_at + + Returns: + tuple: (sources_info, langchain_documents) + """ + drive_docs = await self._combined_rrf_search( + query_text=user_query, + search_space_id=search_space_id, + document_type="GOOGLE_DRIVE_FILE", + top_k=top_k, + start_date=start_date, + end_date=end_date, + ) + + # Early return if no results + if not drive_docs: + return { + "id": 33, + "name": "Google Drive Files", + "type": "GOOGLE_DRIVE_FILE", + "sources": [], + }, [] + + def _title_fn(doc_info: dict[str, Any], metadata: dict[str, Any]) -> str: + return ( + doc_info.get("title") + or metadata.get("google_drive_file_name") + or metadata.get("FILE_NAME") + or "Untitled File" + ) + + def _url_fn(_doc_info: dict[str, Any], metadata: dict[str, Any]) -> str: + file_id = metadata.get("google_drive_file_id", "") + return f"https://drive.google.com/file/d/{file_id}/view" if file_id else "" + + def _description_fn( + chunk: dict[str, Any], _doc_info: dict[str, Any], metadata: dict[str, Any] + ) -> str: + description = self._chunk_preview(chunk.get("content", "")) + info_parts = [] + mime_type = metadata.get("google_drive_mime_type", "") + modified_time = metadata.get("modified_time", "") + if mime_type: + # Simplify mime type for display + if "google-apps" in mime_type: + file_type = mime_type.split(".")[-1].title() + else: + file_type = mime_type.split("/")[-1].upper() + info_parts.append(f"Type: {file_type}") + if modified_time: + info_parts.append(f"Modified: {modified_time}") + if info_parts: + description = (description + " | " + " | ".join(info_parts)).strip(" |") + return description + + def _extra_fields_fn( + _chunk: dict[str, Any], _doc_info: dict[str, Any], metadata: dict[str, Any] + ) -> dict[str, Any]: + return { + "google_drive_file_id": metadata.get("google_drive_file_id", ""), + "google_drive_mime_type": metadata.get("google_drive_mime_type", ""), + "modified_time": metadata.get("modified_time", ""), + } + + sources_list = self._build_chunk_sources_from_documents( + drive_docs, + title_fn=_title_fn, + url_fn=_url_fn, + description_fn=_description_fn, + extra_fields_fn=_extra_fields_fn, + ) + + # Create result object + result_object = { + "id": 33, # Assign a unique ID for the Google Drive connector + "name": "Google Drive Files", + "type": "GOOGLE_DRIVE_FILE", + "sources": sources_list, + } + + return result_object, drive_docs + async def search_confluence( self, user_query: str, @@ -2506,3 +2606,103 @@ class ConnectorService: } return result_object, bookstack_docs + + async def search_circleback( + self, + user_query: str, + search_space_id: int, + top_k: int = 20, + start_date: datetime | None = None, + end_date: datetime | None = None, + ) -> tuple: + """ + Search for Circleback meeting notes and return both the source information and langchain documents. + + Uses combined chunk-level and document-level hybrid search with RRF fusion. + + Args: + user_query: The user's query + search_space_id: The search space ID to search in + top_k: Maximum number of results to return + start_date: Optional start date for filtering documents by updated_at + end_date: Optional end date for filtering documents by updated_at + + Returns: + tuple: (sources_info, langchain_documents) + """ + circleback_docs = await self._combined_rrf_search( + query_text=user_query, + search_space_id=search_space_id, + document_type="CIRCLEBACK", + top_k=top_k, + start_date=start_date, + end_date=end_date, + ) + + # Early return if no results + if not circleback_docs: + return { + "id": 52, + "name": "Circleback Meetings", + "type": "CIRCLEBACK", + "sources": [], + }, [] + + def _title_fn(doc_info: dict[str, Any], metadata: dict[str, Any]) -> str: + meeting_name = metadata.get("meeting_name", "") + meeting_date = metadata.get("meeting_date", "") + title = doc_info.get("title") or meeting_name or "Circleback Meeting" + if meeting_date: + title += f" ({meeting_date})" + return title + + def _url_fn(_doc_info: dict[str, Any], metadata: dict[str, Any]) -> str: + meeting_id = metadata.get("circleback_meeting_id", "") + return ( + f"https://app.circleback.ai/meetings/{meeting_id}" if meeting_id else "" + ) + + def _description_fn( + chunk: dict[str, Any], _doc_info: dict[str, Any], metadata: dict[str, Any] + ) -> str: + description = self._chunk_preview(chunk.get("content", ""), limit=200) + info_parts = [] + duration = metadata.get("duration_seconds") + attendee_count = metadata.get("attendee_count") + if duration: + minutes = int(duration) // 60 + info_parts.append(f"Duration: {minutes} min") + if attendee_count: + info_parts.append(f"Attendees: {attendee_count}") + if info_parts: + description = (description + " | " + " | ".join(info_parts)).strip(" |") + return description + + def _extra_fields_fn( + _chunk: dict[str, Any], _doc_info: dict[str, Any], metadata: dict[str, Any] + ) -> dict[str, Any]: + return { + "circleback_meeting_id": metadata.get("circleback_meeting_id", ""), + "meeting_name": metadata.get("meeting_name", ""), + "meeting_date": metadata.get("meeting_date", ""), + "duration_seconds": metadata.get("duration_seconds", 0), + "attendee_count": metadata.get("attendee_count", 0), + } + + sources_list = self._build_chunk_sources_from_documents( + circleback_docs, + title_fn=_title_fn, + url_fn=_url_fn, + description_fn=_description_fn, + extra_fields_fn=_extra_fields_fn, + ) + + # Create result object + result_object = { + "id": 52, + "name": "Circleback Meetings", + "type": "CIRCLEBACK", + "sources": sources_list, + } + + return result_object, circleback_docs diff --git a/surfsense_backend/app/tasks/celery_tasks/connector_tasks.py b/surfsense_backend/app/tasks/celery_tasks/connector_tasks.py index 6cd557dc4..3cae1bbdb 100644 --- a/surfsense_backend/app/tasks/celery_tasks/connector_tasks.py +++ b/surfsense_backend/app/tasks/celery_tasks/connector_tasks.py @@ -473,6 +473,54 @@ async def _index_google_gmail_messages( ) +@celery_app.task(name="index_google_drive_files", bind=True) +def index_google_drive_files_task( + self, + connector_id: int, + search_space_id: int, + user_id: str, + items_dict: dict, # Dictionary with 'folders' and 'files' lists +): + """Celery task to index Google Drive folders and files.""" + import asyncio + + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + try: + loop.run_until_complete( + _index_google_drive_files( + connector_id, + search_space_id, + user_id, + items_dict, + ) + ) + finally: + loop.close() + + +async def _index_google_drive_files( + connector_id: int, + search_space_id: int, + user_id: str, + items_dict: dict, # Dictionary with 'folders' and 'files' lists +): + """Index Google Drive folders and files with new session.""" + from app.routes.search_source_connectors_routes import ( + run_google_drive_indexing, + ) + + async with get_celery_session_maker()() as session: + await run_google_drive_indexing( + session, + connector_id, + search_space_id, + user_id, + items_dict, + ) + + @celery_app.task(name="index_discord_messages", bind=True) def index_discord_messages_task( self, diff --git a/surfsense_backend/app/tasks/celery_tasks/document_tasks.py b/surfsense_backend/app/tasks/celery_tasks/document_tasks.py index 5b7f9ce13..bb53fd042 100644 --- a/surfsense_backend/app/tasks/celery_tasks/document_tasks.py +++ b/surfsense_backend/app/tasks/celery_tasks/document_tasks.py @@ -268,3 +268,105 @@ async def _process_file_upload( ) logger.error(error_message) raise + + +@celery_app.task(name="process_circleback_meeting", bind=True) +def process_circleback_meeting_task( + self, + meeting_id: int, + meeting_name: str, + markdown_content: str, + metadata: dict, + search_space_id: int, +): + """ + Celery task to process Circleback meeting webhook data. + + Args: + meeting_id: Circleback meeting ID + meeting_name: Name of the meeting + markdown_content: Meeting content formatted as markdown + metadata: Meeting metadata dictionary + search_space_id: ID of the search space + """ + import asyncio + + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + try: + loop.run_until_complete( + _process_circleback_meeting( + meeting_id, + meeting_name, + markdown_content, + metadata, + search_space_id, + ) + ) + finally: + loop.close() + + +async def _process_circleback_meeting( + meeting_id: int, + meeting_name: str, + markdown_content: str, + metadata: dict, + search_space_id: int, +): + """Process Circleback meeting with new session.""" + from app.tasks.document_processors.circleback_processor import ( + add_circleback_meeting_document, + ) + + async with get_celery_session_maker()() as session: + task_logger = TaskLoggingService(session, search_space_id) + + log_entry = await task_logger.log_task_start( + task_name="process_circleback_meeting", + source="circleback_webhook", + message=f"Starting Circleback meeting processing: {meeting_name}", + metadata={ + "document_type": "CIRCLEBACK", + "meeting_id": meeting_id, + "meeting_name": meeting_name, + **metadata, + }, + ) + + try: + result = await add_circleback_meeting_document( + session=session, + meeting_id=meeting_id, + meeting_name=meeting_name, + markdown_content=markdown_content, + metadata=metadata, + search_space_id=search_space_id, + ) + + if result: + await task_logger.log_task_success( + log_entry, + f"Successfully processed Circleback meeting: {meeting_name}", + { + "document_id": result.id, + "meeting_id": meeting_id, + "content_hash": result.content_hash, + }, + ) + else: + await task_logger.log_task_success( + log_entry, + f"Circleback meeting document already exists (duplicate): {meeting_name}", + {"duplicate_detected": True, "meeting_id": meeting_id}, + ) + except Exception as e: + await task_logger.log_task_failure( + log_entry, + f"Failed to process Circleback meeting: {meeting_name}", + str(e), + {"error_type": type(e).__name__, "meeting_id": meeting_id}, + ) + logger.error(f"Error processing Circleback meeting: {e!s}") + raise diff --git a/surfsense_backend/app/tasks/connector_indexers/__init__.py b/surfsense_backend/app/tasks/connector_indexers/__init__.py index dcfca33c3..95e57ddf2 100644 --- a/surfsense_backend/app/tasks/connector_indexers/__init__.py +++ b/surfsense_backend/app/tasks/connector_indexers/__init__.py @@ -34,6 +34,7 @@ from .discord_indexer import index_discord_messages from .elasticsearch_indexer import index_elasticsearch_documents from .github_indexer import index_github_repos from .google_calendar_indexer import index_google_calendar_events +from .google_drive_indexer import index_google_drive_files from .google_gmail_indexer import index_google_gmail_messages from .jira_indexer import index_jira_issues @@ -57,6 +58,7 @@ __all__ = [ # noqa: RUF022 "index_github_repos", # Calendar and scheduling "index_google_calendar_events", + "index_google_drive_files", "index_luma_events", "index_jira_issues", # Issue tracking and project management diff --git a/surfsense_backend/app/tasks/connector_indexers/google_drive_indexer.py b/surfsense_backend/app/tasks/connector_indexers/google_drive_indexer.py new file mode 100644 index 000000000..343d44072 --- /dev/null +++ b/surfsense_backend/app/tasks/connector_indexers/google_drive_indexer.py @@ -0,0 +1,537 @@ +"""Google Drive indexer using Surfsense file processors.""" + +import logging + +from sqlalchemy.exc import SQLAlchemyError +from sqlalchemy.ext.asyncio import AsyncSession + +from app.connectors.google_drive import ( + GoogleDriveClient, + categorize_change, + download_and_process_file, + fetch_all_changes, + get_file_by_id, + get_files_in_folder, + get_start_page_token, +) +from app.db import DocumentType, SearchSourceConnectorType +from app.services.task_logging_service import TaskLoggingService +from app.tasks.connector_indexers.base import ( + check_document_by_unique_identifier, + get_connector_by_id, + update_connector_last_indexed, +) +from app.utils.document_converters import generate_unique_identifier_hash + +logger = logging.getLogger(__name__) + + +async def index_google_drive_files( + session: AsyncSession, + connector_id: int, + search_space_id: int, + user_id: str, + folder_id: str | None = None, + folder_name: str | None = None, + use_delta_sync: bool = True, + update_last_indexed: bool = True, + max_files: int = 500, +) -> tuple[int, str | None]: + """ + Index Google Drive files for a specific connector. + + Args: + session: Database session + connector_id: ID of the Drive connector + search_space_id: ID of the search space + user_id: ID of the user + folder_id: Specific folder to index (from UI/request, takes precedence) + folder_name: Folder name for display (from UI/request) + use_delta_sync: Whether to use change tracking for incremental sync + update_last_indexed: Whether to update last_indexed_at timestamp + max_files: Maximum number of files to index + + Returns: + Tuple of (number_of_indexed_files, error_message) + """ + task_logger = TaskLoggingService(session, search_space_id) + + log_entry = await task_logger.log_task_start( + task_name="google_drive_files_indexing", + source="connector_indexing_task", + message=f"Starting Google Drive indexing for connector {connector_id}", + metadata={ + "connector_id": connector_id, + "user_id": str(user_id), + "folder_id": folder_id, + "use_delta_sync": use_delta_sync, + "max_files": max_files, + }, + ) + + try: + connector = await get_connector_by_id( + session, connector_id, SearchSourceConnectorType.GOOGLE_DRIVE_CONNECTOR + ) + + if not connector: + error_msg = f"Google Drive connector with ID {connector_id} not found" + await task_logger.log_task_failure( + log_entry, error_msg, {"error_type": "ConnectorNotFound"} + ) + return 0, error_msg + + await task_logger.log_task_progress( + log_entry, + f"Initializing Google Drive client for connector {connector_id}", + {"stage": "client_initialization"}, + ) + + drive_client = GoogleDriveClient(session, connector_id) + + if not folder_id: + error_msg = "folder_id is required for Google Drive indexing" + await task_logger.log_task_failure( + log_entry, error_msg, {"error_type": "MissingParameter"} + ) + return 0, error_msg + + target_folder_id = folder_id + target_folder_name = folder_name or "Selected Folder" + + logger.info( + f"Indexing Google Drive folder: {target_folder_name} ({target_folder_id})" + ) + + folder_tokens = connector.config.get("folder_tokens", {}) + start_page_token = folder_tokens.get(target_folder_id) + can_use_delta_sync = ( + use_delta_sync and start_page_token and connector.last_indexed_at + ) + + if can_use_delta_sync: + logger.info(f"Using delta sync for connector {connector_id}") + result = await _index_with_delta_sync( + drive_client=drive_client, + session=session, + connector=connector, + connector_id=connector_id, + search_space_id=search_space_id, + user_id=user_id, + folder_id=target_folder_id, + start_page_token=start_page_token, + task_logger=task_logger, + log_entry=log_entry, + max_files=max_files, + ) + else: + logger.info(f"Using full scan for connector {connector_id}") + result = await _index_full_scan( + drive_client=drive_client, + session=session, + connector=connector, + connector_id=connector_id, + search_space_id=search_space_id, + user_id=user_id, + folder_id=target_folder_id, + folder_name=target_folder_name, + task_logger=task_logger, + log_entry=log_entry, + max_files=max_files, + ) + + documents_indexed, documents_skipped = result + + if documents_indexed > 0 or can_use_delta_sync: + new_token, token_error = await get_start_page_token(drive_client) + if new_token and not token_error: + from sqlalchemy.orm.attributes import flag_modified + + if "folder_tokens" not in connector.config: + connector.config["folder_tokens"] = {} + connector.config["folder_tokens"][target_folder_id] = new_token + flag_modified(connector, "config") + + await update_connector_last_indexed(session, connector, update_last_indexed) + + await session.commit() + logger.info("Successfully committed Google Drive indexing changes to database") + + await task_logger.log_task_success( + log_entry, + f"Successfully completed Google Drive indexing for connector {connector_id}", + { + "files_processed": documents_indexed, + "files_skipped": documents_skipped, + "sync_type": "delta" if can_use_delta_sync else "full", + "folder": target_folder_name, + }, + ) + + logger.info( + f"Google Drive indexing completed: {documents_indexed} files indexed, {documents_skipped} skipped" + ) + return documents_indexed, None + + except SQLAlchemyError as db_error: + await session.rollback() + await task_logger.log_task_failure( + log_entry, + f"Database error during Google Drive indexing for connector {connector_id}", + str(db_error), + {"error_type": "SQLAlchemyError"}, + ) + logger.error(f"Database error: {db_error!s}", exc_info=True) + return 0, f"Database error: {db_error!s}" + except Exception as e: + await session.rollback() + await task_logger.log_task_failure( + log_entry, + f"Failed to index Google Drive files for connector {connector_id}", + str(e), + {"error_type": type(e).__name__}, + ) + logger.error(f"Failed to index Google Drive files: {e!s}", exc_info=True) + return 0, f"Failed to index Google Drive files: {e!s}" + + +async def index_google_drive_single_file( + session: AsyncSession, + connector_id: int, + search_space_id: int, + user_id: str, + file_id: str, + file_name: str | None = None, +) -> tuple[int, str | None]: + """ + Index a single Google Drive file by its ID. + + Args: + session: Database session + connector_id: ID of the Drive connector + search_space_id: ID of the search space + user_id: ID of the user + file_id: Specific file ID to index + file_name: File name for display (optional) + + Returns: + Tuple of (number_of_indexed_files, error_message) + """ + task_logger = TaskLoggingService(session, search_space_id) + + log_entry = await task_logger.log_task_start( + task_name="google_drive_single_file_indexing", + source="connector_indexing_task", + message=f"Starting Google Drive single file indexing for file {file_id}", + metadata={ + "connector_id": connector_id, + "user_id": str(user_id), + "file_id": file_id, + "file_name": file_name, + }, + ) + + try: + connector = await get_connector_by_id( + session, connector_id, SearchSourceConnectorType.GOOGLE_DRIVE_CONNECTOR + ) + + if not connector: + error_msg = f"Google Drive connector with ID {connector_id} not found" + await task_logger.log_task_failure( + log_entry, error_msg, {"error_type": "ConnectorNotFound"} + ) + return 0, error_msg + + await task_logger.log_task_progress( + log_entry, + f"Initializing Google Drive client for connector {connector_id}", + {"stage": "client_initialization"}, + ) + + drive_client = GoogleDriveClient(session, connector_id) + + # Fetch the file metadata + file, error = await get_file_by_id(drive_client, file_id) + + if error or not file: + error_msg = f"Failed to fetch file {file_id}: {error or 'File not found'}" + await task_logger.log_task_failure( + log_entry, error_msg, {"error_type": "FileNotFound"} + ) + return 0, error_msg + + display_name = file_name or file.get("name", "Unknown") + logger.info(f"Indexing Google Drive file: {display_name} ({file_id})") + + # Process the file + indexed, skipped = await _process_single_file( + drive_client=drive_client, + session=session, + file=file, + connector_id=connector_id, + search_space_id=search_space_id, + user_id=user_id, + task_logger=task_logger, + log_entry=log_entry, + ) + + await session.commit() + logger.info( + "Successfully committed Google Drive file indexing changes to database" + ) + + if indexed > 0: + await task_logger.log_task_success( + log_entry, + f"Successfully indexed file {display_name}", + { + "file_name": display_name, + "file_id": file_id, + }, + ) + logger.info(f"Google Drive file indexing completed: {display_name}") + return 1, None + else: + await task_logger.log_task_progress( + log_entry, + f"File {display_name} was skipped", + {"status": "skipped"}, + ) + return 0, None + + except SQLAlchemyError as db_error: + await session.rollback() + await task_logger.log_task_failure( + log_entry, + "Database error during file indexing", + str(db_error), + {"error_type": "SQLAlchemyError"}, + ) + logger.error(f"Database error: {db_error!s}", exc_info=True) + return 0, f"Database error: {db_error!s}" + except Exception as e: + await session.rollback() + await task_logger.log_task_failure( + log_entry, + "Failed to index Google Drive file", + str(e), + {"error_type": type(e).__name__}, + ) + logger.error(f"Failed to index Google Drive file: {e!s}", exc_info=True) + return 0, f"Failed to index Google Drive file: {e!s}" + + +async def _index_full_scan( + drive_client: GoogleDriveClient, + session: AsyncSession, + connector: any, + connector_id: int, + search_space_id: int, + user_id: str, + folder_id: str | None, + folder_name: str, + task_logger: TaskLoggingService, + log_entry: any, + max_files: int, +) -> tuple[int, int]: + """Perform full scan indexing of a folder.""" + await task_logger.log_task_progress( + log_entry, + f"Starting full scan of folder: {folder_name}", + {"stage": "full_scan", "folder_id": folder_id}, + ) + + documents_indexed = 0 + documents_skipped = 0 + page_token = None + files_processed = 0 + + while files_processed < max_files: + files, next_token, error = await get_files_in_folder( + drive_client, folder_id, include_subfolders=False, page_token=page_token + ) + + if error: + logger.error(f"Error listing files: {error}") + break + + if not files: + break + + for file in files: + if files_processed >= max_files: + break + + files_processed += 1 + + indexed, skipped = await _process_single_file( + drive_client=drive_client, + session=session, + file=file, + connector_id=connector_id, + search_space_id=search_space_id, + user_id=user_id, + task_logger=task_logger, + log_entry=log_entry, + ) + + documents_indexed += indexed + documents_skipped += skipped + + if documents_indexed % 10 == 0 and documents_indexed > 0: + await session.commit() + logger.info( + f"Committed batch: {documents_indexed} files indexed so far" + ) + + page_token = next_token + if not page_token: + break + + logger.info( + f"Full scan complete: {documents_indexed} indexed, {documents_skipped} skipped" + ) + return documents_indexed, documents_skipped + + +async def _index_with_delta_sync( + drive_client: GoogleDriveClient, + session: AsyncSession, + connector: any, + connector_id: int, + search_space_id: int, + user_id: str, + folder_id: str | None, + start_page_token: str, + task_logger: TaskLoggingService, + log_entry: any, + max_files: int, +) -> tuple[int, int]: + """Perform delta sync indexing using change tracking.""" + await task_logger.log_task_progress( + log_entry, + f"Starting delta sync from token: {start_page_token[:20]}...", + {"stage": "delta_sync", "start_token": start_page_token}, + ) + + changes, final_token, error = await fetch_all_changes( + drive_client, start_page_token, folder_id + ) + + if error: + logger.error(f"Error fetching changes: {error}") + return 0, 0 + + if not changes: + logger.info("No changes detected since last sync") + return 0, 0 + + logger.info(f"Processing {len(changes)} changes") + + documents_indexed = 0 + documents_skipped = 0 + files_processed = 0 + + for change in changes: + if files_processed >= max_files: + break + + files_processed += 1 + change_type = categorize_change(change) + + if change_type in ["removed", "trashed"]: + file_id = change.get("fileId") + if file_id: + await _remove_document(session, file_id, search_space_id) + continue + + file = change.get("file") + if not file: + continue + + indexed, skipped = await _process_single_file( + drive_client=drive_client, + session=session, + file=file, + connector_id=connector_id, + search_space_id=search_space_id, + user_id=user_id, + task_logger=task_logger, + log_entry=log_entry, + ) + + documents_indexed += indexed + documents_skipped += skipped + + if documents_indexed % 10 == 0 and documents_indexed > 0: + await session.commit() + logger.info(f"Committed batch: {documents_indexed} changes processed") + + logger.info( + f"Delta sync complete: {documents_indexed} indexed, {documents_skipped} skipped" + ) + return documents_indexed, documents_skipped + + +async def _process_single_file( + drive_client: GoogleDriveClient, + session: AsyncSession, + file: dict, + connector_id: int, + search_space_id: int, + user_id: str, + task_logger: TaskLoggingService, + log_entry: any, +) -> tuple[int, int]: + """ + Process a single file by downloading and using Surfsense's file processor. + + Returns: + Tuple of (indexed_count, skipped_count) + """ + file_name = file.get("name", "Unknown") + mime_type = file.get("mimeType", "") + + try: + logger.info(f"Processing file: {file_name} ({mime_type})") + + _, error, _ = await download_and_process_file( + client=drive_client, + file=file, + search_space_id=search_space_id, + user_id=user_id, + session=session, + task_logger=task_logger, + log_entry=log_entry, + ) + + if error: + await task_logger.log_task_progress( + log_entry, + f"Skipped {file_name}: {error}", + {"status": "skipped", "reason": error}, + ) + return 0, 1 + + logger.info(f"Successfully indexed Google Drive file: {file_name}") + return 1, 0 + + except Exception as e: + logger.error(f"Error processing file {file_name}: {e!s}", exc_info=True) + return 0, 1 + + +async def _remove_document(session: AsyncSession, file_id: str, search_space_id: int): + """Remove a document that was deleted in Drive.""" + unique_identifier_hash = generate_unique_identifier_hash( + DocumentType.GOOGLE_DRIVE_FILE, file_id, search_space_id + ) + + existing_document = await check_document_by_unique_identifier( + session, unique_identifier_hash + ) + + if existing_document: + await session.delete(existing_document) + logger.info(f"Removed deleted file document: {file_id}") diff --git a/surfsense_backend/app/tasks/document_processors/circleback_processor.py b/surfsense_backend/app/tasks/document_processors/circleback_processor.py new file mode 100644 index 000000000..0a1d91784 --- /dev/null +++ b/surfsense_backend/app/tasks/document_processors/circleback_processor.py @@ -0,0 +1,183 @@ +""" +Circleback meeting document processor. + +This module processes meeting data received from Circleback webhooks +and stores it as searchable documents in the database. +""" + +import logging +from typing import Any + +from sqlalchemy.exc import SQLAlchemyError +from sqlalchemy.ext.asyncio import AsyncSession + +from app.db import Document, DocumentType +from app.services.llm_service import get_document_summary_llm +from app.utils.document_converters import ( + create_document_chunks, + generate_content_hash, + generate_document_summary, + generate_unique_identifier_hash, +) + +from .base import ( + check_document_by_unique_identifier, + get_current_timestamp, +) + +logger = logging.getLogger(__name__) + + +async def add_circleback_meeting_document( + session: AsyncSession, + meeting_id: int, + meeting_name: str, + markdown_content: str, + metadata: dict[str, Any], + search_space_id: int, +) -> Document | None: + """ + Process and store a Circleback meeting document. + + Args: + session: Database session + meeting_id: Circleback meeting ID + meeting_name: Name of the meeting + markdown_content: Meeting content formatted as markdown + metadata: Meeting metadata dictionary + search_space_id: ID of the search space + + Returns: + Document object if successful, None if failed or duplicate + """ + try: + # Generate unique identifier hash using Circleback meeting ID + unique_identifier = f"circleback_{meeting_id}" + unique_identifier_hash = generate_unique_identifier_hash( + DocumentType.CIRCLEBACK, unique_identifier, search_space_id + ) + + # Generate content hash + content_hash = generate_content_hash(markdown_content, search_space_id) + + # Check if document with this unique identifier already exists + existing_document = await check_document_by_unique_identifier( + session, unique_identifier_hash + ) + + if existing_document: + # Document exists - check if content has changed + if existing_document.content_hash == content_hash: + logger.info(f"Circleback meeting {meeting_id} unchanged. Skipping.") + return existing_document + else: + # Content has changed - update the existing document + logger.info( + f"Content changed for Circleback meeting {meeting_id}. Updating document." + ) + + # Get LLM for generating summary + llm = await get_document_summary_llm(session, search_space_id) + if not llm: + logger.warning( + f"No LLM configured for search space {search_space_id}. Using content as summary." + ) + # Use first 1000 chars as summary if no LLM available + summary_content = ( + markdown_content[:1000] + "..." + if len(markdown_content) > 1000 + else markdown_content + ) + summary_embedding = None + else: + # Generate summary with metadata + document_metadata = { + "meeting_name": meeting_name, + "meeting_id": meeting_id, + "document_type": "Circleback Meeting", + **{ + k: v + for k, v in metadata.items() + if isinstance(v, str | int | float | bool) + }, + } + summary_content, summary_embedding = await generate_document_summary( + markdown_content, llm, document_metadata + ) + + # Process chunks + chunks = await create_document_chunks(markdown_content) + + # Convert to BlockNote JSON for editing capability + from app.utils.blocknote_converter import convert_markdown_to_blocknote + + blocknote_json = await convert_markdown_to_blocknote(markdown_content) + if not blocknote_json: + logger.warning( + f"Failed to convert Circleback meeting {meeting_id} to BlockNote JSON, document will not be editable" + ) + + # Prepare document metadata + document_metadata = { + "CIRCLEBACK_MEETING_ID": meeting_id, + "MEETING_NAME": meeting_name, + "SOURCE": "CIRCLEBACK_WEBHOOK", + **metadata, + } + + # Update or create document + if existing_document: + # Update existing document + existing_document.title = meeting_name + existing_document.content = summary_content + existing_document.content_hash = content_hash + if summary_embedding is not None: + existing_document.embedding = summary_embedding + existing_document.document_metadata = document_metadata + existing_document.chunks = chunks + existing_document.blocknote_document = blocknote_json + existing_document.content_needs_reindexing = False + existing_document.updated_at = get_current_timestamp() + + await session.commit() + await session.refresh(existing_document) + document = existing_document + logger.info( + f"Updated Circleback meeting document {meeting_id} in search space {search_space_id}" + ) + else: + # Create new document + document = Document( + search_space_id=search_space_id, + title=meeting_name, + document_type=DocumentType.CIRCLEBACK, + document_metadata=document_metadata, + content=summary_content, + embedding=summary_embedding, + chunks=chunks, + content_hash=content_hash, + unique_identifier_hash=unique_identifier_hash, + blocknote_document=blocknote_json, + content_needs_reindexing=False, + updated_at=get_current_timestamp(), + ) + + session.add(document) + await session.commit() + await session.refresh(document) + logger.info( + f"Created new Circleback meeting document {meeting_id} in search space {search_space_id}" + ) + + return document + + except SQLAlchemyError as db_error: + await session.rollback() + logger.error( + f"Database error processing Circleback meeting {meeting_id}: {db_error}" + ) + raise db_error + except Exception as e: + await session.rollback() + logger.error(f"Failed to process Circleback meeting {meeting_id}: {e!s}") + raise RuntimeError(f"Failed to process Circleback meeting: {e!s}") from e diff --git a/surfsense_backend/app/tasks/document_processors/file_processors.py b/surfsense_backend/app/tasks/document_processors/file_processors.py index a32e75a32..596cd9830 100644 --- a/surfsense_backend/app/tasks/document_processors/file_processors.py +++ b/surfsense_backend/app/tasks/document_processors/file_processors.py @@ -447,6 +447,24 @@ async def add_received_file_document_using_docling( ) from e +async def _update_document_from_connector( + document: Document | None, connector: dict | None, session: AsyncSession +) -> None: + """Helper to update document type and metadata from connector info.""" + if document and connector: + if "type" in connector: + document.document_type = connector["type"] + if "metadata" in connector: + # Merge with existing document_metadata (the actual column name) + if not document.document_metadata: + document.document_metadata = connector["metadata"] + else: + # Expand existing metadata with connector metadata + merged = {**document.document_metadata, **connector["metadata"]} + document.document_metadata = merged + await session.commit() + + async def process_file_in_background( file_path: str, filename: str, @@ -455,6 +473,8 @@ async def process_file_in_background( session: AsyncSession, task_logger: TaskLoggingService, log_entry: Log, + connector: dict + | None = None, # Optional: {"type": "GOOGLE_DRIVE_FILE", "metadata": {...}} ): try: # Check if the file is a markdown or text file @@ -492,6 +512,9 @@ async def process_file_in_background( session, filename, markdown_content, search_space_id, user_id ) + if connector: + await _update_document_from_connector(result, connector, session) + if result: await task_logger.log_task_success( log_entry, @@ -608,6 +631,9 @@ async def process_file_in_background( session, filename, transcribed_text, search_space_id, user_id ) + if connector: + await _update_document_from_connector(result, connector, session) + if result: await task_logger.log_task_success( log_entry, @@ -753,6 +779,9 @@ async def process_file_in_background( session, filename, docs, search_space_id, user_id ) + if connector: + await _update_document_from_connector(result, connector, session) + if result: # Update page usage after successful processing # allow_exceed=True because document was already created after passing initial check @@ -897,6 +926,11 @@ async def process_file_in_background( user_id, final_page_count, allow_exceed=True ) + if connector: + await _update_document_from_connector( + last_created_doc, connector, session + ) + await task_logger.log_task_success( log_entry, f"Successfully processed file with LlamaCloud: {filename}", @@ -1021,6 +1055,11 @@ async def process_file_in_background( user_id, final_page_count, allow_exceed=True ) + if connector: + await _update_document_from_connector( + doc_result, connector, session + ) + await task_logger.log_task_success( log_entry, f"Successfully processed file with Docling: {filename}", diff --git a/surfsense_backend/pyproject.toml b/surfsense_backend/pyproject.toml index 2cbdd85f1..ba1d69939 100644 --- a/surfsense_backend/pyproject.toml +++ b/surfsense_backend/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "surf-new-backend" -version = "0.0.9" +version = "0.0.10" description = "SurfSense Backend" readme = "README.md" requires-python = ">=3.12" diff --git a/surfsense_backend/uv.lock b/surfsense_backend/uv.lock index d4af0123d..a6ef20cca 100644 --- a/surfsense_backend/uv.lock +++ b/surfsense_backend/uv.lock @@ -6409,7 +6409,7 @@ wheels = [ [[package]] name = "surf-new-backend" -version = "0.0.9" +version = "0.0.10" source = { virtual = "." } dependencies = [ { name = "alembic" }, diff --git a/surfsense_browser_extension/package.json b/surfsense_browser_extension/package.json index 636fea94b..d7edcc95b 100644 --- a/surfsense_browser_extension/package.json +++ b/surfsense_browser_extension/package.json @@ -1,7 +1,7 @@ { "name": "surfsense_browser_extension", "displayName": "Surfsense Browser Extension", - "version": "0.0.9", + "version": "0.0.10", "description": "Extension to collect Browsing History for SurfSense.", "author": "https://github.com/MODSetter", "engines": { diff --git a/surfsense_web/app/dashboard/[search_space_id]/client-layout.tsx b/surfsense_web/app/dashboard/[search_space_id]/client-layout.tsx index dd524a198..6e61ff7ac 100644 --- a/surfsense_web/app/dashboard/[search_space_id]/client-layout.tsx +++ b/surfsense_web/app/dashboard/[search_space_id]/client-layout.tsx @@ -14,6 +14,7 @@ import { llmPreferencesAtom, } from "@/atoms/new-llm-config/new-llm-config-query.atoms"; import { activeSearchSpaceIdAtom } from "@/atoms/search-spaces/search-space-query.atoms"; +import { DocumentUploadDialogProvider } from "@/components/assistant-ui/document-upload-popup"; import { DashboardBreadcrumb } from "@/components/dashboard-breadcrumb"; import { LanguageSwitcher } from "@/components/LanguageSwitcher"; import { AppSidebarProvider } from "@/components/sidebar/AppSidebarProvider"; @@ -240,36 +241,34 @@ export function DashboardClientLayout({ } return ( - - {/* Use AppSidebarProvider which fetches user, search space, and recent chats */} - - -
-
-
-
- -
- - + + + {/* Use AppSidebarProvider which fetches user, search space, and recent chats */} + + +
+
+
+
+ +
+ + +
+
+
+
-
- -
-
-
-
{children}
-
-
-
+ +
{children}
+ + + + ); } diff --git a/surfsense_web/app/dashboard/[search_space_id]/connectors/(manage)/page.tsx b/surfsense_web/app/dashboard/[search_space_id]/connectors/(manage)/page.tsx deleted file mode 100644 index d10a2338c..000000000 --- a/surfsense_web/app/dashboard/[search_space_id]/connectors/(manage)/page.tsx +++ /dev/null @@ -1,715 +0,0 @@ -"use client"; - -import { format } from "date-fns"; -import { useAtomValue } from "jotai"; -import { - Calendar as CalendarIcon, - Clock, - Edit, - Loader2, - Plus, - RefreshCw, - Trash2, -} from "lucide-react"; -import { motion } from "motion/react"; -import { useParams, useRouter } from "next/navigation"; -import { useTranslations } from "next-intl"; -import { useEffect, useState } from "react"; -import { toast } from "sonner"; -import { - deleteConnectorMutationAtom, - indexConnectorMutationAtom, - updateConnectorMutationAtom, -} from "@/atoms/connectors/connector-mutation.atoms"; -import { connectorsAtom } from "@/atoms/connectors/connector-query.atoms"; -import { - AlertDialog, - AlertDialogAction, - AlertDialogCancel, - AlertDialogContent, - AlertDialogDescription, - AlertDialogFooter, - AlertDialogHeader, - AlertDialogTitle, - AlertDialogTrigger, -} from "@/components/ui/alert-dialog"; -import { Button } from "@/components/ui/button"; -import { Calendar } from "@/components/ui/calendar"; -import { Card, CardContent, CardDescription, CardHeader, CardTitle } from "@/components/ui/card"; -import { - Dialog, - DialogContent, - DialogDescription, - DialogFooter, - DialogHeader, - DialogTitle, -} from "@/components/ui/dialog"; -import { Input } from "@/components/ui/input"; -import { Label } from "@/components/ui/label"; -import { Popover, PopoverContent, PopoverTrigger } from "@/components/ui/popover"; -import { - Select, - SelectContent, - SelectItem, - SelectTrigger, - SelectValue, -} from "@/components/ui/select"; -import { Switch } from "@/components/ui/switch"; -import { - Table, - TableBody, - TableCell, - TableHead, - TableHeader, - TableRow, -} from "@/components/ui/table"; -import { Tooltip, TooltipContent, TooltipProvider, TooltipTrigger } from "@/components/ui/tooltip"; -import { EnumConnectorName } from "@/contracts/enums/connector"; -import { getConnectorIcon } from "@/contracts/enums/connectorIcons"; -import { cn } from "@/lib/utils"; - -export default function ConnectorsPage() { - const t = useTranslations("connectors"); - const tCommon = useTranslations("common"); - - // Helper function to format date with time - const formatDateTime = (dateString: string | null): string => { - if (!dateString) return t("never"); - - const date = new Date(dateString); - return new Intl.DateTimeFormat("en-US", { - year: "numeric", - month: "short", - day: "numeric", - hour: "2-digit", - minute: "2-digit", - }).format(date); - }; - const router = useRouter(); - const params = useParams(); - const searchSpaceId = params.search_space_id as string; - const today = new Date(); - - const { data: connectors = [], isLoading, error } = useAtomValue(connectorsAtom); - - const { mutateAsync: deleteConnector } = useAtomValue(deleteConnectorMutationAtom); - const { mutateAsync: indexConnector } = useAtomValue(indexConnectorMutationAtom); - const { mutateAsync: updateConnector } = useAtomValue(updateConnectorMutationAtom); - - const [connectorToDelete, setConnectorToDelete] = useState(null); - const [indexingConnectorId, setIndexingConnectorId] = useState(null); - const [datePickerOpen, setDatePickerOpen] = useState(false); - const [selectedConnectorForIndexing, setSelectedConnectorForIndexing] = useState( - null - ); - const [startDate, setStartDate] = useState(undefined); - const [endDate, setEndDate] = useState(undefined); - - // Periodic indexing state - const [periodicDialogOpen, setPeriodicDialogOpen] = useState(false); - const [selectedConnectorForPeriodic, setSelectedConnectorForPeriodic] = useState( - null - ); - const [periodicEnabled, setPeriodicEnabled] = useState(false); - const [frequencyMinutes, setFrequencyMinutes] = useState("1440"); - const [customFrequency, setCustomFrequency] = useState(""); - const [isSavingPeriodic, setIsSavingPeriodic] = useState(false); - - useEffect(() => { - if (error) { - toast.error(t("failed_load")); - console.error("Error fetching connectors:", error); - } - }, [error, t]); - - // Handle connector deletion - const handleDeleteConnector = async () => { - if (connectorToDelete === null) return; - - try { - await deleteConnector({ id: connectorToDelete }); - } catch (error) { - console.error("Error deleting connector:", error); - } finally { - setConnectorToDelete(null); - } - }; - - // Handle opening date picker for indexing - const handleOpenDatePicker = (connectorId: number) => { - setSelectedConnectorForIndexing(connectorId); - setDatePickerOpen(true); - }; - - // Handle connector indexing with dates - const handleIndexConnector = async () => { - if (selectedConnectorForIndexing === null) return; - - setDatePickerOpen(false); - - try { - setIndexingConnectorId(selectedConnectorForIndexing); - const startDateStr = startDate ? format(startDate, "yyyy-MM-dd") : undefined; - const endDateStr = endDate ? format(endDate, "yyyy-MM-dd") : undefined; - - await indexConnector({ - connector_id: selectedConnectorForIndexing, - queryParams: { - search_space_id: searchSpaceId, - start_date: startDateStr, - end_date: endDateStr, - }, - }); - toast.success(t("indexing_started")); - } catch (error) { - console.error("Error indexing connector content:", error); - toast.error(error instanceof Error ? error.message : t("indexing_failed")); - } finally { - setIndexingConnectorId(null); - setSelectedConnectorForIndexing(null); - setStartDate(undefined); - setEndDate(undefined); - } - }; - - // Handle indexing without date picker (for quick indexing) - const handleQuickIndexConnector = async (connectorId: number) => { - setIndexingConnectorId(connectorId); - try { - await indexConnector({ - connector_id: connectorId, - queryParams: { - search_space_id: searchSpaceId, - }, - }); - toast.success(t("indexing_started")); - } catch (error) { - console.error("Error indexing connector content:", error); - toast.error(error instanceof Error ? error.message : t("indexing_failed")); - } finally { - setIndexingConnectorId(null); - } - }; - - // Handle opening periodic indexing dialog - const handleOpenPeriodicDialog = (connectorId: number) => { - const connector = connectors.find((c) => c.id === connectorId); - if (!connector) return; - - setSelectedConnectorForPeriodic(connectorId); - setPeriodicEnabled(connector.periodic_indexing_enabled); - - if (connector.indexing_frequency_minutes) { - // Check if it's a preset value - const presetValues = ["15", "60", "360", "720", "1440", "10080"]; - if (presetValues.includes(connector.indexing_frequency_minutes.toString())) { - setFrequencyMinutes(connector.indexing_frequency_minutes.toString()); - setCustomFrequency(""); - } else { - setFrequencyMinutes("custom"); - setCustomFrequency(connector.indexing_frequency_minutes.toString()); - } - } else { - setFrequencyMinutes("1440"); - setCustomFrequency(""); - } - - setPeriodicDialogOpen(true); - }; - - // Handle saving periodic indexing configuration - const handleSavePeriodicIndexing = async () => { - if (selectedConnectorForPeriodic === null) return; - - const connector = connectors.find((c) => c.id === selectedConnectorForPeriodic); - if (!connector) return; - - setIsSavingPeriodic(true); - try { - // Determine the frequency value - let frequency: number | null = null; - if (periodicEnabled) { - if (frequencyMinutes === "custom") { - frequency = parseInt(customFrequency, 10); - if (isNaN(frequency) || frequency <= 0) { - toast.error("Please enter a valid frequency in minutes"); - setIsSavingPeriodic(false); - return; - } - } else { - frequency = parseInt(frequencyMinutes, 10); - } - } - - await updateConnector({ - id: selectedConnectorForPeriodic, - data: { - periodic_indexing_enabled: periodicEnabled, - indexing_frequency_minutes: frequency, - }, - }); - - toast.success( - periodicEnabled - ? "Periodic indexing enabled successfully" - : "Periodic indexing disabled successfully" - ); - setPeriodicDialogOpen(false); - } catch (error) { - console.error("Error updating periodic indexing:", error); - toast.error(error instanceof Error ? error.message : "Failed to update periodic indexing"); - } finally { - setIsSavingPeriodic(false); - setSelectedConnectorForPeriodic(null); - } - }; - - // Format frequency for display - const formatFrequency = (minutes: number): string => { - if (minutes < 60) return `${minutes}m`; - if (minutes < 1440) return `${Math.floor(minutes / 60)}h`; - if (minutes < 10080) return `${Math.floor(minutes / 1440)}d`; - return `${Math.floor(minutes / 10080)}w`; - }; - - return ( -
- -
-

{t("title")}

-

{t("subtitle")}

-
- -
- - - - {t("your_connectors")} - {t("view_manage")} - - - {isLoading ? ( -
-
-
-
-
-
- ) : connectors.length === 0 ? ( -
-

{t("no_connectors")}

-

{t("no_connectors_desc")}

- -
- ) : ( -
- - - - {t("name")} - {t("type")} - {t("last_indexed")} - {t("periodic")} - {t("actions")} - - - - {connectors.map((connector) => ( - - {connector.name} - {getConnectorIcon(connector.connector_type)} - - {connector.is_indexable - ? formatDateTime(connector.last_indexed_at) - : t("not_indexable")} - - - {connector.is_indexable ? ( - connector.periodic_indexing_enabled ? ( - - - -
- - - {connector.indexing_frequency_minutes - ? formatFrequency(connector.indexing_frequency_minutes) - : "Enabled"} - -
-
- -

- Runs every {connector.indexing_frequency_minutes} minutes - {connector.next_scheduled_at && ( - <> -
- Next: {formatDateTime(connector.next_scheduled_at)} - - )} -

-
-
-
- ) : ( - Disabled - ) - ) : ( - - - )} -
- -
- {connector.is_indexable && ( -
- - - - - - -

{t("index_date_range")}

-
-
-
- - - - - - -

{t("quick_index_auto")}

-
-
-
-
- )} - {connector.is_indexable && ( - - - - - - -

Configure Periodic Indexing

-
-
-
- )} - - - - - - - - {t("delete_connector")} - - {t("delete_confirm")} - - - - setConnectorToDelete(null)}> - {tCommon("cancel")} - - - {tCommon("delete")} - - - - -
-
-
- ))} -
-
-
- )} -
-
- - {/* Date Picker Dialog */} - - - - {t("select_date_range")} - {t("select_date_range_desc")} - -
-
-
- - - - - - - - - -
-
- - - - - - - - - -
-
-
- - - -
-
- - - - -
-
- - {/* Periodic Indexing Configuration Dialog */} - - - - Configure Periodic Indexing - - Set up automatic indexing at regular intervals for this connector. - - -
-
-
- -

- Automatically index this connector at regular intervals -

-
- -
- - {periodicEnabled && ( -
-
- - -
- - {frequencyMinutes === "custom" && ( -
- - setCustomFrequency(e.target.value)} - /> -

- Enter the number of minutes between each indexing run -

-
- )} - -
-

Preview:

-

- {frequencyMinutes === "custom" && customFrequency - ? `Will run every ${customFrequency} minutes` - : frequencyMinutes === "15" - ? "Will run every 15 minutes" - : frequencyMinutes === "60" - ? "Will run every hour" - : frequencyMinutes === "360" - ? "Will run every 6 hours" - : frequencyMinutes === "720" - ? "Will run every 12 hours" - : frequencyMinutes === "1440" - ? "Will run daily (every 24 hours)" - : frequencyMinutes === "10080" - ? "Will run weekly (every 7 days)" - : "Select a frequency above"} -

-
-
- )} -
- - - - -
-
-
- ); -} diff --git a/surfsense_web/app/dashboard/[search_space_id]/connectors/[connector_id]/edit/page.tsx b/surfsense_web/app/dashboard/[search_space_id]/connectors/[connector_id]/edit/page.tsx deleted file mode 100644 index 4c7bb3a3a..000000000 --- a/surfsense_web/app/dashboard/[search_space_id]/connectors/[connector_id]/edit/page.tsx +++ /dev/null @@ -1,336 +0,0 @@ -"use client"; - -import { ArrowLeft, Check, Loader2 } from "lucide-react"; -import { motion } from "motion/react"; -import { useParams, useRouter } from "next/navigation"; -import { useEffect } from "react"; -import { toast } from "sonner"; -import { EditConnectorLoadingSkeleton } from "@/components/editConnector/EditConnectorLoadingSkeleton"; -import { EditConnectorNameForm } from "@/components/editConnector/EditConnectorNameForm"; -import { EditGitHubConnectorConfig } from "@/components/editConnector/EditGitHubConnectorConfig"; -import { EditSimpleTokenForm } from "@/components/editConnector/EditSimpleTokenForm"; -import { Button } from "@/components/ui/button"; -import { - Card, - CardContent, - CardDescription, - CardFooter, - CardHeader, - CardTitle, -} from "@/components/ui/card"; -import { - Form, - FormControl, - FormDescription, - FormField, - FormItem, - FormLabel, - FormMessage, -} from "@/components/ui/form"; -import { Textarea } from "@/components/ui/textarea"; -import { getConnectorIcon } from "@/contracts/enums/connectorIcons"; -import { useConnectorEditPage } from "@/hooks/use-connector-edit-page"; -// Import Utils, Types, Hook, and Components -import { getConnectorTypeDisplay } from "@/lib/connectors/utils"; - -export default function EditConnectorPage() { - const router = useRouter(); - const params = useParams(); - const searchSpaceId = params.search_space_id as string; - // Ensure connectorId is parsed safely - const connectorIdParam = params.connector_id as string; - const connectorId = connectorIdParam ? parseInt(connectorIdParam, 10) : NaN; - - // Use the custom hook to manage state and logic - const { - connectorsLoading, - connector, - isSaving, - editForm, - patForm, // Needed for GitHub child component - handleSaveChanges, - // GitHub specific props for the child component - editMode, - setEditMode, // Pass down if needed by GitHub component - originalPat, - currentSelectedRepos, - fetchedRepos, - setFetchedRepos, - newSelectedRepos, - setNewSelectedRepos, - isFetchingRepos, - handleFetchRepositories, - handleRepoSelectionChange, - } = useConnectorEditPage(connectorId, searchSpaceId); - - // Redirect if connectorId is not a valid number after parsing - useEffect(() => { - if (Number.isNaN(connectorId)) { - toast.error("Invalid Connector ID."); - router.push(`/dashboard/${searchSpaceId}/connectors`); - } - }, [connectorId, router, searchSpaceId]); - - // Loading State - if (connectorsLoading || !connector) { - // Handle NaN case before showing skeleton - if (Number.isNaN(connectorId)) return null; - return ; - } - - // Main Render using data/handlers from the hook - return ( -
- - - - - - - {getConnectorIcon(connector.connector_type)} - Edit {getConnectorTypeDisplay(connector.connector_type)} Connector - - Modify connector name and configuration. - - -
- {/* Pass hook's handleSaveChanges */} - - - {/* Pass form control from hook */} - - -
- -

Configuration

- - {/* == GitHub == */} - {connector.connector_type === "GITHUB_CONNECTOR" && ( - - )} - - {/* == Slack == */} - {connector.connector_type === "SLACK_CONNECTOR" && ( - - )} - {/* == Notion == */} - {connector.connector_type === "NOTION_CONNECTOR" && ( - - )} - {/* == Tavily == */} - {connector.connector_type === "TAVILY_API" && ( - - )} - - {/* == Linear == */} - {connector.connector_type === "LINEAR_CONNECTOR" && ( - - )} - - {/* == Jira == */} - {connector.connector_type === "JIRA_CONNECTOR" && ( -
- - - -
- )} - - {/* == Confluence == */} - {connector.connector_type === "CONFLUENCE_CONNECTOR" && ( -
- - - -
- )} - - {/* == ClickUp == */} - {connector.connector_type === "CLICKUP_CONNECTOR" && ( - - )} - - {/* == Linkup == */} - {connector.connector_type === "LINKUP_API" && ( - - )} - - {/* == Discord == */} - {connector.connector_type === "DISCORD_CONNECTOR" && ( - - )} - - {/* == Luma == */} - {connector.connector_type === "LUMA_CONNECTOR" && ( - - )} - - {/* == Elasticsearch == */} - {connector.connector_type === "ELASTICSEARCH_CONNECTOR" && ( - - )} - - {/* == Webcrawler == */} - {connector.connector_type === "WEBCRAWLER_CONNECTOR" && ( -
- - ( - - URLs to Crawl - -