diff --git a/surfsense_backend/app/agents/new_chat/chat_deepagent.py b/surfsense_backend/app/agents/new_chat/chat_deepagent.py
index d3cf456d6..4e831265f 100644
--- a/surfsense_backend/app/agents/new_chat/chat_deepagent.py
+++ b/surfsense_backend/app/agents/new_chat/chat_deepagent.py
@@ -305,6 +305,32 @@ async def create_surfsense_deep_agent(
]
modified_disabled_tools.extend(google_drive_tools)
+ # Disable Google Calendar action tools if no Google Calendar connector is configured
+ has_google_calendar_connector = (
+ available_connectors is not None
+ and "GOOGLE_CALENDAR_CONNECTOR" in available_connectors
+ )
+ if not has_google_calendar_connector:
+ calendar_tools = [
+ "create_calendar_event",
+ "update_calendar_event",
+ "delete_calendar_event",
+ ]
+ modified_disabled_tools.extend(calendar_tools)
+
+ # Disable Gmail action tools if no Gmail connector is configured
+ has_gmail_connector = (
+ available_connectors is not None
+ and "GOOGLE_GMAIL_CONNECTOR" in available_connectors
+ )
+ if not has_gmail_connector:
+ gmail_tools = [
+ "create_gmail_draft",
+ "send_gmail_email",
+ "trash_gmail_email",
+ ]
+ modified_disabled_tools.extend(gmail_tools)
+
# Build tools using the async registry (includes MCP tools)
_t0 = time.perf_counter()
tools = await build_tools_async(
diff --git a/surfsense_backend/app/agents/new_chat/tools/gmail/__init__.py b/surfsense_backend/app/agents/new_chat/tools/gmail/__init__.py
new file mode 100644
index 000000000..3b6325ae6
--- /dev/null
+++ b/surfsense_backend/app/agents/new_chat/tools/gmail/__init__.py
@@ -0,0 +1,15 @@
+from app.agents.new_chat.tools.gmail.create_draft import (
+ create_create_gmail_draft_tool,
+)
+from app.agents.new_chat.tools.gmail.send_email import (
+ create_send_gmail_email_tool,
+)
+from app.agents.new_chat.tools.gmail.trash_email import (
+ create_trash_gmail_email_tool,
+)
+
+__all__ = [
+ "create_create_gmail_draft_tool",
+ "create_send_gmail_email_tool",
+ "create_trash_gmail_email_tool",
+]
diff --git a/surfsense_backend/app/agents/new_chat/tools/gmail/create_draft.py b/surfsense_backend/app/agents/new_chat/tools/gmail/create_draft.py
new file mode 100644
index 000000000..246c7d16f
--- /dev/null
+++ b/surfsense_backend/app/agents/new_chat/tools/gmail/create_draft.py
@@ -0,0 +1,320 @@
+import asyncio
+import base64
+import logging
+from datetime import datetime
+from email.mime.text import MIMEText
+from typing import Any
+
+from langchain_core.tools import tool
+from langgraph.types import interrupt
+from sqlalchemy.ext.asyncio import AsyncSession
+
+from app.services.gmail import GmailToolMetadataService
+
+logger = logging.getLogger(__name__)
+
+
+def create_create_gmail_draft_tool(
+ db_session: AsyncSession | None = None,
+ search_space_id: int | None = None,
+ user_id: str | None = None,
+):
+ @tool
+ async def create_gmail_draft(
+ to: str,
+ subject: str,
+ body: str,
+ cc: str | None = None,
+ bcc: str | None = None,
+ ) -> dict[str, Any]:
+ """Create a draft email in Gmail.
+
+ Use when the user asks to draft, compose, or prepare an email without
+ sending it.
+
+ Args:
+ to: Recipient email address.
+ subject: Email subject line.
+ body: Email body content.
+ cc: Optional CC recipient(s), comma-separated.
+ bcc: Optional BCC recipient(s), comma-separated.
+
+ Returns:
+ Dictionary with:
+ - status: "success", "rejected", or "error"
+ - draft_id: Gmail draft ID (if success)
+ - message: Result message
+
+ IMPORTANT:
+ - If status is "rejected", the user explicitly declined the action.
+ Respond with a brief acknowledgment and do NOT retry or suggest alternatives.
+ - If status is "insufficient_permissions", the connector lacks the required OAuth scope.
+ Inform the user they need to re-authenticate and do NOT retry the action.
+
+ Examples:
+ - "Draft an email to alice@example.com about the meeting"
+ - "Compose a reply to Bob about the project update"
+ """
+ logger.info(
+ f"create_gmail_draft called: to='{to}', subject='{subject}'"
+ )
+
+ if db_session is None or search_space_id is None or user_id is None:
+ return {
+ "status": "error",
+ "message": "Gmail tool not properly configured. Please contact support.",
+ }
+
+ try:
+ metadata_service = GmailToolMetadataService(db_session)
+ context = await metadata_service.get_creation_context(
+ search_space_id, user_id
+ )
+
+ if "error" in context:
+ logger.error(f"Failed to fetch creation context: {context['error']}")
+ return {"status": "error", "message": context["error"]}
+
+ accounts = context.get("accounts", [])
+ if accounts and all(a.get("auth_expired") for a in accounts):
+ logger.warning("All Gmail accounts have expired authentication")
+ return {
+ "status": "auth_error",
+ "message": "All connected Gmail accounts need re-authentication. Please re-authenticate in your connector settings.",
+ "connector_type": "gmail",
+ }
+
+ logger.info(
+ f"Requesting approval for creating Gmail draft: to='{to}', subject='{subject}'"
+ )
+ approval = interrupt(
+ {
+ "type": "gmail_draft_creation",
+ "action": {
+ "tool": "create_gmail_draft",
+ "params": {
+ "to": to,
+ "subject": subject,
+ "body": body,
+ "cc": cc,
+ "bcc": bcc,
+ "connector_id": None,
+ },
+ },
+ "context": context,
+ }
+ )
+
+ decisions_raw = (
+ approval.get("decisions", []) if isinstance(approval, dict) else []
+ )
+ decisions = (
+ decisions_raw if isinstance(decisions_raw, list) else [decisions_raw]
+ )
+ decisions = [d for d in decisions if isinstance(d, dict)]
+ if not decisions:
+ logger.warning("No approval decision received")
+ return {"status": "error", "message": "No approval decision received"}
+
+ decision = decisions[0]
+ decision_type = decision.get("type") or decision.get("decision_type")
+ logger.info(f"User decision: {decision_type}")
+
+ if decision_type == "reject":
+ return {
+ "status": "rejected",
+ "message": "User declined. The draft was not created. Do not ask again or suggest alternatives.",
+ }
+
+ final_params: dict[str, Any] = {}
+ edited_action = decision.get("edited_action")
+ if isinstance(edited_action, dict):
+ edited_args = edited_action.get("args")
+ if isinstance(edited_args, dict):
+ final_params = edited_args
+ elif isinstance(decision.get("args"), dict):
+ final_params = decision["args"]
+
+ final_to = final_params.get("to", to)
+ final_subject = final_params.get("subject", subject)
+ final_body = final_params.get("body", body)
+ final_cc = final_params.get("cc", cc)
+ final_bcc = final_params.get("bcc", bcc)
+ final_connector_id = final_params.get("connector_id")
+
+ from sqlalchemy.future import select
+
+ from app.db import SearchSourceConnector, SearchSourceConnectorType
+
+ _GMAIL_TYPES = [
+ SearchSourceConnectorType.GOOGLE_GMAIL_CONNECTOR,
+ SearchSourceConnectorType.COMPOSIO_GMAIL_CONNECTOR,
+ ]
+
+ if final_connector_id is not None:
+ result = await db_session.execute(
+ select(SearchSourceConnector).filter(
+ SearchSourceConnector.id == final_connector_id,
+ SearchSourceConnector.search_space_id == search_space_id,
+ SearchSourceConnector.user_id == user_id,
+ SearchSourceConnector.connector_type.in_(_GMAIL_TYPES),
+ )
+ )
+ connector = result.scalars().first()
+ if not connector:
+ return {
+ "status": "error",
+ "message": "Selected Gmail connector is invalid or has been disconnected.",
+ }
+ actual_connector_id = connector.id
+ else:
+ result = await db_session.execute(
+ select(SearchSourceConnector).filter(
+ SearchSourceConnector.search_space_id == search_space_id,
+ SearchSourceConnector.user_id == user_id,
+ SearchSourceConnector.connector_type.in_(_GMAIL_TYPES),
+ )
+ )
+ connector = result.scalars().first()
+ if not connector:
+ return {
+ "status": "error",
+ "message": "No Gmail connector found. Please connect Gmail in your workspace settings.",
+ }
+ actual_connector_id = connector.id
+
+ logger.info(
+ f"Creating Gmail draft: to='{final_to}', subject='{final_subject}', connector={actual_connector_id}"
+ )
+
+ if connector.connector_type == SearchSourceConnectorType.COMPOSIO_GMAIL_CONNECTOR:
+ from app.utils.google_credentials import build_composio_credentials
+
+ cca_id = connector.config.get("composio_connected_account_id")
+ if cca_id:
+ creds = build_composio_credentials(cca_id)
+ else:
+ return {
+ "status": "error",
+ "message": "Composio connected account ID not found for this Gmail connector.",
+ }
+ else:
+ from google.oauth2.credentials import Credentials
+
+ from app.config import config
+ from app.utils.oauth_security import TokenEncryption
+
+ config_data = dict(connector.config)
+ token_encrypted = config_data.get("_token_encrypted", False)
+ if token_encrypted and config.SECRET_KEY:
+ token_encryption = TokenEncryption(config.SECRET_KEY)
+ if config_data.get("token"):
+ config_data["token"] = token_encryption.decrypt_token(
+ config_data["token"]
+ )
+ if config_data.get("refresh_token"):
+ config_data["refresh_token"] = token_encryption.decrypt_token(
+ config_data["refresh_token"]
+ )
+ if config_data.get("client_secret"):
+ config_data["client_secret"] = token_encryption.decrypt_token(
+ config_data["client_secret"]
+ )
+
+ exp = config_data.get("expiry", "")
+ if exp:
+ exp = exp.replace("Z", "")
+
+ creds = Credentials(
+ token=config_data.get("token"),
+ refresh_token=config_data.get("refresh_token"),
+ token_uri=config_data.get("token_uri"),
+ client_id=config_data.get("client_id"),
+ client_secret=config_data.get("client_secret"),
+ scopes=config_data.get("scopes", []),
+ expiry=datetime.fromisoformat(exp) if exp else None,
+ )
+
+ from googleapiclient.discovery import build
+
+ gmail_service = build("gmail", "v1", credentials=creds)
+
+ message = MIMEText(final_body)
+ message["to"] = final_to
+ message["subject"] = final_subject
+ if final_cc:
+ message["cc"] = final_cc
+ if final_bcc:
+ message["bcc"] = final_bcc
+ raw = base64.urlsafe_b64encode(message.as_bytes()).decode()
+
+ try:
+ created = await asyncio.get_event_loop().run_in_executor(
+ None,
+ lambda: gmail_service.users()
+ .drafts()
+ .create(userId="me", body={"message": {"raw": raw}})
+ .execute(),
+ )
+ except Exception as api_err:
+ from googleapiclient.errors import HttpError
+
+ if isinstance(api_err, HttpError) and api_err.resp.status == 403:
+ logger.warning(
+ f"Insufficient permissions for connector {actual_connector_id}: {api_err}"
+ )
+ return {
+ "status": "insufficient_permissions",
+ "connector_id": actual_connector_id,
+ "message": "This Gmail account needs additional permissions. Please re-authenticate.",
+ }
+ raise
+
+ logger.info(
+ f"Gmail draft created: id={created.get('id')}"
+ )
+
+ kb_message_suffix = ""
+ try:
+ from app.services.gmail import GmailKBSyncService
+
+ kb_service = GmailKBSyncService(db_session)
+ draft_message = created.get("message", {})
+ kb_result = await kb_service.sync_after_create(
+ message_id=draft_message.get("id", ""),
+ thread_id=draft_message.get("threadId", ""),
+ subject=final_subject,
+ sender="me",
+ date_str=datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
+ body_text=final_body,
+ connector_id=actual_connector_id,
+ search_space_id=search_space_id,
+ user_id=user_id,
+ )
+ if kb_result["status"] == "success":
+ kb_message_suffix = " Your knowledge base has also been updated."
+ else:
+ kb_message_suffix = " This draft will be added to your knowledge base in the next scheduled sync."
+ except Exception as kb_err:
+ logger.warning(f"KB sync after create failed: {kb_err}")
+ kb_message_suffix = " This draft will be added to your knowledge base in the next scheduled sync."
+
+ return {
+ "status": "success",
+ "draft_id": created.get("id"),
+ "message": f"Successfully created Gmail draft with subject '{final_subject}'.{kb_message_suffix}",
+ }
+
+ except Exception as e:
+ from langgraph.errors import GraphInterrupt
+
+ if isinstance(e, GraphInterrupt):
+ raise
+
+ logger.error(f"Error creating Gmail draft: {e}", exc_info=True)
+ return {
+ "status": "error",
+ "message": "Something went wrong while creating the draft. Please try again.",
+ }
+
+ return create_gmail_draft
diff --git a/surfsense_backend/app/agents/new_chat/tools/gmail/send_email.py b/surfsense_backend/app/agents/new_chat/tools/gmail/send_email.py
new file mode 100644
index 000000000..93c7933a5
--- /dev/null
+++ b/surfsense_backend/app/agents/new_chat/tools/gmail/send_email.py
@@ -0,0 +1,321 @@
+import asyncio
+import base64
+import logging
+from datetime import datetime
+from email.mime.text import MIMEText
+from typing import Any
+
+from langchain_core.tools import tool
+from langgraph.types import interrupt
+from sqlalchemy.ext.asyncio import AsyncSession
+
+from app.services.gmail import GmailToolMetadataService
+
+logger = logging.getLogger(__name__)
+
+
+def create_send_gmail_email_tool(
+ db_session: AsyncSession | None = None,
+ search_space_id: int | None = None,
+ user_id: str | None = None,
+):
+ @tool
+ async def send_gmail_email(
+ to: str,
+ subject: str,
+ body: str,
+ cc: str | None = None,
+ bcc: str | None = None,
+ ) -> dict[str, Any]:
+ """Send an email via Gmail.
+
+ Use when the user explicitly asks to send an email. This sends the
+ email immediately - it cannot be unsent.
+
+ Args:
+ to: Recipient email address.
+ subject: Email subject line.
+ body: Email body content.
+ cc: Optional CC recipient(s), comma-separated.
+ bcc: Optional BCC recipient(s), comma-separated.
+
+ Returns:
+ Dictionary with:
+ - status: "success", "rejected", or "error"
+ - message_id: Gmail message ID (if success)
+ - thread_id: Gmail thread ID (if success)
+ - message: Result message
+
+ IMPORTANT:
+ - If status is "rejected", the user explicitly declined the action.
+ Respond with a brief acknowledgment and do NOT retry or suggest alternatives.
+ - If status is "insufficient_permissions", the connector lacks the required OAuth scope.
+ Inform the user they need to re-authenticate and do NOT retry the action.
+
+ Examples:
+ - "Send an email to alice@example.com about the meeting"
+ - "Email Bob the project update"
+ """
+ logger.info(
+ f"send_gmail_email called: to='{to}', subject='{subject}'"
+ )
+
+ if db_session is None or search_space_id is None or user_id is None:
+ return {
+ "status": "error",
+ "message": "Gmail tool not properly configured. Please contact support.",
+ }
+
+ try:
+ metadata_service = GmailToolMetadataService(db_session)
+ context = await metadata_service.get_creation_context(
+ search_space_id, user_id
+ )
+
+ if "error" in context:
+ logger.error(f"Failed to fetch creation context: {context['error']}")
+ return {"status": "error", "message": context["error"]}
+
+ accounts = context.get("accounts", [])
+ if accounts and all(a.get("auth_expired") for a in accounts):
+ logger.warning("All Gmail accounts have expired authentication")
+ return {
+ "status": "auth_error",
+ "message": "All connected Gmail accounts need re-authentication. Please re-authenticate in your connector settings.",
+ "connector_type": "gmail",
+ }
+
+ logger.info(
+ f"Requesting approval for sending Gmail email: to='{to}', subject='{subject}'"
+ )
+ approval = interrupt(
+ {
+ "type": "gmail_email_send",
+ "action": {
+ "tool": "send_gmail_email",
+ "params": {
+ "to": to,
+ "subject": subject,
+ "body": body,
+ "cc": cc,
+ "bcc": bcc,
+ "connector_id": None,
+ },
+ },
+ "context": context,
+ }
+ )
+
+ decisions_raw = (
+ approval.get("decisions", []) if isinstance(approval, dict) else []
+ )
+ decisions = (
+ decisions_raw if isinstance(decisions_raw, list) else [decisions_raw]
+ )
+ decisions = [d for d in decisions if isinstance(d, dict)]
+ if not decisions:
+ logger.warning("No approval decision received")
+ return {"status": "error", "message": "No approval decision received"}
+
+ decision = decisions[0]
+ decision_type = decision.get("type") or decision.get("decision_type")
+ logger.info(f"User decision: {decision_type}")
+
+ if decision_type == "reject":
+ return {
+ "status": "rejected",
+ "message": "User declined. The email was not sent. Do not ask again or suggest alternatives.",
+ }
+
+ final_params: dict[str, Any] = {}
+ edited_action = decision.get("edited_action")
+ if isinstance(edited_action, dict):
+ edited_args = edited_action.get("args")
+ if isinstance(edited_args, dict):
+ final_params = edited_args
+ elif isinstance(decision.get("args"), dict):
+ final_params = decision["args"]
+
+ final_to = final_params.get("to", to)
+ final_subject = final_params.get("subject", subject)
+ final_body = final_params.get("body", body)
+ final_cc = final_params.get("cc", cc)
+ final_bcc = final_params.get("bcc", bcc)
+ final_connector_id = final_params.get("connector_id")
+
+ from sqlalchemy.future import select
+
+ from app.db import SearchSourceConnector, SearchSourceConnectorType
+
+ _GMAIL_TYPES = [
+ SearchSourceConnectorType.GOOGLE_GMAIL_CONNECTOR,
+ SearchSourceConnectorType.COMPOSIO_GMAIL_CONNECTOR,
+ ]
+
+ if final_connector_id is not None:
+ result = await db_session.execute(
+ select(SearchSourceConnector).filter(
+ SearchSourceConnector.id == final_connector_id,
+ SearchSourceConnector.search_space_id == search_space_id,
+ SearchSourceConnector.user_id == user_id,
+ SearchSourceConnector.connector_type.in_(_GMAIL_TYPES),
+ )
+ )
+ connector = result.scalars().first()
+ if not connector:
+ return {
+ "status": "error",
+ "message": "Selected Gmail connector is invalid or has been disconnected.",
+ }
+ actual_connector_id = connector.id
+ else:
+ result = await db_session.execute(
+ select(SearchSourceConnector).filter(
+ SearchSourceConnector.search_space_id == search_space_id,
+ SearchSourceConnector.user_id == user_id,
+ SearchSourceConnector.connector_type.in_(_GMAIL_TYPES),
+ )
+ )
+ connector = result.scalars().first()
+ if not connector:
+ return {
+ "status": "error",
+ "message": "No Gmail connector found. Please connect Gmail in your workspace settings.",
+ }
+ actual_connector_id = connector.id
+
+ logger.info(
+ f"Sending Gmail email: to='{final_to}', subject='{final_subject}', connector={actual_connector_id}"
+ )
+
+ if connector.connector_type == SearchSourceConnectorType.COMPOSIO_GMAIL_CONNECTOR:
+ from app.utils.google_credentials import build_composio_credentials
+
+ cca_id = connector.config.get("composio_connected_account_id")
+ if cca_id:
+ creds = build_composio_credentials(cca_id)
+ else:
+ return {
+ "status": "error",
+ "message": "Composio connected account ID not found for this Gmail connector.",
+ }
+ else:
+ from google.oauth2.credentials import Credentials
+
+ from app.config import config
+ from app.utils.oauth_security import TokenEncryption
+
+ config_data = dict(connector.config)
+ token_encrypted = config_data.get("_token_encrypted", False)
+ if token_encrypted and config.SECRET_KEY:
+ token_encryption = TokenEncryption(config.SECRET_KEY)
+ if config_data.get("token"):
+ config_data["token"] = token_encryption.decrypt_token(
+ config_data["token"]
+ )
+ if config_data.get("refresh_token"):
+ config_data["refresh_token"] = token_encryption.decrypt_token(
+ config_data["refresh_token"]
+ )
+ if config_data.get("client_secret"):
+ config_data["client_secret"] = token_encryption.decrypt_token(
+ config_data["client_secret"]
+ )
+
+ exp = config_data.get("expiry", "")
+ if exp:
+ exp = exp.replace("Z", "")
+
+ creds = Credentials(
+ token=config_data.get("token"),
+ refresh_token=config_data.get("refresh_token"),
+ token_uri=config_data.get("token_uri"),
+ client_id=config_data.get("client_id"),
+ client_secret=config_data.get("client_secret"),
+ scopes=config_data.get("scopes", []),
+ expiry=datetime.fromisoformat(exp) if exp else None,
+ )
+
+ from googleapiclient.discovery import build
+
+ gmail_service = build("gmail", "v1", credentials=creds)
+
+ message = MIMEText(final_body)
+ message["to"] = final_to
+ message["subject"] = final_subject
+ if final_cc:
+ message["cc"] = final_cc
+ if final_bcc:
+ message["bcc"] = final_bcc
+ raw = base64.urlsafe_b64encode(message.as_bytes()).decode()
+
+ try:
+ sent = await asyncio.get_event_loop().run_in_executor(
+ None,
+ lambda: gmail_service.users()
+ .messages()
+ .send(userId="me", body={"raw": raw})
+ .execute(),
+ )
+ except Exception as api_err:
+ from googleapiclient.errors import HttpError
+
+ if isinstance(api_err, HttpError) and api_err.resp.status == 403:
+ logger.warning(
+ f"Insufficient permissions for connector {actual_connector_id}: {api_err}"
+ )
+ return {
+ "status": "insufficient_permissions",
+ "connector_id": actual_connector_id,
+ "message": "This Gmail account needs additional permissions. Please re-authenticate.",
+ }
+ raise
+
+ logger.info(
+ f"Gmail email sent: id={sent.get('id')}, threadId={sent.get('threadId')}"
+ )
+
+ kb_message_suffix = ""
+ try:
+ from app.services.gmail import GmailKBSyncService
+
+ kb_service = GmailKBSyncService(db_session)
+ kb_result = await kb_service.sync_after_create(
+ message_id=sent.get("id", ""),
+ thread_id=sent.get("threadId", ""),
+ subject=final_subject,
+ sender="me",
+ date_str=datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
+ body_text=final_body,
+ connector_id=actual_connector_id,
+ search_space_id=search_space_id,
+ user_id=user_id,
+ )
+ if kb_result["status"] == "success":
+ kb_message_suffix = " Your knowledge base has also been updated."
+ else:
+ kb_message_suffix = " This email will be added to your knowledge base in the next scheduled sync."
+ except Exception as kb_err:
+ logger.warning(f"KB sync after send failed: {kb_err}")
+ kb_message_suffix = " This email will be added to your knowledge base in the next scheduled sync."
+
+ return {
+ "status": "success",
+ "message_id": sent.get("id"),
+ "thread_id": sent.get("threadId"),
+ "message": f"Successfully sent email to '{final_to}' with subject '{final_subject}'.{kb_message_suffix}",
+ }
+
+ except Exception as e:
+ from langgraph.errors import GraphInterrupt
+
+ if isinstance(e, GraphInterrupt):
+ raise
+
+ logger.error(f"Error sending Gmail email: {e}", exc_info=True)
+ return {
+ "status": "error",
+ "message": "Something went wrong while sending the email. Please try again.",
+ }
+
+ return send_gmail_email
diff --git a/surfsense_backend/app/agents/new_chat/tools/gmail/trash_email.py b/surfsense_backend/app/agents/new_chat/tools/gmail/trash_email.py
new file mode 100644
index 000000000..417839bb1
--- /dev/null
+++ b/surfsense_backend/app/agents/new_chat/tools/gmail/trash_email.py
@@ -0,0 +1,319 @@
+import asyncio
+import logging
+from datetime import datetime
+from typing import Any
+
+from langchain_core.tools import tool
+from langgraph.types import interrupt
+from sqlalchemy.ext.asyncio import AsyncSession
+
+from app.services.gmail import GmailToolMetadataService
+
+logger = logging.getLogger(__name__)
+
+
+def create_trash_gmail_email_tool(
+ db_session: AsyncSession | None = None,
+ search_space_id: int | None = None,
+ user_id: str | None = None,
+):
+ @tool
+ async def trash_gmail_email(
+ email_subject_or_id: str,
+ delete_from_kb: bool = False,
+ ) -> dict[str, Any]:
+ """Move an email to trash in Gmail.
+
+ Use when the user asks to delete, remove, or trash an email.
+
+ Args:
+ email_subject_or_id: The exact subject line or message ID of the
+ email to trash (as it appears in the inbox).
+ delete_from_kb: Whether to also remove the email from the knowledge base.
+ Default is False.
+ Set to True to remove from both Gmail and knowledge base.
+
+ Returns:
+ Dictionary with:
+ - status: "success", "rejected", "not_found", or "error"
+ - message_id: Gmail message ID (if success)
+ - deleted_from_kb: whether the document was removed from the knowledge base
+ - message: Result message
+
+ IMPORTANT:
+ - If status is "rejected", the user explicitly declined. Respond with a brief
+ acknowledgment and do NOT retry or suggest alternatives.
+ - If status is "not_found", relay the exact message to the user and ask them
+ to verify the email subject or check if it has been indexed.
+ - If status is "insufficient_permissions", the connector lacks the required OAuth scope.
+ Inform the user they need to re-authenticate and do NOT retry this tool.
+
+ Examples:
+ - "Delete the email about 'Meeting Cancelled'"
+ - "Trash the email from Bob about the project"
+ """
+ logger.info(
+ f"trash_gmail_email called: email_subject_or_id='{email_subject_or_id}', delete_from_kb={delete_from_kb}"
+ )
+
+ if db_session is None or search_space_id is None or user_id is None:
+ return {
+ "status": "error",
+ "message": "Gmail tool not properly configured. Please contact support.",
+ }
+
+ try:
+ metadata_service = GmailToolMetadataService(db_session)
+ context = await metadata_service.get_trash_context(
+ search_space_id, user_id, email_subject_or_id
+ )
+
+ if "error" in context:
+ error_msg = context["error"]
+ if "not found" in error_msg.lower():
+ logger.warning(f"Email not found: {error_msg}")
+ return {"status": "not_found", "message": error_msg}
+ logger.error(f"Failed to fetch trash context: {error_msg}")
+ return {"status": "error", "message": error_msg}
+
+ account = context.get("account", {})
+ if account.get("auth_expired"):
+ logger.warning(
+ "Gmail account %s has expired authentication",
+ account.get("id"),
+ )
+ return {
+ "status": "auth_error",
+ "message": "The Gmail account for this email needs re-authentication. Please re-authenticate in your connector settings.",
+ "connector_type": "gmail",
+ }
+
+ email = context["email"]
+ message_id = email["message_id"]
+ document_id = email.get("document_id")
+ connector_id_from_context = context["account"]["id"]
+
+ if not message_id:
+ return {
+ "status": "error",
+ "message": "Message ID is missing from the indexed document. Please re-index the email and try again.",
+ }
+
+ logger.info(
+ f"Requesting approval for trashing Gmail email: '{email_subject_or_id}' (message_id={message_id}, delete_from_kb={delete_from_kb})"
+ )
+ approval = interrupt(
+ {
+ "type": "gmail_email_trash",
+ "action": {
+ "tool": "trash_gmail_email",
+ "params": {
+ "message_id": message_id,
+ "connector_id": connector_id_from_context,
+ "delete_from_kb": delete_from_kb,
+ },
+ },
+ "context": context,
+ }
+ )
+
+ decisions_raw = (
+ approval.get("decisions", []) if isinstance(approval, dict) else []
+ )
+ decisions = (
+ decisions_raw if isinstance(decisions_raw, list) else [decisions_raw]
+ )
+ decisions = [d for d in decisions if isinstance(d, dict)]
+ if not decisions:
+ logger.warning("No approval decision received")
+ return {"status": "error", "message": "No approval decision received"}
+
+ decision = decisions[0]
+ decision_type = decision.get("type") or decision.get("decision_type")
+ logger.info(f"User decision: {decision_type}")
+
+ if decision_type == "reject":
+ return {
+ "status": "rejected",
+ "message": "User declined. The email was not trashed. Do not ask again or suggest alternatives.",
+ }
+
+ edited_action = decision.get("edited_action")
+ final_params: dict[str, Any] = {}
+ if isinstance(edited_action, dict):
+ edited_args = edited_action.get("args")
+ if isinstance(edited_args, dict):
+ final_params = edited_args
+ elif isinstance(decision.get("args"), dict):
+ final_params = decision["args"]
+
+ final_message_id = final_params.get("message_id", message_id)
+ final_connector_id = final_params.get(
+ "connector_id", connector_id_from_context
+ )
+ final_delete_from_kb = final_params.get("delete_from_kb", delete_from_kb)
+
+ if not final_connector_id:
+ return {
+ "status": "error",
+ "message": "No connector found for this email.",
+ }
+
+ from sqlalchemy.future import select
+
+ from app.db import SearchSourceConnector, SearchSourceConnectorType
+
+ _GMAIL_TYPES = [
+ SearchSourceConnectorType.GOOGLE_GMAIL_CONNECTOR,
+ SearchSourceConnectorType.COMPOSIO_GMAIL_CONNECTOR,
+ ]
+
+ result = await db_session.execute(
+ select(SearchSourceConnector).filter(
+ SearchSourceConnector.id == final_connector_id,
+ SearchSourceConnector.search_space_id == search_space_id,
+ SearchSourceConnector.user_id == user_id,
+ SearchSourceConnector.connector_type.in_(_GMAIL_TYPES),
+ )
+ )
+ connector = result.scalars().first()
+ if not connector:
+ return {
+ "status": "error",
+ "message": "Selected Gmail connector is invalid or has been disconnected.",
+ }
+
+ logger.info(
+ f"Trashing Gmail email: message_id='{final_message_id}', connector={final_connector_id}"
+ )
+
+ if connector.connector_type == SearchSourceConnectorType.COMPOSIO_GMAIL_CONNECTOR:
+ from app.utils.google_credentials import build_composio_credentials
+
+ cca_id = connector.config.get("composio_connected_account_id")
+ if cca_id:
+ creds = build_composio_credentials(cca_id)
+ else:
+ return {
+ "status": "error",
+ "message": "Composio connected account ID not found for this Gmail connector.",
+ }
+ else:
+ from google.oauth2.credentials import Credentials
+
+ from app.config import config
+ from app.utils.oauth_security import TokenEncryption
+
+ config_data = dict(connector.config)
+ token_encrypted = config_data.get("_token_encrypted", False)
+ if token_encrypted and config.SECRET_KEY:
+ token_encryption = TokenEncryption(config.SECRET_KEY)
+ if config_data.get("token"):
+ config_data["token"] = token_encryption.decrypt_token(
+ config_data["token"]
+ )
+ if config_data.get("refresh_token"):
+ config_data["refresh_token"] = token_encryption.decrypt_token(
+ config_data["refresh_token"]
+ )
+ if config_data.get("client_secret"):
+ config_data["client_secret"] = token_encryption.decrypt_token(
+ config_data["client_secret"]
+ )
+
+ exp = config_data.get("expiry", "")
+ if exp:
+ exp = exp.replace("Z", "")
+
+ creds = Credentials(
+ token=config_data.get("token"),
+ refresh_token=config_data.get("refresh_token"),
+ token_uri=config_data.get("token_uri"),
+ client_id=config_data.get("client_id"),
+ client_secret=config_data.get("client_secret"),
+ scopes=config_data.get("scopes", []),
+ expiry=datetime.fromisoformat(exp) if exp else None,
+ )
+
+ from googleapiclient.discovery import build
+
+ gmail_service = build("gmail", "v1", credentials=creds)
+
+ try:
+ await asyncio.get_event_loop().run_in_executor(
+ None,
+ lambda: gmail_service.users()
+ .messages()
+ .trash(userId="me", id=final_message_id)
+ .execute(),
+ )
+ except Exception as api_err:
+ from googleapiclient.errors import HttpError
+
+ if isinstance(api_err, HttpError) and api_err.resp.status == 403:
+ logger.warning(
+ f"Insufficient permissions for connector {connector.id}: {api_err}"
+ )
+ return {
+ "status": "insufficient_permissions",
+ "connector_id": connector.id,
+ "message": "This Gmail account needs additional permissions. Please re-authenticate.",
+ }
+ raise
+
+ logger.info(
+ f"Gmail email trashed: message_id={final_message_id}"
+ )
+
+ trash_result: dict[str, Any] = {
+ "status": "success",
+ "message_id": final_message_id,
+ "message": f"Successfully moved email '{email.get('subject', email_subject_or_id)}' to trash.",
+ }
+
+ deleted_from_kb = False
+ if final_delete_from_kb and document_id:
+ try:
+ from app.db import Document
+
+ doc_result = await db_session.execute(
+ select(Document).filter(Document.id == document_id)
+ )
+ document = doc_result.scalars().first()
+ if document:
+ await db_session.delete(document)
+ await db_session.commit()
+ deleted_from_kb = True
+ logger.info(
+ f"Deleted document {document_id} from knowledge base"
+ )
+ else:
+ logger.warning(f"Document {document_id} not found in KB")
+ except Exception as e:
+ logger.error(f"Failed to delete document from KB: {e}")
+ await db_session.rollback()
+ trash_result["warning"] = (
+ f"Email trashed, but failed to remove from knowledge base: {e!s}"
+ )
+
+ trash_result["deleted_from_kb"] = deleted_from_kb
+ if deleted_from_kb:
+ trash_result["message"] = (
+ f"{trash_result.get('message', '')} (also removed from knowledge base)"
+ )
+
+ return trash_result
+
+ except Exception as e:
+ from langgraph.errors import GraphInterrupt
+
+ if isinstance(e, GraphInterrupt):
+ raise
+
+ logger.error(f"Error trashing Gmail email: {e}", exc_info=True)
+ return {
+ "status": "error",
+ "message": "Something went wrong while trashing the email. Please try again.",
+ }
+
+ return trash_gmail_email
diff --git a/surfsense_backend/app/agents/new_chat/tools/google_calendar/__init__.py b/surfsense_backend/app/agents/new_chat/tools/google_calendar/__init__.py
new file mode 100644
index 000000000..4fea648f0
--- /dev/null
+++ b/surfsense_backend/app/agents/new_chat/tools/google_calendar/__init__.py
@@ -0,0 +1,15 @@
+from app.agents.new_chat.tools.google_calendar.create_event import (
+ create_create_calendar_event_tool,
+)
+from app.agents.new_chat.tools.google_calendar.update_event import (
+ create_update_calendar_event_tool,
+)
+from app.agents.new_chat.tools.google_calendar.delete_event import (
+ create_delete_calendar_event_tool,
+)
+
+__all__ = [
+ "create_create_calendar_event_tool",
+ "create_update_calendar_event_tool",
+ "create_delete_calendar_event_tool",
+]
diff --git a/surfsense_backend/app/agents/new_chat/tools/google_calendar/create_event.py b/surfsense_backend/app/agents/new_chat/tools/google_calendar/create_event.py
new file mode 100644
index 000000000..cf96870df
--- /dev/null
+++ b/surfsense_backend/app/agents/new_chat/tools/google_calendar/create_event.py
@@ -0,0 +1,310 @@
+import asyncio
+import logging
+from datetime import datetime
+from typing import Any
+
+from google.oauth2.credentials import Credentials
+from googleapiclient.discovery import build
+from langchain_core.tools import tool
+from langgraph.types import interrupt
+from sqlalchemy.ext.asyncio import AsyncSession
+
+from app.services.google_calendar import GoogleCalendarToolMetadataService
+
+logger = logging.getLogger(__name__)
+
+
+def create_create_calendar_event_tool(
+ db_session: AsyncSession | None = None,
+ search_space_id: int | None = None,
+ user_id: str | None = None,
+):
+ @tool
+ async def create_calendar_event(
+ summary: str,
+ start_datetime: str,
+ end_datetime: str,
+ description: str | None = None,
+ location: str | None = None,
+ attendees: list[str] | None = None,
+ ) -> dict[str, Any]:
+ """Create a new event on Google Calendar.
+
+ Use when the user asks to schedule, create, or add a calendar event.
+ Ask for event details if not provided.
+
+ Args:
+ summary: The event title.
+ start_datetime: Start time in ISO 8601 format (e.g. "2026-03-20T10:00:00").
+ end_datetime: End time in ISO 8601 format (e.g. "2026-03-20T11:00:00").
+ description: Optional event description.
+ location: Optional event location.
+ attendees: Optional list of attendee email addresses.
+
+ Returns:
+ Dictionary with:
+ - status: "success", "rejected", "auth_error", or "error"
+ - event_id: Google Calendar event ID (if success)
+ - html_link: URL to open the event (if success)
+ - message: Result message
+
+ IMPORTANT:
+ - If status is "rejected", the user explicitly declined the action.
+ Respond with a brief acknowledgment and do NOT retry or suggest alternatives.
+
+ Examples:
+ - "Schedule a meeting with John tomorrow at 10am"
+ - "Create a calendar event for the team standup"
+ """
+ logger.info(
+ f"create_calendar_event called: summary='{summary}', start='{start_datetime}', end='{end_datetime}'"
+ )
+
+ if db_session is None or search_space_id is None or user_id is None:
+ return {
+ "status": "error",
+ "message": "Google Calendar tool not properly configured. Please contact support.",
+ }
+
+ try:
+ metadata_service = GoogleCalendarToolMetadataService(db_session)
+ context = await metadata_service.get_creation_context(
+ search_space_id, user_id
+ )
+
+ if "error" in context:
+ logger.error(f"Failed to fetch creation context: {context['error']}")
+ return {"status": "error", "message": context["error"]}
+
+ accounts = context.get("accounts", [])
+ if accounts and all(a.get("auth_expired") for a in accounts):
+ logger.warning("All Google Calendar accounts have expired authentication")
+ return {
+ "status": "auth_error",
+ "message": "All connected Google Calendar accounts need re-authentication. Please re-authenticate in your connector settings.",
+ "connector_type": "google_calendar",
+ }
+
+ logger.info(
+ f"Requesting approval for creating calendar event: summary='{summary}'"
+ )
+ approval = interrupt(
+ {
+ "type": "google_calendar_event_creation",
+ "action": {
+ "tool": "create_calendar_event",
+ "params": {
+ "summary": summary,
+ "start_datetime": start_datetime,
+ "end_datetime": end_datetime,
+ "description": description,
+ "location": location,
+ "attendees": attendees,
+ "timezone": context.get("timezone"),
+ "connector_id": None,
+ },
+ },
+ "context": context,
+ }
+ )
+
+ decisions_raw = (
+ approval.get("decisions", []) if isinstance(approval, dict) else []
+ )
+ decisions = (
+ decisions_raw if isinstance(decisions_raw, list) else [decisions_raw]
+ )
+ decisions = [d for d in decisions if isinstance(d, dict)]
+ if not decisions:
+ logger.warning("No approval decision received")
+ return {"status": "error", "message": "No approval decision received"}
+
+ decision = decisions[0]
+ decision_type = decision.get("type") or decision.get("decision_type")
+ logger.info(f"User decision: {decision_type}")
+
+ if decision_type == "reject":
+ return {
+ "status": "rejected",
+ "message": "User declined. The event was not created. Do not ask again or suggest alternatives.",
+ }
+
+ final_params: dict[str, Any] = {}
+ edited_action = decision.get("edited_action")
+ if isinstance(edited_action, dict):
+ edited_args = edited_action.get("args")
+ if isinstance(edited_args, dict):
+ final_params = edited_args
+ elif isinstance(decision.get("args"), dict):
+ final_params = decision["args"]
+
+ final_summary = final_params.get("summary", summary)
+ final_start_datetime = final_params.get("start_datetime", start_datetime)
+ final_end_datetime = final_params.get("end_datetime", end_datetime)
+ final_description = final_params.get("description", description)
+ final_location = final_params.get("location", location)
+ final_attendees = final_params.get("attendees", attendees)
+ final_connector_id = final_params.get("connector_id")
+
+ if not final_summary or not final_summary.strip():
+ return {"status": "error", "message": "Event summary cannot be empty."}
+
+ from sqlalchemy.future import select
+
+ from app.db import SearchSourceConnector, SearchSourceConnectorType
+
+ _CALENDAR_TYPES = [
+ SearchSourceConnectorType.GOOGLE_CALENDAR_CONNECTOR,
+ SearchSourceConnectorType.COMPOSIO_GOOGLE_CALENDAR_CONNECTOR,
+ ]
+
+ if final_connector_id is not None:
+ result = await db_session.execute(
+ select(SearchSourceConnector).filter(
+ SearchSourceConnector.id == final_connector_id,
+ SearchSourceConnector.search_space_id == search_space_id,
+ SearchSourceConnector.user_id == user_id,
+ SearchSourceConnector.connector_type.in_(_CALENDAR_TYPES),
+ )
+ )
+ connector = result.scalars().first()
+ if not connector:
+ return {
+ "status": "error",
+ "message": "Selected Google Calendar connector is invalid or has been disconnected.",
+ }
+ actual_connector_id = connector.id
+ else:
+ result = await db_session.execute(
+ select(SearchSourceConnector).filter(
+ SearchSourceConnector.search_space_id == search_space_id,
+ SearchSourceConnector.user_id == user_id,
+ SearchSourceConnector.connector_type.in_(_CALENDAR_TYPES),
+ )
+ )
+ connector = result.scalars().first()
+ if not connector:
+ return {
+ "status": "error",
+ "message": "No Google Calendar connector found. Please connect Google Calendar in your workspace settings.",
+ }
+ actual_connector_id = connector.id
+
+ logger.info(
+ f"Creating calendar event: summary='{final_summary}', connector={actual_connector_id}"
+ )
+
+ if connector.connector_type == SearchSourceConnectorType.COMPOSIO_GOOGLE_CALENDAR_CONNECTOR:
+ from app.utils.google_credentials import build_composio_credentials
+
+ cca_id = connector.config.get("composio_connected_account_id")
+ if cca_id:
+ creds = build_composio_credentials(cca_id)
+ else:
+ return {
+ "status": "error",
+ "message": "Composio connected account ID not found for this connector.",
+ }
+ else:
+ config_data = dict(connector.config)
+
+ from app.config import config as app_config
+ from app.utils.oauth_security import TokenEncryption
+
+ token_encrypted = config_data.get("_token_encrypted", False)
+ if token_encrypted and app_config.SECRET_KEY:
+ token_encryption = TokenEncryption(app_config.SECRET_KEY)
+ for key in ("token", "refresh_token", "client_secret"):
+ if config_data.get(key):
+ config_data[key] = token_encryption.decrypt_token(config_data[key])
+
+ exp = config_data.get("expiry", "")
+ if exp:
+ exp = exp.replace("Z", "")
+
+ creds = Credentials(
+ token=config_data.get("token"),
+ refresh_token=config_data.get("refresh_token"),
+ token_uri=config_data.get("token_uri"),
+ client_id=config_data.get("client_id"),
+ client_secret=config_data.get("client_secret"),
+ scopes=config_data.get("scopes", []),
+ expiry=datetime.fromisoformat(exp) if exp else None,
+ )
+
+ service = await asyncio.get_event_loop().run_in_executor(
+ None, lambda: build("calendar", "v3", credentials=creds)
+ )
+
+ tz = context.get("timezone", "UTC")
+ event_body: dict[str, Any] = {
+ "summary": final_summary,
+ "start": {"dateTime": final_start_datetime, "timeZone": tz},
+ "end": {"dateTime": final_end_datetime, "timeZone": tz},
+ }
+ if final_description:
+ event_body["description"] = final_description
+ if final_location:
+ event_body["location"] = final_location
+ if final_attendees:
+ event_body["attendees"] = [
+ {"email": e.strip()} for e in final_attendees if e.strip()
+ ]
+
+ created = await asyncio.get_event_loop().run_in_executor(
+ None,
+ lambda: service.events()
+ .insert(calendarId="primary", body=event_body)
+ .execute(),
+ )
+
+ logger.info(
+ f"Calendar event created: id={created.get('id')}, summary={created.get('summary')}"
+ )
+
+ kb_message_suffix = ""
+ try:
+ from app.services.google_calendar import GoogleCalendarKBSyncService
+
+ kb_service = GoogleCalendarKBSyncService(db_session)
+ kb_result = await kb_service.sync_after_create(
+ event_id=created.get("id"),
+ event_summary=final_summary,
+ calendar_id="primary",
+ start_time=final_start_datetime,
+ end_time=final_end_datetime,
+ location=final_location,
+ html_link=created.get("htmlLink"),
+ description=final_description,
+ connector_id=actual_connector_id,
+ search_space_id=search_space_id,
+ user_id=user_id,
+ )
+ if kb_result["status"] == "success":
+ kb_message_suffix = " Your knowledge base has also been updated."
+ else:
+ kb_message_suffix = " This event will be added to your knowledge base in the next scheduled sync."
+ except Exception as kb_err:
+ logger.warning(f"KB sync after create failed: {kb_err}")
+ kb_message_suffix = " This event will be added to your knowledge base in the next scheduled sync."
+
+ return {
+ "status": "success",
+ "event_id": created.get("id"),
+ "html_link": created.get("htmlLink"),
+ "message": f"Successfully created '{final_summary}' on Google Calendar.{kb_message_suffix}",
+ }
+
+ except Exception as e:
+ from langgraph.errors import GraphInterrupt
+
+ if isinstance(e, GraphInterrupt):
+ raise
+
+ logger.error(f"Error creating calendar event: {e}", exc_info=True)
+ return {
+ "status": "error",
+ "message": "Something went wrong while creating the event. Please try again.",
+ }
+
+ return create_calendar_event
diff --git a/surfsense_backend/app/agents/new_chat/tools/google_calendar/delete_event.py b/surfsense_backend/app/agents/new_chat/tools/google_calendar/delete_event.py
new file mode 100644
index 000000000..3a4570737
--- /dev/null
+++ b/surfsense_backend/app/agents/new_chat/tools/google_calendar/delete_event.py
@@ -0,0 +1,295 @@
+import asyncio
+import logging
+from datetime import datetime
+from typing import Any
+
+from google.oauth2.credentials import Credentials
+from googleapiclient.discovery import build
+from langchain_core.tools import tool
+from langgraph.types import interrupt
+from sqlalchemy.ext.asyncio import AsyncSession
+
+from app.services.google_calendar import GoogleCalendarToolMetadataService
+
+logger = logging.getLogger(__name__)
+
+
+def create_delete_calendar_event_tool(
+ db_session: AsyncSession | None = None,
+ search_space_id: int | None = None,
+ user_id: str | None = None,
+):
+ @tool
+ async def delete_calendar_event(
+ event_title_or_id: str,
+ delete_from_kb: bool = False,
+ ) -> dict[str, Any]:
+ """Delete a Google Calendar event.
+
+ Use when the user asks to delete, remove, or cancel a calendar event.
+
+ Args:
+ event_title_or_id: The exact title or event ID of the event to delete.
+ delete_from_kb: Whether to also remove the event from the knowledge base.
+ Default is False.
+ Set to True to remove from both Google Calendar and knowledge base.
+
+ Returns:
+ Dictionary with:
+ - status: "success", "rejected", "not_found", "auth_error", or "error"
+ - event_id: Google Calendar event ID (if success)
+ - deleted_from_kb: whether the document was removed from the knowledge base
+ - message: Result message
+
+ IMPORTANT:
+ - If status is "rejected", the user explicitly declined. Respond with a brief
+ acknowledgment and do NOT retry or suggest alternatives.
+ - If status is "not_found", relay the exact message to the user and ask them
+ to verify the event name or check if it has been indexed.
+
+ Examples:
+ - "Delete the team standup event"
+ - "Cancel my dentist appointment on Friday"
+ """
+ logger.info(
+ f"delete_calendar_event called: event_ref='{event_title_or_id}', delete_from_kb={delete_from_kb}"
+ )
+
+ if db_session is None or search_space_id is None or user_id is None:
+ return {
+ "status": "error",
+ "message": "Google Calendar tool not properly configured. Please contact support.",
+ }
+
+ try:
+ metadata_service = GoogleCalendarToolMetadataService(db_session)
+ context = await metadata_service.get_deletion_context(
+ search_space_id, user_id, event_title_or_id
+ )
+
+ if "error" in context:
+ error_msg = context["error"]
+ if "not found" in error_msg.lower():
+ logger.warning(f"Event not found: {error_msg}")
+ return {"status": "not_found", "message": error_msg}
+ logger.error(f"Failed to fetch deletion context: {error_msg}")
+ return {"status": "error", "message": error_msg}
+
+ account = context.get("account", {})
+ if account.get("auth_expired"):
+ logger.warning(
+ "Google Calendar account %s has expired authentication",
+ account.get("id"),
+ )
+ return {
+ "status": "auth_error",
+ "message": "The Google Calendar account for this event needs re-authentication. Please re-authenticate in your connector settings.",
+ "connector_type": "google_calendar",
+ }
+
+ event = context["event"]
+ event_id = event["event_id"]
+ document_id = event.get("document_id")
+ connector_id_from_context = context["account"]["id"]
+
+ if not event_id:
+ return {
+ "status": "error",
+ "message": "Event ID is missing from the indexed document. Please re-index the event and try again.",
+ }
+
+ logger.info(
+ f"Requesting approval for deleting calendar event: '{event_title_or_id}' (event_id={event_id}, delete_from_kb={delete_from_kb})"
+ )
+ approval = interrupt(
+ {
+ "type": "google_calendar_event_deletion",
+ "action": {
+ "tool": "delete_calendar_event",
+ "params": {
+ "event_id": event_id,
+ "connector_id": connector_id_from_context,
+ "delete_from_kb": delete_from_kb,
+ },
+ },
+ "context": context,
+ }
+ )
+
+ decisions_raw = (
+ approval.get("decisions", []) if isinstance(approval, dict) else []
+ )
+ decisions = (
+ decisions_raw if isinstance(decisions_raw, list) else [decisions_raw]
+ )
+ decisions = [d for d in decisions if isinstance(d, dict)]
+ if not decisions:
+ logger.warning("No approval decision received")
+ return {"status": "error", "message": "No approval decision received"}
+
+ decision = decisions[0]
+ decision_type = decision.get("type") or decision.get("decision_type")
+ logger.info(f"User decision: {decision_type}")
+
+ if decision_type == "reject":
+ return {
+ "status": "rejected",
+ "message": "User declined. The event was not deleted. Do not ask again or suggest alternatives.",
+ }
+
+ edited_action = decision.get("edited_action")
+ final_params: dict[str, Any] = {}
+ if isinstance(edited_action, dict):
+ edited_args = edited_action.get("args")
+ if isinstance(edited_args, dict):
+ final_params = edited_args
+ elif isinstance(decision.get("args"), dict):
+ final_params = decision["args"]
+
+ final_event_id = final_params.get("event_id", event_id)
+ final_connector_id = final_params.get(
+ "connector_id", connector_id_from_context
+ )
+ final_delete_from_kb = final_params.get("delete_from_kb", delete_from_kb)
+
+ if not final_connector_id:
+ return {
+ "status": "error",
+ "message": "No connector found for this event.",
+ }
+
+ from sqlalchemy.future import select
+
+ from app.db import SearchSourceConnector, SearchSourceConnectorType
+
+ _CALENDAR_TYPES = [
+ SearchSourceConnectorType.GOOGLE_CALENDAR_CONNECTOR,
+ SearchSourceConnectorType.COMPOSIO_GOOGLE_CALENDAR_CONNECTOR,
+ ]
+
+ result = await db_session.execute(
+ select(SearchSourceConnector).filter(
+ SearchSourceConnector.id == final_connector_id,
+ SearchSourceConnector.search_space_id == search_space_id,
+ SearchSourceConnector.user_id == user_id,
+ SearchSourceConnector.connector_type.in_(_CALENDAR_TYPES),
+ )
+ )
+ connector = result.scalars().first()
+ if not connector:
+ return {
+ "status": "error",
+ "message": "Selected Google Calendar connector is invalid or has been disconnected.",
+ }
+
+ actual_connector_id = connector.id
+
+ logger.info(
+ f"Deleting calendar event: event_id='{final_event_id}', connector={actual_connector_id}"
+ )
+
+ if connector.connector_type == SearchSourceConnectorType.COMPOSIO_GOOGLE_CALENDAR_CONNECTOR:
+ from app.utils.google_credentials import build_composio_credentials
+
+ cca_id = connector.config.get("composio_connected_account_id")
+ if cca_id:
+ creds = build_composio_credentials(cca_id)
+ else:
+ return {
+ "status": "error",
+ "message": "Composio connected account ID not found for this connector.",
+ }
+ else:
+ config_data = dict(connector.config)
+
+ from app.config import config as app_config
+ from app.utils.oauth_security import TokenEncryption
+
+ token_encrypted = config_data.get("_token_encrypted", False)
+ if token_encrypted and app_config.SECRET_KEY:
+ token_encryption = TokenEncryption(app_config.SECRET_KEY)
+ for key in ("token", "refresh_token", "client_secret"):
+ if config_data.get(key):
+ config_data[key] = token_encryption.decrypt_token(config_data[key])
+
+ exp = config_data.get("expiry", "")
+ if exp:
+ exp = exp.replace("Z", "")
+
+ creds = Credentials(
+ token=config_data.get("token"),
+ refresh_token=config_data.get("refresh_token"),
+ token_uri=config_data.get("token_uri"),
+ client_id=config_data.get("client_id"),
+ client_secret=config_data.get("client_secret"),
+ scopes=config_data.get("scopes", []),
+ expiry=datetime.fromisoformat(exp) if exp else None,
+ )
+
+ service = await asyncio.get_event_loop().run_in_executor(
+ None, lambda: build("calendar", "v3", credentials=creds)
+ )
+
+ await asyncio.get_event_loop().run_in_executor(
+ None,
+ lambda: service.events()
+ .delete(calendarId="primary", eventId=final_event_id)
+ .execute(),
+ )
+
+ logger.info(
+ f"Calendar event deleted: event_id={final_event_id}"
+ )
+
+ delete_result: dict[str, Any] = {
+ "status": "success",
+ "event_id": final_event_id,
+ "message": f"Successfully deleted the calendar event '{event.get('summary', event_title_or_id)}'.",
+ }
+
+ deleted_from_kb = False
+ if final_delete_from_kb and document_id:
+ try:
+ from app.db import Document
+
+ doc_result = await db_session.execute(
+ select(Document).filter(Document.id == document_id)
+ )
+ document = doc_result.scalars().first()
+ if document:
+ await db_session.delete(document)
+ await db_session.commit()
+ deleted_from_kb = True
+ logger.info(
+ f"Deleted document {document_id} from knowledge base"
+ )
+ else:
+ logger.warning(f"Document {document_id} not found in KB")
+ except Exception as e:
+ logger.error(f"Failed to delete document from KB: {e}")
+ await db_session.rollback()
+ delete_result["warning"] = (
+ f"Event deleted, but failed to remove from knowledge base: {e!s}"
+ )
+
+ delete_result["deleted_from_kb"] = deleted_from_kb
+ if deleted_from_kb:
+ delete_result["message"] = (
+ f"{delete_result.get('message', '')} (also removed from knowledge base)"
+ )
+
+ return delete_result
+
+ except Exception as e:
+ from langgraph.errors import GraphInterrupt
+
+ if isinstance(e, GraphInterrupt):
+ raise
+
+ logger.error(f"Error deleting calendar event: {e}", exc_info=True)
+ return {
+ "status": "error",
+ "message": "Something went wrong while deleting the event. Please try again.",
+ }
+
+ return delete_calendar_event
diff --git a/surfsense_backend/app/agents/new_chat/tools/google_calendar/update_event.py b/surfsense_backend/app/agents/new_chat/tools/google_calendar/update_event.py
new file mode 100644
index 000000000..98f20a3c7
--- /dev/null
+++ b/surfsense_backend/app/agents/new_chat/tools/google_calendar/update_event.py
@@ -0,0 +1,325 @@
+import asyncio
+import logging
+from datetime import datetime
+from typing import Any
+
+from google.oauth2.credentials import Credentials
+from googleapiclient.discovery import build
+from langchain_core.tools import tool
+from langgraph.types import interrupt
+from sqlalchemy.ext.asyncio import AsyncSession
+
+from app.services.google_calendar import GoogleCalendarToolMetadataService
+
+logger = logging.getLogger(__name__)
+
+
+def create_update_calendar_event_tool(
+ db_session: AsyncSession | None = None,
+ search_space_id: int | None = None,
+ user_id: str | None = None,
+):
+ @tool
+ async def update_calendar_event(
+ event_title_or_id: str,
+ new_summary: str | None = None,
+ new_start_datetime: str | None = None,
+ new_end_datetime: str | None = None,
+ new_description: str | None = None,
+ new_location: str | None = None,
+ new_attendees: list[str] | None = None,
+ ) -> dict[str, Any]:
+ """Update an existing Google Calendar event.
+
+ Use when the user asks to modify, reschedule, or change a calendar event.
+
+ Args:
+ event_title_or_id: The exact title or event ID of the event to update.
+ new_summary: New event title (if changing).
+ new_start_datetime: New start time in ISO 8601 format (if rescheduling).
+ new_end_datetime: New end time in ISO 8601 format (if rescheduling).
+ new_description: New event description (if changing).
+ new_location: New event location (if changing).
+ new_attendees: New list of attendee email addresses (if changing).
+
+ Returns:
+ Dictionary with:
+ - status: "success", "rejected", "not_found", "auth_error", or "error"
+ - event_id: Google Calendar event ID (if success)
+ - html_link: URL to open the event (if success)
+ - message: Result message
+
+ IMPORTANT:
+ - If status is "rejected", the user explicitly declined. Respond with a brief
+ acknowledgment and do NOT retry or suggest alternatives.
+ - If status is "not_found", relay the exact message to the user and ask them
+ to verify the event name or check if it has been indexed.
+
+ Examples:
+ - "Reschedule the team standup to 3pm"
+ - "Change the location of my dentist appointment"
+ """
+ logger.info(
+ f"update_calendar_event called: event_ref='{event_title_or_id}'"
+ )
+
+ if db_session is None or search_space_id is None or user_id is None:
+ return {
+ "status": "error",
+ "message": "Google Calendar tool not properly configured. Please contact support.",
+ }
+
+ try:
+ metadata_service = GoogleCalendarToolMetadataService(db_session)
+ context = await metadata_service.get_update_context(
+ search_space_id, user_id, event_title_or_id
+ )
+
+ if "error" in context:
+ error_msg = context["error"]
+ if "not found" in error_msg.lower():
+ logger.warning(f"Event not found: {error_msg}")
+ return {"status": "not_found", "message": error_msg}
+ logger.error(f"Failed to fetch update context: {error_msg}")
+ return {"status": "error", "message": error_msg}
+
+ if context.get("auth_expired"):
+ logger.warning(
+ "Google Calendar account has expired authentication"
+ )
+ return {
+ "status": "auth_error",
+ "message": "The Google Calendar account for this event needs re-authentication. Please re-authenticate in your connector settings.",
+ "connector_type": "google_calendar",
+ }
+
+ event = context["event"]
+ event_id = event["event_id"]
+ document_id = event.get("document_id")
+ connector_id_from_context = context["account"]["id"]
+
+ if not event_id:
+ return {
+ "status": "error",
+ "message": "Event ID is missing from the indexed document. Please re-index the event and try again.",
+ }
+
+ logger.info(
+ f"Requesting approval for updating calendar event: '{event_title_or_id}' (event_id={event_id})"
+ )
+ approval = interrupt(
+ {
+ "type": "google_calendar_event_update",
+ "action": {
+ "tool": "update_calendar_event",
+ "params": {
+ "event_id": event_id,
+ "document_id": document_id,
+ "connector_id": connector_id_from_context,
+ "new_summary": new_summary,
+ "new_start_datetime": new_start_datetime,
+ "new_end_datetime": new_end_datetime,
+ "new_description": new_description,
+ "new_location": new_location,
+ "new_attendees": new_attendees,
+ },
+ },
+ "context": context,
+ }
+ )
+
+ decisions_raw = (
+ approval.get("decisions", []) if isinstance(approval, dict) else []
+ )
+ decisions = (
+ decisions_raw if isinstance(decisions_raw, list) else [decisions_raw]
+ )
+ decisions = [d for d in decisions if isinstance(d, dict)]
+ if not decisions:
+ logger.warning("No approval decision received")
+ return {"status": "error", "message": "No approval decision received"}
+
+ decision = decisions[0]
+ decision_type = decision.get("type") or decision.get("decision_type")
+ logger.info(f"User decision: {decision_type}")
+
+ if decision_type == "reject":
+ return {
+ "status": "rejected",
+ "message": "User declined. The event was not updated. Do not ask again or suggest alternatives.",
+ }
+
+ edited_action = decision.get("edited_action")
+ final_params: dict[str, Any] = {}
+ if isinstance(edited_action, dict):
+ edited_args = edited_action.get("args")
+ if isinstance(edited_args, dict):
+ final_params = edited_args
+ elif isinstance(decision.get("args"), dict):
+ final_params = decision["args"]
+
+ final_event_id = final_params.get("event_id", event_id)
+ final_connector_id = final_params.get(
+ "connector_id", connector_id_from_context
+ )
+ final_new_summary = final_params.get("new_summary", new_summary)
+ final_new_start_datetime = final_params.get("new_start_datetime", new_start_datetime)
+ final_new_end_datetime = final_params.get("new_end_datetime", new_end_datetime)
+ final_new_description = final_params.get("new_description", new_description)
+ final_new_location = final_params.get("new_location", new_location)
+ final_new_attendees = final_params.get("new_attendees", new_attendees)
+
+ if not final_connector_id:
+ return {
+ "status": "error",
+ "message": "No connector found for this event.",
+ }
+
+ from sqlalchemy.future import select
+
+ from app.db import SearchSourceConnector, SearchSourceConnectorType
+
+ _CALENDAR_TYPES = [
+ SearchSourceConnectorType.GOOGLE_CALENDAR_CONNECTOR,
+ SearchSourceConnectorType.COMPOSIO_GOOGLE_CALENDAR_CONNECTOR,
+ ]
+
+ result = await db_session.execute(
+ select(SearchSourceConnector).filter(
+ SearchSourceConnector.id == final_connector_id,
+ SearchSourceConnector.search_space_id == search_space_id,
+ SearchSourceConnector.user_id == user_id,
+ SearchSourceConnector.connector_type.in_(_CALENDAR_TYPES),
+ )
+ )
+ connector = result.scalars().first()
+ if not connector:
+ return {
+ "status": "error",
+ "message": "Selected Google Calendar connector is invalid or has been disconnected.",
+ }
+
+ actual_connector_id = connector.id
+
+ logger.info(
+ f"Updating calendar event: event_id='{final_event_id}', connector={actual_connector_id}"
+ )
+
+ if connector.connector_type == SearchSourceConnectorType.COMPOSIO_GOOGLE_CALENDAR_CONNECTOR:
+ from app.utils.google_credentials import build_composio_credentials
+
+ cca_id = connector.config.get("composio_connected_account_id")
+ if cca_id:
+ creds = build_composio_credentials(cca_id)
+ else:
+ return {
+ "status": "error",
+ "message": "Composio connected account ID not found for this connector.",
+ }
+ else:
+ config_data = dict(connector.config)
+
+ from app.config import config as app_config
+ from app.utils.oauth_security import TokenEncryption
+
+ token_encrypted = config_data.get("_token_encrypted", False)
+ if token_encrypted and app_config.SECRET_KEY:
+ token_encryption = TokenEncryption(app_config.SECRET_KEY)
+ for key in ("token", "refresh_token", "client_secret"):
+ if config_data.get(key):
+ config_data[key] = token_encryption.decrypt_token(config_data[key])
+
+ exp = config_data.get("expiry", "")
+ if exp:
+ exp = exp.replace("Z", "")
+
+ creds = Credentials(
+ token=config_data.get("token"),
+ refresh_token=config_data.get("refresh_token"),
+ token_uri=config_data.get("token_uri"),
+ client_id=config_data.get("client_id"),
+ client_secret=config_data.get("client_secret"),
+ scopes=config_data.get("scopes", []),
+ expiry=datetime.fromisoformat(exp) if exp else None,
+ )
+
+ service = await asyncio.get_event_loop().run_in_executor(
+ None, lambda: build("calendar", "v3", credentials=creds)
+ )
+
+ update_body: dict[str, Any] = {}
+ if final_new_summary is not None:
+ update_body["summary"] = final_new_summary
+ if final_new_start_datetime is not None:
+ tz = context.get("timezone", "UTC") if isinstance(context, dict) else "UTC"
+ update_body["start"] = {"dateTime": final_new_start_datetime, "timeZone": tz}
+ if final_new_end_datetime is not None:
+ tz = context.get("timezone", "UTC") if isinstance(context, dict) else "UTC"
+ update_body["end"] = {"dateTime": final_new_end_datetime, "timeZone": tz}
+ if final_new_description is not None:
+ update_body["description"] = final_new_description
+ if final_new_location is not None:
+ update_body["location"] = final_new_location
+ if final_new_attendees is not None:
+ update_body["attendees"] = [
+ {"email": e.strip()} for e in final_new_attendees if e.strip()
+ ]
+
+ if not update_body:
+ return {
+ "status": "error",
+ "message": "No changes specified. Please provide at least one field to update.",
+ }
+
+ updated = await asyncio.get_event_loop().run_in_executor(
+ None,
+ lambda: service.events()
+ .patch(calendarId="primary", eventId=final_event_id, body=update_body)
+ .execute(),
+ )
+
+ logger.info(
+ f"Calendar event updated: event_id={final_event_id}"
+ )
+
+ kb_message_suffix = ""
+ if document_id is not None:
+ try:
+ from app.services.google_calendar import GoogleCalendarKBSyncService
+
+ kb_service = GoogleCalendarKBSyncService(db_session)
+ kb_result = await kb_service.sync_after_update(
+ document_id=document_id,
+ event_id=final_event_id,
+ connector_id=actual_connector_id,
+ search_space_id=search_space_id,
+ user_id=user_id,
+ )
+ if kb_result["status"] == "success":
+ kb_message_suffix = " Your knowledge base has also been updated."
+ else:
+ kb_message_suffix = " The knowledge base will be updated in the next scheduled sync."
+ except Exception as kb_err:
+ logger.warning(f"KB sync after update failed: {kb_err}")
+ kb_message_suffix = " The knowledge base will be updated in the next scheduled sync."
+
+ return {
+ "status": "success",
+ "event_id": final_event_id,
+ "html_link": updated.get("htmlLink"),
+ "message": f"Successfully updated the calendar event.{kb_message_suffix}",
+ }
+
+ except Exception as e:
+ from langgraph.errors import GraphInterrupt
+
+ if isinstance(e, GraphInterrupt):
+ raise
+
+ logger.error(f"Error updating calendar event: {e}", exc_info=True)
+ return {
+ "status": "error",
+ "message": "Something went wrong while updating the event. Please try again.",
+ }
+
+ return update_calendar_event
diff --git a/surfsense_backend/app/agents/new_chat/tools/registry.py b/surfsense_backend/app/agents/new_chat/tools/registry.py
index c2592207a..6fa3a1586 100644
--- a/surfsense_backend/app/agents/new_chat/tools/registry.py
+++ b/surfsense_backend/app/agents/new_chat/tools/registry.py
@@ -47,6 +47,16 @@ from app.db import ChatVisibility
from .display_image import create_display_image_tool
from .generate_image import create_generate_image_tool
+from .gmail import (
+ create_create_gmail_draft_tool,
+ create_send_gmail_email_tool,
+ create_trash_gmail_email_tool,
+)
+from .google_calendar import (
+ create_create_calendar_event_tool,
+ create_delete_calendar_event_tool,
+ create_update_calendar_event_tool,
+)
from .google_drive import (
create_create_google_drive_file_tool,
create_delete_google_drive_file_tool,
@@ -336,6 +346,74 @@ BUILTIN_TOOLS: list[ToolDefinition] = [
),
requires=["db_session", "search_space_id", "user_id"],
),
+ # =========================================================================
+ # GOOGLE CALENDAR TOOLS - create, update, delete events
+ # Auto-disabled when no Google Calendar connector is configured
+ # =========================================================================
+ ToolDefinition(
+ name="create_calendar_event",
+ description="Create a new event on Google Calendar",
+ factory=lambda deps: create_create_calendar_event_tool(
+ db_session=deps["db_session"],
+ search_space_id=deps["search_space_id"],
+ user_id=deps["user_id"],
+ ),
+ requires=["db_session", "search_space_id", "user_id"],
+ ),
+ ToolDefinition(
+ name="update_calendar_event",
+ description="Update an existing indexed Google Calendar event",
+ factory=lambda deps: create_update_calendar_event_tool(
+ db_session=deps["db_session"],
+ search_space_id=deps["search_space_id"],
+ user_id=deps["user_id"],
+ ),
+ requires=["db_session", "search_space_id", "user_id"],
+ ),
+ ToolDefinition(
+ name="delete_calendar_event",
+ description="Delete an existing indexed Google Calendar event",
+ factory=lambda deps: create_delete_calendar_event_tool(
+ db_session=deps["db_session"],
+ search_space_id=deps["search_space_id"],
+ user_id=deps["user_id"],
+ ),
+ requires=["db_session", "search_space_id", "user_id"],
+ ),
+ # =========================================================================
+ # GMAIL TOOLS - create drafts, send emails, trash emails
+ # Auto-disabled when no Gmail connector is configured
+ # =========================================================================
+ ToolDefinition(
+ name="create_gmail_draft",
+ description="Create a draft email in Gmail",
+ factory=lambda deps: create_create_gmail_draft_tool(
+ db_session=deps["db_session"],
+ search_space_id=deps["search_space_id"],
+ user_id=deps["user_id"],
+ ),
+ requires=["db_session", "search_space_id", "user_id"],
+ ),
+ ToolDefinition(
+ name="send_gmail_email",
+ description="Send an email via Gmail",
+ factory=lambda deps: create_send_gmail_email_tool(
+ db_session=deps["db_session"],
+ search_space_id=deps["search_space_id"],
+ user_id=deps["user_id"],
+ ),
+ requires=["db_session", "search_space_id", "user_id"],
+ ),
+ ToolDefinition(
+ name="trash_gmail_email",
+ description="Move an indexed email to trash in Gmail",
+ factory=lambda deps: create_trash_gmail_email_tool(
+ db_session=deps["db_session"],
+ search_space_id=deps["search_space_id"],
+ user_id=deps["user_id"],
+ ),
+ requires=["db_session", "search_space_id", "user_id"],
+ ),
]
diff --git a/surfsense_backend/app/services/gmail/__init__.py b/surfsense_backend/app/services/gmail/__init__.py
new file mode 100644
index 000000000..4de115171
--- /dev/null
+++ b/surfsense_backend/app/services/gmail/__init__.py
@@ -0,0 +1,13 @@
+from app.services.gmail.kb_sync_service import GmailKBSyncService
+from app.services.gmail.tool_metadata_service import (
+ GmailAccount,
+ GmailMessage,
+ GmailToolMetadataService,
+)
+
+__all__ = [
+ "GmailAccount",
+ "GmailKBSyncService",
+ "GmailMessage",
+ "GmailToolMetadataService",
+]
diff --git a/surfsense_backend/app/services/gmail/kb_sync_service.py b/surfsense_backend/app/services/gmail/kb_sync_service.py
new file mode 100644
index 000000000..279a6d78e
--- /dev/null
+++ b/surfsense_backend/app/services/gmail/kb_sync_service.py
@@ -0,0 +1,163 @@
+import logging
+from datetime import datetime
+
+from sqlalchemy.ext.asyncio import AsyncSession
+
+from app.db import Document, DocumentType
+from app.services.llm_service import get_user_long_context_llm
+from app.utils.document_converters import (
+ create_document_chunks,
+ embed_text,
+ generate_content_hash,
+ generate_document_summary,
+ generate_unique_identifier_hash,
+)
+
+logger = logging.getLogger(__name__)
+
+
+class GmailKBSyncService:
+ def __init__(self, db_session: AsyncSession):
+ self.db_session = db_session
+
+ async def sync_after_create(
+ self,
+ message_id: str,
+ thread_id: str,
+ subject: str,
+ sender: str,
+ date_str: str,
+ body_text: str | None,
+ connector_id: int,
+ search_space_id: int,
+ user_id: str,
+ ) -> dict:
+ from app.tasks.connector_indexers.base import (
+ check_document_by_unique_identifier,
+ check_duplicate_document_by_hash,
+ get_current_timestamp,
+ safe_set_chunks,
+ )
+
+ try:
+ unique_hash = generate_unique_identifier_hash(
+ DocumentType.GOOGLE_GMAIL_CONNECTOR, message_id, search_space_id
+ )
+
+ existing = await check_document_by_unique_identifier(
+ self.db_session, unique_hash
+ )
+ if existing:
+ logger.info(
+ "Document for Gmail message %s already exists (doc_id=%s), skipping",
+ message_id,
+ existing.id,
+ )
+ return {"status": "success"}
+
+ indexable_content = (
+ f"Gmail Message: {subject}\n\nFrom: {sender}\nDate: {date_str}\n\n"
+ f"{body_text or ''}"
+ ).strip()
+ if not indexable_content:
+ indexable_content = f"Gmail message: {subject}"
+
+ content_hash = generate_content_hash(indexable_content, search_space_id)
+
+ with self.db_session.no_autoflush:
+ dup = await check_duplicate_document_by_hash(
+ self.db_session, content_hash
+ )
+ if dup:
+ logger.info(
+ "Content-hash collision for Gmail message %s -- identical content "
+ "exists in doc %s. Using unique_identifier_hash as content_hash.",
+ message_id,
+ dup.id,
+ )
+ content_hash = unique_hash
+
+ user_llm = await get_user_long_context_llm(
+ self.db_session,
+ user_id,
+ search_space_id,
+ disable_streaming=True,
+ )
+
+ doc_metadata_for_summary = {
+ "subject": subject,
+ "sender": sender,
+ "document_type": "Gmail Message",
+ "connector_type": "Gmail",
+ }
+
+ if user_llm:
+ summary_content, summary_embedding = await generate_document_summary(
+ indexable_content, user_llm, doc_metadata_for_summary
+ )
+ else:
+ logger.warning("No LLM configured -- using fallback summary")
+ summary_content = f"Gmail Message: {subject}\n\n{indexable_content}"
+ summary_embedding = embed_text(summary_content)
+
+ chunks = await create_document_chunks(indexable_content)
+ now_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
+
+ document = Document(
+ title=subject,
+ document_type=DocumentType.GOOGLE_GMAIL_CONNECTOR,
+ document_metadata={
+ "message_id": message_id,
+ "thread_id": thread_id,
+ "subject": subject,
+ "sender": sender,
+ "date": date_str,
+ "connector_id": connector_id,
+ "indexed_at": now_str,
+ },
+ content=summary_content,
+ content_hash=content_hash,
+ unique_identifier_hash=unique_hash,
+ embedding=summary_embedding,
+ search_space_id=search_space_id,
+ connector_id=connector_id,
+ source_markdown=body_text,
+ updated_at=get_current_timestamp(),
+ )
+
+ self.db_session.add(document)
+ await self.db_session.flush()
+ await safe_set_chunks(self.db_session, document, chunks)
+ await self.db_session.commit()
+
+ logger.info(
+ "KB sync after create succeeded: doc_id=%s, subject=%s, chunks=%d",
+ document.id,
+ subject,
+ len(chunks),
+ )
+ return {"status": "success"}
+
+ except Exception as e:
+ error_str = str(e).lower()
+ if (
+ "duplicate key value violates unique constraint" in error_str
+ or "uniqueviolationerror" in error_str
+ ):
+ logger.warning(
+ "Duplicate constraint hit during KB sync for message %s. "
+ "Rolling back -- periodic indexer will handle it. Error: %s",
+ message_id,
+ e,
+ )
+ await self.db_session.rollback()
+ return {"status": "error", "message": "Duplicate document detected"}
+
+ logger.error(
+ "KB sync after create failed for message %s: %s",
+ message_id,
+ e,
+ exc_info=True,
+ )
+ await self.db_session.rollback()
+ return {"status": "error", "message": str(e)}
diff --git a/surfsense_backend/app/services/gmail/tool_metadata_service.py b/surfsense_backend/app/services/gmail/tool_metadata_service.py
new file mode 100644
index 000000000..bc3685ac6
--- /dev/null
+++ b/surfsense_backend/app/services/gmail/tool_metadata_service.py
@@ -0,0 +1,298 @@
+import asyncio
+import logging
+from dataclasses import dataclass
+from datetime import datetime
+
+from google.oauth2.credentials import Credentials
+from googleapiclient.discovery import build
+from sqlalchemy import and_, func, or_
+from sqlalchemy.ext.asyncio import AsyncSession
+from sqlalchemy.future import select
+from sqlalchemy.orm.attributes import flag_modified
+
+from app.db import (
+ Document,
+ DocumentType,
+ SearchSourceConnector,
+ SearchSourceConnectorType,
+)
+from app.utils.google_credentials import build_composio_credentials
+
+logger = logging.getLogger(__name__)
+
+
+@dataclass
+class GmailAccount:
+ id: int
+ name: str
+ email: str
+
+ @classmethod
+ def from_connector(cls, connector: SearchSourceConnector) -> "GmailAccount":
+ return cls(id=connector.id, name=connector.name, email="")
+
+ def to_dict(self) -> dict:
+ return {"id": self.id, "name": self.name, "email": self.email}
+
+
+@dataclass
+class GmailMessage:
+ message_id: str
+ thread_id: str
+ subject: str
+ sender: str
+ date: str
+ connector_id: int
+ document_id: int
+
+ @classmethod
+ def from_document(cls, document: Document) -> "GmailMessage":
+ meta = document.document_metadata or {}
+ return cls(
+ message_id=meta.get("message_id", ""),
+ thread_id=meta.get("thread_id", ""),
+ subject=meta.get("subject", document.title),
+ sender=meta.get("sender", ""),
+ date=meta.get("date", ""),
+ connector_id=document.connector_id,
+ document_id=document.id,
+ )
+
+ def to_dict(self) -> dict:
+ return {
+ "message_id": self.message_id,
+ "thread_id": self.thread_id,
+ "subject": self.subject,
+ "sender": self.sender,
+ "date": self.date,
+ "connector_id": self.connector_id,
+ "document_id": self.document_id,
+ }
+
+
+class GmailToolMetadataService:
+ def __init__(self, db_session: AsyncSession):
+ self._db_session = db_session
+
+ async def _build_credentials(self, connector: SearchSourceConnector) -> Credentials:
+ if (
+ connector.connector_type
+ == SearchSourceConnectorType.COMPOSIO_GMAIL_CONNECTOR
+ ):
+ cca_id = connector.config.get("composio_connected_account_id")
+ if cca_id:
+ return build_composio_credentials(cca_id)
+
+ config_data = dict(connector.config)
+
+ from app.config import config
+ from app.utils.oauth_security import TokenEncryption
+
+ token_encrypted = config_data.get("_token_encrypted", False)
+ if token_encrypted and config.SECRET_KEY:
+ token_encryption = TokenEncryption(config.SECRET_KEY)
+ if config_data.get("token"):
+ config_data["token"] = token_encryption.decrypt_token(
+ config_data["token"]
+ )
+ if config_data.get("refresh_token"):
+ config_data["refresh_token"] = token_encryption.decrypt_token(
+ config_data["refresh_token"]
+ )
+ if config_data.get("client_secret"):
+ config_data["client_secret"] = token_encryption.decrypt_token(
+ config_data["client_secret"]
+ )
+
+ exp = config_data.get("expiry", "")
+ if exp:
+ exp = exp.replace("Z", "")
+
+ return Credentials(
+ token=config_data.get("token"),
+ refresh_token=config_data.get("refresh_token"),
+ token_uri=config_data.get("token_uri"),
+ client_id=config_data.get("client_id"),
+ client_secret=config_data.get("client_secret"),
+ scopes=config_data.get("scopes", []),
+ expiry=datetime.fromisoformat(exp) if exp else None,
+ )
+
+ async def _check_account_health(self, connector_id: int) -> bool:
+ """Check if a Gmail connector's credentials are still valid.
+
+ Uses a lightweight ``users().getProfile(userId='me')`` call.
+
+ Returns True if the credentials are expired/invalid, False if healthy.
+ """
+ try:
+ result = await self._db_session.execute(
+ select(SearchSourceConnector).where(
+ SearchSourceConnector.id == connector_id
+ )
+ )
+ connector = result.scalar_one_or_none()
+ if not connector:
+ return True
+
+ creds = await self._build_credentials(connector)
+ service = build("gmail", "v1", credentials=creds)
+ await asyncio.get_event_loop().run_in_executor(
+ None, lambda: service.users().getProfile(userId="me").execute()
+ )
+ return False
+ except Exception as e:
+ logger.warning(
+ "Gmail connector %s health check failed: %s",
+ connector_id,
+ e,
+ )
+ return True
+
+ async def _persist_auth_expired(self, connector_id: int) -> None:
+ """Persist ``auth_expired: True`` to the connector config if not already set."""
+ try:
+ result = await self._db_session.execute(
+ select(SearchSourceConnector).where(
+ SearchSourceConnector.id == connector_id
+ )
+ )
+ db_connector = result.scalar_one_or_none()
+ if db_connector and not db_connector.config.get("auth_expired"):
+ db_connector.config = {**db_connector.config, "auth_expired": True}
+ flag_modified(db_connector, "config")
+ await self._db_session.commit()
+ await self._db_session.refresh(db_connector)
+ except Exception:
+ logger.warning(
+ "Failed to persist auth_expired for connector %s",
+ connector_id,
+ exc_info=True,
+ )
+
+ async def _get_accounts(
+ self, search_space_id: int, user_id: str
+ ) -> list[GmailAccount]:
+ result = await self._db_session.execute(
+ select(SearchSourceConnector)
+ .filter(
+ and_(
+ SearchSourceConnector.search_space_id == search_space_id,
+ SearchSourceConnector.user_id == user_id,
+ SearchSourceConnector.connector_type.in_([
+ SearchSourceConnectorType.GOOGLE_GMAIL_CONNECTOR,
+ SearchSourceConnectorType.COMPOSIO_GMAIL_CONNECTOR,
+ ]),
+ )
+ )
+ .order_by(SearchSourceConnector.last_indexed_at.desc())
+ )
+ connectors = result.scalars().all()
+ return [GmailAccount.from_connector(c) for c in connectors]
+
+ async def get_creation_context(self, search_space_id: int, user_id: str) -> dict:
+ accounts = await self._get_accounts(search_space_id, user_id)
+
+ if not accounts:
+ return {
+ "accounts": [],
+ "error": "No Gmail account connected",
+ }
+
+ accounts_with_status = []
+ for acc in accounts:
+ acc_dict = acc.to_dict()
+ auth_expired = await self._check_account_health(acc.id)
+ acc_dict["auth_expired"] = auth_expired
+ if auth_expired:
+ await self._persist_auth_expired(acc.id)
+ else:
+ try:
+ result = await self._db_session.execute(
+ select(SearchSourceConnector).where(
+ SearchSourceConnector.id == acc.id
+ )
+ )
+ connector = result.scalar_one_or_none()
+ if connector:
+ creds = await self._build_credentials(connector)
+ service = build("gmail", "v1", credentials=creds)
+ profile = await asyncio.get_event_loop().run_in_executor(
+ None,
+ lambda: service.users()
+ .getProfile(userId="me")
+ .execute(),
+ )
+ acc_dict["email"] = profile.get("emailAddress", "")
+ except Exception:
+ logger.warning(
+ "Failed to fetch email for Gmail connector %s",
+ acc.id,
+ exc_info=True,
+ )
+ accounts_with_status.append(acc_dict)
+
+ return {"accounts": accounts_with_status}
+
+ async def get_trash_context(
+ self, search_space_id: int, user_id: str, email_ref: str
+ ) -> dict:
+ document, connector = await self._resolve_email(
+ search_space_id, user_id, email_ref
+ )
+
+ if not document or not connector:
+ return {
+ "error": (
+ f"Email '{email_ref}' not found in your indexed Gmail messages. "
+ "This could mean: (1) the email doesn't exist, "
+ "(2) it hasn't been indexed yet, "
+ "or (3) the subject is different."
+ )
+ }
+
+ account = GmailAccount.from_connector(connector)
+ message = GmailMessage.from_document(document)
+
+ acc_dict = account.to_dict()
+ auth_expired = await self._check_account_health(connector.id)
+ acc_dict["auth_expired"] = auth_expired
+ if auth_expired:
+ await self._persist_auth_expired(connector.id)
+
+ return {
+ "account": acc_dict,
+ "email": message.to_dict(),
+ }
+
+ async def _resolve_email(
+ self, search_space_id: int, user_id: str, email_ref: str
+ ) -> tuple[Document | None, SearchSourceConnector | None]:
+ result = await self._db_session.execute(
+ select(Document, SearchSourceConnector)
+ .join(
+ SearchSourceConnector,
+ Document.connector_id == SearchSourceConnector.id,
+ )
+ .filter(
+ and_(
+ Document.search_space_id == search_space_id,
+ Document.document_type.in_([
+ DocumentType.GOOGLE_GMAIL_CONNECTOR,
+ DocumentType.COMPOSIO_GMAIL_CONNECTOR,
+ ]),
+ SearchSourceConnector.user_id == user_id,
+ or_(
+ func.lower(
+ Document.document_metadata["subject"].astext
+ )
+ == func.lower(email_ref),
+ func.lower(Document.title) == func.lower(email_ref),
+ ),
+ )
+ )
+ )
+ row = result.first()
+ if row:
+ return row[0], row[1]
+ return None, None
diff --git a/surfsense_backend/app/services/google_calendar/__init__.py b/surfsense_backend/app/services/google_calendar/__init__.py
new file mode 100644
index 000000000..38368b0a0
--- /dev/null
+++ b/surfsense_backend/app/services/google_calendar/__init__.py
@@ -0,0 +1,13 @@
+from app.services.google_calendar.kb_sync_service import GoogleCalendarKBSyncService
+from app.services.google_calendar.tool_metadata_service import (
+ GoogleCalendarAccount,
+ GoogleCalendarEvent,
+ GoogleCalendarToolMetadataService,
+)
+
+__all__ = [
+ "GoogleCalendarAccount",
+ "GoogleCalendarEvent",
+ "GoogleCalendarKBSyncService",
+ "GoogleCalendarToolMetadataService",
+]
diff --git a/surfsense_backend/app/services/google_calendar/kb_sync_service.py b/surfsense_backend/app/services/google_calendar/kb_sync_service.py
new file mode 100644
index 000000000..b26433bb9
--- /dev/null
+++ b/surfsense_backend/app/services/google_calendar/kb_sync_service.py
@@ -0,0 +1,348 @@
+import asyncio
+import logging
+from datetime import datetime
+
+from google.oauth2.credentials import Credentials
+from googleapiclient.discovery import build
+from sqlalchemy.ext.asyncio import AsyncSession
+from sqlalchemy.future import select
+from sqlalchemy.orm.attributes import flag_modified
+
+from app.db import Document, DocumentType, SearchSourceConnector, SearchSourceConnectorType
+from app.services.llm_service import get_user_long_context_llm
+from app.utils.document_converters import (
+ create_document_chunks,
+ embed_text,
+ generate_content_hash,
+ generate_document_summary,
+ generate_unique_identifier_hash,
+)
+from app.utils.google_credentials import build_composio_credentials
+
+logger = logging.getLogger(__name__)
+
+
+class GoogleCalendarKBSyncService:
+ def __init__(self, db_session: AsyncSession):
+ self.db_session = db_session
+
+ async def sync_after_create(
+ self,
+ event_id: str,
+ event_summary: str,
+ calendar_id: str,
+ start_time: str,
+ end_time: str,
+ location: str | None,
+ html_link: str | None,
+ description: str | None,
+ connector_id: int,
+ search_space_id: int,
+ user_id: str,
+ ) -> dict:
+ from app.tasks.connector_indexers.base import (
+ check_document_by_unique_identifier,
+ check_duplicate_document_by_hash,
+ get_current_timestamp,
+ safe_set_chunks,
+ )
+
+ try:
+ unique_hash = generate_unique_identifier_hash(
+ DocumentType.GOOGLE_CALENDAR_CONNECTOR, event_id, search_space_id
+ )
+
+ existing = await check_document_by_unique_identifier(
+ self.db_session, unique_hash
+ )
+ if existing:
+ logger.info(
+ "Document for Calendar event %s already exists (doc_id=%s), skipping",
+ event_id,
+ existing.id,
+ )
+ return {"status": "success"}
+
+ indexable_content = (
+ f"Google Calendar Event: {event_summary}\n\n"
+ f"Start: {start_time}\n"
+ f"End: {end_time}\n"
+ f"Location: {location or 'N/A'}\n\n"
+ f"{description or ''}"
+ ).strip()
+
+ content_hash = generate_content_hash(indexable_content, search_space_id)
+
+ with self.db_session.no_autoflush:
+ dup = await check_duplicate_document_by_hash(
+ self.db_session, content_hash
+ )
+ if dup:
+ logger.info(
+ "Content-hash collision for Calendar event %s -- identical content "
+ "exists in doc %s. Using unique_identifier_hash as content_hash.",
+ event_id,
+ dup.id,
+ )
+ content_hash = unique_hash
+
+ user_llm = await get_user_long_context_llm(
+ self.db_session,
+ user_id,
+ search_space_id,
+ disable_streaming=True,
+ )
+
+ doc_metadata_for_summary = {
+ "event_summary": event_summary,
+ "start_time": start_time,
+ "end_time": end_time,
+ "document_type": "Google Calendar Event",
+ "connector_type": "Google Calendar",
+ }
+
+ if user_llm:
+ summary_content, summary_embedding = await generate_document_summary(
+ indexable_content, user_llm, doc_metadata_for_summary
+ )
+ else:
+ logger.warning("No LLM configured -- using fallback summary")
+ summary_content = f"Google Calendar Event: {event_summary}\n\n{indexable_content}"
+ summary_embedding = embed_text(summary_content)
+
+ chunks = await create_document_chunks(indexable_content)
+ now_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
+
+ document = Document(
+ title=event_summary,
+ document_type=DocumentType.GOOGLE_CALENDAR_CONNECTOR,
+ document_metadata={
+ "event_id": event_id,
+ "event_summary": event_summary,
+ "calendar_id": calendar_id,
+ "start_time": start_time,
+ "end_time": end_time,
+ "location": location,
+ "html_link": html_link,
+ "source_connector": "google_calendar",
+ "indexed_at": now_str,
+ "connector_id": connector_id,
+ },
+ content=summary_content,
+ content_hash=content_hash,
+ unique_identifier_hash=unique_hash,
+ embedding=summary_embedding,
+ search_space_id=search_space_id,
+ connector_id=connector_id,
+ source_markdown=indexable_content,
+ updated_at=get_current_timestamp(),
+ )
+
+ self.db_session.add(document)
+ await self.db_session.flush()
+ await safe_set_chunks(self.db_session, document, chunks)
+ await self.db_session.commit()
+
+ logger.info(
+ "KB sync after create succeeded: doc_id=%s, event=%s, chunks=%d",
+ document.id,
+ event_summary,
+ len(chunks),
+ )
+ return {"status": "success"}
+
+ except Exception as e:
+ error_str = str(e).lower()
+ if (
+ "duplicate key value violates unique constraint" in error_str
+ or "uniqueviolationerror" in error_str
+ ):
+ logger.warning(
+ "Duplicate constraint hit during KB sync for event %s. "
+ "Rolling back -- periodic indexer will handle it. Error: %s",
+ event_id,
+ e,
+ )
+ await self.db_session.rollback()
+ return {"status": "error", "message": "Duplicate document detected"}
+
+ logger.error(
+ "KB sync after create failed for event %s: %s",
+ event_id,
+ e,
+ exc_info=True,
+ )
+ await self.db_session.rollback()
+ return {"status": "error", "message": str(e)}
+
+ async def sync_after_update(
+ self,
+ document_id: int,
+ event_id: str,
+ connector_id: int,
+ search_space_id: int,
+ user_id: str,
+ ) -> dict:
+ from app.tasks.connector_indexers.base import (
+ get_current_timestamp,
+ safe_set_chunks,
+ )
+
+ try:
+ document = await self.db_session.get(Document, document_id)
+ if not document:
+ logger.warning("Document %s not found in KB", document_id)
+ return {"status": "not_indexed"}
+
+ creds = await self._build_credentials_for_connector(connector_id)
+ loop = asyncio.get_event_loop()
+ service = await loop.run_in_executor(
+ None, lambda: build("calendar", "v3", credentials=creds)
+ )
+
+ calendar_id = (document.document_metadata or {}).get("calendar_id", "primary")
+ live_event = await loop.run_in_executor(
+ None,
+ lambda: service.events()
+ .get(calendarId=calendar_id, eventId=event_id)
+ .execute(),
+ )
+
+ event_summary = live_event.get("summary", "")
+ description = live_event.get("description", "")
+ location = live_event.get("location", "")
+
+ start_data = live_event.get("start", {})
+ start_time = start_data.get("dateTime", start_data.get("date", ""))
+
+ end_data = live_event.get("end", {})
+ end_time = end_data.get("dateTime", end_data.get("date", ""))
+
+ attendees = [
+ {"email": a.get("email", ""), "responseStatus": a.get("responseStatus", "")}
+ for a in live_event.get("attendees", [])
+ ]
+
+ indexable_content = (
+ f"Google Calendar Event: {event_summary}\n\n"
+ f"Start: {start_time}\n"
+ f"End: {end_time}\n"
+ f"Location: {location or 'N/A'}\n\n"
+ f"{description or ''}"
+ ).strip()
+
+ if not indexable_content:
+ return {"status": "error", "message": "Event produced empty content"}
+
+ user_llm = await get_user_long_context_llm(
+ self.db_session, user_id, search_space_id, disable_streaming=True
+ )
+
+ doc_metadata_for_summary = {
+ "event_summary": event_summary,
+ "start_time": start_time,
+ "end_time": end_time,
+ "document_type": "Google Calendar Event",
+ "connector_type": "Google Calendar",
+ }
+
+ if user_llm:
+ summary_content, summary_embedding = await generate_document_summary(
+ indexable_content, user_llm, doc_metadata_for_summary
+ )
+ else:
+ summary_content = f"Google Calendar Event: {event_summary}\n\n{indexable_content}"
+ summary_embedding = embed_text(summary_content)
+
+ chunks = await create_document_chunks(indexable_content)
+ now_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
+
+ document.title = event_summary
+ document.content = summary_content
+ document.content_hash = generate_content_hash(
+ indexable_content, search_space_id
+ )
+ document.embedding = summary_embedding
+
+ document.document_metadata = {
+ **(document.document_metadata or {}),
+ "event_id": event_id,
+ "event_summary": event_summary,
+ "calendar_id": calendar_id,
+ "start_time": start_time,
+ "end_time": end_time,
+ "location": location,
+ "description": description,
+ "attendees": attendees,
+ "html_link": live_event.get("htmlLink", ""),
+ "indexed_at": now_str,
+ "connector_id": connector_id,
+ }
+ flag_modified(document, "document_metadata")
+
+ await safe_set_chunks(self.db_session, document, chunks)
+ document.updated_at = get_current_timestamp()
+
+ await self.db_session.commit()
+
+ logger.info(
+ "KB sync after update succeeded for document %s (event: %s)",
+ document_id,
+ event_summary,
+ )
+ return {"status": "success"}
+
+ except Exception as e:
+ logger.error(
+ "KB sync after update failed for document %s: %s",
+ document_id,
+ e,
+ exc_info=True,
+ )
+ await self.db_session.rollback()
+ return {"status": "error", "message": str(e)}
+
+ async def _build_credentials_for_connector(self, connector_id: int) -> Credentials:
+ result = await self.db_session.execute(
+ select(SearchSourceConnector).where(
+ SearchSourceConnector.id == connector_id
+ )
+ )
+ connector = result.scalar_one_or_none()
+ if not connector:
+ raise ValueError(f"Connector {connector_id} not found")
+
+ if connector.connector_type == SearchSourceConnectorType.COMPOSIO_GOOGLE_CALENDAR_CONNECTOR:
+ cca_id = connector.config.get("composio_connected_account_id")
+ if cca_id:
+ return build_composio_credentials(cca_id)
+ raise ValueError("Composio connected_account_id not found")
+
+ config_data = dict(connector.config)
+
+ from app.config import config as app_config
+ from app.utils.oauth_security import TokenEncryption
+
+ token_encrypted = config_data.get("_token_encrypted", False)
+ if token_encrypted and app_config.SECRET_KEY:
+ token_encryption = TokenEncryption(app_config.SECRET_KEY)
+ if config_data.get("token"):
+ config_data["token"] = token_encryption.decrypt_token(config_data["token"])
+ if config_data.get("refresh_token"):
+ config_data["refresh_token"] = token_encryption.decrypt_token(config_data["refresh_token"])
+ if config_data.get("client_secret"):
+ config_data["client_secret"] = token_encryption.decrypt_token(config_data["client_secret"])
+
+ exp = config_data.get("expiry", "")
+ if exp:
+ exp = exp.replace("Z", "")
+
+ return Credentials(
+ token=config_data.get("token"),
+ refresh_token=config_data.get("refresh_token"),
+ token_uri=config_data.get("token_uri"),
+ client_id=config_data.get("client_id"),
+ client_secret=config_data.get("client_secret"),
+ scopes=config_data.get("scopes", []),
+ expiry=datetime.fromisoformat(exp) if exp else None,
+ )
diff --git a/surfsense_backend/app/services/google_calendar/tool_metadata_service.py b/surfsense_backend/app/services/google_calendar/tool_metadata_service.py
new file mode 100644
index 000000000..f2a8b08c3
--- /dev/null
+++ b/surfsense_backend/app/services/google_calendar/tool_metadata_service.py
@@ -0,0 +1,403 @@
+import asyncio
+import logging
+from dataclasses import dataclass
+from datetime import datetime
+
+from google.oauth2.credentials import Credentials
+from googleapiclient.discovery import build
+from sqlalchemy import and_, func, or_
+from sqlalchemy.ext.asyncio import AsyncSession
+from sqlalchemy.future import select
+from sqlalchemy.orm.attributes import flag_modified
+
+from app.db import (
+ Document,
+ DocumentType,
+ SearchSourceConnector,
+ SearchSourceConnectorType,
+)
+from app.utils.google_credentials import build_composio_credentials
+
+logger = logging.getLogger(__name__)
+
+CALENDAR_CONNECTOR_TYPES = [
+ SearchSourceConnectorType.GOOGLE_CALENDAR_CONNECTOR,
+ SearchSourceConnectorType.COMPOSIO_GOOGLE_CALENDAR_CONNECTOR,
+]
+
+CALENDAR_DOCUMENT_TYPES = [
+ DocumentType.GOOGLE_CALENDAR_CONNECTOR,
+ DocumentType.COMPOSIO_GOOGLE_CALENDAR_CONNECTOR,
+]
+
+
+@dataclass
+class GoogleCalendarAccount:
+ id: int
+ name: str
+
+ @classmethod
+ def from_connector(cls, connector: SearchSourceConnector) -> "GoogleCalendarAccount":
+ return cls(id=connector.id, name=connector.name)
+
+ def to_dict(self) -> dict:
+ return {"id": self.id, "name": self.name}
+
+
+@dataclass
+class GoogleCalendarEvent:
+ event_id: str
+ summary: str
+ start: str
+ end: str
+ description: str
+ location: str
+ attendees: list
+ calendar_id: str
+ document_id: int
+ indexed_at: str | None
+
+ @classmethod
+ def from_document(cls, document: Document) -> "GoogleCalendarEvent":
+ meta = document.document_metadata or {}
+ return cls(
+ event_id=meta.get("event_id", ""),
+ summary=meta.get("event_summary", document.title),
+ start=meta.get("start_time", ""),
+ end=meta.get("end_time", ""),
+ description=meta.get("description", ""),
+ location=meta.get("location", ""),
+ attendees=meta.get("attendees", []),
+ calendar_id=meta.get("calendar_id", "primary"),
+ document_id=document.id,
+ indexed_at=meta.get("indexed_at"),
+ )
+
+ def to_dict(self) -> dict:
+ return {
+ "event_id": self.event_id,
+ "summary": self.summary,
+ "start": self.start,
+ "end": self.end,
+ "description": self.description,
+ "location": self.location,
+ "attendees": self.attendees,
+ "calendar_id": self.calendar_id,
+ "document_id": self.document_id,
+ "indexed_at": self.indexed_at,
+ }
+
+
+class GoogleCalendarToolMetadataService:
+ def __init__(self, db_session: AsyncSession):
+ self._db_session = db_session
+
+ async def _build_credentials(self, connector: SearchSourceConnector) -> Credentials:
+ if connector.connector_type == SearchSourceConnectorType.COMPOSIO_GOOGLE_CALENDAR_CONNECTOR:
+ cca_id = connector.config.get("composio_connected_account_id")
+ if cca_id:
+ return build_composio_credentials(cca_id)
+ raise ValueError("Composio connected_account_id not found")
+
+ config_data = dict(connector.config)
+
+ from app.config import config as app_config
+ from app.utils.oauth_security import TokenEncryption
+
+ token_encrypted = config_data.get("_token_encrypted", False)
+ if token_encrypted and app_config.SECRET_KEY:
+ token_encryption = TokenEncryption(app_config.SECRET_KEY)
+ if config_data.get("token"):
+ config_data["token"] = token_encryption.decrypt_token(config_data["token"])
+ if config_data.get("refresh_token"):
+ config_data["refresh_token"] = token_encryption.decrypt_token(config_data["refresh_token"])
+ if config_data.get("client_secret"):
+ config_data["client_secret"] = token_encryption.decrypt_token(config_data["client_secret"])
+
+ exp = config_data.get("expiry", "")
+ if exp:
+ exp = exp.replace("Z", "")
+
+ return Credentials(
+ token=config_data.get("token"),
+ refresh_token=config_data.get("refresh_token"),
+ token_uri=config_data.get("token_uri"),
+ client_id=config_data.get("client_id"),
+ client_secret=config_data.get("client_secret"),
+ scopes=config_data.get("scopes", []),
+ expiry=datetime.fromisoformat(exp) if exp else None,
+ )
+
+ async def _check_account_health(self, connector_id: int) -> bool:
+ """Check if a Google Calendar connector's credentials are still valid.
+
+ Uses a lightweight calendarList.list(maxResults=1) call to verify access.
+
+ Returns True if the credentials are expired/invalid, False if healthy.
+ """
+ try:
+ result = await self._db_session.execute(
+ select(SearchSourceConnector).where(
+ SearchSourceConnector.id == connector_id
+ )
+ )
+ connector = result.scalar_one_or_none()
+ if not connector:
+ return True
+
+ creds = await self._build_credentials(connector)
+ loop = asyncio.get_event_loop()
+ await loop.run_in_executor(
+ None,
+ lambda: build("calendar", "v3", credentials=creds)
+ .calendarList()
+ .list(maxResults=1)
+ .execute(),
+ )
+ return False
+ except Exception as e:
+ logger.warning(
+ "Google Calendar connector %s health check failed: %s",
+ connector_id,
+ e,
+ )
+ return True
+
+ async def _persist_auth_expired(self, connector_id: int) -> None:
+ """Persist ``auth_expired: True`` to the connector config if not already set."""
+ try:
+ result = await self._db_session.execute(
+ select(SearchSourceConnector).where(
+ SearchSourceConnector.id == connector_id
+ )
+ )
+ db_connector = result.scalar_one_or_none()
+ if db_connector and not db_connector.config.get("auth_expired"):
+ db_connector.config = {**db_connector.config, "auth_expired": True}
+ flag_modified(db_connector, "config")
+ await self._db_session.commit()
+ await self._db_session.refresh(db_connector)
+ except Exception:
+ logger.warning(
+ "Failed to persist auth_expired for connector %s",
+ connector_id,
+ exc_info=True,
+ )
+
+ async def _get_accounts(
+ self, search_space_id: int, user_id: str
+ ) -> list[GoogleCalendarAccount]:
+ result = await self._db_session.execute(
+ select(SearchSourceConnector)
+ .filter(
+ and_(
+ SearchSourceConnector.search_space_id == search_space_id,
+ SearchSourceConnector.user_id == user_id,
+ SearchSourceConnector.connector_type.in_(CALENDAR_CONNECTOR_TYPES),
+ )
+ )
+ .order_by(SearchSourceConnector.last_indexed_at.desc())
+ )
+ connectors = result.scalars().all()
+ return [GoogleCalendarAccount.from_connector(c) for c in connectors]
+
+ async def get_creation_context(self, search_space_id: int, user_id: str) -> dict:
+ accounts = await self._get_accounts(search_space_id, user_id)
+
+ if not accounts:
+ return {
+ "accounts": [],
+ "error": "No Google Calendar account connected",
+ }
+
+ accounts_with_status = []
+ for acc in accounts:
+ acc_dict = acc.to_dict()
+ auth_expired = await self._check_account_health(acc.id)
+ acc_dict["auth_expired"] = auth_expired
+ if auth_expired:
+ await self._persist_auth_expired(acc.id)
+ accounts_with_status.append(acc_dict)
+
+ healthy_account = next(
+ (a for a in accounts_with_status if not a.get("auth_expired")), None
+ )
+ if not healthy_account:
+ return {
+ "accounts": accounts_with_status,
+ "calendars": [],
+ "timezone": "",
+ "error": "All connected Google Calendar accounts have expired credentials",
+ }
+
+ connector_id = healthy_account["id"]
+ result = await self._db_session.execute(
+ select(SearchSourceConnector).where(
+ SearchSourceConnector.id == connector_id
+ )
+ )
+ connector = result.scalar_one_or_none()
+
+ calendars = []
+ timezone_str = ""
+ if connector:
+ try:
+ creds = await self._build_credentials(connector)
+ loop = asyncio.get_event_loop()
+ service = await loop.run_in_executor(
+ None, lambda: build("calendar", "v3", credentials=creds)
+ )
+
+ cal_list = await loop.run_in_executor(
+ None, lambda: service.calendarList().list().execute()
+ )
+ for cal in cal_list.get("items", []):
+ calendars.append({
+ "id": cal.get("id", ""),
+ "summary": cal.get("summary", ""),
+ "primary": cal.get("primary", False),
+ })
+
+ tz_setting = await loop.run_in_executor(
+ None,
+ lambda: service.settings().get(setting="timezone").execute(),
+ )
+ timezone_str = tz_setting.get("value", "")
+ except Exception:
+ logger.warning(
+ "Failed to fetch calendars/timezone for connector %s",
+ connector_id,
+ exc_info=True,
+ )
+
+ return {
+ "accounts": accounts_with_status,
+ "calendars": calendars,
+ "timezone": timezone_str,
+ }
+
+ async def get_update_context(
+ self, search_space_id: int, user_id: str, event_ref: str
+ ) -> dict:
+ resolved = await self._resolve_event(search_space_id, user_id, event_ref)
+ if not resolved:
+ return {
+ "error": (
+ f"Event '{event_ref}' not found in your indexed Google Calendar events. "
+ "This could mean: (1) the event doesn't exist, (2) it hasn't been indexed yet, "
+ "or (3) the event name is different."
+ )
+ }
+
+ document, connector = resolved
+ account = GoogleCalendarAccount.from_connector(connector)
+ event = GoogleCalendarEvent.from_document(document)
+
+ acc_dict = account.to_dict()
+ auth_expired = await self._check_account_health(connector.id)
+ acc_dict["auth_expired"] = auth_expired
+ if auth_expired:
+ await self._persist_auth_expired(connector.id)
+ return {
+ "error": "Google Calendar credentials have expired. Please re-authenticate.",
+ "auth_expired": True,
+ "connector_id": connector.id,
+ }
+
+ event_dict = event.to_dict()
+ try:
+ creds = await self._build_credentials(connector)
+ loop = asyncio.get_event_loop()
+ service = await loop.run_in_executor(
+ None, lambda: build("calendar", "v3", credentials=creds)
+ )
+ calendar_id = event.calendar_id or "primary"
+ live_event = await loop.run_in_executor(
+ None,
+ lambda: service.events()
+ .get(calendarId=calendar_id, eventId=event.event_id)
+ .execute(),
+ )
+
+ event_dict["summary"] = live_event.get("summary", event_dict["summary"])
+ event_dict["description"] = live_event.get("description", event_dict["description"])
+ event_dict["location"] = live_event.get("location", event_dict["location"])
+
+ start_data = live_event.get("start", {})
+ event_dict["start"] = start_data.get("dateTime", start_data.get("date", event_dict["start"]))
+
+ end_data = live_event.get("end", {})
+ event_dict["end"] = end_data.get("dateTime", end_data.get("date", event_dict["end"]))
+
+ event_dict["attendees"] = [
+ {"email": a.get("email", ""), "responseStatus": a.get("responseStatus", "")}
+ for a in live_event.get("attendees", [])
+ ]
+ except Exception:
+ logger.warning(
+ "Failed to fetch live event data for event %s, using KB metadata",
+ event.event_id,
+ exc_info=True,
+ )
+
+ return {
+ "account": acc_dict,
+ "event": event_dict,
+ }
+
+ async def get_deletion_context(
+ self, search_space_id: int, user_id: str, event_ref: str
+ ) -> dict:
+ resolved = await self._resolve_event(search_space_id, user_id, event_ref)
+ if not resolved:
+ return {
+ "error": (
+ f"Event '{event_ref}' not found in your indexed Google Calendar events. "
+ "This could mean: (1) the event doesn't exist, (2) it hasn't been indexed yet, "
+ "or (3) the event name is different."
+ )
+ }
+
+ document, connector = resolved
+ account = GoogleCalendarAccount.from_connector(connector)
+ event = GoogleCalendarEvent.from_document(document)
+
+ acc_dict = account.to_dict()
+ auth_expired = await self._check_account_health(connector.id)
+ acc_dict["auth_expired"] = auth_expired
+ if auth_expired:
+ await self._persist_auth_expired(connector.id)
+
+ return {
+ "account": acc_dict,
+ "event": event.to_dict(),
+ }
+
+ async def _resolve_event(
+ self, search_space_id: int, user_id: str, event_ref: str
+ ) -> tuple[Document, SearchSourceConnector] | None:
+ result = await self._db_session.execute(
+ select(Document, SearchSourceConnector)
+ .join(
+ SearchSourceConnector,
+ Document.connector_id == SearchSourceConnector.id,
+ )
+ .filter(
+ and_(
+ Document.search_space_id == search_space_id,
+ Document.document_type.in_(CALENDAR_DOCUMENT_TYPES),
+ SearchSourceConnector.user_id == user_id,
+ or_(
+ func.lower(
+ Document.document_metadata["event_summary"].astext
+ )
+ == func.lower(event_ref),
+ func.lower(Document.title) == func.lower(event_ref),
+ ),
+ )
+ )
+ )
+ row = result.first()
+ if row:
+ return row[0], row[1]
+ return None
diff --git a/surfsense_web/app/dashboard/[search_space_id]/new-chat/[[...chat_id]]/page.tsx b/surfsense_web/app/dashboard/[search_space_id]/new-chat/[[...chat_id]]/page.tsx
index a17d7362b..3a7c71caa 100644
--- a/surfsense_web/app/dashboard/[search_space_id]/new-chat/[[...chat_id]]/page.tsx
+++ b/surfsense_web/app/dashboard/[search_space_id]/new-chat/[[...chat_id]]/page.tsx
@@ -41,6 +41,16 @@ import type { ThinkingStep } from "@/components/tool-ui/deepagent-thinking";
import { DisplayImageToolUI } from "@/components/tool-ui/display-image";
import { GeneratePodcastToolUI } from "@/components/tool-ui/generate-podcast";
import { GenerateReportToolUI } from "@/components/tool-ui/generate-report";
+import {
+ CreateCalendarEventToolUI,
+ UpdateCalendarEventToolUI,
+ DeleteCalendarEventToolUI,
+} from "@/components/tool-ui/google-calendar";
+import {
+ CreateGmailDraftToolUI,
+ SendGmailEmailToolUI,
+ TrashGmailEmailToolUI,
+} from "@/components/tool-ui/gmail";
import {
CreateGoogleDriveFileToolUI,
DeleteGoogleDriveFileToolUI,
@@ -160,6 +170,12 @@ const TOOLS_WITH_UI = new Set([
"delete_linear_issue",
"create_google_drive_file",
"delete_google_drive_file",
+ "create_calendar_event",
+ "update_calendar_event",
+ "delete_calendar_event",
+ "create_gmail_draft",
+ "send_gmail_email",
+ "trash_gmail_email",
"execute",
// "write_todos", // Disabled for now
]);
@@ -1676,6 +1692,12 @@ export default function NewChatPage() {
+ {decided === "reject" + ? "Gmail Draft Rejected" + : decided === "approve" || decided === "edit" + ? "Gmail Draft Approved" + : "Create Gmail Draft"} +
++ {decided === "reject" + ? "Draft creation was cancelled" + : decided === "edit" + ? "Draft creation is in progress with your changes" + : decided === "approve" + ? "Draft creation is in progress" + : "Requires your approval to proceed"} +
+{interruptData.context.error}
+ ) : ( + <> + {accounts.length > 0 && ( ++ Gmail Account * +
+ +{args.subject}
+ )} + {args.body != null && ( +Failed to create Gmail draft
+{result.message}
++ Gmail authentication expired +
+{result.message}
++ {result.message || "Gmail draft created successfully"} +
++ {decided === "reject" + ? "Email Sending Rejected" + : decided === "approve" || decided === "edit" + ? "Email Sending Approved" + : "Send Email"} +
++ {decided === "reject" + ? "Email sending was cancelled" + : decided === "edit" + ? "Email is being sent with your changes" + : decided === "approve" + ? "Email is being sent" + : "This will send the email immediately"} +
+{interruptData.context.error}
+ ) : ( + <> + {accounts.length > 0 && ( ++ Gmail Account * +
+ +{args.subject}
+ )} + {args.body != null && ( +Failed to send email
+{result.message}
++ Gmail authentication expired +
+{result.message}
++ {result.message || "Email sent successfully"} +
++ {decided === "reject" + ? "Email Trash Rejected" + : decided === "approve" + ? "Email Trash Approved" + : "Trash Email"} +
++ {decided === "reject" + ? "Email trash was cancelled" + : decided === "approve" + ? "Email is being trashed" + : "Requires your approval to proceed"} +
+{interruptData.context.error}
+ ) : ( + <> + {account && ( +Gmail Account
+Email to Trash
+Failed to trash email
+{result.message}
++ Gmail authentication expired +
+{result.message}
++ Email not found +
+{result.message}
++ {result.message || "Email moved to trash successfully"} +
++ {decided === "reject" + ? "Calendar Event Rejected" + : decided === "approve" || decided === "edit" + ? "Calendar Event Approved" + : "Create Calendar Event"} +
++ {decided === "reject" + ? "Event creation was cancelled" + : decided === "edit" + ? "Event creation is in progress with your changes" + : decided === "approve" + ? "Event creation is in progress" + : "Requires your approval to proceed"} +
+{interruptData.context.error}
+ ) : ( + <> + {accounts.length > 0 && ( ++ Google Calendar Account * +
+ ++ Calendar * +
+ +Timezone
+{args.summary}
+ )} + + {(args.start_datetime || args.end_datetime) && ( +Failed to create calendar event
+{result.message}
++ Google Calendar authentication expired +
+{result.message}
++ {result.message || "Calendar event created successfully"} +
++ {decided === "reject" + ? "Calendar Event Deletion Rejected" + : decided === "approve" + ? "Calendar Event Deletion Approved" + : "Delete Calendar Event"} +
++ {decided === "reject" + ? "Event deletion was cancelled" + : decided === "approve" + ? "Event deletion is in progress" + : "Requires your approval to proceed"} +
+{context.error}
+ ) : ( + <> + {account && ( +Google Calendar Account
+Event to Delete
+Failed to delete calendar event
+{result.message}
++ Google Calendar authentication expired +
+{result.message}
++ Event not found +
+{result.message}
+Partial success
+{result.warning}
++ {result.message || "Calendar event deleted successfully"} +
++ {decided === "reject" + ? "Calendar Event Update Rejected" + : decided === "approve" || decided === "edit" + ? "Calendar Event Update Approved" + : "Update Calendar Event"} +
++ {decided === "reject" + ? "Event update was cancelled" + : decided === "edit" + ? "Event update is in progress with your changes" + : decided === "approve" + ? "Event update is in progress" + : "Requires your approval to proceed"} +
+{context.error}
+ ) : ( + <> + {account && ( +Google Calendar Account
+Current Event
+Proposed Changes
+No changes proposed
+ )} + > + )} +Failed to update calendar event
+{result.message}
++ Google Calendar authentication expired +
+{result.message}
++ Event not found +
+{result.message}
++ {result.message || "Calendar event updated successfully"} +
+