diff --git a/surfsense_backend/.env.example b/surfsense_backend/.env.example index dfb7db87e..5d4417f8c 100644 --- a/surfsense_backend/.env.example +++ b/surfsense_backend/.env.example @@ -9,6 +9,7 @@ AUTH_TYPE=GOOGLE or LOCAL GOOGLE_OAUTH_CLIENT_ID=924507538m GOOGLE_OAUTH_CLIENT_SECRET=GOCSV GOOGLE_CALENDAR_REDIRECT_URI=http://localhost:8000/api/v1/auth/google/calendar/connector/callback +GOOGLE_GMAIL_REDIRECT_URI=http://localhost:8000/api/v1/auth/google/gmail/connector/callback # Embedding Model EMBEDDING_MODEL=mixedbread-ai/mxbai-embed-large-v1 diff --git a/surfsense_backend/alembic/versions/18_add_google_gmail_connector_enums.py b/surfsense_backend/alembic/versions/18_add_google_gmail_connector_enums.py new file mode 100644 index 000000000..a2d77e506 --- /dev/null +++ b/surfsense_backend/alembic/versions/18_add_google_gmail_connector_enums.py @@ -0,0 +1,65 @@ +"""Add Google Gmail connector enums + +Revision ID: 18 +Revises: 17 +Create Date: 2024-02-01 12:00:00.000000 + +""" + +from collections.abc import Sequence + +from alembic import op + +# revision identifiers, used by Alembic. +revision: str = "18" +down_revision: str | None = "17" +branch_labels: str | Sequence[str] | None = None +depends_on: str | Sequence[str] | None = None + + +def upgrade() -> None: + """Safely add 'GOOGLE_GMAIL_CONNECTOR' to enum types if missing.""" + + # Add to searchsourceconnectortype enum + op.execute( + """ + DO $$ + BEGIN + IF NOT EXISTS ( + SELECT 1 FROM pg_type t + JOIN pg_enum e ON t.oid = e.enumtypid + WHERE t.typname = 'searchsourceconnectortype' AND e.enumlabel = 'GOOGLE_GMAIL_CONNECTOR' + ) THEN + ALTER TYPE searchsourceconnectortype ADD VALUE 'GOOGLE_GMAIL_CONNECTOR'; + END IF; + END + $$; + """ + ) + + # Add to documenttype enum + op.execute( + """ + DO $$ + BEGIN + IF NOT EXISTS ( + SELECT 1 FROM pg_type t + JOIN pg_enum e ON t.oid = e.enumtypid + WHERE t.typname = 'documenttype' AND e.enumlabel = 'GOOGLE_GMAIL_CONNECTOR' + ) THEN + ALTER TYPE documenttype ADD VALUE 'GOOGLE_GMAIL_CONNECTOR'; + END IF; + END + $$; + """ + ) + + +def downgrade() -> None: + """Remove 'GOOGLE_GMAIL_CONNECTOR' from enum types.""" + + # Note: PostgreSQL doesn't support removing enum values directly + # This would require recreating the enum type, which is complex + # For now, we'll leave the enum values in place + # In a production environment, you might want to implement a more sophisticated downgrade + pass diff --git a/surfsense_backend/app/agents/researcher/nodes.py b/surfsense_backend/app/agents/researcher/nodes.py index 6e27aa3e7..9e5d6d7e1 100644 --- a/surfsense_backend/app/agents/researcher/nodes.py +++ b/surfsense_backend/app/agents/researcher/nodes.py @@ -988,6 +988,32 @@ async def fetch_relevant_documents( ) } ) + elif connector == "GOOGLE_GMAIL_CONNECTOR": + ( + source_object, + gmail_chunks, + ) = await connector_service.search_google_gmail( + user_query=reformulated_query, + user_id=user_id, + search_space_id=search_space_id, + top_k=top_k, + search_mode=search_mode, + ) + + # Add to sources and raw documents + if source_object: + all_sources.append(source_object) + all_raw_documents.extend(gmail_chunks) + + # Stream found document count + if streaming_service and writer: + writer( + { + "yield_value": streaming_service.format_terminal_info_delta( + f"📧 Found {len(gmail_chunks)} Gmail messages related to your query" + ) + } + ) elif connector == "CONFLUENCE_CONNECTOR": ( source_object, diff --git a/surfsense_backend/app/agents/researcher/qna_agent/prompts.py b/surfsense_backend/app/agents/researcher/qna_agent/prompts.py index a7554aa69..cd64d563f 100644 --- a/surfsense_backend/app/agents/researcher/qna_agent/prompts.py +++ b/surfsense_backend/app/agents/researcher/qna_agent/prompts.py @@ -19,6 +19,7 @@ You are SurfSense, an advanced AI research assistant that provides detailed, wel - CONFLUENCE_CONNECTOR: "Confluence pages and comments" (personal project documentation) - CLICKUP_CONNECTOR: "ClickUp tasks and project data" (personal task management) - GOOGLE_CALENDAR_CONNECTOR: "Google Calendar events, meetings, and schedules" (personal calendar and time management) +- GOOGLE_GMAIL_CONNECTOR: "Google Gmail emails and conversations" (personal emails and communications) - DISCORD_CONNECTOR: "Discord server conversations and shared content" (personal community communications) - TAVILY_API: "Tavily search API results" (personalized search results) - LINKUP_API: "Linkup search API results" (personalized search results) diff --git a/surfsense_backend/app/agents/researcher/sub_section_writer/prompts.py b/surfsense_backend/app/agents/researcher/sub_section_writer/prompts.py index 5080c1bdc..07aec91ea 100644 --- a/surfsense_backend/app/agents/researcher/sub_section_writer/prompts.py +++ b/surfsense_backend/app/agents/researcher/sub_section_writer/prompts.py @@ -19,6 +19,7 @@ You are SurfSense, an advanced AI research assistant that synthesizes informatio - CONFLUENCE_CONNECTOR: "Confluence pages and comments" (personal project documentation) - CLICKUP_CONNECTOR: "ClickUp tasks and project data" (personal task management) - GOOGLE_CALENDAR_CONNECTOR: "Google Calendar events, meetings, and schedules" (personal calendar and time management) +- GOOGLE_GMAIL_CONNECTOR: "Google Gmail emails and conversations" (personal emails and communications) - DISCORD_CONNECTOR: "Discord server messages and channels" (personal community interactions) - TAVILY_API: "Tavily search API results" (personalized search results) - LINKUP_API: "Linkup search API results" (personalized search results) diff --git a/surfsense_backend/app/config/__init__.py b/surfsense_backend/app/config/__init__.py index 38a1f3e36..38ae61652 100644 --- a/surfsense_backend/app/config/__init__.py +++ b/surfsense_backend/app/config/__init__.py @@ -51,6 +51,9 @@ class Config: # Google Calendar redirect URI GOOGLE_CALENDAR_REDIRECT_URI = os.getenv("GOOGLE_CALENDAR_REDIRECT_URI") + # Google Gmail redirect URI + GOOGLE_GMAIL_REDIRECT_URI = os.getenv("GOOGLE_GMAIL_REDIRECT_URI") + # LLM instances are now managed per-user through the LLMConfig system # Legacy environment variables removed in favor of user-specific configurations diff --git a/surfsense_backend/app/connectors/google_gmail_connector.py b/surfsense_backend/app/connectors/google_gmail_connector.py new file mode 100644 index 000000000..0e75080ac --- /dev/null +++ b/surfsense_backend/app/connectors/google_gmail_connector.py @@ -0,0 +1,337 @@ +""" +Google Gmail Connector Module | Google OAuth Credentials | Gmail API +A module for retrieving emails from Gmail using Google OAuth credentials. +Allows fetching emails from Gmail mailbox using Google OAuth credentials. +""" + +import base64 +import re +from typing import Any + +from google.auth.transport.requests import Request +from google.oauth2.credentials import Credentials +from googleapiclient.discovery import build + + +class GoogleGmailConnector: + """Class for retrieving emails from Gmail using Google OAuth credentials.""" + + def __init__( + self, + credentials: Credentials, + ): + """ + Initialize the GoogleGmailConnector class. + Args: + credentials: Google OAuth Credentials object + """ + self._credentials = credentials + self.service = None + + def _get_credentials(self) -> Credentials: + """ + Get valid Google OAuth credentials. + Returns: + Google OAuth credentials + Raises: + ValueError: If credentials have not been set + Exception: If credential refresh fails + """ + if not all( + [ + self._credentials.client_id, + self._credentials.client_secret, + self._credentials.refresh_token, + ] + ): + raise ValueError( + "Google OAuth credentials (client_id, client_secret, refresh_token) must be set" + ) + + if self._credentials and not self._credentials.expired: + return self._credentials + + # Create credentials from refresh token + self._credentials = Credentials( + token=self._credentials.token, + refresh_token=self._credentials.refresh_token, + token_uri=self._credentials.token_uri, + client_id=self._credentials.client_id, + client_secret=self._credentials.client_secret, + scopes=self._credentials.scopes, + ) + + # Refresh the token if needed + if self._credentials.expired or not self._credentials.valid: + try: + self._credentials.refresh(Request()) + except Exception as e: + raise Exception( + f"Failed to refresh Google OAuth credentials: {e!s}" + ) from e + + return self._credentials + + def _get_service(self): + """ + Get the Gmail service instance using Google OAuth credentials. + Returns: + Gmail service instance + Raises: + ValueError: If credentials have not been set + Exception: If service creation fails + """ + if self.service: + return self.service + + try: + credentials = self._get_credentials() + self.service = build("gmail", "v1", credentials=credentials) + return self.service + except Exception as e: + raise Exception(f"Failed to create Gmail service: {e!s}") from e + + def get_user_profile(self) -> tuple[dict[str, Any], str | None]: + """ + Fetch user's Gmail profile information. + Returns: + Tuple containing (profile dict, error message or None) + """ + try: + service = self._get_service() + profile = service.users().getProfile(userId="me").execute() + + return { + "email_address": profile.get("emailAddress"), + "messages_total": profile.get("messagesTotal", 0), + "threads_total": profile.get("threadsTotal", 0), + "history_id": profile.get("historyId"), + }, None + + except Exception as e: + return {}, f"Error fetching user profile: {e!s}" + + def get_messages_list( + self, + max_results: int = 100, + query: str = "", + label_ids: list[str] | None = None, + include_spam_trash: bool = False, + ) -> tuple[list[dict[str, Any]], str | None]: + """ + Fetch list of messages from Gmail. + Args: + max_results: Maximum number of messages to fetch (default: 100) + query: Gmail search query (e.g., "is:unread", "from:example@gmail.com") + label_ids: List of label IDs to filter by + include_spam_trash: Whether to include spam and trash + Returns: + Tuple containing (messages list, error message or None) + """ + try: + service = self._get_service() + + # Build request parameters + request_params = { + "userId": "me", + "maxResults": max_results, + "includeSpamTrash": include_spam_trash, + } + + if query: + request_params["q"] = query + if label_ids: + request_params["labelIds"] = label_ids + + # Get messages list + result = service.users().messages().list(**request_params).execute() + messages = result.get("messages", []) + + return messages, None + + except Exception as e: + return [], f"Error fetching messages list: {e!s}" + + def get_message_details(self, message_id: str) -> tuple[dict[str, Any], str | None]: + """ + Fetch detailed information for a specific message. + Args: + message_id: The ID of the message to fetch + Returns: + Tuple containing (message details dict, error message or None) + """ + try: + service = self._get_service() + + # Get full message details + message = ( + service.users() + .messages() + .get(userId="me", id=message_id, format="full") + .execute() + ) + + return message, None + + except Exception as e: + return {}, f"Error fetching message details: {e!s}" + + def get_recent_messages( + self, + max_results: int = 50, + days_back: int = 30, + ) -> tuple[list[dict[str, Any]], str | None]: + """ + Fetch recent messages from Gmail within specified days. + Args: + max_results: Maximum number of messages to fetch (default: 50) + days_back: Number of days to look back (default: 30) + Returns: + Tuple containing (messages list with details, error message or None) + """ + try: + # Calculate date query + from datetime import datetime, timedelta + + cutoff_date = datetime.now() - timedelta(days=days_back) + date_query = cutoff_date.strftime("%Y/%m/%d") + query = f"after:{date_query}" + + # Get messages list + messages_list, error = self.get_messages_list( + max_results=max_results, query=query + ) + + if error: + return [], error + + # Get detailed information for each message + detailed_messages = [] + for msg in messages_list: + message_details, detail_error = self.get_message_details(msg["id"]) + if detail_error: + continue # Skip messages that can't be fetched + detailed_messages.append(message_details) + + return detailed_messages, None + + except Exception as e: + return [], f"Error fetching recent messages: {e!s}" + + def extract_message_text(self, message: dict[str, Any]) -> str: + """ + Extract text content from a Gmail message. + Args: + message: Gmail message object + Returns: + Extracted text content + """ + + def get_message_parts(payload): + """Recursively extract message parts.""" + parts = [] + + if "parts" in payload: + for part in payload["parts"]: + parts.extend(get_message_parts(part)) + else: + parts.append(payload) + + return parts + + try: + payload = message.get("payload", {}) + parts = get_message_parts(payload) + + text_content = "" + + for part in parts: + mime_type = part.get("mimeType", "") + body = part.get("body", {}) + data = body.get("data", "") + + if mime_type == "text/plain" and data: + # Decode base64 content + decoded_data = base64.urlsafe_b64decode(data + "===").decode( + "utf-8", errors="ignore" + ) + text_content += decoded_data + "\n" + elif mime_type == "text/html" and data and not text_content: + # Use HTML as fallback if no plain text + decoded_data = base64.urlsafe_b64decode(data + "===").decode( + "utf-8", errors="ignore" + ) + # Basic HTML tag removal (you might want to use a proper HTML parser) + + text_content = re.sub(r"<[^>]+>", "", decoded_data) + + return text_content.strip() + + except Exception as e: + return f"Error extracting message text: {e!s}" + + def format_message_to_markdown(self, message: dict[str, Any]) -> str: + """ + Format a Gmail message to markdown. + Args: + message: Message object from Gmail API + Returns: + Formatted markdown string + """ + try: + # Extract basic message information + message_id = message.get("id", "") + thread_id = message.get("threadId", "") + label_ids = message.get("labelIds", []) + + # Extract headers + payload = message.get("payload", {}) + headers = payload.get("headers", []) + + # Parse headers into a dict + header_dict = {} + for header in headers: + name = header.get("name", "").lower() + value = header.get("value", "") + header_dict[name] = value + + # Extract key information + subject = header_dict.get("subject", "No Subject") + from_email = header_dict.get("from", "Unknown Sender") + to_email = header_dict.get("to", "Unknown Recipient") + date_str = header_dict.get("date", "Unknown Date") + + # Extract message content + message_text = self.extract_message_text(message) + + # Build markdown content + markdown_content = f"# {subject}\n\n" + + # Add message details + markdown_content += f"**From:** {from_email}\n" + markdown_content += f"**To:** {to_email}\n" + markdown_content += f"**Date:** {date_str}\n" + + if label_ids: + markdown_content += f"**Labels:** {', '.join(label_ids)}\n" + + markdown_content += "\n" + + # Add message content + if message_text: + markdown_content += f"## Message Content\n\n{message_text}\n\n" + + # Add message metadata + markdown_content += "## Message Details\n\n" + markdown_content += f"- **Message ID:** {message_id}\n" + markdown_content += f"- **Thread ID:** {thread_id}\n" + + # Add snippet if available + snippet = message.get("snippet", "") + if snippet: + markdown_content += f"- **Snippet:** {snippet}\n" + + return markdown_content + + except Exception as e: + return f"Error formatting message to markdown: {e!s}" diff --git a/surfsense_backend/app/db.py b/surfsense_backend/app/db.py index f572438ba..49e227f94 100644 --- a/surfsense_backend/app/db.py +++ b/surfsense_backend/app/db.py @@ -47,6 +47,7 @@ class DocumentType(str, Enum): CONFLUENCE_CONNECTOR = "CONFLUENCE_CONNECTOR" CLICKUP_CONNECTOR = "CLICKUP_CONNECTOR" GOOGLE_CALENDAR_CONNECTOR = "GOOGLE_CALENDAR_CONNECTOR" + GOOGLE_GMAIL_CONNECTOR = "GOOGLE_GMAIL_CONNECTOR" class SearchSourceConnectorType(str, Enum): @@ -62,6 +63,7 @@ class SearchSourceConnectorType(str, Enum): CONFLUENCE_CONNECTOR = "CONFLUENCE_CONNECTOR" CLICKUP_CONNECTOR = "CLICKUP_CONNECTOR" GOOGLE_CALENDAR_CONNECTOR = "GOOGLE_CALENDAR_CONNECTOR" + GOOGLE_GMAIL_CONNECTOR = "GOOGLE_GMAIL_CONNECTOR" class ChatType(str, Enum): diff --git a/surfsense_backend/app/routes/__init__.py b/surfsense_backend/app/routes/__init__.py index 3e9c6bae5..e10db1e76 100644 --- a/surfsense_backend/app/routes/__init__.py +++ b/surfsense_backend/app/routes/__init__.py @@ -5,6 +5,9 @@ from .documents_routes import router as documents_router from .google_calendar_add_connector_route import ( router as google_calendar_add_connector_router, ) +from .google_gmail_add_connector_route import ( + router as google_gmail_add_connector_router, +) from .llm_config_routes import router as llm_config_router from .logs_routes import router as logs_router from .podcasts_routes import router as podcasts_router @@ -19,5 +22,6 @@ router.include_router(podcasts_router) router.include_router(chats_router) router.include_router(search_source_connectors_router) router.include_router(google_calendar_add_connector_router) +router.include_router(google_gmail_add_connector_router) router.include_router(llm_config_router) router.include_router(logs_router) diff --git a/surfsense_backend/app/routes/google_gmail_add_connector_route.py b/surfsense_backend/app/routes/google_gmail_add_connector_route.py new file mode 100644 index 000000000..01c6e7fb9 --- /dev/null +++ b/surfsense_backend/app/routes/google_gmail_add_connector_route.py @@ -0,0 +1,159 @@ +import os + +os.environ["OAUTHLIB_RELAX_TOKEN_SCOPE"] = "1" + +import base64 +import json +import logging +from uuid import UUID + +from fastapi import APIRouter, Depends, HTTPException, Request +from fastapi.responses import RedirectResponse +from google_auth_oauthlib.flow import Flow +from pydantic import ValidationError +from sqlalchemy.exc import IntegrityError +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy.future import select + +from app.config import config +from app.db import ( + SearchSourceConnector, + SearchSourceConnectorType, + User, + get_async_session, +) +from app.users import current_active_user + +logger = logging.getLogger(__name__) + +router = APIRouter() + + +def get_google_flow(): + """Create and return a Google OAuth flow for Gmail API.""" + flow = Flow.from_client_config( + { + "web": { + "client_id": config.GOOGLE_OAUTH_CLIENT_ID, + "client_secret": config.GOOGLE_OAUTH_CLIENT_SECRET, + "auth_uri": "https://accounts.google.com/o/oauth2/auth", + "token_uri": "https://oauth2.googleapis.com/token", + "redirect_uris": [config.GOOGLE_GMAIL_REDIRECT_URI], + } + }, + scopes=[ + "https://www.googleapis.com/auth/gmail.readonly", + "https://www.googleapis.com/auth/userinfo.email", + "https://www.googleapis.com/auth/userinfo.profile", + "openid", + ], + ) + flow.redirect_uri = config.GOOGLE_GMAIL_REDIRECT_URI + return flow + + +@router.get("/auth/google/gmail/connector/add/") +async def connect_gmail(space_id: int, user: User = Depends(current_active_user)): + try: + if not space_id: + raise HTTPException(status_code=400, detail="space_id is required") + + flow = get_google_flow() + + # Encode space_id and user_id in state + state_payload = json.dumps( + { + "space_id": space_id, + "user_id": str(user.id), + } + ) + state_encoded = base64.urlsafe_b64encode(state_payload.encode()).decode() + + auth_url, _ = flow.authorization_url( + access_type="offline", + prompt="consent", + include_granted_scopes="true", + state=state_encoded, + ) + return {"auth_url": auth_url} + except Exception as e: + raise HTTPException( + status_code=500, detail=f"Failed to initiate Google OAuth: {e!s}" + ) from e + + +@router.get("/auth/google/gmail/connector/callback/") +async def gmail_callback( + request: Request, + code: str, + state: str, + session: AsyncSession = Depends(get_async_session), +): + try: + # Decode and parse the state + decoded_state = base64.urlsafe_b64decode(state.encode()).decode() + data = json.loads(decoded_state) + + user_id = UUID(data["user_id"]) + space_id = data["space_id"] + + flow = get_google_flow() + flow.fetch_token(code=code) + + creds = flow.credentials + creds_dict = json.loads(creds.to_json()) + + try: + # Check if a connector with the same type already exists for this user + result = await session.execute( + select(SearchSourceConnector).filter( + SearchSourceConnector.user_id == user_id, + SearchSourceConnector.connector_type + == SearchSourceConnectorType.GOOGLE_GMAIL_CONNECTOR, + ) + ) + existing_connector = result.scalars().first() + if existing_connector: + raise HTTPException( + status_code=409, + detail="A GOOGLE_GMAIL_CONNECTOR connector already exists. Each user can have only one connector of each type.", + ) + db_connector = SearchSourceConnector( + name="Google Gmail Connector", + connector_type=SearchSourceConnectorType.GOOGLE_GMAIL_CONNECTOR, + config=creds_dict, + user_id=user_id, + is_indexable=True, + ) + session.add(db_connector) + await session.commit() + await session.refresh(db_connector) + + logger.info( + f"Successfully created Gmail connector for user {user_id} with ID {db_connector.id}" + ) + + # Redirect to the frontend success page + return RedirectResponse( + url=f"{config.NEXT_FRONTEND_URL}/dashboard/{space_id}/connectors/add/google-gmail-connector?success=true" + ) + + except IntegrityError as e: + await session.rollback() + logger.error(f"Database integrity error: {e!s}") + raise HTTPException( + status_code=409, + detail="A connector with this configuration already exists.", + ) from e + except ValidationError as e: + await session.rollback() + logger.error(f"Validation error: {e!s}") + raise HTTPException( + status_code=400, detail=f"Invalid connector configuration: {e!s}" + ) from e + + except HTTPException: + # Re-raise HTTP exceptions as-is + raise + except Exception as e: + logger.error(f"Unexpected error in Gmail callback: {e!s}", exc_info=True) diff --git a/surfsense_backend/app/routes/search_source_connectors_routes.py b/surfsense_backend/app/routes/search_source_connectors_routes.py index 49f31289b..d1f610839 100644 --- a/surfsense_backend/app/routes/search_source_connectors_routes.py +++ b/surfsense_backend/app/routes/search_source_connectors_routes.py @@ -41,6 +41,7 @@ from app.tasks.connector_indexers import ( index_discord_messages, index_github_repos, index_google_calendar_events, + index_google_gmail_messages, index_jira_issues, index_linear_issues, index_notion_pages, @@ -507,6 +508,22 @@ async def index_connector_content( indexing_to, ) response_message = "Google Calendar indexing started in the background." + elif ( + connector.connector_type == SearchSourceConnectorType.GOOGLE_GMAIL_CONNECTOR + ): + # Run indexing in background + logger.info( + f"Triggering Google Gmail indexing for connector {connector_id} into search space {search_space_id} from {indexing_from} to {indexing_to}" + ) + background_tasks.add_task( + run_google_gmail_indexing_with_new_session, + connector_id, + search_space_id, + str(user.id), + indexing_from, + indexing_to, + ) + response_message = "Google Gmail indexing started in the background." elif connector.connector_type == SearchSourceConnectorType.DISCORD_CONNECTOR: # Run indexing in background @@ -1113,3 +1130,62 @@ async def run_google_calendar_indexing( exc_info=True, ) # Optionally update status in DB to indicate failure + + +async def run_google_gmail_indexing_with_new_session( + connector_id: int, + search_space_id: int, + user_id: str, + max_messages: int, + days_back: int, +): + """Wrapper to run Google Gmail indexing with its own database session.""" + logger.info( + f"Background task started: Indexing Google Gmail connector {connector_id} into space {search_space_id} for {max_messages} messages from the last {days_back} days" + ) + async with async_session_maker() as session: + await run_google_gmail_indexing( + session, connector_id, search_space_id, user_id, max_messages, days_back + ) + logger.info( + f"Background task finished: Indexing Google Gmail connector {connector_id}" + ) + + +async def run_google_gmail_indexing( + session: AsyncSession, + connector_id: int, + search_space_id: int, + user_id: str, + max_messages: int, + days_back: int, +): + """Runs the Google Gmail indexing task and updates the timestamp.""" + try: + indexed_count, error_message = await index_google_gmail_messages( + session, + connector_id, + search_space_id, + user_id, + max_messages, + days_back, + update_last_indexed=False, + ) + if error_message: + logger.error( + f"Google Gmail indexing failed for connector {connector_id}: {error_message}" + ) + # Optionally update status in DB to indicate failure + else: + logger.info( + f"Google Gmail indexing successful for connector {connector_id}. Indexed {indexed_count} documents." + ) + # Update the last indexed timestamp only on success + await update_connector_last_indexed(session, connector_id) + await session.commit() # Commit timestamp update + except Exception as e: + logger.error( + f"Critical error in run_google_gmail_indexing for connector {connector_id}: {e}", + exc_info=True, + ) + # Optionally update status in DB to indicate failure diff --git a/surfsense_backend/app/schemas/search_source_connector.py b/surfsense_backend/app/schemas/search_source_connector.py index 4c36893c6..c44b2bffb 100644 --- a/surfsense_backend/app/schemas/search_source_connector.py +++ b/surfsense_backend/app/schemas/search_source_connector.py @@ -188,6 +188,14 @@ class SearchSourceConnectorBase(BaseModel): if key not in config or config[key] in (None, ""): raise ValueError(f"{key} is required and cannot be empty") + elif connector_type == SearchSourceConnectorType.GOOGLE_GMAIL_CONNECTOR: + # Required fields for Gmail connector (same as Calendar - uses Google OAuth) + required_keys = list(GoogleAuthCredentialsBase.model_fields.keys()) + + for key in required_keys: + if key not in config or config[key] in (None, ""): + raise ValueError(f"{key} is required and cannot be empty") + return config diff --git a/surfsense_backend/app/services/connector_service.py b/surfsense_backend/app/services/connector_service.py index d063a8665..20ad35111 100644 --- a/surfsense_backend/app/services/connector_service.py +++ b/surfsense_backend/app/services/connector_service.py @@ -1208,6 +1208,132 @@ class ConnectorService: return result_object, calendar_chunks + async def search_google_gmail( + self, + user_query: str, + user_id: str, + search_space_id: int, + top_k: int = 20, + search_mode: SearchMode = SearchMode.CHUNKS, + ) -> tuple: + """ + Search for Gmail messages and return both the source information and langchain documents + + Args: + user_query: The user's query + user_id: The user's ID + search_space_id: The search space ID to search in + top_k: Maximum number of results to return + search_mode: Search mode (CHUNKS or DOCUMENTS) + + Returns: + tuple: (sources_info, langchain_documents) + """ + if search_mode == SearchMode.CHUNKS: + gmail_chunks = await self.chunk_retriever.hybrid_search( + query_text=user_query, + top_k=top_k, + user_id=user_id, + search_space_id=search_space_id, + document_type="GOOGLE_GMAIL_CONNECTOR", + ) + elif search_mode == SearchMode.DOCUMENTS: + gmail_chunks = await self.document_retriever.hybrid_search( + query_text=user_query, + top_k=top_k, + user_id=user_id, + search_space_id=search_space_id, + document_type="GOOGLE_GMAIL_CONNECTOR", + ) + # Transform document retriever results to match expected format + gmail_chunks = self._transform_document_results(gmail_chunks) + + # Early return if no results + if not gmail_chunks: + return { + "id": 32, + "name": "Gmail Messages", + "type": "GOOGLE_GMAIL_CONNECTOR", + "sources": [], + }, [] + + # Process each chunk and create sources directly without deduplication + sources_list = [] + async with self.counter_lock: + for _i, chunk in enumerate(gmail_chunks): + # Extract document metadata + document = chunk.get("document", {}) + metadata = document.get("metadata", {}) + + # Extract Gmail-specific metadata + message_id = metadata.get("message_id", "") + subject = metadata.get("subject", "No Subject") + sender = metadata.get("sender", "Unknown Sender") + date_str = metadata.get("date", "") + thread_id = metadata.get("thread_id", "") + + # Create a more descriptive title for Gmail messages + title = f"Email: {subject}" + if sender: + # Extract just the email address or name from sender + import re + + sender_match = re.search(r"<([^>]+)>", sender) + if sender_match: + sender_email = sender_match.group(1) + title += f" (from {sender_email})" + else: + title += f" (from {sender})" + + # Create a more descriptive description for Gmail messages + description = chunk.get("content", "")[:150] + if len(description) == 150: + description += "..." + + # Add message info to description + info_parts = [] + if date_str: + info_parts.append(f"Date: {date_str}") + if thread_id: + info_parts.append(f"Thread: {thread_id}") + + if info_parts: + if description: + description += f" | {' | '.join(info_parts)}" + else: + description = " | ".join(info_parts) + + # For URL, we could construct a URL to the Gmail message + url = "" + if message_id: + # Gmail message URL format + url = f"https://mail.google.com/mail/u/0/#inbox/{message_id}" + + source = { + "id": document.get("id", self.source_id_counter), + "title": title, + "description": description, + "url": url, + "message_id": message_id, + "subject": subject, + "sender": sender, + "date": date_str, + "thread_id": thread_id, + } + + self.source_id_counter += 1 + sources_list.append(source) + + # Create result object + result_object = { + "id": 32, # Assign a unique ID for the Gmail connector + "name": "Gmail Messages", + "type": "GOOGLE_GMAIL_CONNECTOR", + "sources": sources_list, + } + + return result_object, gmail_chunks + async def search_confluence( self, user_query: str, diff --git a/surfsense_backend/app/tasks/connector_indexers/__init__.py b/surfsense_backend/app/tasks/connector_indexers/__init__.py index 048a136f7..7befa5986 100644 --- a/surfsense_backend/app/tasks/connector_indexers/__init__.py +++ b/surfsense_backend/app/tasks/connector_indexers/__init__.py @@ -14,6 +14,7 @@ Available indexers: - Confluence: Index pages from Confluence spaces - Discord: Index messages from Discord servers - ClickUp: Index tasks from ClickUp workspaces +- Google Gmail: Index messages from Google Gmail - Google Calendar: Index events from Google Calendar """ @@ -27,6 +28,7 @@ from .github_indexer import index_github_repos # Calendar and scheduling from .google_calendar_indexer import index_google_calendar_events +from .google_gmail_indexer import index_google_gmail_messages from .jira_indexer import index_jira_issues # Issue tracking and project management @@ -36,7 +38,7 @@ from .linear_indexer import index_linear_issues from .notion_indexer import index_notion_pages from .slack_indexer import index_slack_messages -__all__ = [ +__all__ = [ # noqa: RUF022 "index_clickup_tasks", "index_confluence_pages", "index_discord_messages", @@ -51,4 +53,5 @@ __all__ = [ "index_notion_pages", # Communication platforms "index_slack_messages", + "index_google_gmail_messages", ] diff --git a/surfsense_backend/app/tasks/connector_indexers/google_gmail_indexer.py b/surfsense_backend/app/tasks/connector_indexers/google_gmail_indexer.py new file mode 100644 index 000000000..68e29e310 --- /dev/null +++ b/surfsense_backend/app/tasks/connector_indexers/google_gmail_indexer.py @@ -0,0 +1,299 @@ +""" +Google Gmail connector indexer. +""" + +from datetime import datetime + +from google.oauth2.credentials import Credentials +from sqlalchemy.exc import SQLAlchemyError +from sqlalchemy.ext.asyncio import AsyncSession + +from app.config import config +from app.connectors.google_gmail_connector import GoogleGmailConnector +from app.db import ( + Document, + DocumentType, + SearchSourceConnectorType, +) +from app.services.task_logging_service import TaskLoggingService +from app.utils.document_converters import generate_content_hash + +from .base import ( + check_duplicate_document_by_hash, + create_document_chunks, + get_connector_by_id, + logger, + update_connector_last_indexed, +) + + +async def index_google_gmail_messages( + session: AsyncSession, + connector_id: int, + search_space_id: int, + user_id: str, + start_date: str | None = None, + end_date: str | None = None, + update_last_indexed: bool = True, + max_messages: int = 100, +) -> tuple[int, str]: + """ + Index Gmail messages for a specific connector. + + Args: + session: Database session + connector_id: ID of the Gmail connector + search_space_id: ID of the search space + user_id: ID of the user + start_date: Start date for filtering messages (YYYY-MM-DD format) + end_date: End date for filtering messages (YYYY-MM-DD format) + update_last_indexed: Whether to update the last_indexed_at timestamp (default: True) + max_messages: Maximum number of messages to fetch (default: 100) + + Returns: + Tuple of (number_of_indexed_messages, status_message) + """ + task_logger = TaskLoggingService(session, search_space_id) + + # Calculate days back based on start_date + if start_date: + try: + start_date_obj = datetime.strptime(start_date, "%Y-%m-%d") + days_back = (datetime.now() - start_date_obj).days + except ValueError: + days_back = 30 # Default to 30 days if start_date is invalid + + # Log task start + log_entry = await task_logger.log_task_start( + task_name="google_gmail_messages_indexing", + source="connector_indexing_task", + message=f"Starting Gmail messages indexing for connector {connector_id}", + metadata={ + "connector_id": connector_id, + "user_id": str(user_id), + "max_messages": max_messages, + "days_back": days_back, + }, + ) + + try: + # Get connector by id + connector = await get_connector_by_id( + session, connector_id, SearchSourceConnectorType.GOOGLE_GMAIL_CONNECTOR + ) + + if not connector: + error_msg = f"Gmail connector with ID {connector_id} not found" + await task_logger.log_task_failure( + log_entry, error_msg, {"error_type": "ConnectorNotFound"} + ) + return 0, error_msg + + # Create credentials from connector config + config_data = connector.config + credentials = Credentials( + token=config_data.get("token"), + refresh_token=config_data.get("refresh_token"), + token_uri=config_data.get("token_uri"), + client_id=config_data.get("client_id"), + client_secret=config_data.get("client_secret"), + scopes=config_data.get("scopes", []), + ) + + if ( + not credentials.client_id + or not credentials.client_secret + or not credentials.refresh_token + ): + await task_logger.log_task_failure( + log_entry, + f"Google gmail credentials not found in connector config for connector {connector_id}", + "Missing Google gmail credentials", + {"error_type": "MissingCredentials"}, + ) + return 0, "Google gmail credentials not found in connector config" + + # Initialize Google gmail client + await task_logger.log_task_progress( + log_entry, + f"Initializing Google gmail client for connector {connector_id}", + {"stage": "client_initialization"}, + ) + + # Initialize Google gmail connector + gmail_connector = GoogleGmailConnector(credentials) + + # Fetch recent Google gmail messages + logger.info(f"Fetching recent emails for connector {connector_id}") + messages, error = gmail_connector.get_recent_messages( + max_results=max_messages, days_back=days_back + ) + + if error: + await task_logger.log_task_failure( + log_entry, f"Failed to fetch messages: {error}", {} + ) + return 0, f"Failed to fetch Gmail messages: {error}" + + if not messages: + success_msg = "No Google gmail messages found in the specified date range" + await task_logger.log_task_success( + log_entry, success_msg, {"messages_count": 0} + ) + return 0, success_msg + + logger.info(f"Found {len(messages)} Google gmail messages to index") + + documents_indexed = 0 + skipped_messages = [] + documents_skipped = 0 + for message in messages: + try: + # Extract message information + message_id = message.get("id", "") + thread_id = message.get("threadId", "") + + # Extract headers for subject and sender + payload = message.get("payload", {}) + headers = payload.get("headers", []) + + subject = "No Subject" + sender = "Unknown Sender" + date_str = "Unknown Date" + + for header in headers: + name = header.get("name", "").lower() + value = header.get("value", "") + if name == "subject": + subject = value + elif name == "from": + sender = value + elif name == "date": + date_str = value + + if not message_id: + logger.warning(f"Skipping message with missing ID: {subject}") + skipped_messages.append(f"{subject} (missing ID)") + documents_skipped += 1 + continue + + # Format message to markdown + markdown_content = gmail_connector.format_message_to_markdown(message) + + if not markdown_content.strip(): + logger.warning(f"Skipping message with no content: {subject}") + skipped_messages.append(f"{subject} (no content)") + documents_skipped += 1 + continue + + # Create a simple summary + summary_content = f"Google Gmail Message: {subject}\n\n" + summary_content += f"Sender: {sender}\n" + summary_content += f"Date: {date_str}\n" + + # Generate content hash + content_hash = generate_content_hash(markdown_content, search_space_id) + + # Check if document already exists + existing_document_by_hash = await check_duplicate_document_by_hash( + session, content_hash + ) + + if existing_document_by_hash: + logger.info( + f"Document with content hash {content_hash} already exists for message {message_id}. Skipping processing." + ) + documents_skipped += 1 + continue + + # Generate embedding for the summary + summary_embedding = config.embedding_model_instance.embed( + summary_content + ) + + # Process chunks + chunks = await create_document_chunks(markdown_content) + + # Create and store new document + logger.info(f"Creating new document for Gmail message: {subject}") + document = Document( + search_space_id=search_space_id, + title=f"Gmail: {subject}", + document_type=DocumentType.GOOGLE_GMAIL_CONNECTOR, + document_metadata={ + "message_id": message_id, + "thread_id": thread_id, + "subject": subject, + "sender": sender, + "date": date_str, + "connector_id": connector_id, + }, + content=markdown_content, + content_hash=content_hash, + embedding=summary_embedding, + chunks=chunks, + ) + session.add(document) + documents_indexed += 1 + logger.info(f"Successfully indexed new email {summary_content}") + + except Exception as e: + logger.error( + f"Error processing the email {message_id}: {e!s}", + exc_info=True, + ) + skipped_messages.append(f"{subject} (processing error)") + documents_skipped += 1 + continue # Skip this message and continue with others + + # Update the last_indexed_at timestamp for the connector only if requested + total_processed = documents_indexed + if total_processed > 0: + await update_connector_last_indexed(session, connector, update_last_indexed) + + # Commit all changes + await session.commit() + logger.info( + "Successfully committed all Google gmail document changes to database" + ) + + # Log success + await task_logger.log_task_success( + log_entry, + f"Successfully completed Google gmail indexing for connector {connector_id}", + { + "events_processed": total_processed, + "documents_indexed": documents_indexed, + "documents_skipped": documents_skipped, + "skipped_messages_count": len(skipped_messages), + }, + ) + + logger.info( + f"Google gmail indexing completed: {documents_indexed} new emails, {documents_skipped} skipped" + ) + return ( + total_processed, + None, + ) # Return None as the error message to indicate success + + except SQLAlchemyError as db_error: + await session.rollback() + await task_logger.log_task_failure( + log_entry, + f"Database error during Google gmail indexing for connector {connector_id}", + str(db_error), + {"error_type": "SQLAlchemyError"}, + ) + logger.error(f"Database error: {db_error!s}", exc_info=True) + return 0, f"Database error: {db_error!s}" + except Exception as e: + await session.rollback() + await task_logger.log_task_failure( + log_entry, + f"Failed to index Google gmail emails for connector {connector_id}", + str(e), + {"error_type": type(e).__name__}, + ) + logger.error(f"Failed to index Google gmail emails: {e!s}", exc_info=True) + return 0, f"Failed to index Google gmail emails: {e!s}" diff --git a/surfsense_web/app/dashboard/[search_space_id]/connectors/add/google-gmail-connector/page.tsx b/surfsense_web/app/dashboard/[search_space_id]/connectors/add/google-gmail-connector/page.tsx new file mode 100644 index 000000000..6c130bf8f --- /dev/null +++ b/surfsense_web/app/dashboard/[search_space_id]/connectors/add/google-gmail-connector/page.tsx @@ -0,0 +1,199 @@ +"use client"; + +import { zodResolver } from "@hookform/resolvers/zod"; +import { IconMail } from "@tabler/icons-react"; +import { motion } from "framer-motion"; +import { ArrowLeft, Check, ExternalLink, Loader2 } from "lucide-react"; +import Link from "next/link"; +import { useParams, useRouter, useSearchParams } from "next/navigation"; +import { useEffect, useState } from "react"; +import { useForm } from "react-hook-form"; +import { toast } from "sonner"; +import { z } from "zod"; +import { Button } from "@/components/ui/button"; +import { + Card, + CardContent, + CardDescription, + CardFooter, + CardHeader, + CardTitle, +} from "@/components/ui/card"; +import { + type SearchSourceConnector, + useSearchSourceConnectors, +} from "@/hooks/useSearchSourceConnectors"; + +export default function GoogleGmailConnectorPage() { + const router = useRouter(); + const params = useParams(); + const searchSpaceId = params.search_space_id as string; + const [isConnecting, setIsConnecting] = useState(false); + const [doesConnectorExist, setDoesConnectorExist] = useState(false); + + const { fetchConnectors } = useSearchSourceConnectors(); + + useEffect(() => { + fetchConnectors().then((data) => { + const connector = data.find( + (c: SearchSourceConnector) => c.connector_type === "GOOGLE_GMAIL_CONNECTOR" + ); + if (connector) { + setDoesConnectorExist(true); + } + }); + }, []); + + // Handle Google OAuth connection + const handleConnectGoogle = async () => { + try { + setIsConnecting(true); + // Call backend to initiate authorization flow + const response = await fetch( + `${process.env.NEXT_PUBLIC_FASTAPI_BACKEND_URL}/api/v1/auth/google/gmail/connector/add/?space_id=${searchSpaceId}`, + { + method: "GET", + headers: { + Authorization: `Bearer ${localStorage.getItem("surfsense_bearer_token")}`, + }, + } + ); + + if (!response.ok) { + throw new Error("Failed to initiate Google OAuth"); + } + + const data = await response.json(); + + // Redirect to Google for authentication + window.location.href = data.auth_url; + } catch (error) { + console.error("Error connecting to Google:", error); + toast.error("Failed to connect to Google Gmail"); + } finally { + setIsConnecting(false); + } + }; + + return ( +
+ + {/* Header */} +
+ + + Back to connectors + +
+
+ +
+
+

Connect Google Gmail

+

+ Connect your Gmail account to search through your emails +

+
+
+
+ + {/* Connection Card */} + {!doesConnectorExist ? ( + + + Connect Your Gmail Account + + Securely connect your Gmail account to enable email search within SurfSense. We'll + only access your emails with read-only permissions. + + + +
+ + Read-only access to your emails +
+
+ + Search through email content and metadata +
+
+ + Secure OAuth 2.0 authentication +
+
+ + You can disconnect anytime +
+
+ + + + +
+ ) : ( + /* Configuration Form Card */ + + + ✅ Your Gmail is successfully connected! + + + )} + + {/* Information Card */} + + + What data will be indexed? + + +
+

Email Content

+

+ We'll index the content of your emails including subject lines, sender information, + and message body text to make them searchable. +

+
+
+

Email Metadata

+

+ Information like sender, recipient, date, and labels will be indexed to provide + better search context and filtering options. +

+
+
+

Privacy & Security

+

+ Your emails are processed securely and stored with encryption. We only access emails + with read-only permissions and never modify or send emails on your behalf. +

+
+
+
+
+
+ ); +} diff --git a/surfsense_web/app/dashboard/[search_space_id]/connectors/add/page.tsx b/surfsense_web/app/dashboard/[search_space_id]/connectors/add/page.tsx index c6ec629cb..2d4f2b9e5 100644 --- a/surfsense_web/app/dashboard/[search_space_id]/connectors/add/page.tsx +++ b/surfsense_web/app/dashboard/[search_space_id]/connectors/add/page.tsx @@ -157,11 +157,11 @@ const connectorCategories: ConnectorCategory[] = [ status: "available", }, { - id: "gmail", + id: "google-gmail-connector", title: "Gmail", - description: "Connect to your Gmail account to access emails.", + description: "Connect to your Gmail account to search through your emails.", icon: , - status: "coming-soon", + status: "available", }, { id: "zoom", diff --git a/surfsense_web/app/dashboard/[search_space_id]/documents/(manage)/components/DocumentTypeIcon.tsx b/surfsense_web/app/dashboard/[search_space_id]/documents/(manage)/components/DocumentTypeIcon.tsx index e0cc12b0a..5273fa57e 100644 --- a/surfsense_web/app/dashboard/[search_space_id]/documents/(manage)/components/DocumentTypeIcon.tsx +++ b/surfsense_web/app/dashboard/[search_space_id]/documents/(manage)/components/DocumentTypeIcon.tsx @@ -10,6 +10,7 @@ import { IconCalendar, IconChecklist, IconLayoutKanban, + IconMail, IconTicket, } from "@tabler/icons-react"; import { File, Globe, Webhook } from "lucide-react"; @@ -31,6 +32,7 @@ const documentTypeIcons: Record = { CONFLUENCE_CONNECTOR: IconBook, CLICKUP_CONNECTOR: IconChecklist, GOOGLE_CALENDAR_CONNECTOR: IconCalendar, + GOOGLE_GMAIL_CONNECTOR: IconMail, }; export function getDocumentTypeIcon(type: string): IconComponent { diff --git a/surfsense_web/components/chat/ConnectorComponents.tsx b/surfsense_web/components/chat/ConnectorComponents.tsx index 2a3cdb49c..d66227e7a 100644 --- a/surfsense_web/components/chat/ConnectorComponents.tsx +++ b/surfsense_web/components/chat/ConnectorComponents.tsx @@ -7,6 +7,7 @@ import { IconCalendar, IconLayoutKanban, IconLinkPlus, + IconMail, IconTicket, } from "@tabler/icons-react"; import { @@ -59,6 +60,8 @@ export const getConnectorIcon = (connectorType: string) => { return ; case "GOOGLE_CALENDAR_CONNECTOR": return ; + case "GOOGLE_GMAIL_CONNECTOR": + return ; case "DEEP": return ; case "DEEPER":