From 64512c604d846cdf5db8396d95e60c24b3a75393 Mon Sep 17 00:00:00 2001 From: CREDO23 Date: Thu, 4 Jun 2026 20:09:37 +0200 Subject: [PATCH] refactor(agents): colocate gmail + calendar connector tools into subagent slices Gmail and Calendar are handled together because both Google connectors share the _build_credentials helper that lived in shared/tools/gmail. - relocate the gmail helpers (_get_token_encryption, _build_credentials, _gmail_headers, _format_gmail_summary) into the gmail subagent slice (tools/_helpers.py); repoint gmail search_emails/read_email to it. - calendar search_events now imports _build_credentials from the gmail slice (preserving the existing cross-connector Google-auth dependency). - repoint both dead tools/__init__ shims at the live local impls. - fix tests/e2e native_google fake: it patched the dead shared google_calendar.*.build paths; point it at the live subagent calendar modules (which actually import googleapiclient build). - delete dead shared/tools/{gmail,google_calendar} twins. shared/tools now has zero connector dirs. agents unit suite green (942). --- .../connectors/calendar/tools/__init__.py | 16 +- .../calendar/tools/search_events.py | 4 +- .../connectors/gmail/tools/__init__.py | 24 +- .../connectors/gmail/tools/_helpers.py | 81 +++ .../connectors/gmail/tools/read_email.py | 9 +- .../connectors/gmail/tools/search_emails.py | 9 +- .../app/agents/shared/tools/gmail/__init__.py | 27 - .../shared/tools/gmail/composio_helpers.py | 41 -- .../agents/shared/tools/gmail/create_draft.py | 361 ------------- .../agents/shared/tools/gmail/read_email.py | 172 ------ .../shared/tools/gmail/search_emails.py | 260 --------- .../agents/shared/tools/gmail/send_email.py | 363 ------------- .../agents/shared/tools/gmail/trash_email.py | 344 ------------ .../agents/shared/tools/gmail/update_draft.py | 495 ------------------ .../shared/tools/google_calendar/__init__.py | 19 - .../tools/google_calendar/create_event.py | 382 -------------- .../tools/google_calendar/delete_event.py | 340 ------------ .../tools/google_calendar/search_events.py | 187 ------- .../tools/google_calendar/update_event.py | 419 --------------- .../tests/e2e/fakes/native_google.py | 15 +- 20 files changed, 112 insertions(+), 3456 deletions(-) create mode 100644 surfsense_backend/app/agents/multi_agent_chat/subagents/connectors/gmail/tools/_helpers.py delete mode 100644 surfsense_backend/app/agents/shared/tools/gmail/__init__.py delete mode 100644 surfsense_backend/app/agents/shared/tools/gmail/composio_helpers.py delete mode 100644 surfsense_backend/app/agents/shared/tools/gmail/create_draft.py delete mode 100644 surfsense_backend/app/agents/shared/tools/gmail/read_email.py delete mode 100644 surfsense_backend/app/agents/shared/tools/gmail/search_emails.py delete mode 100644 surfsense_backend/app/agents/shared/tools/gmail/send_email.py delete mode 100644 surfsense_backend/app/agents/shared/tools/gmail/trash_email.py delete mode 100644 surfsense_backend/app/agents/shared/tools/gmail/update_draft.py delete mode 100644 surfsense_backend/app/agents/shared/tools/google_calendar/__init__.py delete mode 100644 surfsense_backend/app/agents/shared/tools/google_calendar/create_event.py delete mode 100644 surfsense_backend/app/agents/shared/tools/google_calendar/delete_event.py delete mode 100644 surfsense_backend/app/agents/shared/tools/google_calendar/search_events.py delete mode 100644 surfsense_backend/app/agents/shared/tools/google_calendar/update_event.py diff --git a/surfsense_backend/app/agents/multi_agent_chat/subagents/connectors/calendar/tools/__init__.py b/surfsense_backend/app/agents/multi_agent_chat/subagents/connectors/calendar/tools/__init__.py index 362cf4127..717199fef 100644 --- a/surfsense_backend/app/agents/multi_agent_chat/subagents/connectors/calendar/tools/__init__.py +++ b/surfsense_backend/app/agents/multi_agent_chat/subagents/connectors/calendar/tools/__init__.py @@ -1,15 +1,7 @@ -from app.agents.shared.tools.google_calendar.create_event import ( - create_create_calendar_event_tool, -) -from app.agents.shared.tools.google_calendar.delete_event import ( - create_delete_calendar_event_tool, -) -from app.agents.shared.tools.google_calendar.search_events import ( - create_search_calendar_events_tool, -) -from app.agents.shared.tools.google_calendar.update_event import ( - create_update_calendar_event_tool, -) +from .create_event import create_create_calendar_event_tool +from .delete_event import create_delete_calendar_event_tool +from .search_events import create_search_calendar_events_tool +from .update_event import create_update_calendar_event_tool __all__ = [ "create_create_calendar_event_tool", diff --git a/surfsense_backend/app/agents/multi_agent_chat/subagents/connectors/calendar/tools/search_events.py b/surfsense_backend/app/agents/multi_agent_chat/subagents/connectors/calendar/tools/search_events.py index 2768563f4..e0cb8c789 100644 --- a/surfsense_backend/app/agents/multi_agent_chat/subagents/connectors/calendar/tools/search_events.py +++ b/surfsense_backend/app/agents/multi_agent_chat/subagents/connectors/calendar/tools/search_events.py @@ -5,7 +5,9 @@ from langchain_core.tools import tool from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.future import select -from app.agents.shared.tools.gmail.search_emails import _build_credentials +from app.agents.multi_agent_chat.subagents.connectors.gmail.tools._helpers import ( + _build_credentials, +) from app.db import SearchSourceConnector, SearchSourceConnectorType logger = logging.getLogger(__name__) diff --git a/surfsense_backend/app/agents/multi_agent_chat/subagents/connectors/gmail/tools/__init__.py b/surfsense_backend/app/agents/multi_agent_chat/subagents/connectors/gmail/tools/__init__.py index f32312fe6..1f0839c44 100644 --- a/surfsense_backend/app/agents/multi_agent_chat/subagents/connectors/gmail/tools/__init__.py +++ b/surfsense_backend/app/agents/multi_agent_chat/subagents/connectors/gmail/tools/__init__.py @@ -1,21 +1,9 @@ -from app.agents.shared.tools.gmail.create_draft import ( - create_create_gmail_draft_tool, -) -from app.agents.shared.tools.gmail.read_email import ( - create_read_gmail_email_tool, -) -from app.agents.shared.tools.gmail.search_emails import ( - create_search_gmail_tool, -) -from app.agents.shared.tools.gmail.send_email import ( - create_send_gmail_email_tool, -) -from app.agents.shared.tools.gmail.trash_email import ( - create_trash_gmail_email_tool, -) -from app.agents.shared.tools.gmail.update_draft import ( - create_update_gmail_draft_tool, -) +from .create_draft import create_create_gmail_draft_tool +from .read_email import create_read_gmail_email_tool +from .search_emails import create_search_gmail_tool +from .send_email import create_send_gmail_email_tool +from .trash_email import create_trash_gmail_email_tool +from .update_draft import create_update_gmail_draft_tool __all__ = [ "create_create_gmail_draft_tool", diff --git a/surfsense_backend/app/agents/multi_agent_chat/subagents/connectors/gmail/tools/_helpers.py b/surfsense_backend/app/agents/multi_agent_chat/subagents/connectors/gmail/tools/_helpers.py new file mode 100644 index 000000000..5a467e328 --- /dev/null +++ b/surfsense_backend/app/agents/multi_agent_chat/subagents/connectors/gmail/tools/_helpers.py @@ -0,0 +1,81 @@ +"""Shared helpers for Gmail connector tools. + +Credential construction (``_build_credentials``) is also reused by the +Calendar connector tools, since both are Google OAuth backed. +""" + +from __future__ import annotations + +from datetime import datetime +from typing import Any + +from app.db import SearchSourceConnector + +_token_encryption_cache: object | None = None + + +def _get_token_encryption(): + global _token_encryption_cache + if _token_encryption_cache is None: + from app.config import config + from app.utils.oauth_security import TokenEncryption + + if not config.SECRET_KEY: + raise RuntimeError("SECRET_KEY not configured for token decryption.") + _token_encryption_cache = TokenEncryption(config.SECRET_KEY) + return _token_encryption_cache + + +def _build_credentials(connector: SearchSourceConnector): + """Build Google OAuth Credentials from a connector's stored config. + + Handles both native OAuth connectors (with encrypted tokens) and + Composio-backed connectors. Shared by Gmail and Calendar tools. + """ + from app.utils.google_credentials import COMPOSIO_GOOGLE_CONNECTOR_TYPES + + if connector.connector_type in COMPOSIO_GOOGLE_CONNECTOR_TYPES: + raise ValueError("Composio connectors must use Composio tool execution.") + + from google.oauth2.credentials import Credentials + + cfg = dict(connector.config) + if cfg.get("_token_encrypted"): + enc = _get_token_encryption() + for key in ("token", "refresh_token", "client_secret"): + if cfg.get(key): + cfg[key] = enc.decrypt_token(cfg[key]) + + exp = (cfg.get("expiry") or "").replace("Z", "") + return Credentials( + token=cfg.get("token"), + refresh_token=cfg.get("refresh_token"), + token_uri=cfg.get("token_uri"), + client_id=cfg.get("client_id"), + client_secret=cfg.get("client_secret"), + scopes=cfg.get("scopes", []), + expiry=datetime.fromisoformat(exp) if exp else None, + ) + + +def _gmail_headers(message: dict[str, Any]) -> dict[str, str]: + headers = message.get("payload", {}).get("headers", []) + return { + header.get("name", "").lower(): header.get("value", "") + for header in headers + if isinstance(header, dict) + } + + +def _format_gmail_summary(message: dict[str, Any]) -> dict[str, Any]: + headers = _gmail_headers(message) + return { + "message_id": message.get("id") or message.get("messageId"), + "thread_id": message.get("threadId"), + "subject": message.get("subject") or headers.get("subject", "No Subject"), + "from": message.get("sender") or headers.get("from", "Unknown"), + "to": message.get("to") or headers.get("to", ""), + "date": message.get("messageTimestamp") or headers.get("date", ""), + "snippet": message.get("snippet") or message.get("messageText", "")[:300], + "labels": message.get("labelIds", []), + } diff --git a/surfsense_backend/app/agents/multi_agent_chat/subagents/connectors/gmail/tools/read_email.py b/surfsense_backend/app/agents/multi_agent_chat/subagents/connectors/gmail/tools/read_email.py index 0636bf3d9..10c64c6c5 100644 --- a/surfsense_backend/app/agents/multi_agent_chat/subagents/connectors/gmail/tools/read_email.py +++ b/surfsense_backend/app/agents/multi_agent_chat/subagents/connectors/gmail/tools/read_email.py @@ -61,11 +61,10 @@ def create_read_gmail_email_tool( "message": "Composio connected account ID not found for this Gmail connector.", } - from app.agents.shared.tools.gmail.search_emails import ( - _format_gmail_summary, - ) from app.services.composio_service import ComposioService + from ._helpers import _format_gmail_summary + detail, error = await ComposioService().get_gmail_message_detail( connected_account_id=cca_id, entity_id=f"surfsense_{user_id}", @@ -97,9 +96,7 @@ def create_read_gmail_email_tool( "content": content, } - from app.agents.shared.tools.gmail.search_emails import ( - _build_credentials, - ) + from ._helpers import _build_credentials creds = _build_credentials(connector) diff --git a/surfsense_backend/app/agents/multi_agent_chat/subagents/connectors/gmail/tools/search_emails.py b/surfsense_backend/app/agents/multi_agent_chat/subagents/connectors/gmail/tools/search_emails.py index a3466cfa5..2c633d629 100644 --- a/surfsense_backend/app/agents/multi_agent_chat/subagents/connectors/gmail/tools/search_emails.py +++ b/surfsense_backend/app/agents/multi_agent_chat/subagents/connectors/gmail/tools/search_emails.py @@ -69,11 +69,10 @@ def create_search_gmail_tool( "message": "Composio connected account ID not found for this Gmail connector.", } - from app.agents.shared.tools.gmail.search_emails import ( - _format_gmail_summary, - ) from app.services.composio_service import ComposioService + from ._helpers import _format_gmail_summary + ( messages, _next, @@ -98,9 +97,7 @@ def create_search_gmail_tool( } return {"status": "success", "emails": emails, "total": len(emails)} - from app.agents.shared.tools.gmail.search_emails import ( - _build_credentials, - ) + from ._helpers import _build_credentials creds = _build_credentials(connector) diff --git a/surfsense_backend/app/agents/shared/tools/gmail/__init__.py b/surfsense_backend/app/agents/shared/tools/gmail/__init__.py deleted file mode 100644 index f32312fe6..000000000 --- a/surfsense_backend/app/agents/shared/tools/gmail/__init__.py +++ /dev/null @@ -1,27 +0,0 @@ -from app.agents.shared.tools.gmail.create_draft import ( - create_create_gmail_draft_tool, -) -from app.agents.shared.tools.gmail.read_email import ( - create_read_gmail_email_tool, -) -from app.agents.shared.tools.gmail.search_emails import ( - create_search_gmail_tool, -) -from app.agents.shared.tools.gmail.send_email import ( - create_send_gmail_email_tool, -) -from app.agents.shared.tools.gmail.trash_email import ( - create_trash_gmail_email_tool, -) -from app.agents.shared.tools.gmail.update_draft import ( - create_update_gmail_draft_tool, -) - -__all__ = [ - "create_create_gmail_draft_tool", - "create_read_gmail_email_tool", - "create_search_gmail_tool", - "create_send_gmail_email_tool", - "create_trash_gmail_email_tool", - "create_update_gmail_draft_tool", -] diff --git a/surfsense_backend/app/agents/shared/tools/gmail/composio_helpers.py b/surfsense_backend/app/agents/shared/tools/gmail/composio_helpers.py deleted file mode 100644 index 0ca1191a4..000000000 --- a/surfsense_backend/app/agents/shared/tools/gmail/composio_helpers.py +++ /dev/null @@ -1,41 +0,0 @@ -from typing import Any - -from app.db import SearchSourceConnector -from app.services.composio_service import ComposioService - - -def split_recipients(value: str | None) -> list[str]: - if not value: - return [] - return [recipient.strip() for recipient in value.split(",") if recipient.strip()] - - -def unwrap_composio_data(data: Any) -> Any: - if isinstance(data, dict): - inner = data.get("data", data) - if isinstance(inner, dict): - return inner.get("response_data", inner) - return inner - return data - - -async def execute_composio_gmail_tool( - connector: SearchSourceConnector, - user_id: str, - tool_name: str, - params: dict[str, Any], -) -> tuple[Any, str | None]: - cca_id = connector.config.get("composio_connected_account_id") - if not cca_id: - return None, "Composio connected account ID not found for this Gmail connector." - - result = await ComposioService().execute_tool( - connected_account_id=cca_id, - tool_name=tool_name, - params=params, - entity_id=f"surfsense_{user_id}", - ) - if not result.get("success"): - return None, result.get("error", "Unknown Composio Gmail error") - - return unwrap_composio_data(result.get("data")), None diff --git a/surfsense_backend/app/agents/shared/tools/gmail/create_draft.py b/surfsense_backend/app/agents/shared/tools/gmail/create_draft.py deleted file mode 100644 index e44fa33a2..000000000 --- a/surfsense_backend/app/agents/shared/tools/gmail/create_draft.py +++ /dev/null @@ -1,361 +0,0 @@ -import asyncio -import base64 -import logging -from datetime import datetime -from email.mime.text import MIMEText -from typing import Any - -from langchain_core.tools import tool -from sqlalchemy.ext.asyncio import AsyncSession - -from app.agents.shared.tools.hitl import request_approval -from app.db import async_session_maker -from app.services.gmail import GmailToolMetadataService - -logger = logging.getLogger(__name__) - - -def create_create_gmail_draft_tool( - db_session: AsyncSession | None = None, - search_space_id: int | None = None, - user_id: str | None = None, -): - """ - Factory function to create the create_gmail_draft tool. - - The tool acquires its own short-lived ``AsyncSession`` per call via - :data:`async_session_maker` so the closure is safe to share across - HTTP requests by the compiled-agent cache. Capturing a per-request - session here would surface stale/closed sessions on cache hits. - - Args: - db_session: Reserved for registry compatibility. Per-call sessions - are opened via :data:`async_session_maker` inside the tool body. - - Returns: - Configured create_gmail_draft tool - """ - del db_session # per-call session — see docstring - - @tool - async def create_gmail_draft( - to: str, - subject: str, - body: str, - cc: str | None = None, - bcc: str | None = None, - ) -> dict[str, Any]: - """Create a draft email in Gmail. - - Use when the user asks to draft, compose, or prepare an email without - sending it. - - Args: - to: Recipient email address. - subject: Email subject line. - body: Email body content. - cc: Optional CC recipient(s), comma-separated. - bcc: Optional BCC recipient(s), comma-separated. - - Returns: - Dictionary with: - - status: "success", "rejected", or "error" - - draft_id: Gmail draft ID (if success) - - message: Result message - - IMPORTANT: - - If status is "rejected", the user explicitly declined the action. - Respond with a brief acknowledgment and do NOT retry or suggest alternatives. - - If status is "insufficient_permissions", the connector lacks the required OAuth scope. - Inform the user they need to re-authenticate and do NOT retry the action. - - Examples: - - "Draft an email to alice@example.com about the meeting" - - "Compose a reply to Bob about the project update" - """ - logger.info(f"create_gmail_draft called: to='{to}', subject='{subject}'") - - if search_space_id is None or user_id is None: - return { - "status": "error", - "message": "Gmail tool not properly configured. Please contact support.", - } - - try: - async with async_session_maker() as db_session: - metadata_service = GmailToolMetadataService(db_session) - context = await metadata_service.get_creation_context( - search_space_id, user_id - ) - - if "error" in context: - logger.error( - f"Failed to fetch creation context: {context['error']}" - ) - return {"status": "error", "message": context["error"]} - - accounts = context.get("accounts", []) - if accounts and all(a.get("auth_expired") for a in accounts): - logger.warning("All Gmail accounts have expired authentication") - return { - "status": "auth_error", - "message": "All connected Gmail accounts need re-authentication. Please re-authenticate in your connector settings.", - "connector_type": "gmail", - } - - logger.info( - f"Requesting approval for creating Gmail draft: to='{to}', subject='{subject}'" - ) - result = request_approval( - action_type="gmail_draft_creation", - tool_name="create_gmail_draft", - params={ - "to": to, - "subject": subject, - "body": body, - "cc": cc, - "bcc": bcc, - "connector_id": None, - }, - context=context, - ) - - if result.rejected: - return { - "status": "rejected", - "message": "User declined. The draft was not created. Do not ask again or suggest alternatives.", - } - - final_to = result.params.get("to", to) - final_subject = result.params.get("subject", subject) - final_body = result.params.get("body", body) - final_cc = result.params.get("cc", cc) - final_bcc = result.params.get("bcc", bcc) - final_connector_id = result.params.get("connector_id") - - from sqlalchemy.future import select - - from app.db import SearchSourceConnector, SearchSourceConnectorType - - _gmail_types = [ - SearchSourceConnectorType.GOOGLE_GMAIL_CONNECTOR, - SearchSourceConnectorType.COMPOSIO_GMAIL_CONNECTOR, - ] - - if final_connector_id is not None: - result = await db_session.execute( - select(SearchSourceConnector).filter( - SearchSourceConnector.id == final_connector_id, - SearchSourceConnector.search_space_id == search_space_id, - SearchSourceConnector.user_id == user_id, - SearchSourceConnector.connector_type.in_(_gmail_types), - ) - ) - connector = result.scalars().first() - if not connector: - return { - "status": "error", - "message": "Selected Gmail connector is invalid or has been disconnected.", - } - actual_connector_id = connector.id - else: - result = await db_session.execute( - select(SearchSourceConnector).filter( - SearchSourceConnector.search_space_id == search_space_id, - SearchSourceConnector.user_id == user_id, - SearchSourceConnector.connector_type.in_(_gmail_types), - ) - ) - connector = result.scalars().first() - if not connector: - return { - "status": "error", - "message": "No Gmail connector found. Please connect Gmail in your workspace settings.", - } - actual_connector_id = connector.id - - logger.info( - f"Creating Gmail draft: to='{final_to}', subject='{final_subject}', connector={actual_connector_id}" - ) - - is_composio_gmail = ( - connector.connector_type - == SearchSourceConnectorType.COMPOSIO_GMAIL_CONNECTOR - ) - if is_composio_gmail: - cca_id = connector.config.get("composio_connected_account_id") - if not cca_id: - return { - "status": "error", - "message": "Composio connected account ID not found for this Gmail connector.", - } - else: - from google.oauth2.credentials import Credentials - - from app.config import config - from app.utils.oauth_security import TokenEncryption - - config_data = dict(connector.config) - token_encrypted = config_data.get("_token_encrypted", False) - if token_encrypted and config.SECRET_KEY: - token_encryption = TokenEncryption(config.SECRET_KEY) - if config_data.get("token"): - config_data["token"] = token_encryption.decrypt_token( - config_data["token"] - ) - if config_data.get("refresh_token"): - config_data["refresh_token"] = ( - token_encryption.decrypt_token( - config_data["refresh_token"] - ) - ) - if config_data.get("client_secret"): - config_data["client_secret"] = ( - token_encryption.decrypt_token( - config_data["client_secret"] - ) - ) - - exp = config_data.get("expiry", "") - if exp: - exp = exp.replace("Z", "") - - creds = Credentials( - token=config_data.get("token"), - refresh_token=config_data.get("refresh_token"), - token_uri=config_data.get("token_uri"), - client_id=config_data.get("client_id"), - client_secret=config_data.get("client_secret"), - scopes=config_data.get("scopes", []), - expiry=datetime.fromisoformat(exp) if exp else None, - ) - - message = MIMEText(final_body) - message["to"] = final_to - message["subject"] = final_subject - if final_cc: - message["cc"] = final_cc - if final_bcc: - message["bcc"] = final_bcc - raw = base64.urlsafe_b64encode(message.as_bytes()).decode() - - try: - if is_composio_gmail: - from app.agents.shared.tools.gmail.composio_helpers import ( - execute_composio_gmail_tool, - split_recipients, - ) - - created, error = await execute_composio_gmail_tool( - connector, - user_id, - "GMAIL_CREATE_EMAIL_DRAFT", - { - "user_id": "me", - "recipient_email": final_to, - "subject": final_subject, - "body": final_body, - "cc": split_recipients(final_cc), - "bcc": split_recipients(final_bcc), - "is_html": False, - }, - ) - if error: - raise RuntimeError(error) - if not isinstance(created, dict): - created = {} - else: - from googleapiclient.discovery import build - - gmail_service = build("gmail", "v1", credentials=creds) - created = await asyncio.get_event_loop().run_in_executor( - None, - lambda: ( - gmail_service.users() - .drafts() - .create(userId="me", body={"message": {"raw": raw}}) - .execute() - ), - ) - except Exception as api_err: - from googleapiclient.errors import HttpError - - if isinstance(api_err, HttpError) and api_err.resp.status == 403: - logger.warning( - f"Insufficient permissions for connector {actual_connector_id}: {api_err}" - ) - try: - from sqlalchemy.orm.attributes import flag_modified - - _res = await db_session.execute( - select(SearchSourceConnector).where( - SearchSourceConnector.id == actual_connector_id - ) - ) - _conn = _res.scalar_one_or_none() - if _conn and not _conn.config.get("auth_expired"): - _conn.config = {**_conn.config, "auth_expired": True} - flag_modified(_conn, "config") - await db_session.commit() - except Exception: - logger.warning( - "Failed to persist auth_expired for connector %s", - actual_connector_id, - exc_info=True, - ) - return { - "status": "insufficient_permissions", - "connector_id": actual_connector_id, - "message": "This Gmail account needs additional permissions. Please re-authenticate in connector settings.", - } - raise - - logger.info(f"Gmail draft created: id={created.get('id')}") - - kb_message_suffix = "" - try: - from app.services.gmail import GmailKBSyncService - - kb_service = GmailKBSyncService(db_session) - draft_message = created.get("message", {}) - kb_result = await kb_service.sync_after_create( - message_id=draft_message.get("id", ""), - thread_id=draft_message.get("threadId", ""), - subject=final_subject, - sender="me", - date_str=datetime.now().strftime("%Y-%m-%d %H:%M:%S"), - body_text=final_body, - connector_id=actual_connector_id, - search_space_id=search_space_id, - user_id=user_id, - draft_id=created.get("id"), - ) - if kb_result["status"] == "success": - kb_message_suffix = ( - " Your knowledge base has also been updated." - ) - else: - kb_message_suffix = " This draft will be added to your knowledge base in the next scheduled sync." - except Exception as kb_err: - logger.warning(f"KB sync after create failed: {kb_err}") - kb_message_suffix = " This draft will be added to your knowledge base in the next scheduled sync." - - return { - "status": "success", - "draft_id": created.get("id"), - "message": f"Successfully created Gmail draft with subject '{final_subject}'.{kb_message_suffix}", - } - - except Exception as e: - from langgraph.errors import GraphInterrupt - - if isinstance(e, GraphInterrupt): - raise - - logger.error(f"Error creating Gmail draft: {e}", exc_info=True) - return { - "status": "error", - "message": "Something went wrong while creating the draft. Please try again.", - } - - return create_gmail_draft diff --git a/surfsense_backend/app/agents/shared/tools/gmail/read_email.py b/surfsense_backend/app/agents/shared/tools/gmail/read_email.py deleted file mode 100644 index 684379a09..000000000 --- a/surfsense_backend/app/agents/shared/tools/gmail/read_email.py +++ /dev/null @@ -1,172 +0,0 @@ -import logging -from typing import Any - -from langchain_core.tools import tool -from sqlalchemy.ext.asyncio import AsyncSession -from sqlalchemy.future import select - -from app.db import SearchSourceConnector, SearchSourceConnectorType, async_session_maker - -logger = logging.getLogger(__name__) - -_GMAIL_TYPES = [ - SearchSourceConnectorType.GOOGLE_GMAIL_CONNECTOR, - SearchSourceConnectorType.COMPOSIO_GMAIL_CONNECTOR, -] - - -def create_read_gmail_email_tool( - db_session: AsyncSession | None = None, - search_space_id: int | None = None, - user_id: str | None = None, -): - """ - Factory function to create the read_gmail_email tool. - - The tool acquires its own short-lived ``AsyncSession`` per call via - :data:`async_session_maker` so the closure is safe to share across - HTTP requests by the compiled-agent cache. Capturing a per-request - session here would surface stale/closed sessions on cache hits. - - Args: - db_session: Reserved for registry compatibility. Per-call sessions - are opened via :data:`async_session_maker` inside the tool body. - - Returns: - Configured read_gmail_email tool - """ - del db_session # per-call session — see docstring - - @tool - async def read_gmail_email(message_id: str) -> dict[str, Any]: - """Read the full content of a specific Gmail email by its message ID. - - Use after search_gmail to get the complete body of an email. - - Args: - message_id: The Gmail message ID (from search_gmail results). - - Returns: - Dictionary with status and the full email content formatted as markdown. - """ - if search_space_id is None or user_id is None: - return {"status": "error", "message": "Gmail tool not properly configured."} - - try: - async with async_session_maker() as db_session: - result = await db_session.execute( - select(SearchSourceConnector).filter( - SearchSourceConnector.search_space_id == search_space_id, - SearchSourceConnector.user_id == user_id, - SearchSourceConnector.connector_type.in_(_GMAIL_TYPES), - ) - ) - connector = result.scalars().first() - if not connector: - return { - "status": "error", - "message": "No Gmail connector found. Please connect Gmail in your workspace settings.", - } - - if ( - connector.connector_type - == SearchSourceConnectorType.COMPOSIO_GMAIL_CONNECTOR - ): - cca_id = connector.config.get("composio_connected_account_id") - if not cca_id: - return { - "status": "error", - "message": "Composio connected account ID not found.", - } - - from app.agents.shared.tools.gmail.search_emails import ( - _format_gmail_summary, - ) - from app.services.composio_service import ComposioService - - service = ComposioService() - detail, error = await service.get_gmail_message_detail( - connected_account_id=cca_id, - entity_id=f"surfsense_{user_id}", - message_id=message_id, - ) - if error: - return {"status": "error", "message": error} - if not detail: - return { - "status": "not_found", - "message": f"Email with ID '{message_id}' not found.", - } - - summary = _format_gmail_summary(detail) - content = ( - f"# {summary['subject']}\n\n" - f"**From:** {summary['from']}\n" - f"**To:** {summary['to']}\n" - f"**Date:** {summary['date']}\n\n" - f"## Message Content\n\n" - f"{detail.get('messageText') or detail.get('snippet') or ''}\n\n" - f"## Message Details\n\n" - f"- **Message ID:** {summary['message_id']}\n" - f"- **Thread ID:** {summary['thread_id']}\n" - ) - return { - "status": "success", - "message_id": summary["message_id"] or message_id, - "content": content, - } - - from app.agents.shared.tools.gmail.search_emails import ( - _build_credentials, - ) - - creds = _build_credentials(connector) - - from app.connectors.google_gmail_connector import GoogleGmailConnector - - gmail = GoogleGmailConnector( - credentials=creds, - session=db_session, - user_id=user_id, - connector_id=connector.id, - ) - - detail, error = await gmail.get_message_details(message_id) - if error: - if ( - "re-authenticate" in error.lower() - or "authentication failed" in error.lower() - ): - return { - "status": "auth_error", - "message": error, - "connector_type": "gmail", - } - return {"status": "error", "message": error} - - if not detail: - return { - "status": "not_found", - "message": f"Email with ID '{message_id}' not found.", - } - - content = gmail.format_message_to_markdown(detail) - - return { - "status": "success", - "message_id": message_id, - "content": content, - } - - except Exception as e: - from langgraph.errors import GraphInterrupt - - if isinstance(e, GraphInterrupt): - raise - logger.error("Error reading Gmail email: %s", e, exc_info=True) - return { - "status": "error", - "message": "Failed to read email. Please try again.", - } - - return read_gmail_email diff --git a/surfsense_backend/app/agents/shared/tools/gmail/search_emails.py b/surfsense_backend/app/agents/shared/tools/gmail/search_emails.py deleted file mode 100644 index 3ce154c53..000000000 --- a/surfsense_backend/app/agents/shared/tools/gmail/search_emails.py +++ /dev/null @@ -1,260 +0,0 @@ -import logging -from datetime import datetime -from typing import Any - -from langchain_core.tools import tool -from sqlalchemy.ext.asyncio import AsyncSession -from sqlalchemy.future import select - -from app.db import SearchSourceConnector, SearchSourceConnectorType, async_session_maker - -logger = logging.getLogger(__name__) - -_GMAIL_TYPES = [ - SearchSourceConnectorType.GOOGLE_GMAIL_CONNECTOR, - SearchSourceConnectorType.COMPOSIO_GMAIL_CONNECTOR, -] - -_token_encryption_cache: object | None = None - - -def _get_token_encryption(): - global _token_encryption_cache - if _token_encryption_cache is None: - from app.config import config - from app.utils.oauth_security import TokenEncryption - - if not config.SECRET_KEY: - raise RuntimeError("SECRET_KEY not configured for token decryption.") - _token_encryption_cache = TokenEncryption(config.SECRET_KEY) - return _token_encryption_cache - - -def _build_credentials(connector: SearchSourceConnector): - """Build Google OAuth Credentials from a connector's stored config. - - Handles both native OAuth connectors (with encrypted tokens) and - Composio-backed connectors. Shared by Gmail and Calendar tools. - """ - from app.utils.google_credentials import COMPOSIO_GOOGLE_CONNECTOR_TYPES - - if connector.connector_type in COMPOSIO_GOOGLE_CONNECTOR_TYPES: - raise ValueError("Composio connectors must use Composio tool execution.") - - from google.oauth2.credentials import Credentials - - cfg = dict(connector.config) - if cfg.get("_token_encrypted"): - enc = _get_token_encryption() - for key in ("token", "refresh_token", "client_secret"): - if cfg.get(key): - cfg[key] = enc.decrypt_token(cfg[key]) - - exp = (cfg.get("expiry") or "").replace("Z", "") - return Credentials( - token=cfg.get("token"), - refresh_token=cfg.get("refresh_token"), - token_uri=cfg.get("token_uri"), - client_id=cfg.get("client_id"), - client_secret=cfg.get("client_secret"), - scopes=cfg.get("scopes", []), - expiry=datetime.fromisoformat(exp) if exp else None, - ) - - -def _gmail_headers(message: dict[str, Any]) -> dict[str, str]: - headers = message.get("payload", {}).get("headers", []) - return { - header.get("name", "").lower(): header.get("value", "") - for header in headers - if isinstance(header, dict) - } - - -def _format_gmail_summary(message: dict[str, Any]) -> dict[str, Any]: - headers = _gmail_headers(message) - return { - "message_id": message.get("id") or message.get("messageId"), - "thread_id": message.get("threadId"), - "subject": message.get("subject") or headers.get("subject", "No Subject"), - "from": message.get("sender") or headers.get("from", "Unknown"), - "to": message.get("to") or headers.get("to", ""), - "date": message.get("messageTimestamp") or headers.get("date", ""), - "snippet": message.get("snippet") or message.get("messageText", "")[:300], - "labels": message.get("labelIds", []), - } - - -async def _search_composio_gmail( - connector: SearchSourceConnector, - user_id: str, - query: str, - max_results: int, -) -> dict[str, Any]: - cca_id = connector.config.get("composio_connected_account_id") - if not cca_id: - return { - "status": "error", - "message": "Composio connected account ID not found.", - } - - from app.services.composio_service import ComposioService - - service = ComposioService() - messages, _next_token, _estimate, error = await service.get_gmail_messages( - connected_account_id=cca_id, - entity_id=f"surfsense_{user_id}", - query=query, - max_results=max_results, - ) - if error: - return {"status": "error", "message": error} - - emails = [_format_gmail_summary(message) for message in messages] - return { - "status": "success", - "emails": emails, - "total": len(emails), - "message": "No emails found." if not emails else None, - } - - -def create_search_gmail_tool( - db_session: AsyncSession | None = None, - search_space_id: int | None = None, - user_id: str | None = None, -): - """ - Factory function to create the search_gmail tool. - - The tool acquires its own short-lived ``AsyncSession`` per call via - :data:`async_session_maker` so the closure is safe to share across - HTTP requests by the compiled-agent cache. Capturing a per-request - session here would surface stale/closed sessions on cache hits. - - Args: - db_session: Reserved for registry compatibility. Per-call sessions - are opened via :data:`async_session_maker` inside the tool body. - - Returns: - Configured search_gmail tool - """ - del db_session # per-call session — see docstring - - @tool - async def search_gmail( - query: str, - max_results: int = 10, - ) -> dict[str, Any]: - """Search emails in the user's Gmail inbox using Gmail search syntax. - - Args: - query: Gmail search query, same syntax as the Gmail search bar. - Examples: "from:alice@example.com", "subject:meeting", - "is:unread", "after:2024/01/01 before:2024/02/01", - "has:attachment", "in:sent". - max_results: Number of emails to return (default 10, max 20). - - Returns: - Dictionary with status and a list of email summaries including - message_id, subject, from, date, snippet. - """ - if search_space_id is None or user_id is None: - return {"status": "error", "message": "Gmail tool not properly configured."} - - max_results = min(max_results, 20) - - try: - async with async_session_maker() as db_session: - result = await db_session.execute( - select(SearchSourceConnector).filter( - SearchSourceConnector.search_space_id == search_space_id, - SearchSourceConnector.user_id == user_id, - SearchSourceConnector.connector_type.in_(_GMAIL_TYPES), - ) - ) - connector = result.scalars().first() - if not connector: - return { - "status": "error", - "message": "No Gmail connector found. Please connect Gmail in your workspace settings.", - } - - if ( - connector.connector_type - == SearchSourceConnectorType.COMPOSIO_GMAIL_CONNECTOR - ): - return await _search_composio_gmail( - connector, str(user_id), query, max_results - ) - - creds = _build_credentials(connector) - - from app.connectors.google_gmail_connector import GoogleGmailConnector - - gmail = GoogleGmailConnector( - credentials=creds, - session=db_session, - user_id=user_id, - connector_id=connector.id, - ) - - messages_list, error = await gmail.get_messages_list( - max_results=max_results, query=query - ) - if error: - if ( - "re-authenticate" in error.lower() - or "authentication failed" in error.lower() - ): - return { - "status": "auth_error", - "message": error, - "connector_type": "gmail", - } - return {"status": "error", "message": error} - - if not messages_list: - return { - "status": "success", - "emails": [], - "total": 0, - "message": "No emails found.", - } - - emails = [] - for msg in messages_list: - detail, err = await gmail.get_message_details(msg["id"]) - if err: - continue - headers = { - h["name"].lower(): h["value"] - for h in detail.get("payload", {}).get("headers", []) - } - emails.append( - { - "message_id": detail.get("id"), - "thread_id": detail.get("threadId"), - "subject": headers.get("subject", "No Subject"), - "from": headers.get("from", "Unknown"), - "to": headers.get("to", ""), - "date": headers.get("date", ""), - "snippet": detail.get("snippet", ""), - "labels": detail.get("labelIds", []), - } - ) - - return {"status": "success", "emails": emails, "total": len(emails)} - - except Exception as e: - from langgraph.errors import GraphInterrupt - - if isinstance(e, GraphInterrupt): - raise - logger.error("Error searching Gmail: %s", e, exc_info=True) - return { - "status": "error", - "message": "Failed to search Gmail. Please try again.", - } - - return search_gmail diff --git a/surfsense_backend/app/agents/shared/tools/gmail/send_email.py b/surfsense_backend/app/agents/shared/tools/gmail/send_email.py deleted file mode 100644 index 0f10e8082..000000000 --- a/surfsense_backend/app/agents/shared/tools/gmail/send_email.py +++ /dev/null @@ -1,363 +0,0 @@ -import asyncio -import base64 -import logging -from datetime import datetime -from email.mime.text import MIMEText -from typing import Any - -from langchain_core.tools import tool -from sqlalchemy.ext.asyncio import AsyncSession - -from app.agents.shared.tools.hitl import request_approval -from app.db import async_session_maker -from app.services.gmail import GmailToolMetadataService - -logger = logging.getLogger(__name__) - - -def create_send_gmail_email_tool( - db_session: AsyncSession | None = None, - search_space_id: int | None = None, - user_id: str | None = None, -): - """ - Factory function to create the send_gmail_email tool. - - The tool acquires its own short-lived ``AsyncSession`` per call via - :data:`async_session_maker` so the closure is safe to share across - HTTP requests by the compiled-agent cache. Capturing a per-request - session here would surface stale/closed sessions on cache hits. - - Args: - db_session: Reserved for registry compatibility. Per-call sessions - are opened via :data:`async_session_maker` inside the tool body. - - Returns: - Configured send_gmail_email tool - """ - del db_session # per-call session — see docstring - - @tool - async def send_gmail_email( - to: str, - subject: str, - body: str, - cc: str | None = None, - bcc: str | None = None, - ) -> dict[str, Any]: - """Send an email via Gmail. - - Use when the user explicitly asks to send an email. This sends the - email immediately - it cannot be unsent. - - Args: - to: Recipient email address. - subject: Email subject line. - body: Email body content. - cc: Optional CC recipient(s), comma-separated. - bcc: Optional BCC recipient(s), comma-separated. - - Returns: - Dictionary with: - - status: "success", "rejected", or "error" - - message_id: Gmail message ID (if success) - - thread_id: Gmail thread ID (if success) - - message: Result message - - IMPORTANT: - - If status is "rejected", the user explicitly declined the action. - Respond with a brief acknowledgment and do NOT retry or suggest alternatives. - - If status is "insufficient_permissions", the connector lacks the required OAuth scope. - Inform the user they need to re-authenticate and do NOT retry the action. - - Examples: - - "Send an email to alice@example.com about the meeting" - - "Email Bob the project update" - """ - logger.info(f"send_gmail_email called: to='{to}', subject='{subject}'") - - if search_space_id is None or user_id is None: - return { - "status": "error", - "message": "Gmail tool not properly configured. Please contact support.", - } - - try: - async with async_session_maker() as db_session: - metadata_service = GmailToolMetadataService(db_session) - context = await metadata_service.get_creation_context( - search_space_id, user_id - ) - - if "error" in context: - logger.error( - f"Failed to fetch creation context: {context['error']}" - ) - return {"status": "error", "message": context["error"]} - - accounts = context.get("accounts", []) - if accounts and all(a.get("auth_expired") for a in accounts): - logger.warning("All Gmail accounts have expired authentication") - return { - "status": "auth_error", - "message": "All connected Gmail accounts need re-authentication. Please re-authenticate in your connector settings.", - "connector_type": "gmail", - } - - logger.info( - f"Requesting approval for sending Gmail email: to='{to}', subject='{subject}'" - ) - result = request_approval( - action_type="gmail_email_send", - tool_name="send_gmail_email", - params={ - "to": to, - "subject": subject, - "body": body, - "cc": cc, - "bcc": bcc, - "connector_id": None, - }, - context=context, - ) - - if result.rejected: - return { - "status": "rejected", - "message": "User declined. The email was not sent. Do not ask again or suggest alternatives.", - } - - final_to = result.params.get("to", to) - final_subject = result.params.get("subject", subject) - final_body = result.params.get("body", body) - final_cc = result.params.get("cc", cc) - final_bcc = result.params.get("bcc", bcc) - final_connector_id = result.params.get("connector_id") - - from sqlalchemy.future import select - - from app.db import SearchSourceConnector, SearchSourceConnectorType - - _gmail_types = [ - SearchSourceConnectorType.GOOGLE_GMAIL_CONNECTOR, - SearchSourceConnectorType.COMPOSIO_GMAIL_CONNECTOR, - ] - - if final_connector_id is not None: - result = await db_session.execute( - select(SearchSourceConnector).filter( - SearchSourceConnector.id == final_connector_id, - SearchSourceConnector.search_space_id == search_space_id, - SearchSourceConnector.user_id == user_id, - SearchSourceConnector.connector_type.in_(_gmail_types), - ) - ) - connector = result.scalars().first() - if not connector: - return { - "status": "error", - "message": "Selected Gmail connector is invalid or has been disconnected.", - } - actual_connector_id = connector.id - else: - result = await db_session.execute( - select(SearchSourceConnector).filter( - SearchSourceConnector.search_space_id == search_space_id, - SearchSourceConnector.user_id == user_id, - SearchSourceConnector.connector_type.in_(_gmail_types), - ) - ) - connector = result.scalars().first() - if not connector: - return { - "status": "error", - "message": "No Gmail connector found. Please connect Gmail in your workspace settings.", - } - actual_connector_id = connector.id - - logger.info( - f"Sending Gmail email: to='{final_to}', subject='{final_subject}', connector={actual_connector_id}" - ) - - is_composio_gmail = ( - connector.connector_type - == SearchSourceConnectorType.COMPOSIO_GMAIL_CONNECTOR - ) - if is_composio_gmail: - cca_id = connector.config.get("composio_connected_account_id") - if not cca_id: - return { - "status": "error", - "message": "Composio connected account ID not found for this Gmail connector.", - } - else: - from google.oauth2.credentials import Credentials - - from app.config import config - from app.utils.oauth_security import TokenEncryption - - config_data = dict(connector.config) - token_encrypted = config_data.get("_token_encrypted", False) - if token_encrypted and config.SECRET_KEY: - token_encryption = TokenEncryption(config.SECRET_KEY) - if config_data.get("token"): - config_data["token"] = token_encryption.decrypt_token( - config_data["token"] - ) - if config_data.get("refresh_token"): - config_data["refresh_token"] = ( - token_encryption.decrypt_token( - config_data["refresh_token"] - ) - ) - if config_data.get("client_secret"): - config_data["client_secret"] = ( - token_encryption.decrypt_token( - config_data["client_secret"] - ) - ) - - exp = config_data.get("expiry", "") - if exp: - exp = exp.replace("Z", "") - - creds = Credentials( - token=config_data.get("token"), - refresh_token=config_data.get("refresh_token"), - token_uri=config_data.get("token_uri"), - client_id=config_data.get("client_id"), - client_secret=config_data.get("client_secret"), - scopes=config_data.get("scopes", []), - expiry=datetime.fromisoformat(exp) if exp else None, - ) - - message = MIMEText(final_body) - message["to"] = final_to - message["subject"] = final_subject - if final_cc: - message["cc"] = final_cc - if final_bcc: - message["bcc"] = final_bcc - raw = base64.urlsafe_b64encode(message.as_bytes()).decode() - - try: - if is_composio_gmail: - from app.agents.shared.tools.gmail.composio_helpers import ( - execute_composio_gmail_tool, - split_recipients, - ) - - sent, error = await execute_composio_gmail_tool( - connector, - user_id, - "GMAIL_SEND_EMAIL", - { - "user_id": "me", - "recipient_email": final_to, - "subject": final_subject, - "body": final_body, - "cc": split_recipients(final_cc), - "bcc": split_recipients(final_bcc), - "is_html": False, - }, - ) - if error: - raise RuntimeError(error) - if not isinstance(sent, dict): - sent = {} - else: - from googleapiclient.discovery import build - - gmail_service = build("gmail", "v1", credentials=creds) - sent = await asyncio.get_event_loop().run_in_executor( - None, - lambda: ( - gmail_service.users() - .messages() - .send(userId="me", body={"raw": raw}) - .execute() - ), - ) - except Exception as api_err: - from googleapiclient.errors import HttpError - - if isinstance(api_err, HttpError) and api_err.resp.status == 403: - logger.warning( - f"Insufficient permissions for connector {actual_connector_id}: {api_err}" - ) - try: - from sqlalchemy.orm.attributes import flag_modified - - _res = await db_session.execute( - select(SearchSourceConnector).where( - SearchSourceConnector.id == actual_connector_id - ) - ) - _conn = _res.scalar_one_or_none() - if _conn and not _conn.config.get("auth_expired"): - _conn.config = {**_conn.config, "auth_expired": True} - flag_modified(_conn, "config") - await db_session.commit() - except Exception: - logger.warning( - "Failed to persist auth_expired for connector %s", - actual_connector_id, - exc_info=True, - ) - return { - "status": "insufficient_permissions", - "connector_id": actual_connector_id, - "message": "This Gmail account needs additional permissions. Please re-authenticate in connector settings.", - } - raise - - logger.info( - f"Gmail email sent: id={sent.get('id')}, threadId={sent.get('threadId')}" - ) - - kb_message_suffix = "" - try: - from app.services.gmail import GmailKBSyncService - - kb_service = GmailKBSyncService(db_session) - kb_result = await kb_service.sync_after_create( - message_id=sent.get("id", ""), - thread_id=sent.get("threadId", ""), - subject=final_subject, - sender="me", - date_str=datetime.now().strftime("%Y-%m-%d %H:%M:%S"), - body_text=final_body, - connector_id=actual_connector_id, - search_space_id=search_space_id, - user_id=user_id, - ) - if kb_result["status"] == "success": - kb_message_suffix = ( - " Your knowledge base has also been updated." - ) - else: - kb_message_suffix = " This email will be added to your knowledge base in the next scheduled sync." - except Exception as kb_err: - logger.warning(f"KB sync after send failed: {kb_err}") - kb_message_suffix = " This email will be added to your knowledge base in the next scheduled sync." - - return { - "status": "success", - "message_id": sent.get("id"), - "thread_id": sent.get("threadId"), - "message": f"Successfully sent email to '{final_to}' with subject '{final_subject}'.{kb_message_suffix}", - } - - except Exception as e: - from langgraph.errors import GraphInterrupt - - if isinstance(e, GraphInterrupt): - raise - - logger.error(f"Error sending Gmail email: {e}", exc_info=True) - return { - "status": "error", - "message": "Something went wrong while sending the email. Please try again.", - } - - return send_gmail_email diff --git a/surfsense_backend/app/agents/shared/tools/gmail/trash_email.py b/surfsense_backend/app/agents/shared/tools/gmail/trash_email.py deleted file mode 100644 index fa6e015d1..000000000 --- a/surfsense_backend/app/agents/shared/tools/gmail/trash_email.py +++ /dev/null @@ -1,344 +0,0 @@ -import asyncio -import logging -from datetime import datetime -from typing import Any - -from langchain_core.tools import tool -from sqlalchemy.ext.asyncio import AsyncSession - -from app.agents.shared.tools.hitl import request_approval -from app.db import async_session_maker -from app.services.gmail import GmailToolMetadataService - -logger = logging.getLogger(__name__) - - -def create_trash_gmail_email_tool( - db_session: AsyncSession | None = None, - search_space_id: int | None = None, - user_id: str | None = None, -): - """ - Factory function to create the trash_gmail_email tool. - - The tool acquires its own short-lived ``AsyncSession`` per call via - :data:`async_session_maker` so the closure is safe to share across - HTTP requests by the compiled-agent cache. Capturing a per-request - session here would surface stale/closed sessions on cache hits. - - Args: - db_session: Reserved for registry compatibility. Per-call sessions - are opened via :data:`async_session_maker` inside the tool body. - - Returns: - Configured trash_gmail_email tool - """ - del db_session # per-call session — see docstring - - @tool - async def trash_gmail_email( - email_subject_or_id: str, - delete_from_kb: bool = False, - ) -> dict[str, Any]: - """Move an email or draft to trash in Gmail. - - Use when the user asks to delete, remove, or trash an email or draft. - - Args: - email_subject_or_id: The exact subject line or message ID of the - email to trash (as it appears in the inbox). - delete_from_kb: Whether to also remove the email from the knowledge base. - Default is False. - Set to True to remove from both Gmail and knowledge base. - - Returns: - Dictionary with: - - status: "success", "rejected", "not_found", or "error" - - message_id: Gmail message ID (if success) - - deleted_from_kb: whether the document was removed from the knowledge base - - message: Result message - - IMPORTANT: - - If status is "rejected", the user explicitly declined. Respond with a brief - acknowledgment and do NOT retry or suggest alternatives. - - If status is "not_found", relay the exact message to the user and ask them - to verify the email subject or check if it has been indexed. - - If status is "insufficient_permissions", the connector lacks the required OAuth scope. - Inform the user they need to re-authenticate and do NOT retry this tool. - Examples: - - "Delete the email about 'Meeting Cancelled'" - - "Trash the email from Bob about the project" - """ - logger.info( - f"trash_gmail_email called: email_subject_or_id='{email_subject_or_id}', delete_from_kb={delete_from_kb}" - ) - - if search_space_id is None or user_id is None: - return { - "status": "error", - "message": "Gmail tool not properly configured. Please contact support.", - } - - try: - async with async_session_maker() as db_session: - metadata_service = GmailToolMetadataService(db_session) - context = await metadata_service.get_trash_context( - search_space_id, user_id, email_subject_or_id - ) - - if "error" in context: - error_msg = context["error"] - if "not found" in error_msg.lower(): - logger.warning(f"Email not found: {error_msg}") - return {"status": "not_found", "message": error_msg} - logger.error(f"Failed to fetch trash context: {error_msg}") - return {"status": "error", "message": error_msg} - - account = context.get("account", {}) - if account.get("auth_expired"): - logger.warning( - "Gmail account %s has expired authentication", - account.get("id"), - ) - return { - "status": "auth_error", - "message": "The Gmail account for this email needs re-authentication. Please re-authenticate in your connector settings.", - "connector_type": "gmail", - } - - email = context["email"] - message_id = email["message_id"] - document_id = email.get("document_id") - connector_id_from_context = context["account"]["id"] - - if not message_id: - return { - "status": "error", - "message": "Message ID is missing from the indexed document. Please re-index the email and try again.", - } - - logger.info( - f"Requesting approval for trashing Gmail email: '{email_subject_or_id}' (message_id={message_id}, delete_from_kb={delete_from_kb})" - ) - result = request_approval( - action_type="gmail_email_trash", - tool_name="trash_gmail_email", - params={ - "message_id": message_id, - "connector_id": connector_id_from_context, - "delete_from_kb": delete_from_kb, - }, - context=context, - ) - - if result.rejected: - return { - "status": "rejected", - "message": "User declined. The email was not trashed. Do not ask again or suggest alternatives.", - } - - final_message_id = result.params.get("message_id", message_id) - final_connector_id = result.params.get( - "connector_id", connector_id_from_context - ) - final_delete_from_kb = result.params.get( - "delete_from_kb", delete_from_kb - ) - - if not final_connector_id: - return { - "status": "error", - "message": "No connector found for this email.", - } - - from sqlalchemy.future import select - - from app.db import SearchSourceConnector, SearchSourceConnectorType - - _gmail_types = [ - SearchSourceConnectorType.GOOGLE_GMAIL_CONNECTOR, - SearchSourceConnectorType.COMPOSIO_GMAIL_CONNECTOR, - ] - - result = await db_session.execute( - select(SearchSourceConnector).filter( - SearchSourceConnector.id == final_connector_id, - SearchSourceConnector.search_space_id == search_space_id, - SearchSourceConnector.user_id == user_id, - SearchSourceConnector.connector_type.in_(_gmail_types), - ) - ) - connector = result.scalars().first() - if not connector: - return { - "status": "error", - "message": "Selected Gmail connector is invalid or has been disconnected.", - } - - logger.info( - f"Trashing Gmail email: message_id='{final_message_id}', connector={final_connector_id}" - ) - - is_composio_gmail = ( - connector.connector_type - == SearchSourceConnectorType.COMPOSIO_GMAIL_CONNECTOR - ) - if is_composio_gmail: - cca_id = connector.config.get("composio_connected_account_id") - if not cca_id: - return { - "status": "error", - "message": "Composio connected account ID not found for this Gmail connector.", - } - else: - from google.oauth2.credentials import Credentials - - from app.config import config - from app.utils.oauth_security import TokenEncryption - - config_data = dict(connector.config) - token_encrypted = config_data.get("_token_encrypted", False) - if token_encrypted and config.SECRET_KEY: - token_encryption = TokenEncryption(config.SECRET_KEY) - if config_data.get("token"): - config_data["token"] = token_encryption.decrypt_token( - config_data["token"] - ) - if config_data.get("refresh_token"): - config_data["refresh_token"] = ( - token_encryption.decrypt_token( - config_data["refresh_token"] - ) - ) - if config_data.get("client_secret"): - config_data["client_secret"] = ( - token_encryption.decrypt_token( - config_data["client_secret"] - ) - ) - - exp = config_data.get("expiry", "") - if exp: - exp = exp.replace("Z", "") - - creds = Credentials( - token=config_data.get("token"), - refresh_token=config_data.get("refresh_token"), - token_uri=config_data.get("token_uri"), - client_id=config_data.get("client_id"), - client_secret=config_data.get("client_secret"), - scopes=config_data.get("scopes", []), - expiry=datetime.fromisoformat(exp) if exp else None, - ) - - try: - if is_composio_gmail: - from app.agents.shared.tools.gmail.composio_helpers import ( - execute_composio_gmail_tool, - ) - - _trashed, error = await execute_composio_gmail_tool( - connector, - user_id, - "GMAIL_MOVE_TO_TRASH", - {"user_id": "me", "message_id": final_message_id}, - ) - if error: - raise RuntimeError(error) - else: - from googleapiclient.discovery import build - - gmail_service = build("gmail", "v1", credentials=creds) - await asyncio.get_event_loop().run_in_executor( - None, - lambda: ( - gmail_service.users() - .messages() - .trash(userId="me", id=final_message_id) - .execute() - ), - ) - except Exception as api_err: - from googleapiclient.errors import HttpError - - if isinstance(api_err, HttpError) and api_err.resp.status == 403: - logger.warning( - f"Insufficient permissions for connector {connector.id}: {api_err}" - ) - try: - from sqlalchemy.orm.attributes import flag_modified - - if not connector.config.get("auth_expired"): - connector.config = { - **connector.config, - "auth_expired": True, - } - flag_modified(connector, "config") - await db_session.commit() - except Exception: - logger.warning( - "Failed to persist auth_expired for connector %s", - connector.id, - exc_info=True, - ) - return { - "status": "insufficient_permissions", - "connector_id": connector.id, - "message": "This Gmail account needs additional permissions. Please re-authenticate in connector settings.", - } - raise - - logger.info(f"Gmail email trashed: message_id={final_message_id}") - - trash_result: dict[str, Any] = { - "status": "success", - "message_id": final_message_id, - "message": f"Successfully moved email '{email.get('subject', email_subject_or_id)}' to trash.", - } - - deleted_from_kb = False - if final_delete_from_kb and document_id: - try: - from app.db import Document - - doc_result = await db_session.execute( - select(Document).filter(Document.id == document_id) - ) - document = doc_result.scalars().first() - if document: - await db_session.delete(document) - await db_session.commit() - deleted_from_kb = True - logger.info( - f"Deleted document {document_id} from knowledge base" - ) - else: - logger.warning(f"Document {document_id} not found in KB") - except Exception as e: - logger.error(f"Failed to delete document from KB: {e}") - await db_session.rollback() - trash_result["warning"] = ( - f"Email trashed, but failed to remove from knowledge base: {e!s}" - ) - - trash_result["deleted_from_kb"] = deleted_from_kb - if deleted_from_kb: - trash_result["message"] = ( - f"{trash_result.get('message', '')} (also removed from knowledge base)" - ) - - return trash_result - - except Exception as e: - from langgraph.errors import GraphInterrupt - - if isinstance(e, GraphInterrupt): - raise - - logger.error(f"Error trashing Gmail email: {e}", exc_info=True) - return { - "status": "error", - "message": "Something went wrong while trashing the email. Please try again.", - } - - return trash_gmail_email diff --git a/surfsense_backend/app/agents/shared/tools/gmail/update_draft.py b/surfsense_backend/app/agents/shared/tools/gmail/update_draft.py deleted file mode 100644 index 965b42675..000000000 --- a/surfsense_backend/app/agents/shared/tools/gmail/update_draft.py +++ /dev/null @@ -1,495 +0,0 @@ -import asyncio -import base64 -import logging -from datetime import datetime -from email.mime.text import MIMEText -from typing import Any - -from langchain_core.tools import tool -from sqlalchemy.ext.asyncio import AsyncSession - -from app.agents.shared.tools.hitl import request_approval -from app.db import async_session_maker -from app.services.gmail import GmailToolMetadataService - -logger = logging.getLogger(__name__) - - -def create_update_gmail_draft_tool( - db_session: AsyncSession | None = None, - search_space_id: int | None = None, - user_id: str | None = None, -): - """ - Factory function to create the update_gmail_draft tool. - - The tool acquires its own short-lived ``AsyncSession`` per call via - :data:`async_session_maker` so the closure is safe to share across - HTTP requests by the compiled-agent cache. Capturing a per-request - session here would surface stale/closed sessions on cache hits. - - Args: - db_session: Reserved for registry compatibility. Per-call sessions - are opened via :data:`async_session_maker` inside the tool body. - - Returns: - Configured update_gmail_draft tool - """ - del db_session # per-call session — see docstring - - @tool - async def update_gmail_draft( - draft_subject_or_id: str, - body: str, - to: str | None = None, - subject: str | None = None, - cc: str | None = None, - bcc: str | None = None, - ) -> dict[str, Any]: - """Update an existing Gmail draft. - - Use when the user asks to modify, edit, or add content to an existing - email draft. This replaces the draft content with the new version. - The user will be able to review and edit the content before it is applied. - - If the user simply wants to "edit" a draft without specifying exact changes, - generate the body yourself using your best understanding of the conversation - context. The user will review and can freely edit the content in the approval - card before confirming. - - IMPORTANT: This tool is ONLY for modifying Gmail draft content, NOT for - deleting/trashing drafts (use trash_gmail_email instead), Notion pages, - calendar events, or any other content type. - - Args: - draft_subject_or_id: The exact subject line of the draft to update - (as it appears in Gmail drafts). - body: The full updated body content for the draft. Generate this - yourself based on the user's request and conversation context. - to: Optional new recipient email address (keeps original if omitted). - subject: Optional new subject line (keeps original if omitted). - cc: Optional CC recipient(s), comma-separated. - bcc: Optional BCC recipient(s), comma-separated. - - Returns: - Dictionary with: - - status: "success", "rejected", "not_found", or "error" - - draft_id: Gmail draft ID (if success) - - message: Result message - - IMPORTANT: - - If status is "rejected", the user explicitly declined the action. - Respond with a brief acknowledgment and do NOT retry or suggest alternatives. - - If status is "not_found", relay the exact message to the user and ask them - to verify the draft subject or check if it has been indexed. - - If status is "insufficient_permissions", the connector lacks the required OAuth scope. - Inform the user they need to re-authenticate and do NOT retry the action. - - Examples: - - "Update the Kurseong Plan draft with the new itinerary details" - - "Edit my draft about the project proposal and change the recipient" - - "Let me edit the meeting notes draft" (call with current body content so user can edit in the approval card) - """ - logger.info( - f"update_gmail_draft called: draft_subject_or_id='{draft_subject_or_id}'" - ) - - if search_space_id is None or user_id is None: - return { - "status": "error", - "message": "Gmail tool not properly configured. Please contact support.", - } - - try: - async with async_session_maker() as db_session: - metadata_service = GmailToolMetadataService(db_session) - context = await metadata_service.get_update_context( - search_space_id, user_id, draft_subject_or_id - ) - - if "error" in context: - error_msg = context["error"] - if "not found" in error_msg.lower(): - logger.warning(f"Draft not found: {error_msg}") - return {"status": "not_found", "message": error_msg} - logger.error(f"Failed to fetch update context: {error_msg}") - return {"status": "error", "message": error_msg} - - account = context.get("account", {}) - if account.get("auth_expired"): - logger.warning( - "Gmail account %s has expired authentication", - account.get("id"), - ) - return { - "status": "auth_error", - "message": "The Gmail account for this draft needs re-authentication. Please re-authenticate in your connector settings.", - "connector_type": "gmail", - } - - email = context["email"] - message_id = email["message_id"] - document_id = email.get("document_id") - connector_id_from_context = account["id"] - draft_id_from_context = context.get("draft_id") - - original_subject = email.get("subject", draft_subject_or_id) - final_subject_default = subject if subject else original_subject - final_to_default = to if to else "" - - logger.info( - f"Requesting approval for updating Gmail draft: '{original_subject}' " - f"(message_id={message_id}, draft_id={draft_id_from_context})" - ) - result = request_approval( - action_type="gmail_draft_update", - tool_name="update_gmail_draft", - params={ - "message_id": message_id, - "draft_id": draft_id_from_context, - "to": final_to_default, - "subject": final_subject_default, - "body": body, - "cc": cc, - "bcc": bcc, - "connector_id": connector_id_from_context, - }, - context=context, - ) - - if result.rejected: - return { - "status": "rejected", - "message": "User declined. The draft was not updated. Do not ask again or suggest alternatives.", - } - - final_to = result.params.get("to", final_to_default) - final_subject = result.params.get("subject", final_subject_default) - final_body = result.params.get("body", body) - final_cc = result.params.get("cc", cc) - final_bcc = result.params.get("bcc", bcc) - final_connector_id = result.params.get( - "connector_id", connector_id_from_context - ) - final_draft_id = result.params.get("draft_id", draft_id_from_context) - - if not final_connector_id: - return { - "status": "error", - "message": "No connector found for this draft.", - } - - from sqlalchemy.future import select - - from app.db import SearchSourceConnector, SearchSourceConnectorType - - _gmail_types = [ - SearchSourceConnectorType.GOOGLE_GMAIL_CONNECTOR, - SearchSourceConnectorType.COMPOSIO_GMAIL_CONNECTOR, - ] - - result = await db_session.execute( - select(SearchSourceConnector).filter( - SearchSourceConnector.id == final_connector_id, - SearchSourceConnector.search_space_id == search_space_id, - SearchSourceConnector.user_id == user_id, - SearchSourceConnector.connector_type.in_(_gmail_types), - ) - ) - connector = result.scalars().first() - if not connector: - return { - "status": "error", - "message": "Selected Gmail connector is invalid or has been disconnected.", - } - - logger.info( - f"Updating Gmail draft: subject='{final_subject}', connector={final_connector_id}" - ) - - is_composio_gmail = ( - connector.connector_type - == SearchSourceConnectorType.COMPOSIO_GMAIL_CONNECTOR - ) - if is_composio_gmail: - cca_id = connector.config.get("composio_connected_account_id") - if not cca_id: - return { - "status": "error", - "message": "Composio connected account ID not found for this Gmail connector.", - } - else: - from google.oauth2.credentials import Credentials - - from app.config import config - from app.utils.oauth_security import TokenEncryption - - config_data = dict(connector.config) - token_encrypted = config_data.get("_token_encrypted", False) - if token_encrypted and config.SECRET_KEY: - token_encryption = TokenEncryption(config.SECRET_KEY) - if config_data.get("token"): - config_data["token"] = token_encryption.decrypt_token( - config_data["token"] - ) - if config_data.get("refresh_token"): - config_data["refresh_token"] = ( - token_encryption.decrypt_token( - config_data["refresh_token"] - ) - ) - if config_data.get("client_secret"): - config_data["client_secret"] = ( - token_encryption.decrypt_token( - config_data["client_secret"] - ) - ) - - exp = config_data.get("expiry", "") - if exp: - exp = exp.replace("Z", "") - - creds = Credentials( - token=config_data.get("token"), - refresh_token=config_data.get("refresh_token"), - token_uri=config_data.get("token_uri"), - client_id=config_data.get("client_id"), - client_secret=config_data.get("client_secret"), - scopes=config_data.get("scopes", []), - expiry=datetime.fromisoformat(exp) if exp else None, - ) - - # Resolve draft_id if not already available - if not final_draft_id: - logger.info( - f"draft_id not in metadata, looking up via drafts.list for message_id={message_id}" - ) - if is_composio_gmail: - final_draft_id = await _find_composio_draft_id_by_message( - connector, user_id, message_id - ) - else: - from googleapiclient.discovery import build - - gmail_service = build("gmail", "v1", credentials=creds) - final_draft_id = await _find_draft_id_by_message( - gmail_service, message_id - ) - - if not final_draft_id: - return { - "status": "error", - "message": ( - "Could not find this draft in Gmail. " - "It may have already been sent or deleted." - ), - } - - message = MIMEText(final_body) - if final_to: - message["to"] = final_to - message["subject"] = final_subject - if final_cc: - message["cc"] = final_cc - if final_bcc: - message["bcc"] = final_bcc - raw = base64.urlsafe_b64encode(message.as_bytes()).decode() - - try: - if is_composio_gmail: - from app.agents.shared.tools.gmail.composio_helpers import ( - execute_composio_gmail_tool, - split_recipients, - ) - - updated, error = await execute_composio_gmail_tool( - connector, - user_id, - "GMAIL_UPDATE_DRAFT", - { - "user_id": "me", - "draft_id": final_draft_id, - "recipient_email": final_to, - "subject": final_subject, - "body": final_body, - "cc": split_recipients(final_cc), - "bcc": split_recipients(final_bcc), - "is_html": False, - }, - ) - if error: - raise RuntimeError(error) - if not isinstance(updated, dict): - updated = {} - else: - from googleapiclient.discovery import build - - gmail_service = build("gmail", "v1", credentials=creds) - updated = await asyncio.get_event_loop().run_in_executor( - None, - lambda: ( - gmail_service.users() - .drafts() - .update( - userId="me", - id=final_draft_id, - body={"message": {"raw": raw}}, - ) - .execute() - ), - ) - except Exception as api_err: - from googleapiclient.errors import HttpError - - if isinstance(api_err, HttpError) and api_err.resp.status == 403: - logger.warning( - f"Insufficient permissions for connector {connector.id}: {api_err}" - ) - try: - from sqlalchemy.orm.attributes import flag_modified - - if not connector.config.get("auth_expired"): - connector.config = { - **connector.config, - "auth_expired": True, - } - flag_modified(connector, "config") - await db_session.commit() - except Exception: - logger.warning( - "Failed to persist auth_expired for connector %s", - connector.id, - exc_info=True, - ) - return { - "status": "insufficient_permissions", - "connector_id": connector.id, - "message": "This Gmail account needs additional permissions. Please re-authenticate in connector settings.", - } - if isinstance(api_err, HttpError) and api_err.resp.status == 404: - return { - "status": "error", - "message": "Draft no longer exists in Gmail. It may have been sent or deleted.", - } - raise - - logger.info(f"Gmail draft updated: id={updated.get('id')}") - - kb_message_suffix = "" - if document_id: - try: - from sqlalchemy.future import select as sa_select - from sqlalchemy.orm.attributes import flag_modified - - from app.db import Document - - doc_result = await db_session.execute( - sa_select(Document).filter(Document.id == document_id) - ) - document = doc_result.scalars().first() - if document: - document.source_markdown = final_body - document.title = final_subject - meta = dict(document.document_metadata or {}) - meta["subject"] = final_subject - meta["draft_id"] = updated.get("id", final_draft_id) - updated_msg = updated.get("message", {}) - if updated_msg.get("id"): - meta["message_id"] = updated_msg["id"] - document.document_metadata = meta - flag_modified(document, "document_metadata") - await db_session.commit() - kb_message_suffix = ( - " Your knowledge base has also been updated." - ) - logger.info( - f"KB document {document_id} updated for draft {final_draft_id}" - ) - else: - kb_message_suffix = " This draft will be fully updated in your knowledge base in the next scheduled sync." - except Exception as kb_err: - logger.warning(f"KB update after draft edit failed: {kb_err}") - await db_session.rollback() - kb_message_suffix = " This draft will be fully updated in your knowledge base in the next scheduled sync." - - return { - "status": "success", - "draft_id": updated.get("id"), - "message": f"Successfully updated Gmail draft with subject '{final_subject}'.{kb_message_suffix}", - } - - except Exception as e: - from langgraph.errors import GraphInterrupt - - if isinstance(e, GraphInterrupt): - raise - - logger.error(f"Error updating Gmail draft: {e}", exc_info=True) - return { - "status": "error", - "message": "Something went wrong while updating the draft. Please try again.", - } - - return update_gmail_draft - - -async def _find_draft_id_by_message(gmail_service: Any, message_id: str) -> str | None: - """Look up a draft's ID by its message ID via the Gmail API.""" - try: - page_token = None - while True: - kwargs: dict[str, Any] = {"userId": "me", "maxResults": 100} - if page_token: - kwargs["pageToken"] = page_token - - response = await asyncio.get_event_loop().run_in_executor( - None, - lambda kwargs=kwargs: ( - gmail_service.users().drafts().list(**kwargs).execute() - ), - ) - - for draft in response.get("drafts", []): - if draft.get("message", {}).get("id") == message_id: - return draft["id"] - - page_token = response.get("nextPageToken") - if not page_token: - break - - return None - except Exception as e: - logger.warning(f"Failed to look up draft by message_id: {e}") - return None - - -async def _find_composio_draft_id_by_message( - connector: Any, user_id: str, message_id: str -) -> str | None: - from app.agents.shared.tools.gmail.composio_helpers import ( - execute_composio_gmail_tool, - ) - - page_token = "" - while True: - params: dict[str, Any] = { - "user_id": "me", - "max_results": 100, - "verbose": False, - } - if page_token: - params["page_token"] = page_token - - data, error = await execute_composio_gmail_tool( - connector, user_id, "GMAIL_LIST_DRAFTS", params - ) - if error or not isinstance(data, dict): - return None - - for draft in data.get("drafts", []): - if draft.get("message", {}).get("id") == message_id: - return draft.get("id") - - page_token = data.get("nextPageToken") or data.get("next_page_token") or "" - if not page_token: - return None diff --git a/surfsense_backend/app/agents/shared/tools/google_calendar/__init__.py b/surfsense_backend/app/agents/shared/tools/google_calendar/__init__.py deleted file mode 100644 index 362cf4127..000000000 --- a/surfsense_backend/app/agents/shared/tools/google_calendar/__init__.py +++ /dev/null @@ -1,19 +0,0 @@ -from app.agents.shared.tools.google_calendar.create_event import ( - create_create_calendar_event_tool, -) -from app.agents.shared.tools.google_calendar.delete_event import ( - create_delete_calendar_event_tool, -) -from app.agents.shared.tools.google_calendar.search_events import ( - create_search_calendar_events_tool, -) -from app.agents.shared.tools.google_calendar.update_event import ( - create_update_calendar_event_tool, -) - -__all__ = [ - "create_create_calendar_event_tool", - "create_delete_calendar_event_tool", - "create_search_calendar_events_tool", - "create_update_calendar_event_tool", -] diff --git a/surfsense_backend/app/agents/shared/tools/google_calendar/create_event.py b/surfsense_backend/app/agents/shared/tools/google_calendar/create_event.py deleted file mode 100644 index 7e5367049..000000000 --- a/surfsense_backend/app/agents/shared/tools/google_calendar/create_event.py +++ /dev/null @@ -1,382 +0,0 @@ -import asyncio -import logging -from datetime import datetime -from typing import Any - -from google.oauth2.credentials import Credentials -from googleapiclient.discovery import build -from langchain_core.tools import tool -from sqlalchemy.ext.asyncio import AsyncSession - -from app.agents.shared.tools.hitl import request_approval -from app.db import async_session_maker -from app.services.google_calendar import GoogleCalendarToolMetadataService - -logger = logging.getLogger(__name__) - - -def create_create_calendar_event_tool( - db_session: AsyncSession | None = None, - search_space_id: int | None = None, - user_id: str | None = None, -): - """ - Factory function to create the create_calendar_event tool. - - The tool acquires its own short-lived ``AsyncSession`` per call via - :data:`async_session_maker` so the closure is safe to share across - HTTP requests by the compiled-agent cache. Capturing a per-request - session here would surface stale/closed sessions on cache hits. - - Args: - db_session: Reserved for registry compatibility. Per-call sessions - are opened via :data:`async_session_maker` inside the tool body. - - Returns: - Configured create_calendar_event tool - """ - del db_session # per-call session — see docstring - - @tool - async def create_calendar_event( - summary: str, - start_datetime: str, - end_datetime: str, - description: str | None = None, - location: str | None = None, - attendees: list[str] | None = None, - ) -> dict[str, Any]: - """Create a new event on Google Calendar. - - Use when the user asks to schedule, create, or add a calendar event. - Ask for event details if not provided. - - Args: - summary: The event title. - start_datetime: Start time in ISO 8601 format (e.g. "2026-03-20T10:00:00"). - end_datetime: End time in ISO 8601 format (e.g. "2026-03-20T11:00:00"). - description: Optional event description. - location: Optional event location. - attendees: Optional list of attendee email addresses. - - Returns: - Dictionary with: - - status: "success", "rejected", "auth_error", or "error" - - event_id: Google Calendar event ID (if success) - - html_link: URL to open the event (if success) - - message: Result message - - IMPORTANT: - - If status is "rejected", the user explicitly declined the action. - Respond with a brief acknowledgment and do NOT retry or suggest alternatives. - - Examples: - - "Schedule a meeting with John tomorrow at 10am" - - "Create a calendar event for the team standup" - """ - logger.info( - f"create_calendar_event called: summary='{summary}', start='{start_datetime}', end='{end_datetime}'" - ) - - if search_space_id is None or user_id is None: - return { - "status": "error", - "message": "Google Calendar tool not properly configured. Please contact support.", - } - - try: - async with async_session_maker() as db_session: - metadata_service = GoogleCalendarToolMetadataService(db_session) - context = await metadata_service.get_creation_context( - search_space_id, user_id - ) - - if "error" in context: - logger.error( - f"Failed to fetch creation context: {context['error']}" - ) - return {"status": "error", "message": context["error"]} - - accounts = context.get("accounts", []) - if accounts and all(a.get("auth_expired") for a in accounts): - logger.warning( - "All Google Calendar accounts have expired authentication" - ) - return { - "status": "auth_error", - "message": "All connected Google Calendar accounts need re-authentication. Please re-authenticate in your connector settings.", - "connector_type": "google_calendar", - } - - logger.info( - f"Requesting approval for creating calendar event: summary='{summary}'" - ) - result = request_approval( - action_type="google_calendar_event_creation", - tool_name="create_calendar_event", - params={ - "summary": summary, - "start_datetime": start_datetime, - "end_datetime": end_datetime, - "description": description, - "location": location, - "attendees": attendees, - "timezone": context.get("timezone"), - "connector_id": None, - }, - context=context, - ) - - if result.rejected: - return { - "status": "rejected", - "message": "User declined. The event was not created. Do not ask again or suggest alternatives.", - } - - final_summary = result.params.get("summary", summary) - final_start_datetime = result.params.get( - "start_datetime", start_datetime - ) - final_end_datetime = result.params.get("end_datetime", end_datetime) - final_description = result.params.get("description", description) - final_location = result.params.get("location", location) - final_attendees = result.params.get("attendees", attendees) - final_connector_id = result.params.get("connector_id") - - if not final_summary or not final_summary.strip(): - return { - "status": "error", - "message": "Event summary cannot be empty.", - } - - from sqlalchemy.future import select - - from app.db import SearchSourceConnector, SearchSourceConnectorType - - _calendar_types = [ - SearchSourceConnectorType.GOOGLE_CALENDAR_CONNECTOR, - SearchSourceConnectorType.COMPOSIO_GOOGLE_CALENDAR_CONNECTOR, - ] - - if final_connector_id is not None: - result = await db_session.execute( - select(SearchSourceConnector).filter( - SearchSourceConnector.id == final_connector_id, - SearchSourceConnector.search_space_id == search_space_id, - SearchSourceConnector.user_id == user_id, - SearchSourceConnector.connector_type.in_(_calendar_types), - ) - ) - connector = result.scalars().first() - if not connector: - return { - "status": "error", - "message": "Selected Google Calendar connector is invalid or has been disconnected.", - } - actual_connector_id = connector.id - else: - result = await db_session.execute( - select(SearchSourceConnector).filter( - SearchSourceConnector.search_space_id == search_space_id, - SearchSourceConnector.user_id == user_id, - SearchSourceConnector.connector_type.in_(_calendar_types), - ) - ) - connector = result.scalars().first() - if not connector: - return { - "status": "error", - "message": "No Google Calendar connector found. Please connect Google Calendar in your workspace settings.", - } - actual_connector_id = connector.id - - logger.info( - f"Creating calendar event: summary='{final_summary}', connector={actual_connector_id}" - ) - - is_composio_calendar = ( - connector.connector_type - == SearchSourceConnectorType.COMPOSIO_GOOGLE_CALENDAR_CONNECTOR - ) - if is_composio_calendar: - cca_id = connector.config.get("composio_connected_account_id") - if not cca_id: - return { - "status": "error", - "message": "Composio connected account ID not found for this connector.", - } - else: - config_data = dict(connector.config) - - from app.config import config as app_config - from app.utils.oauth_security import TokenEncryption - - token_encrypted = config_data.get("_token_encrypted", False) - if token_encrypted and app_config.SECRET_KEY: - token_encryption = TokenEncryption(app_config.SECRET_KEY) - for key in ("token", "refresh_token", "client_secret"): - if config_data.get(key): - config_data[key] = token_encryption.decrypt_token( - config_data[key] - ) - - exp = config_data.get("expiry", "") - if exp: - exp = exp.replace("Z", "") - - creds = Credentials( - token=config_data.get("token"), - refresh_token=config_data.get("refresh_token"), - token_uri=config_data.get("token_uri"), - client_id=config_data.get("client_id"), - client_secret=config_data.get("client_secret"), - scopes=config_data.get("scopes", []), - expiry=datetime.fromisoformat(exp) if exp else None, - ) - - tz = context.get("timezone", "UTC") - event_body: dict[str, Any] = { - "summary": final_summary, - "start": {"dateTime": final_start_datetime, "timeZone": tz}, - "end": {"dateTime": final_end_datetime, "timeZone": tz}, - } - if final_description: - event_body["description"] = final_description - if final_location: - event_body["location"] = final_location - if final_attendees: - event_body["attendees"] = [ - {"email": e.strip()} for e in final_attendees if e.strip() - ] - - try: - if is_composio_calendar: - from app.services.composio_service import ComposioService - - composio_params = { - "calendar_id": "primary", - "summary": final_summary, - "start_datetime": final_start_datetime, - "end_datetime": final_end_datetime, - "timezone": tz, - "attendees": final_attendees or [], - } - if final_description: - composio_params["description"] = final_description - if final_location: - composio_params["location"] = final_location - - composio_result = await ComposioService().execute_tool( - connected_account_id=cca_id, - tool_name="GOOGLECALENDAR_CREATE_EVENT", - params=composio_params, - entity_id=f"surfsense_{user_id}", - ) - if not composio_result.get("success"): - raise RuntimeError( - composio_result.get( - "error", "Unknown Composio Calendar error" - ) - ) - created = composio_result.get("data", {}) - if isinstance(created, dict): - created = created.get("data", created) - if isinstance(created, dict): - created = created.get("response_data", created) - else: - service = await asyncio.get_event_loop().run_in_executor( - None, lambda: build("calendar", "v3", credentials=creds) - ) - created = await asyncio.get_event_loop().run_in_executor( - None, - lambda: ( - service.events() - .insert(calendarId="primary", body=event_body) - .execute() - ), - ) - except Exception as api_err: - from googleapiclient.errors import HttpError - - if isinstance(api_err, HttpError) and api_err.resp.status == 403: - logger.warning( - f"Insufficient permissions for connector {actual_connector_id}: {api_err}" - ) - try: - from sqlalchemy.orm.attributes import flag_modified - - _res = await db_session.execute( - select(SearchSourceConnector).where( - SearchSourceConnector.id == actual_connector_id - ) - ) - _conn = _res.scalar_one_or_none() - if _conn and not _conn.config.get("auth_expired"): - _conn.config = {**_conn.config, "auth_expired": True} - flag_modified(_conn, "config") - await db_session.commit() - except Exception: - logger.warning( - "Failed to persist auth_expired for connector %s", - actual_connector_id, - exc_info=True, - ) - return { - "status": "insufficient_permissions", - "connector_id": actual_connector_id, - "message": "This Google Calendar account needs additional permissions. Please re-authenticate in connector settings.", - } - raise - - logger.info( - f"Calendar event created: id={created.get('id')}, summary={created.get('summary')}" - ) - - kb_message_suffix = "" - try: - from app.services.google_calendar import GoogleCalendarKBSyncService - - kb_service = GoogleCalendarKBSyncService(db_session) - kb_result = await kb_service.sync_after_create( - event_id=created.get("id"), - event_summary=final_summary, - calendar_id="primary", - start_time=final_start_datetime, - end_time=final_end_datetime, - location=final_location, - html_link=created.get("htmlLink"), - description=final_description, - connector_id=actual_connector_id, - search_space_id=search_space_id, - user_id=user_id, - ) - if kb_result["status"] == "success": - kb_message_suffix = ( - " Your knowledge base has also been updated." - ) - else: - kb_message_suffix = " This event will be added to your knowledge base in the next scheduled sync." - except Exception as kb_err: - logger.warning(f"KB sync after create failed: {kb_err}") - kb_message_suffix = " This event will be added to your knowledge base in the next scheduled sync." - - return { - "status": "success", - "event_id": created.get("id"), - "html_link": created.get("htmlLink"), - "message": f"Successfully created '{final_summary}' on Google Calendar.{kb_message_suffix}", - } - - except Exception as e: - from langgraph.errors import GraphInterrupt - - if isinstance(e, GraphInterrupt): - raise - - logger.error(f"Error creating calendar event: {e}", exc_info=True) - return { - "status": "error", - "message": "Something went wrong while creating the event. Please try again.", - } - - return create_calendar_event diff --git a/surfsense_backend/app/agents/shared/tools/google_calendar/delete_event.py b/surfsense_backend/app/agents/shared/tools/google_calendar/delete_event.py deleted file mode 100644 index 21a67a947..000000000 --- a/surfsense_backend/app/agents/shared/tools/google_calendar/delete_event.py +++ /dev/null @@ -1,340 +0,0 @@ -import asyncio -import logging -from datetime import datetime -from typing import Any - -from google.oauth2.credentials import Credentials -from googleapiclient.discovery import build -from langchain_core.tools import tool -from sqlalchemy.ext.asyncio import AsyncSession - -from app.agents.shared.tools.hitl import request_approval -from app.db import async_session_maker -from app.services.google_calendar import GoogleCalendarToolMetadataService - -logger = logging.getLogger(__name__) - - -def create_delete_calendar_event_tool( - db_session: AsyncSession | None = None, - search_space_id: int | None = None, - user_id: str | None = None, -): - """ - Factory function to create the delete_calendar_event tool. - - The tool acquires its own short-lived ``AsyncSession`` per call via - :data:`async_session_maker` so the closure is safe to share across - HTTP requests by the compiled-agent cache. Capturing a per-request - session here would surface stale/closed sessions on cache hits. - - Args: - db_session: Reserved for registry compatibility. Per-call sessions - are opened via :data:`async_session_maker` inside the tool body. - - Returns: - Configured delete_calendar_event tool - """ - del db_session # per-call session — see docstring - - @tool - async def delete_calendar_event( - event_title_or_id: str, - delete_from_kb: bool = False, - ) -> dict[str, Any]: - """Delete a Google Calendar event. - - Use when the user asks to delete, remove, or cancel a calendar event. - - Args: - event_title_or_id: The exact title or event ID of the event to delete. - delete_from_kb: Whether to also remove the event from the knowledge base. - Default is False. - Set to True to remove from both Google Calendar and knowledge base. - - Returns: - Dictionary with: - - status: "success", "rejected", "not_found", "auth_error", or "error" - - event_id: Google Calendar event ID (if success) - - deleted_from_kb: whether the document was removed from the knowledge base - - message: Result message - - IMPORTANT: - - If status is "rejected", the user explicitly declined. Respond with a brief - acknowledgment and do NOT retry or suggest alternatives. - - If status is "not_found", relay the exact message to the user and ask them - to verify the event name or check if it has been indexed. - Examples: - - "Delete the team standup event" - - "Cancel my dentist appointment on Friday" - """ - logger.info( - f"delete_calendar_event called: event_ref='{event_title_or_id}', delete_from_kb={delete_from_kb}" - ) - - if search_space_id is None or user_id is None: - return { - "status": "error", - "message": "Google Calendar tool not properly configured. Please contact support.", - } - - try: - async with async_session_maker() as db_session: - metadata_service = GoogleCalendarToolMetadataService(db_session) - context = await metadata_service.get_deletion_context( - search_space_id, user_id, event_title_or_id - ) - - if "error" in context: - error_msg = context["error"] - if "not found" in error_msg.lower(): - logger.warning(f"Event not found: {error_msg}") - return {"status": "not_found", "message": error_msg} - logger.error(f"Failed to fetch deletion context: {error_msg}") - return {"status": "error", "message": error_msg} - - account = context.get("account", {}) - if account.get("auth_expired"): - logger.warning( - "Google Calendar account %s has expired authentication", - account.get("id"), - ) - return { - "status": "auth_error", - "message": "The Google Calendar account for this event needs re-authentication. Please re-authenticate in your connector settings.", - "connector_type": "google_calendar", - } - - event = context["event"] - event_id = event["event_id"] - document_id = event.get("document_id") - connector_id_from_context = context["account"]["id"] - - if not event_id: - return { - "status": "error", - "message": "Event ID is missing from the indexed document. Please re-index the event and try again.", - } - - logger.info( - f"Requesting approval for deleting calendar event: '{event_title_or_id}' (event_id={event_id}, delete_from_kb={delete_from_kb})" - ) - result = request_approval( - action_type="google_calendar_event_deletion", - tool_name="delete_calendar_event", - params={ - "event_id": event_id, - "connector_id": connector_id_from_context, - "delete_from_kb": delete_from_kb, - }, - context=context, - ) - - if result.rejected: - return { - "status": "rejected", - "message": "User declined. The event was not deleted. Do not ask again or suggest alternatives.", - } - - final_event_id = result.params.get("event_id", event_id) - final_connector_id = result.params.get( - "connector_id", connector_id_from_context - ) - final_delete_from_kb = result.params.get( - "delete_from_kb", delete_from_kb - ) - - if not final_connector_id: - return { - "status": "error", - "message": "No connector found for this event.", - } - - from sqlalchemy.future import select - - from app.db import SearchSourceConnector, SearchSourceConnectorType - - _calendar_types = [ - SearchSourceConnectorType.GOOGLE_CALENDAR_CONNECTOR, - SearchSourceConnectorType.COMPOSIO_GOOGLE_CALENDAR_CONNECTOR, - ] - - result = await db_session.execute( - select(SearchSourceConnector).filter( - SearchSourceConnector.id == final_connector_id, - SearchSourceConnector.search_space_id == search_space_id, - SearchSourceConnector.user_id == user_id, - SearchSourceConnector.connector_type.in_(_calendar_types), - ) - ) - connector = result.scalars().first() - if not connector: - return { - "status": "error", - "message": "Selected Google Calendar connector is invalid or has been disconnected.", - } - - actual_connector_id = connector.id - - logger.info( - f"Deleting calendar event: event_id='{final_event_id}', connector={actual_connector_id}" - ) - - is_composio_calendar = ( - connector.connector_type - == SearchSourceConnectorType.COMPOSIO_GOOGLE_CALENDAR_CONNECTOR - ) - if is_composio_calendar: - cca_id = connector.config.get("composio_connected_account_id") - if not cca_id: - return { - "status": "error", - "message": "Composio connected account ID not found for this connector.", - } - else: - config_data = dict(connector.config) - - from app.config import config as app_config - from app.utils.oauth_security import TokenEncryption - - token_encrypted = config_data.get("_token_encrypted", False) - if token_encrypted and app_config.SECRET_KEY: - token_encryption = TokenEncryption(app_config.SECRET_KEY) - for key in ("token", "refresh_token", "client_secret"): - if config_data.get(key): - config_data[key] = token_encryption.decrypt_token( - config_data[key] - ) - - exp = config_data.get("expiry", "") - if exp: - exp = exp.replace("Z", "") - - creds = Credentials( - token=config_data.get("token"), - refresh_token=config_data.get("refresh_token"), - token_uri=config_data.get("token_uri"), - client_id=config_data.get("client_id"), - client_secret=config_data.get("client_secret"), - scopes=config_data.get("scopes", []), - expiry=datetime.fromisoformat(exp) if exp else None, - ) - - try: - if is_composio_calendar: - from app.services.composio_service import ComposioService - - composio_result = await ComposioService().execute_tool( - connected_account_id=cca_id, - tool_name="GOOGLECALENDAR_DELETE_EVENT", - params={ - "calendar_id": "primary", - "event_id": final_event_id, - }, - entity_id=f"surfsense_{user_id}", - ) - if not composio_result.get("success"): - raise RuntimeError( - composio_result.get( - "error", "Unknown Composio Calendar error" - ) - ) - else: - service = await asyncio.get_event_loop().run_in_executor( - None, lambda: build("calendar", "v3", credentials=creds) - ) - await asyncio.get_event_loop().run_in_executor( - None, - lambda: ( - service.events() - .delete(calendarId="primary", eventId=final_event_id) - .execute() - ), - ) - except Exception as api_err: - from googleapiclient.errors import HttpError - - if isinstance(api_err, HttpError) and api_err.resp.status == 403: - logger.warning( - f"Insufficient permissions for connector {actual_connector_id}: {api_err}" - ) - try: - from sqlalchemy.orm.attributes import flag_modified - - _res = await db_session.execute( - select(SearchSourceConnector).where( - SearchSourceConnector.id == actual_connector_id - ) - ) - _conn = _res.scalar_one_or_none() - if _conn and not _conn.config.get("auth_expired"): - _conn.config = {**_conn.config, "auth_expired": True} - flag_modified(_conn, "config") - await db_session.commit() - except Exception: - logger.warning( - "Failed to persist auth_expired for connector %s", - actual_connector_id, - exc_info=True, - ) - return { - "status": "insufficient_permissions", - "connector_id": actual_connector_id, - "message": "This Google Calendar account needs additional permissions. Please re-authenticate in connector settings.", - } - raise - - logger.info(f"Calendar event deleted: event_id={final_event_id}") - - delete_result: dict[str, Any] = { - "status": "success", - "event_id": final_event_id, - "message": f"Successfully deleted the calendar event '{event.get('summary', event_title_or_id)}'.", - } - - deleted_from_kb = False - if final_delete_from_kb and document_id: - try: - from app.db import Document - - doc_result = await db_session.execute( - select(Document).filter(Document.id == document_id) - ) - document = doc_result.scalars().first() - if document: - await db_session.delete(document) - await db_session.commit() - deleted_from_kb = True - logger.info( - f"Deleted document {document_id} from knowledge base" - ) - else: - logger.warning(f"Document {document_id} not found in KB") - except Exception as e: - logger.error(f"Failed to delete document from KB: {e}") - await db_session.rollback() - delete_result["warning"] = ( - f"Event deleted, but failed to remove from knowledge base: {e!s}" - ) - - delete_result["deleted_from_kb"] = deleted_from_kb - if deleted_from_kb: - delete_result["message"] = ( - f"{delete_result.get('message', '')} (also removed from knowledge base)" - ) - - return delete_result - - except Exception as e: - from langgraph.errors import GraphInterrupt - - if isinstance(e, GraphInterrupt): - raise - - logger.error(f"Error deleting calendar event: {e}", exc_info=True) - return { - "status": "error", - "message": "Something went wrong while deleting the event. Please try again.", - } - - return delete_calendar_event diff --git a/surfsense_backend/app/agents/shared/tools/google_calendar/search_events.py b/surfsense_backend/app/agents/shared/tools/google_calendar/search_events.py deleted file mode 100644 index 6a79b63fb..000000000 --- a/surfsense_backend/app/agents/shared/tools/google_calendar/search_events.py +++ /dev/null @@ -1,187 +0,0 @@ -import logging -from typing import Any - -from langchain_core.tools import tool -from sqlalchemy.ext.asyncio import AsyncSession -from sqlalchemy.future import select - -from app.agents.shared.tools.gmail.search_emails import _build_credentials -from app.db import SearchSourceConnector, SearchSourceConnectorType, async_session_maker - -logger = logging.getLogger(__name__) - -_CALENDAR_TYPES = [ - SearchSourceConnectorType.GOOGLE_CALENDAR_CONNECTOR, - SearchSourceConnectorType.COMPOSIO_GOOGLE_CALENDAR_CONNECTOR, -] - - -def _to_calendar_boundary(value: str, *, is_end: bool) -> str: - if "T" in value: - return value - time = "23:59:59" if is_end else "00:00:00" - return f"{value}T{time}Z" - - -def _format_calendar_events(events_raw: list[dict[str, Any]]) -> list[dict[str, Any]]: - events = [] - for ev in events_raw: - start = ev.get("start", {}) - end = ev.get("end", {}) - attendees_raw = ev.get("attendees", []) - events.append( - { - "event_id": ev.get("id"), - "summary": ev.get("summary", "No Title"), - "start": start.get("dateTime") or start.get("date", ""), - "end": end.get("dateTime") or end.get("date", ""), - "location": ev.get("location", ""), - "description": ev.get("description", ""), - "html_link": ev.get("htmlLink", ""), - "attendees": [a.get("email", "") for a in attendees_raw[:10]], - "status": ev.get("status", ""), - } - ) - return events - - -def create_search_calendar_events_tool( - db_session: AsyncSession | None = None, - search_space_id: int | None = None, - user_id: str | None = None, -): - """ - Factory function to create the search_calendar_events tool. - - The tool acquires its own short-lived ``AsyncSession`` per call via - :data:`async_session_maker` so the closure is safe to share across - HTTP requests by the compiled-agent cache. Capturing a per-request - session here would surface stale/closed sessions on cache hits. - - Args: - db_session: Reserved for registry compatibility. Per-call sessions - are opened via :data:`async_session_maker` inside the tool body. - - Returns: - Configured search_calendar_events tool - """ - del db_session # per-call session — see docstring - - @tool - async def search_calendar_events( - start_date: str, - end_date: str, - max_results: int = 25, - ) -> dict[str, Any]: - """Search Google Calendar events within a date range. - - Args: - start_date: Start date in YYYY-MM-DD format (e.g. "2026-04-01"). - end_date: End date in YYYY-MM-DD format (e.g. "2026-04-30"). - max_results: Maximum number of events to return (default 25, max 50). - - Returns: - Dictionary with status and a list of events including - event_id, summary, start, end, location, attendees. - """ - if search_space_id is None or user_id is None: - return { - "status": "error", - "message": "Calendar tool not properly configured.", - } - - max_results = min(max_results, 50) - - try: - async with async_session_maker() as db_session: - result = await db_session.execute( - select(SearchSourceConnector).filter( - SearchSourceConnector.search_space_id == search_space_id, - SearchSourceConnector.user_id == user_id, - SearchSourceConnector.connector_type.in_(_CALENDAR_TYPES), - ) - ) - connector = result.scalars().first() - if not connector: - return { - "status": "error", - "message": "No Google Calendar connector found. Please connect Google Calendar in your workspace settings.", - } - - if ( - connector.connector_type - == SearchSourceConnectorType.COMPOSIO_GOOGLE_CALENDAR_CONNECTOR - ): - cca_id = connector.config.get("composio_connected_account_id") - if not cca_id: - return { - "status": "error", - "message": "Composio connected account ID not found for this connector.", - } - - from app.services.composio_service import ComposioService - - events_raw, error = await ComposioService().get_calendar_events( - connected_account_id=cca_id, - entity_id=f"surfsense_{user_id}", - time_min=_to_calendar_boundary(start_date, is_end=False), - time_max=_to_calendar_boundary(end_date, is_end=True), - max_results=max_results, - ) - if not events_raw and not error: - error = "No events found in the specified date range." - else: - creds = _build_credentials(connector) - - from app.connectors.google_calendar_connector import ( - GoogleCalendarConnector, - ) - - cal = GoogleCalendarConnector( - credentials=creds, - session=db_session, - user_id=user_id, - connector_id=connector.id, - ) - - events_raw, error = await cal.get_all_primary_calendar_events( - start_date=start_date, - end_date=end_date, - max_results=max_results, - ) - - if error: - if ( - "re-authenticate" in error.lower() - or "authentication failed" in error.lower() - ): - return { - "status": "auth_error", - "message": error, - "connector_type": "google_calendar", - } - if "no events found" in error.lower(): - return { - "status": "success", - "events": [], - "total": 0, - "message": error, - } - return {"status": "error", "message": error} - - events = _format_calendar_events(events_raw) - - return {"status": "success", "events": events, "total": len(events)} - - except Exception as e: - from langgraph.errors import GraphInterrupt - - if isinstance(e, GraphInterrupt): - raise - logger.error("Error searching calendar events: %s", e, exc_info=True) - return { - "status": "error", - "message": "Failed to search calendar events. Please try again.", - } - - return search_calendar_events diff --git a/surfsense_backend/app/agents/shared/tools/google_calendar/update_event.py b/surfsense_backend/app/agents/shared/tools/google_calendar/update_event.py deleted file mode 100644 index 586695056..000000000 --- a/surfsense_backend/app/agents/shared/tools/google_calendar/update_event.py +++ /dev/null @@ -1,419 +0,0 @@ -import asyncio -import logging -from datetime import datetime -from typing import Any - -from google.oauth2.credentials import Credentials -from googleapiclient.discovery import build -from langchain_core.tools import tool -from sqlalchemy.ext.asyncio import AsyncSession - -from app.agents.shared.tools.hitl import request_approval -from app.db import async_session_maker -from app.services.google_calendar import GoogleCalendarToolMetadataService - -logger = logging.getLogger(__name__) - - -def _is_date_only(value: str) -> bool: - """Return True when *value* looks like a bare date (YYYY-MM-DD) with no time component.""" - return len(value) <= 10 and "T" not in value - - -def _build_time_body(value: str, context: dict[str, Any] | Any) -> dict[str, str]: - """Build a Google Calendar start/end body using ``date`` for all-day - events and ``dateTime`` for timed events.""" - if _is_date_only(value): - return {"date": value} - tz = context.get("timezone", "UTC") if isinstance(context, dict) else "UTC" - return {"dateTime": value, "timeZone": tz} - - -def create_update_calendar_event_tool( - db_session: AsyncSession | None = None, - search_space_id: int | None = None, - user_id: str | None = None, -): - """ - Factory function to create the update_calendar_event tool. - - The tool acquires its own short-lived ``AsyncSession`` per call via - :data:`async_session_maker` so the closure is safe to share across - HTTP requests by the compiled-agent cache. Capturing a per-request - session here would surface stale/closed sessions on cache hits. - - Args: - db_session: Reserved for registry compatibility. Per-call sessions - are opened via :data:`async_session_maker` inside the tool body. - - Returns: - Configured update_calendar_event tool - """ - del db_session # per-call session — see docstring - - @tool - async def update_calendar_event( - event_title_or_id: str, - new_summary: str | None = None, - new_start_datetime: str | None = None, - new_end_datetime: str | None = None, - new_description: str | None = None, - new_location: str | None = None, - new_attendees: list[str] | None = None, - ) -> dict[str, Any]: - """Update an existing Google Calendar event. - - Use when the user asks to modify, reschedule, or change a calendar event. - - Args: - event_title_or_id: The exact title or event ID of the event to update. - new_summary: New event title (if changing). - new_start_datetime: New start time in ISO 8601 format (if rescheduling). - new_end_datetime: New end time in ISO 8601 format (if rescheduling). - new_description: New event description (if changing). - new_location: New event location (if changing). - new_attendees: New list of attendee email addresses (if changing). - - Returns: - Dictionary with: - - status: "success", "rejected", "not_found", "auth_error", or "error" - - event_id: Google Calendar event ID (if success) - - html_link: URL to open the event (if success) - - message: Result message - - IMPORTANT: - - If status is "rejected", the user explicitly declined. Respond with a brief - acknowledgment and do NOT retry or suggest alternatives. - - If status is "not_found", relay the exact message to the user and ask them - to verify the event name or check if it has been indexed. - Examples: - - "Reschedule the team standup to 3pm" - - "Change the location of my dentist appointment" - """ - logger.info(f"update_calendar_event called: event_ref='{event_title_or_id}'") - - if search_space_id is None or user_id is None: - return { - "status": "error", - "message": "Google Calendar tool not properly configured. Please contact support.", - } - - try: - async with async_session_maker() as db_session: - metadata_service = GoogleCalendarToolMetadataService(db_session) - context = await metadata_service.get_update_context( - search_space_id, user_id, event_title_or_id - ) - - if "error" in context: - error_msg = context["error"] - if "not found" in error_msg.lower(): - logger.warning(f"Event not found: {error_msg}") - return {"status": "not_found", "message": error_msg} - logger.error(f"Failed to fetch update context: {error_msg}") - return {"status": "error", "message": error_msg} - - if context.get("auth_expired"): - logger.warning("Google Calendar account has expired authentication") - return { - "status": "auth_error", - "message": "The Google Calendar account for this event needs re-authentication. Please re-authenticate in your connector settings.", - "connector_type": "google_calendar", - } - - event = context["event"] - event_id = event["event_id"] - document_id = event.get("document_id") - connector_id_from_context = context["account"]["id"] - - if not event_id: - return { - "status": "error", - "message": "Event ID is missing from the indexed document. Please re-index the event and try again.", - } - - logger.info( - f"Requesting approval for updating calendar event: '{event_title_or_id}' (event_id={event_id})" - ) - result = request_approval( - action_type="google_calendar_event_update", - tool_name="update_calendar_event", - params={ - "event_id": event_id, - "document_id": document_id, - "connector_id": connector_id_from_context, - "new_summary": new_summary, - "new_start_datetime": new_start_datetime, - "new_end_datetime": new_end_datetime, - "new_description": new_description, - "new_location": new_location, - "new_attendees": new_attendees, - }, - context=context, - ) - - if result.rejected: - return { - "status": "rejected", - "message": "User declined. The event was not updated. Do not ask again or suggest alternatives.", - } - - final_event_id = result.params.get("event_id", event_id) - final_connector_id = result.params.get( - "connector_id", connector_id_from_context - ) - final_new_summary = result.params.get("new_summary", new_summary) - final_new_start_datetime = result.params.get( - "new_start_datetime", new_start_datetime - ) - final_new_end_datetime = result.params.get( - "new_end_datetime", new_end_datetime - ) - final_new_description = result.params.get( - "new_description", new_description - ) - final_new_location = result.params.get("new_location", new_location) - final_new_attendees = result.params.get("new_attendees", new_attendees) - - if not final_connector_id: - return { - "status": "error", - "message": "No connector found for this event.", - } - - from sqlalchemy.future import select - - from app.db import SearchSourceConnector, SearchSourceConnectorType - - _calendar_types = [ - SearchSourceConnectorType.GOOGLE_CALENDAR_CONNECTOR, - SearchSourceConnectorType.COMPOSIO_GOOGLE_CALENDAR_CONNECTOR, - ] - - result = await db_session.execute( - select(SearchSourceConnector).filter( - SearchSourceConnector.id == final_connector_id, - SearchSourceConnector.search_space_id == search_space_id, - SearchSourceConnector.user_id == user_id, - SearchSourceConnector.connector_type.in_(_calendar_types), - ) - ) - connector = result.scalars().first() - if not connector: - return { - "status": "error", - "message": "Selected Google Calendar connector is invalid or has been disconnected.", - } - - actual_connector_id = connector.id - - logger.info( - f"Updating calendar event: event_id='{final_event_id}', connector={actual_connector_id}" - ) - - is_composio_calendar = ( - connector.connector_type - == SearchSourceConnectorType.COMPOSIO_GOOGLE_CALENDAR_CONNECTOR - ) - if is_composio_calendar: - cca_id = connector.config.get("composio_connected_account_id") - if not cca_id: - return { - "status": "error", - "message": "Composio connected account ID not found for this connector.", - } - else: - config_data = dict(connector.config) - - from app.config import config as app_config - from app.utils.oauth_security import TokenEncryption - - token_encrypted = config_data.get("_token_encrypted", False) - if token_encrypted and app_config.SECRET_KEY: - token_encryption = TokenEncryption(app_config.SECRET_KEY) - for key in ("token", "refresh_token", "client_secret"): - if config_data.get(key): - config_data[key] = token_encryption.decrypt_token( - config_data[key] - ) - - exp = config_data.get("expiry", "") - if exp: - exp = exp.replace("Z", "") - - creds = Credentials( - token=config_data.get("token"), - refresh_token=config_data.get("refresh_token"), - token_uri=config_data.get("token_uri"), - client_id=config_data.get("client_id"), - client_secret=config_data.get("client_secret"), - scopes=config_data.get("scopes", []), - expiry=datetime.fromisoformat(exp) if exp else None, - ) - - update_body: dict[str, Any] = {} - if final_new_summary is not None: - update_body["summary"] = final_new_summary - if final_new_start_datetime is not None: - update_body["start"] = _build_time_body( - final_new_start_datetime, context - ) - if final_new_end_datetime is not None: - update_body["end"] = _build_time_body( - final_new_end_datetime, context - ) - if final_new_description is not None: - update_body["description"] = final_new_description - if final_new_location is not None: - update_body["location"] = final_new_location - if final_new_attendees is not None: - update_body["attendees"] = [ - {"email": e.strip()} for e in final_new_attendees if e.strip() - ] - - if not update_body: - return { - "status": "error", - "message": "No changes specified. Please provide at least one field to update.", - } - - try: - if is_composio_calendar: - from app.services.composio_service import ComposioService - - composio_params: dict[str, Any] = { - "calendar_id": "primary", - "event_id": final_event_id, - } - if final_new_summary is not None: - composio_params["summary"] = final_new_summary - if final_new_start_datetime is not None: - composio_params["start_time"] = final_new_start_datetime - if final_new_end_datetime is not None: - composio_params["end_time"] = final_new_end_datetime - if final_new_description is not None: - composio_params["description"] = final_new_description - if final_new_location is not None: - composio_params["location"] = final_new_location - if final_new_attendees is not None: - composio_params["attendees"] = [ - e.strip() for e in final_new_attendees if e.strip() - ] - if not _is_date_only( - final_new_start_datetime or final_new_end_datetime or "" - ): - composio_params["timezone"] = context.get("timezone", "UTC") - - composio_result = await ComposioService().execute_tool( - connected_account_id=cca_id, - tool_name="GOOGLECALENDAR_PATCH_EVENT", - params=composio_params, - entity_id=f"surfsense_{user_id}", - ) - if not composio_result.get("success"): - raise RuntimeError( - composio_result.get( - "error", "Unknown Composio Calendar error" - ) - ) - updated = composio_result.get("data", {}) - if isinstance(updated, dict): - updated = updated.get("data", updated) - if isinstance(updated, dict): - updated = updated.get("response_data", updated) - else: - service = await asyncio.get_event_loop().run_in_executor( - None, lambda: build("calendar", "v3", credentials=creds) - ) - updated = await asyncio.get_event_loop().run_in_executor( - None, - lambda: ( - service.events() - .patch( - calendarId="primary", - eventId=final_event_id, - body=update_body, - ) - .execute() - ), - ) - except Exception as api_err: - from googleapiclient.errors import HttpError - - if isinstance(api_err, HttpError) and api_err.resp.status == 403: - logger.warning( - f"Insufficient permissions for connector {actual_connector_id}: {api_err}" - ) - try: - from sqlalchemy.orm.attributes import flag_modified - - _res = await db_session.execute( - select(SearchSourceConnector).where( - SearchSourceConnector.id == actual_connector_id - ) - ) - _conn = _res.scalar_one_or_none() - if _conn and not _conn.config.get("auth_expired"): - _conn.config = {**_conn.config, "auth_expired": True} - flag_modified(_conn, "config") - await db_session.commit() - except Exception: - logger.warning( - "Failed to persist auth_expired for connector %s", - actual_connector_id, - exc_info=True, - ) - return { - "status": "insufficient_permissions", - "connector_id": actual_connector_id, - "message": "This Google Calendar account needs additional permissions. Please re-authenticate in connector settings.", - } - raise - - logger.info(f"Calendar event updated: event_id={final_event_id}") - - kb_message_suffix = "" - if document_id is not None: - try: - from app.services.google_calendar import ( - GoogleCalendarKBSyncService, - ) - - kb_service = GoogleCalendarKBSyncService(db_session) - kb_result = await kb_service.sync_after_update( - document_id=document_id, - event_id=final_event_id, - connector_id=actual_connector_id, - search_space_id=search_space_id, - user_id=user_id, - ) - if kb_result["status"] == "success": - kb_message_suffix = ( - " Your knowledge base has also been updated." - ) - else: - kb_message_suffix = " The knowledge base will be updated in the next scheduled sync." - except Exception as kb_err: - logger.warning(f"KB sync after update failed: {kb_err}") - kb_message_suffix = " The knowledge base will be updated in the next scheduled sync." - - return { - "status": "success", - "event_id": final_event_id, - "html_link": updated.get("htmlLink"), - "message": f"Successfully updated the calendar event.{kb_message_suffix}", - } - - except Exception as e: - from langgraph.errors import GraphInterrupt - - if isinstance(e, GraphInterrupt): - raise - - logger.error(f"Error updating calendar event: {e}", exc_info=True) - return { - "status": "error", - "message": "Something went wrong while updating the event. Please try again.", - } - - return update_calendar_event diff --git a/surfsense_backend/tests/e2e/fakes/native_google.py b/surfsense_backend/tests/e2e/fakes/native_google.py index 84c98d69a..eae30546e 100644 --- a/surfsense_backend/tests/e2e/fakes/native_google.py +++ b/surfsense_backend/tests/e2e/fakes/native_google.py @@ -429,9 +429,18 @@ def install(active_patches: list[Any]) -> None: ("app.connectors.google_drive.client.build", _fake_build), ("app.connectors.google_gmail_connector.build", _fake_build), ("app.connectors.google_calendar_connector.build", _fake_build), - ("app.agents.shared.tools.google_calendar.create_event.build", _fake_build), - ("app.agents.shared.tools.google_calendar.update_event.build", _fake_build), - ("app.agents.shared.tools.google_calendar.delete_event.build", _fake_build), + ( + "app.agents.multi_agent_chat.subagents.connectors.calendar.tools.create_event.build", + _fake_build, + ), + ( + "app.agents.multi_agent_chat.subagents.connectors.calendar.tools.update_event.build", + _fake_build, + ), + ( + "app.agents.multi_agent_chat.subagents.connectors.calendar.tools.delete_event.build", + _fake_build, + ), ("googleapiclient.http.MediaIoBaseDownload", _FakeMediaIoBaseDownload), ( "app.connectors.google_drive.client._build_thread_http",