mirror of
https://github.com/MODSetter/SurfSense.git
synced 2026-06-06 20:15:17 +02:00
refactor(agents): move tools package to app/agents/shared (slice 6)
Relocate the entire new_chat/tools/ package (62 files incl. registry, hitl, MCP cluster, and all connector subpackages: gmail/slack/discord/teams/drive/etc.) to the shared kernel. The package turned out to be a clean cohesive cluster: its only references to non-tools new_chat modules were comments, and its middleware deps were already flipped to shared in slice 5c. Flip 33 live importers (multi-agent, flows, routes, services, anonymous_agent, tests). Re-export shims remain for the frozen single-agent stack: a package __init__ mirroring the public surface (new_chat.__init__ imports it) plus invalid_tool + registry submodule shims (chat_deepagent imports those). Resolves slice 5c's two transient back-edges: shared/middleware/action_log (TYPE_CHECKING ToolDefinition) and tool_call_repair (local INVALID_TOOL_NAME) now point at app.agents.shared.tools.
This commit is contained in:
parent
a7fde2a48e
commit
aab95b9130
98 changed files with 1232 additions and 1152 deletions
27
surfsense_backend/app/agents/shared/tools/gmail/__init__.py
Normal file
27
surfsense_backend/app/agents/shared/tools/gmail/__init__.py
Normal file
|
|
@ -0,0 +1,27 @@
|
|||
from app.agents.shared.tools.gmail.create_draft import (
|
||||
create_create_gmail_draft_tool,
|
||||
)
|
||||
from app.agents.shared.tools.gmail.read_email import (
|
||||
create_read_gmail_email_tool,
|
||||
)
|
||||
from app.agents.shared.tools.gmail.search_emails import (
|
||||
create_search_gmail_tool,
|
||||
)
|
||||
from app.agents.shared.tools.gmail.send_email import (
|
||||
create_send_gmail_email_tool,
|
||||
)
|
||||
from app.agents.shared.tools.gmail.trash_email import (
|
||||
create_trash_gmail_email_tool,
|
||||
)
|
||||
from app.agents.shared.tools.gmail.update_draft import (
|
||||
create_update_gmail_draft_tool,
|
||||
)
|
||||
|
||||
__all__ = [
|
||||
"create_create_gmail_draft_tool",
|
||||
"create_read_gmail_email_tool",
|
||||
"create_search_gmail_tool",
|
||||
"create_send_gmail_email_tool",
|
||||
"create_trash_gmail_email_tool",
|
||||
"create_update_gmail_draft_tool",
|
||||
]
|
||||
|
|
@ -0,0 +1,41 @@
|
|||
from typing import Any
|
||||
|
||||
from app.db import SearchSourceConnector
|
||||
from app.services.composio_service import ComposioService
|
||||
|
||||
|
||||
def split_recipients(value: str | None) -> list[str]:
|
||||
if not value:
|
||||
return []
|
||||
return [recipient.strip() for recipient in value.split(",") if recipient.strip()]
|
||||
|
||||
|
||||
def unwrap_composio_data(data: Any) -> Any:
|
||||
if isinstance(data, dict):
|
||||
inner = data.get("data", data)
|
||||
if isinstance(inner, dict):
|
||||
return inner.get("response_data", inner)
|
||||
return inner
|
||||
return data
|
||||
|
||||
|
||||
async def execute_composio_gmail_tool(
|
||||
connector: SearchSourceConnector,
|
||||
user_id: str,
|
||||
tool_name: str,
|
||||
params: dict[str, Any],
|
||||
) -> tuple[Any, str | None]:
|
||||
cca_id = connector.config.get("composio_connected_account_id")
|
||||
if not cca_id:
|
||||
return None, "Composio connected account ID not found for this Gmail connector."
|
||||
|
||||
result = await ComposioService().execute_tool(
|
||||
connected_account_id=cca_id,
|
||||
tool_name=tool_name,
|
||||
params=params,
|
||||
entity_id=f"surfsense_{user_id}",
|
||||
)
|
||||
if not result.get("success"):
|
||||
return None, result.get("error", "Unknown Composio Gmail error")
|
||||
|
||||
return unwrap_composio_data(result.get("data")), None
|
||||
361
surfsense_backend/app/agents/shared/tools/gmail/create_draft.py
Normal file
361
surfsense_backend/app/agents/shared/tools/gmail/create_draft.py
Normal file
|
|
@ -0,0 +1,361 @@
|
|||
import asyncio
|
||||
import base64
|
||||
import logging
|
||||
from datetime import datetime
|
||||
from email.mime.text import MIMEText
|
||||
from typing import Any
|
||||
|
||||
from langchain_core.tools import tool
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from app.agents.shared.tools.hitl import request_approval
|
||||
from app.db import async_session_maker
|
||||
from app.services.gmail import GmailToolMetadataService
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def create_create_gmail_draft_tool(
|
||||
db_session: AsyncSession | None = None,
|
||||
search_space_id: int | None = None,
|
||||
user_id: str | None = None,
|
||||
):
|
||||
"""
|
||||
Factory function to create the create_gmail_draft tool.
|
||||
|
||||
The tool acquires its own short-lived ``AsyncSession`` per call via
|
||||
:data:`async_session_maker` so the closure is safe to share across
|
||||
HTTP requests by the compiled-agent cache. Capturing a per-request
|
||||
session here would surface stale/closed sessions on cache hits.
|
||||
|
||||
Args:
|
||||
db_session: Reserved for registry compatibility. Per-call sessions
|
||||
are opened via :data:`async_session_maker` inside the tool body.
|
||||
|
||||
Returns:
|
||||
Configured create_gmail_draft tool
|
||||
"""
|
||||
del db_session # per-call session — see docstring
|
||||
|
||||
@tool
|
||||
async def create_gmail_draft(
|
||||
to: str,
|
||||
subject: str,
|
||||
body: str,
|
||||
cc: str | None = None,
|
||||
bcc: str | None = None,
|
||||
) -> dict[str, Any]:
|
||||
"""Create a draft email in Gmail.
|
||||
|
||||
Use when the user asks to draft, compose, or prepare an email without
|
||||
sending it.
|
||||
|
||||
Args:
|
||||
to: Recipient email address.
|
||||
subject: Email subject line.
|
||||
body: Email body content.
|
||||
cc: Optional CC recipient(s), comma-separated.
|
||||
bcc: Optional BCC recipient(s), comma-separated.
|
||||
|
||||
Returns:
|
||||
Dictionary with:
|
||||
- status: "success", "rejected", or "error"
|
||||
- draft_id: Gmail draft ID (if success)
|
||||
- message: Result message
|
||||
|
||||
IMPORTANT:
|
||||
- If status is "rejected", the user explicitly declined the action.
|
||||
Respond with a brief acknowledgment and do NOT retry or suggest alternatives.
|
||||
- If status is "insufficient_permissions", the connector lacks the required OAuth scope.
|
||||
Inform the user they need to re-authenticate and do NOT retry the action.
|
||||
|
||||
Examples:
|
||||
- "Draft an email to alice@example.com about the meeting"
|
||||
- "Compose a reply to Bob about the project update"
|
||||
"""
|
||||
logger.info(f"create_gmail_draft called: to='{to}', subject='{subject}'")
|
||||
|
||||
if search_space_id is None or user_id is None:
|
||||
return {
|
||||
"status": "error",
|
||||
"message": "Gmail tool not properly configured. Please contact support.",
|
||||
}
|
||||
|
||||
try:
|
||||
async with async_session_maker() as db_session:
|
||||
metadata_service = GmailToolMetadataService(db_session)
|
||||
context = await metadata_service.get_creation_context(
|
||||
search_space_id, user_id
|
||||
)
|
||||
|
||||
if "error" in context:
|
||||
logger.error(
|
||||
f"Failed to fetch creation context: {context['error']}"
|
||||
)
|
||||
return {"status": "error", "message": context["error"]}
|
||||
|
||||
accounts = context.get("accounts", [])
|
||||
if accounts and all(a.get("auth_expired") for a in accounts):
|
||||
logger.warning("All Gmail accounts have expired authentication")
|
||||
return {
|
||||
"status": "auth_error",
|
||||
"message": "All connected Gmail accounts need re-authentication. Please re-authenticate in your connector settings.",
|
||||
"connector_type": "gmail",
|
||||
}
|
||||
|
||||
logger.info(
|
||||
f"Requesting approval for creating Gmail draft: to='{to}', subject='{subject}'"
|
||||
)
|
||||
result = request_approval(
|
||||
action_type="gmail_draft_creation",
|
||||
tool_name="create_gmail_draft",
|
||||
params={
|
||||
"to": to,
|
||||
"subject": subject,
|
||||
"body": body,
|
||||
"cc": cc,
|
||||
"bcc": bcc,
|
||||
"connector_id": None,
|
||||
},
|
||||
context=context,
|
||||
)
|
||||
|
||||
if result.rejected:
|
||||
return {
|
||||
"status": "rejected",
|
||||
"message": "User declined. The draft was not created. Do not ask again or suggest alternatives.",
|
||||
}
|
||||
|
||||
final_to = result.params.get("to", to)
|
||||
final_subject = result.params.get("subject", subject)
|
||||
final_body = result.params.get("body", body)
|
||||
final_cc = result.params.get("cc", cc)
|
||||
final_bcc = result.params.get("bcc", bcc)
|
||||
final_connector_id = result.params.get("connector_id")
|
||||
|
||||
from sqlalchemy.future import select
|
||||
|
||||
from app.db import SearchSourceConnector, SearchSourceConnectorType
|
||||
|
||||
_gmail_types = [
|
||||
SearchSourceConnectorType.GOOGLE_GMAIL_CONNECTOR,
|
||||
SearchSourceConnectorType.COMPOSIO_GMAIL_CONNECTOR,
|
||||
]
|
||||
|
||||
if final_connector_id is not None:
|
||||
result = await db_session.execute(
|
||||
select(SearchSourceConnector).filter(
|
||||
SearchSourceConnector.id == final_connector_id,
|
||||
SearchSourceConnector.search_space_id == search_space_id,
|
||||
SearchSourceConnector.user_id == user_id,
|
||||
SearchSourceConnector.connector_type.in_(_gmail_types),
|
||||
)
|
||||
)
|
||||
connector = result.scalars().first()
|
||||
if not connector:
|
||||
return {
|
||||
"status": "error",
|
||||
"message": "Selected Gmail connector is invalid or has been disconnected.",
|
||||
}
|
||||
actual_connector_id = connector.id
|
||||
else:
|
||||
result = await db_session.execute(
|
||||
select(SearchSourceConnector).filter(
|
||||
SearchSourceConnector.search_space_id == search_space_id,
|
||||
SearchSourceConnector.user_id == user_id,
|
||||
SearchSourceConnector.connector_type.in_(_gmail_types),
|
||||
)
|
||||
)
|
||||
connector = result.scalars().first()
|
||||
if not connector:
|
||||
return {
|
||||
"status": "error",
|
||||
"message": "No Gmail connector found. Please connect Gmail in your workspace settings.",
|
||||
}
|
||||
actual_connector_id = connector.id
|
||||
|
||||
logger.info(
|
||||
f"Creating Gmail draft: to='{final_to}', subject='{final_subject}', connector={actual_connector_id}"
|
||||
)
|
||||
|
||||
is_composio_gmail = (
|
||||
connector.connector_type
|
||||
== SearchSourceConnectorType.COMPOSIO_GMAIL_CONNECTOR
|
||||
)
|
||||
if is_composio_gmail:
|
||||
cca_id = connector.config.get("composio_connected_account_id")
|
||||
if not cca_id:
|
||||
return {
|
||||
"status": "error",
|
||||
"message": "Composio connected account ID not found for this Gmail connector.",
|
||||
}
|
||||
else:
|
||||
from google.oauth2.credentials import Credentials
|
||||
|
||||
from app.config import config
|
||||
from app.utils.oauth_security import TokenEncryption
|
||||
|
||||
config_data = dict(connector.config)
|
||||
token_encrypted = config_data.get("_token_encrypted", False)
|
||||
if token_encrypted and config.SECRET_KEY:
|
||||
token_encryption = TokenEncryption(config.SECRET_KEY)
|
||||
if config_data.get("token"):
|
||||
config_data["token"] = token_encryption.decrypt_token(
|
||||
config_data["token"]
|
||||
)
|
||||
if config_data.get("refresh_token"):
|
||||
config_data["refresh_token"] = (
|
||||
token_encryption.decrypt_token(
|
||||
config_data["refresh_token"]
|
||||
)
|
||||
)
|
||||
if config_data.get("client_secret"):
|
||||
config_data["client_secret"] = (
|
||||
token_encryption.decrypt_token(
|
||||
config_data["client_secret"]
|
||||
)
|
||||
)
|
||||
|
||||
exp = config_data.get("expiry", "")
|
||||
if exp:
|
||||
exp = exp.replace("Z", "")
|
||||
|
||||
creds = Credentials(
|
||||
token=config_data.get("token"),
|
||||
refresh_token=config_data.get("refresh_token"),
|
||||
token_uri=config_data.get("token_uri"),
|
||||
client_id=config_data.get("client_id"),
|
||||
client_secret=config_data.get("client_secret"),
|
||||
scopes=config_data.get("scopes", []),
|
||||
expiry=datetime.fromisoformat(exp) if exp else None,
|
||||
)
|
||||
|
||||
message = MIMEText(final_body)
|
||||
message["to"] = final_to
|
||||
message["subject"] = final_subject
|
||||
if final_cc:
|
||||
message["cc"] = final_cc
|
||||
if final_bcc:
|
||||
message["bcc"] = final_bcc
|
||||
raw = base64.urlsafe_b64encode(message.as_bytes()).decode()
|
||||
|
||||
try:
|
||||
if is_composio_gmail:
|
||||
from app.agents.shared.tools.gmail.composio_helpers import (
|
||||
execute_composio_gmail_tool,
|
||||
split_recipients,
|
||||
)
|
||||
|
||||
created, error = await execute_composio_gmail_tool(
|
||||
connector,
|
||||
user_id,
|
||||
"GMAIL_CREATE_EMAIL_DRAFT",
|
||||
{
|
||||
"user_id": "me",
|
||||
"recipient_email": final_to,
|
||||
"subject": final_subject,
|
||||
"body": final_body,
|
||||
"cc": split_recipients(final_cc),
|
||||
"bcc": split_recipients(final_bcc),
|
||||
"is_html": False,
|
||||
},
|
||||
)
|
||||
if error:
|
||||
raise RuntimeError(error)
|
||||
if not isinstance(created, dict):
|
||||
created = {}
|
||||
else:
|
||||
from googleapiclient.discovery import build
|
||||
|
||||
gmail_service = build("gmail", "v1", credentials=creds)
|
||||
created = await asyncio.get_event_loop().run_in_executor(
|
||||
None,
|
||||
lambda: (
|
||||
gmail_service.users()
|
||||
.drafts()
|
||||
.create(userId="me", body={"message": {"raw": raw}})
|
||||
.execute()
|
||||
),
|
||||
)
|
||||
except Exception as api_err:
|
||||
from googleapiclient.errors import HttpError
|
||||
|
||||
if isinstance(api_err, HttpError) and api_err.resp.status == 403:
|
||||
logger.warning(
|
||||
f"Insufficient permissions for connector {actual_connector_id}: {api_err}"
|
||||
)
|
||||
try:
|
||||
from sqlalchemy.orm.attributes import flag_modified
|
||||
|
||||
_res = await db_session.execute(
|
||||
select(SearchSourceConnector).where(
|
||||
SearchSourceConnector.id == actual_connector_id
|
||||
)
|
||||
)
|
||||
_conn = _res.scalar_one_or_none()
|
||||
if _conn and not _conn.config.get("auth_expired"):
|
||||
_conn.config = {**_conn.config, "auth_expired": True}
|
||||
flag_modified(_conn, "config")
|
||||
await db_session.commit()
|
||||
except Exception:
|
||||
logger.warning(
|
||||
"Failed to persist auth_expired for connector %s",
|
||||
actual_connector_id,
|
||||
exc_info=True,
|
||||
)
|
||||
return {
|
||||
"status": "insufficient_permissions",
|
||||
"connector_id": actual_connector_id,
|
||||
"message": "This Gmail account needs additional permissions. Please re-authenticate in connector settings.",
|
||||
}
|
||||
raise
|
||||
|
||||
logger.info(f"Gmail draft created: id={created.get('id')}")
|
||||
|
||||
kb_message_suffix = ""
|
||||
try:
|
||||
from app.services.gmail import GmailKBSyncService
|
||||
|
||||
kb_service = GmailKBSyncService(db_session)
|
||||
draft_message = created.get("message", {})
|
||||
kb_result = await kb_service.sync_after_create(
|
||||
message_id=draft_message.get("id", ""),
|
||||
thread_id=draft_message.get("threadId", ""),
|
||||
subject=final_subject,
|
||||
sender="me",
|
||||
date_str=datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
|
||||
body_text=final_body,
|
||||
connector_id=actual_connector_id,
|
||||
search_space_id=search_space_id,
|
||||
user_id=user_id,
|
||||
draft_id=created.get("id"),
|
||||
)
|
||||
if kb_result["status"] == "success":
|
||||
kb_message_suffix = (
|
||||
" Your knowledge base has also been updated."
|
||||
)
|
||||
else:
|
||||
kb_message_suffix = " This draft will be added to your knowledge base in the next scheduled sync."
|
||||
except Exception as kb_err:
|
||||
logger.warning(f"KB sync after create failed: {kb_err}")
|
||||
kb_message_suffix = " This draft will be added to your knowledge base in the next scheduled sync."
|
||||
|
||||
return {
|
||||
"status": "success",
|
||||
"draft_id": created.get("id"),
|
||||
"message": f"Successfully created Gmail draft with subject '{final_subject}'.{kb_message_suffix}",
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
from langgraph.errors import GraphInterrupt
|
||||
|
||||
if isinstance(e, GraphInterrupt):
|
||||
raise
|
||||
|
||||
logger.error(f"Error creating Gmail draft: {e}", exc_info=True)
|
||||
return {
|
||||
"status": "error",
|
||||
"message": "Something went wrong while creating the draft. Please try again.",
|
||||
}
|
||||
|
||||
return create_gmail_draft
|
||||
172
surfsense_backend/app/agents/shared/tools/gmail/read_email.py
Normal file
172
surfsense_backend/app/agents/shared/tools/gmail/read_email.py
Normal file
|
|
@ -0,0 +1,172 @@
|
|||
import logging
|
||||
from typing import Any
|
||||
|
||||
from langchain_core.tools import tool
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
from sqlalchemy.future import select
|
||||
|
||||
from app.db import SearchSourceConnector, SearchSourceConnectorType, async_session_maker
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
_GMAIL_TYPES = [
|
||||
SearchSourceConnectorType.GOOGLE_GMAIL_CONNECTOR,
|
||||
SearchSourceConnectorType.COMPOSIO_GMAIL_CONNECTOR,
|
||||
]
|
||||
|
||||
|
||||
def create_read_gmail_email_tool(
|
||||
db_session: AsyncSession | None = None,
|
||||
search_space_id: int | None = None,
|
||||
user_id: str | None = None,
|
||||
):
|
||||
"""
|
||||
Factory function to create the read_gmail_email tool.
|
||||
|
||||
The tool acquires its own short-lived ``AsyncSession`` per call via
|
||||
:data:`async_session_maker` so the closure is safe to share across
|
||||
HTTP requests by the compiled-agent cache. Capturing a per-request
|
||||
session here would surface stale/closed sessions on cache hits.
|
||||
|
||||
Args:
|
||||
db_session: Reserved for registry compatibility. Per-call sessions
|
||||
are opened via :data:`async_session_maker` inside the tool body.
|
||||
|
||||
Returns:
|
||||
Configured read_gmail_email tool
|
||||
"""
|
||||
del db_session # per-call session — see docstring
|
||||
|
||||
@tool
|
||||
async def read_gmail_email(message_id: str) -> dict[str, Any]:
|
||||
"""Read the full content of a specific Gmail email by its message ID.
|
||||
|
||||
Use after search_gmail to get the complete body of an email.
|
||||
|
||||
Args:
|
||||
message_id: The Gmail message ID (from search_gmail results).
|
||||
|
||||
Returns:
|
||||
Dictionary with status and the full email content formatted as markdown.
|
||||
"""
|
||||
if search_space_id is None or user_id is None:
|
||||
return {"status": "error", "message": "Gmail tool not properly configured."}
|
||||
|
||||
try:
|
||||
async with async_session_maker() as db_session:
|
||||
result = await db_session.execute(
|
||||
select(SearchSourceConnector).filter(
|
||||
SearchSourceConnector.search_space_id == search_space_id,
|
||||
SearchSourceConnector.user_id == user_id,
|
||||
SearchSourceConnector.connector_type.in_(_GMAIL_TYPES),
|
||||
)
|
||||
)
|
||||
connector = result.scalars().first()
|
||||
if not connector:
|
||||
return {
|
||||
"status": "error",
|
||||
"message": "No Gmail connector found. Please connect Gmail in your workspace settings.",
|
||||
}
|
||||
|
||||
if (
|
||||
connector.connector_type
|
||||
== SearchSourceConnectorType.COMPOSIO_GMAIL_CONNECTOR
|
||||
):
|
||||
cca_id = connector.config.get("composio_connected_account_id")
|
||||
if not cca_id:
|
||||
return {
|
||||
"status": "error",
|
||||
"message": "Composio connected account ID not found.",
|
||||
}
|
||||
|
||||
from app.agents.shared.tools.gmail.search_emails import (
|
||||
_format_gmail_summary,
|
||||
)
|
||||
from app.services.composio_service import ComposioService
|
||||
|
||||
service = ComposioService()
|
||||
detail, error = await service.get_gmail_message_detail(
|
||||
connected_account_id=cca_id,
|
||||
entity_id=f"surfsense_{user_id}",
|
||||
message_id=message_id,
|
||||
)
|
||||
if error:
|
||||
return {"status": "error", "message": error}
|
||||
if not detail:
|
||||
return {
|
||||
"status": "not_found",
|
||||
"message": f"Email with ID '{message_id}' not found.",
|
||||
}
|
||||
|
||||
summary = _format_gmail_summary(detail)
|
||||
content = (
|
||||
f"# {summary['subject']}\n\n"
|
||||
f"**From:** {summary['from']}\n"
|
||||
f"**To:** {summary['to']}\n"
|
||||
f"**Date:** {summary['date']}\n\n"
|
||||
f"## Message Content\n\n"
|
||||
f"{detail.get('messageText') or detail.get('snippet') or ''}\n\n"
|
||||
f"## Message Details\n\n"
|
||||
f"- **Message ID:** {summary['message_id']}\n"
|
||||
f"- **Thread ID:** {summary['thread_id']}\n"
|
||||
)
|
||||
return {
|
||||
"status": "success",
|
||||
"message_id": summary["message_id"] or message_id,
|
||||
"content": content,
|
||||
}
|
||||
|
||||
from app.agents.shared.tools.gmail.search_emails import (
|
||||
_build_credentials,
|
||||
)
|
||||
|
||||
creds = _build_credentials(connector)
|
||||
|
||||
from app.connectors.google_gmail_connector import GoogleGmailConnector
|
||||
|
||||
gmail = GoogleGmailConnector(
|
||||
credentials=creds,
|
||||
session=db_session,
|
||||
user_id=user_id,
|
||||
connector_id=connector.id,
|
||||
)
|
||||
|
||||
detail, error = await gmail.get_message_details(message_id)
|
||||
if error:
|
||||
if (
|
||||
"re-authenticate" in error.lower()
|
||||
or "authentication failed" in error.lower()
|
||||
):
|
||||
return {
|
||||
"status": "auth_error",
|
||||
"message": error,
|
||||
"connector_type": "gmail",
|
||||
}
|
||||
return {"status": "error", "message": error}
|
||||
|
||||
if not detail:
|
||||
return {
|
||||
"status": "not_found",
|
||||
"message": f"Email with ID '{message_id}' not found.",
|
||||
}
|
||||
|
||||
content = gmail.format_message_to_markdown(detail)
|
||||
|
||||
return {
|
||||
"status": "success",
|
||||
"message_id": message_id,
|
||||
"content": content,
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
from langgraph.errors import GraphInterrupt
|
||||
|
||||
if isinstance(e, GraphInterrupt):
|
||||
raise
|
||||
logger.error("Error reading Gmail email: %s", e, exc_info=True)
|
||||
return {
|
||||
"status": "error",
|
||||
"message": "Failed to read email. Please try again.",
|
||||
}
|
||||
|
||||
return read_gmail_email
|
||||
260
surfsense_backend/app/agents/shared/tools/gmail/search_emails.py
Normal file
260
surfsense_backend/app/agents/shared/tools/gmail/search_emails.py
Normal file
|
|
@ -0,0 +1,260 @@
|
|||
import logging
|
||||
from datetime import datetime
|
||||
from typing import Any
|
||||
|
||||
from langchain_core.tools import tool
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
from sqlalchemy.future import select
|
||||
|
||||
from app.db import SearchSourceConnector, SearchSourceConnectorType, async_session_maker
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
_GMAIL_TYPES = [
|
||||
SearchSourceConnectorType.GOOGLE_GMAIL_CONNECTOR,
|
||||
SearchSourceConnectorType.COMPOSIO_GMAIL_CONNECTOR,
|
||||
]
|
||||
|
||||
_token_encryption_cache: object | None = None
|
||||
|
||||
|
||||
def _get_token_encryption():
|
||||
global _token_encryption_cache
|
||||
if _token_encryption_cache is None:
|
||||
from app.config import config
|
||||
from app.utils.oauth_security import TokenEncryption
|
||||
|
||||
if not config.SECRET_KEY:
|
||||
raise RuntimeError("SECRET_KEY not configured for token decryption.")
|
||||
_token_encryption_cache = TokenEncryption(config.SECRET_KEY)
|
||||
return _token_encryption_cache
|
||||
|
||||
|
||||
def _build_credentials(connector: SearchSourceConnector):
|
||||
"""Build Google OAuth Credentials from a connector's stored config.
|
||||
|
||||
Handles both native OAuth connectors (with encrypted tokens) and
|
||||
Composio-backed connectors. Shared by Gmail and Calendar tools.
|
||||
"""
|
||||
from app.utils.google_credentials import COMPOSIO_GOOGLE_CONNECTOR_TYPES
|
||||
|
||||
if connector.connector_type in COMPOSIO_GOOGLE_CONNECTOR_TYPES:
|
||||
raise ValueError("Composio connectors must use Composio tool execution.")
|
||||
|
||||
from google.oauth2.credentials import Credentials
|
||||
|
||||
cfg = dict(connector.config)
|
||||
if cfg.get("_token_encrypted"):
|
||||
enc = _get_token_encryption()
|
||||
for key in ("token", "refresh_token", "client_secret"):
|
||||
if cfg.get(key):
|
||||
cfg[key] = enc.decrypt_token(cfg[key])
|
||||
|
||||
exp = (cfg.get("expiry") or "").replace("Z", "")
|
||||
return Credentials(
|
||||
token=cfg.get("token"),
|
||||
refresh_token=cfg.get("refresh_token"),
|
||||
token_uri=cfg.get("token_uri"),
|
||||
client_id=cfg.get("client_id"),
|
||||
client_secret=cfg.get("client_secret"),
|
||||
scopes=cfg.get("scopes", []),
|
||||
expiry=datetime.fromisoformat(exp) if exp else None,
|
||||
)
|
||||
|
||||
|
||||
def _gmail_headers(message: dict[str, Any]) -> dict[str, str]:
|
||||
headers = message.get("payload", {}).get("headers", [])
|
||||
return {
|
||||
header.get("name", "").lower(): header.get("value", "")
|
||||
for header in headers
|
||||
if isinstance(header, dict)
|
||||
}
|
||||
|
||||
|
||||
def _format_gmail_summary(message: dict[str, Any]) -> dict[str, Any]:
|
||||
headers = _gmail_headers(message)
|
||||
return {
|
||||
"message_id": message.get("id") or message.get("messageId"),
|
||||
"thread_id": message.get("threadId"),
|
||||
"subject": message.get("subject") or headers.get("subject", "No Subject"),
|
||||
"from": message.get("sender") or headers.get("from", "Unknown"),
|
||||
"to": message.get("to") or headers.get("to", ""),
|
||||
"date": message.get("messageTimestamp") or headers.get("date", ""),
|
||||
"snippet": message.get("snippet") or message.get("messageText", "")[:300],
|
||||
"labels": message.get("labelIds", []),
|
||||
}
|
||||
|
||||
|
||||
async def _search_composio_gmail(
|
||||
connector: SearchSourceConnector,
|
||||
user_id: str,
|
||||
query: str,
|
||||
max_results: int,
|
||||
) -> dict[str, Any]:
|
||||
cca_id = connector.config.get("composio_connected_account_id")
|
||||
if not cca_id:
|
||||
return {
|
||||
"status": "error",
|
||||
"message": "Composio connected account ID not found.",
|
||||
}
|
||||
|
||||
from app.services.composio_service import ComposioService
|
||||
|
||||
service = ComposioService()
|
||||
messages, _next_token, _estimate, error = await service.get_gmail_messages(
|
||||
connected_account_id=cca_id,
|
||||
entity_id=f"surfsense_{user_id}",
|
||||
query=query,
|
||||
max_results=max_results,
|
||||
)
|
||||
if error:
|
||||
return {"status": "error", "message": error}
|
||||
|
||||
emails = [_format_gmail_summary(message) for message in messages]
|
||||
return {
|
||||
"status": "success",
|
||||
"emails": emails,
|
||||
"total": len(emails),
|
||||
"message": "No emails found." if not emails else None,
|
||||
}
|
||||
|
||||
|
||||
def create_search_gmail_tool(
|
||||
db_session: AsyncSession | None = None,
|
||||
search_space_id: int | None = None,
|
||||
user_id: str | None = None,
|
||||
):
|
||||
"""
|
||||
Factory function to create the search_gmail tool.
|
||||
|
||||
The tool acquires its own short-lived ``AsyncSession`` per call via
|
||||
:data:`async_session_maker` so the closure is safe to share across
|
||||
HTTP requests by the compiled-agent cache. Capturing a per-request
|
||||
session here would surface stale/closed sessions on cache hits.
|
||||
|
||||
Args:
|
||||
db_session: Reserved for registry compatibility. Per-call sessions
|
||||
are opened via :data:`async_session_maker` inside the tool body.
|
||||
|
||||
Returns:
|
||||
Configured search_gmail tool
|
||||
"""
|
||||
del db_session # per-call session — see docstring
|
||||
|
||||
@tool
|
||||
async def search_gmail(
|
||||
query: str,
|
||||
max_results: int = 10,
|
||||
) -> dict[str, Any]:
|
||||
"""Search emails in the user's Gmail inbox using Gmail search syntax.
|
||||
|
||||
Args:
|
||||
query: Gmail search query, same syntax as the Gmail search bar.
|
||||
Examples: "from:alice@example.com", "subject:meeting",
|
||||
"is:unread", "after:2024/01/01 before:2024/02/01",
|
||||
"has:attachment", "in:sent".
|
||||
max_results: Number of emails to return (default 10, max 20).
|
||||
|
||||
Returns:
|
||||
Dictionary with status and a list of email summaries including
|
||||
message_id, subject, from, date, snippet.
|
||||
"""
|
||||
if search_space_id is None or user_id is None:
|
||||
return {"status": "error", "message": "Gmail tool not properly configured."}
|
||||
|
||||
max_results = min(max_results, 20)
|
||||
|
||||
try:
|
||||
async with async_session_maker() as db_session:
|
||||
result = await db_session.execute(
|
||||
select(SearchSourceConnector).filter(
|
||||
SearchSourceConnector.search_space_id == search_space_id,
|
||||
SearchSourceConnector.user_id == user_id,
|
||||
SearchSourceConnector.connector_type.in_(_GMAIL_TYPES),
|
||||
)
|
||||
)
|
||||
connector = result.scalars().first()
|
||||
if not connector:
|
||||
return {
|
||||
"status": "error",
|
||||
"message": "No Gmail connector found. Please connect Gmail in your workspace settings.",
|
||||
}
|
||||
|
||||
if (
|
||||
connector.connector_type
|
||||
== SearchSourceConnectorType.COMPOSIO_GMAIL_CONNECTOR
|
||||
):
|
||||
return await _search_composio_gmail(
|
||||
connector, str(user_id), query, max_results
|
||||
)
|
||||
|
||||
creds = _build_credentials(connector)
|
||||
|
||||
from app.connectors.google_gmail_connector import GoogleGmailConnector
|
||||
|
||||
gmail = GoogleGmailConnector(
|
||||
credentials=creds,
|
||||
session=db_session,
|
||||
user_id=user_id,
|
||||
connector_id=connector.id,
|
||||
)
|
||||
|
||||
messages_list, error = await gmail.get_messages_list(
|
||||
max_results=max_results, query=query
|
||||
)
|
||||
if error:
|
||||
if (
|
||||
"re-authenticate" in error.lower()
|
||||
or "authentication failed" in error.lower()
|
||||
):
|
||||
return {
|
||||
"status": "auth_error",
|
||||
"message": error,
|
||||
"connector_type": "gmail",
|
||||
}
|
||||
return {"status": "error", "message": error}
|
||||
|
||||
if not messages_list:
|
||||
return {
|
||||
"status": "success",
|
||||
"emails": [],
|
||||
"total": 0,
|
||||
"message": "No emails found.",
|
||||
}
|
||||
|
||||
emails = []
|
||||
for msg in messages_list:
|
||||
detail, err = await gmail.get_message_details(msg["id"])
|
||||
if err:
|
||||
continue
|
||||
headers = {
|
||||
h["name"].lower(): h["value"]
|
||||
for h in detail.get("payload", {}).get("headers", [])
|
||||
}
|
||||
emails.append(
|
||||
{
|
||||
"message_id": detail.get("id"),
|
||||
"thread_id": detail.get("threadId"),
|
||||
"subject": headers.get("subject", "No Subject"),
|
||||
"from": headers.get("from", "Unknown"),
|
||||
"to": headers.get("to", ""),
|
||||
"date": headers.get("date", ""),
|
||||
"snippet": detail.get("snippet", ""),
|
||||
"labels": detail.get("labelIds", []),
|
||||
}
|
||||
)
|
||||
|
||||
return {"status": "success", "emails": emails, "total": len(emails)}
|
||||
|
||||
except Exception as e:
|
||||
from langgraph.errors import GraphInterrupt
|
||||
|
||||
if isinstance(e, GraphInterrupt):
|
||||
raise
|
||||
logger.error("Error searching Gmail: %s", e, exc_info=True)
|
||||
return {
|
||||
"status": "error",
|
||||
"message": "Failed to search Gmail. Please try again.",
|
||||
}
|
||||
|
||||
return search_gmail
|
||||
363
surfsense_backend/app/agents/shared/tools/gmail/send_email.py
Normal file
363
surfsense_backend/app/agents/shared/tools/gmail/send_email.py
Normal file
|
|
@ -0,0 +1,363 @@
|
|||
import asyncio
|
||||
import base64
|
||||
import logging
|
||||
from datetime import datetime
|
||||
from email.mime.text import MIMEText
|
||||
from typing import Any
|
||||
|
||||
from langchain_core.tools import tool
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from app.agents.shared.tools.hitl import request_approval
|
||||
from app.db import async_session_maker
|
||||
from app.services.gmail import GmailToolMetadataService
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def create_send_gmail_email_tool(
|
||||
db_session: AsyncSession | None = None,
|
||||
search_space_id: int | None = None,
|
||||
user_id: str | None = None,
|
||||
):
|
||||
"""
|
||||
Factory function to create the send_gmail_email tool.
|
||||
|
||||
The tool acquires its own short-lived ``AsyncSession`` per call via
|
||||
:data:`async_session_maker` so the closure is safe to share across
|
||||
HTTP requests by the compiled-agent cache. Capturing a per-request
|
||||
session here would surface stale/closed sessions on cache hits.
|
||||
|
||||
Args:
|
||||
db_session: Reserved for registry compatibility. Per-call sessions
|
||||
are opened via :data:`async_session_maker` inside the tool body.
|
||||
|
||||
Returns:
|
||||
Configured send_gmail_email tool
|
||||
"""
|
||||
del db_session # per-call session — see docstring
|
||||
|
||||
@tool
|
||||
async def send_gmail_email(
|
||||
to: str,
|
||||
subject: str,
|
||||
body: str,
|
||||
cc: str | None = None,
|
||||
bcc: str | None = None,
|
||||
) -> dict[str, Any]:
|
||||
"""Send an email via Gmail.
|
||||
|
||||
Use when the user explicitly asks to send an email. This sends the
|
||||
email immediately - it cannot be unsent.
|
||||
|
||||
Args:
|
||||
to: Recipient email address.
|
||||
subject: Email subject line.
|
||||
body: Email body content.
|
||||
cc: Optional CC recipient(s), comma-separated.
|
||||
bcc: Optional BCC recipient(s), comma-separated.
|
||||
|
||||
Returns:
|
||||
Dictionary with:
|
||||
- status: "success", "rejected", or "error"
|
||||
- message_id: Gmail message ID (if success)
|
||||
- thread_id: Gmail thread ID (if success)
|
||||
- message: Result message
|
||||
|
||||
IMPORTANT:
|
||||
- If status is "rejected", the user explicitly declined the action.
|
||||
Respond with a brief acknowledgment and do NOT retry or suggest alternatives.
|
||||
- If status is "insufficient_permissions", the connector lacks the required OAuth scope.
|
||||
Inform the user they need to re-authenticate and do NOT retry the action.
|
||||
|
||||
Examples:
|
||||
- "Send an email to alice@example.com about the meeting"
|
||||
- "Email Bob the project update"
|
||||
"""
|
||||
logger.info(f"send_gmail_email called: to='{to}', subject='{subject}'")
|
||||
|
||||
if search_space_id is None or user_id is None:
|
||||
return {
|
||||
"status": "error",
|
||||
"message": "Gmail tool not properly configured. Please contact support.",
|
||||
}
|
||||
|
||||
try:
|
||||
async with async_session_maker() as db_session:
|
||||
metadata_service = GmailToolMetadataService(db_session)
|
||||
context = await metadata_service.get_creation_context(
|
||||
search_space_id, user_id
|
||||
)
|
||||
|
||||
if "error" in context:
|
||||
logger.error(
|
||||
f"Failed to fetch creation context: {context['error']}"
|
||||
)
|
||||
return {"status": "error", "message": context["error"]}
|
||||
|
||||
accounts = context.get("accounts", [])
|
||||
if accounts and all(a.get("auth_expired") for a in accounts):
|
||||
logger.warning("All Gmail accounts have expired authentication")
|
||||
return {
|
||||
"status": "auth_error",
|
||||
"message": "All connected Gmail accounts need re-authentication. Please re-authenticate in your connector settings.",
|
||||
"connector_type": "gmail",
|
||||
}
|
||||
|
||||
logger.info(
|
||||
f"Requesting approval for sending Gmail email: to='{to}', subject='{subject}'"
|
||||
)
|
||||
result = request_approval(
|
||||
action_type="gmail_email_send",
|
||||
tool_name="send_gmail_email",
|
||||
params={
|
||||
"to": to,
|
||||
"subject": subject,
|
||||
"body": body,
|
||||
"cc": cc,
|
||||
"bcc": bcc,
|
||||
"connector_id": None,
|
||||
},
|
||||
context=context,
|
||||
)
|
||||
|
||||
if result.rejected:
|
||||
return {
|
||||
"status": "rejected",
|
||||
"message": "User declined. The email was not sent. Do not ask again or suggest alternatives.",
|
||||
}
|
||||
|
||||
final_to = result.params.get("to", to)
|
||||
final_subject = result.params.get("subject", subject)
|
||||
final_body = result.params.get("body", body)
|
||||
final_cc = result.params.get("cc", cc)
|
||||
final_bcc = result.params.get("bcc", bcc)
|
||||
final_connector_id = result.params.get("connector_id")
|
||||
|
||||
from sqlalchemy.future import select
|
||||
|
||||
from app.db import SearchSourceConnector, SearchSourceConnectorType
|
||||
|
||||
_gmail_types = [
|
||||
SearchSourceConnectorType.GOOGLE_GMAIL_CONNECTOR,
|
||||
SearchSourceConnectorType.COMPOSIO_GMAIL_CONNECTOR,
|
||||
]
|
||||
|
||||
if final_connector_id is not None:
|
||||
result = await db_session.execute(
|
||||
select(SearchSourceConnector).filter(
|
||||
SearchSourceConnector.id == final_connector_id,
|
||||
SearchSourceConnector.search_space_id == search_space_id,
|
||||
SearchSourceConnector.user_id == user_id,
|
||||
SearchSourceConnector.connector_type.in_(_gmail_types),
|
||||
)
|
||||
)
|
||||
connector = result.scalars().first()
|
||||
if not connector:
|
||||
return {
|
||||
"status": "error",
|
||||
"message": "Selected Gmail connector is invalid or has been disconnected.",
|
||||
}
|
||||
actual_connector_id = connector.id
|
||||
else:
|
||||
result = await db_session.execute(
|
||||
select(SearchSourceConnector).filter(
|
||||
SearchSourceConnector.search_space_id == search_space_id,
|
||||
SearchSourceConnector.user_id == user_id,
|
||||
SearchSourceConnector.connector_type.in_(_gmail_types),
|
||||
)
|
||||
)
|
||||
connector = result.scalars().first()
|
||||
if not connector:
|
||||
return {
|
||||
"status": "error",
|
||||
"message": "No Gmail connector found. Please connect Gmail in your workspace settings.",
|
||||
}
|
||||
actual_connector_id = connector.id
|
||||
|
||||
logger.info(
|
||||
f"Sending Gmail email: to='{final_to}', subject='{final_subject}', connector={actual_connector_id}"
|
||||
)
|
||||
|
||||
is_composio_gmail = (
|
||||
connector.connector_type
|
||||
== SearchSourceConnectorType.COMPOSIO_GMAIL_CONNECTOR
|
||||
)
|
||||
if is_composio_gmail:
|
||||
cca_id = connector.config.get("composio_connected_account_id")
|
||||
if not cca_id:
|
||||
return {
|
||||
"status": "error",
|
||||
"message": "Composio connected account ID not found for this Gmail connector.",
|
||||
}
|
||||
else:
|
||||
from google.oauth2.credentials import Credentials
|
||||
|
||||
from app.config import config
|
||||
from app.utils.oauth_security import TokenEncryption
|
||||
|
||||
config_data = dict(connector.config)
|
||||
token_encrypted = config_data.get("_token_encrypted", False)
|
||||
if token_encrypted and config.SECRET_KEY:
|
||||
token_encryption = TokenEncryption(config.SECRET_KEY)
|
||||
if config_data.get("token"):
|
||||
config_data["token"] = token_encryption.decrypt_token(
|
||||
config_data["token"]
|
||||
)
|
||||
if config_data.get("refresh_token"):
|
||||
config_data["refresh_token"] = (
|
||||
token_encryption.decrypt_token(
|
||||
config_data["refresh_token"]
|
||||
)
|
||||
)
|
||||
if config_data.get("client_secret"):
|
||||
config_data["client_secret"] = (
|
||||
token_encryption.decrypt_token(
|
||||
config_data["client_secret"]
|
||||
)
|
||||
)
|
||||
|
||||
exp = config_data.get("expiry", "")
|
||||
if exp:
|
||||
exp = exp.replace("Z", "")
|
||||
|
||||
creds = Credentials(
|
||||
token=config_data.get("token"),
|
||||
refresh_token=config_data.get("refresh_token"),
|
||||
token_uri=config_data.get("token_uri"),
|
||||
client_id=config_data.get("client_id"),
|
||||
client_secret=config_data.get("client_secret"),
|
||||
scopes=config_data.get("scopes", []),
|
||||
expiry=datetime.fromisoformat(exp) if exp else None,
|
||||
)
|
||||
|
||||
message = MIMEText(final_body)
|
||||
message["to"] = final_to
|
||||
message["subject"] = final_subject
|
||||
if final_cc:
|
||||
message["cc"] = final_cc
|
||||
if final_bcc:
|
||||
message["bcc"] = final_bcc
|
||||
raw = base64.urlsafe_b64encode(message.as_bytes()).decode()
|
||||
|
||||
try:
|
||||
if is_composio_gmail:
|
||||
from app.agents.shared.tools.gmail.composio_helpers import (
|
||||
execute_composio_gmail_tool,
|
||||
split_recipients,
|
||||
)
|
||||
|
||||
sent, error = await execute_composio_gmail_tool(
|
||||
connector,
|
||||
user_id,
|
||||
"GMAIL_SEND_EMAIL",
|
||||
{
|
||||
"user_id": "me",
|
||||
"recipient_email": final_to,
|
||||
"subject": final_subject,
|
||||
"body": final_body,
|
||||
"cc": split_recipients(final_cc),
|
||||
"bcc": split_recipients(final_bcc),
|
||||
"is_html": False,
|
||||
},
|
||||
)
|
||||
if error:
|
||||
raise RuntimeError(error)
|
||||
if not isinstance(sent, dict):
|
||||
sent = {}
|
||||
else:
|
||||
from googleapiclient.discovery import build
|
||||
|
||||
gmail_service = build("gmail", "v1", credentials=creds)
|
||||
sent = await asyncio.get_event_loop().run_in_executor(
|
||||
None,
|
||||
lambda: (
|
||||
gmail_service.users()
|
||||
.messages()
|
||||
.send(userId="me", body={"raw": raw})
|
||||
.execute()
|
||||
),
|
||||
)
|
||||
except Exception as api_err:
|
||||
from googleapiclient.errors import HttpError
|
||||
|
||||
if isinstance(api_err, HttpError) and api_err.resp.status == 403:
|
||||
logger.warning(
|
||||
f"Insufficient permissions for connector {actual_connector_id}: {api_err}"
|
||||
)
|
||||
try:
|
||||
from sqlalchemy.orm.attributes import flag_modified
|
||||
|
||||
_res = await db_session.execute(
|
||||
select(SearchSourceConnector).where(
|
||||
SearchSourceConnector.id == actual_connector_id
|
||||
)
|
||||
)
|
||||
_conn = _res.scalar_one_or_none()
|
||||
if _conn and not _conn.config.get("auth_expired"):
|
||||
_conn.config = {**_conn.config, "auth_expired": True}
|
||||
flag_modified(_conn, "config")
|
||||
await db_session.commit()
|
||||
except Exception:
|
||||
logger.warning(
|
||||
"Failed to persist auth_expired for connector %s",
|
||||
actual_connector_id,
|
||||
exc_info=True,
|
||||
)
|
||||
return {
|
||||
"status": "insufficient_permissions",
|
||||
"connector_id": actual_connector_id,
|
||||
"message": "This Gmail account needs additional permissions. Please re-authenticate in connector settings.",
|
||||
}
|
||||
raise
|
||||
|
||||
logger.info(
|
||||
f"Gmail email sent: id={sent.get('id')}, threadId={sent.get('threadId')}"
|
||||
)
|
||||
|
||||
kb_message_suffix = ""
|
||||
try:
|
||||
from app.services.gmail import GmailKBSyncService
|
||||
|
||||
kb_service = GmailKBSyncService(db_session)
|
||||
kb_result = await kb_service.sync_after_create(
|
||||
message_id=sent.get("id", ""),
|
||||
thread_id=sent.get("threadId", ""),
|
||||
subject=final_subject,
|
||||
sender="me",
|
||||
date_str=datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
|
||||
body_text=final_body,
|
||||
connector_id=actual_connector_id,
|
||||
search_space_id=search_space_id,
|
||||
user_id=user_id,
|
||||
)
|
||||
if kb_result["status"] == "success":
|
||||
kb_message_suffix = (
|
||||
" Your knowledge base has also been updated."
|
||||
)
|
||||
else:
|
||||
kb_message_suffix = " This email will be added to your knowledge base in the next scheduled sync."
|
||||
except Exception as kb_err:
|
||||
logger.warning(f"KB sync after send failed: {kb_err}")
|
||||
kb_message_suffix = " This email will be added to your knowledge base in the next scheduled sync."
|
||||
|
||||
return {
|
||||
"status": "success",
|
||||
"message_id": sent.get("id"),
|
||||
"thread_id": sent.get("threadId"),
|
||||
"message": f"Successfully sent email to '{final_to}' with subject '{final_subject}'.{kb_message_suffix}",
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
from langgraph.errors import GraphInterrupt
|
||||
|
||||
if isinstance(e, GraphInterrupt):
|
||||
raise
|
||||
|
||||
logger.error(f"Error sending Gmail email: {e}", exc_info=True)
|
||||
return {
|
||||
"status": "error",
|
||||
"message": "Something went wrong while sending the email. Please try again.",
|
||||
}
|
||||
|
||||
return send_gmail_email
|
||||
344
surfsense_backend/app/agents/shared/tools/gmail/trash_email.py
Normal file
344
surfsense_backend/app/agents/shared/tools/gmail/trash_email.py
Normal file
|
|
@ -0,0 +1,344 @@
|
|||
import asyncio
|
||||
import logging
|
||||
from datetime import datetime
|
||||
from typing import Any
|
||||
|
||||
from langchain_core.tools import tool
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from app.agents.shared.tools.hitl import request_approval
|
||||
from app.db import async_session_maker
|
||||
from app.services.gmail import GmailToolMetadataService
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def create_trash_gmail_email_tool(
|
||||
db_session: AsyncSession | None = None,
|
||||
search_space_id: int | None = None,
|
||||
user_id: str | None = None,
|
||||
):
|
||||
"""
|
||||
Factory function to create the trash_gmail_email tool.
|
||||
|
||||
The tool acquires its own short-lived ``AsyncSession`` per call via
|
||||
:data:`async_session_maker` so the closure is safe to share across
|
||||
HTTP requests by the compiled-agent cache. Capturing a per-request
|
||||
session here would surface stale/closed sessions on cache hits.
|
||||
|
||||
Args:
|
||||
db_session: Reserved for registry compatibility. Per-call sessions
|
||||
are opened via :data:`async_session_maker` inside the tool body.
|
||||
|
||||
Returns:
|
||||
Configured trash_gmail_email tool
|
||||
"""
|
||||
del db_session # per-call session — see docstring
|
||||
|
||||
@tool
|
||||
async def trash_gmail_email(
|
||||
email_subject_or_id: str,
|
||||
delete_from_kb: bool = False,
|
||||
) -> dict[str, Any]:
|
||||
"""Move an email or draft to trash in Gmail.
|
||||
|
||||
Use when the user asks to delete, remove, or trash an email or draft.
|
||||
|
||||
Args:
|
||||
email_subject_or_id: The exact subject line or message ID of the
|
||||
email to trash (as it appears in the inbox).
|
||||
delete_from_kb: Whether to also remove the email from the knowledge base.
|
||||
Default is False.
|
||||
Set to True to remove from both Gmail and knowledge base.
|
||||
|
||||
Returns:
|
||||
Dictionary with:
|
||||
- status: "success", "rejected", "not_found", or "error"
|
||||
- message_id: Gmail message ID (if success)
|
||||
- deleted_from_kb: whether the document was removed from the knowledge base
|
||||
- message: Result message
|
||||
|
||||
IMPORTANT:
|
||||
- If status is "rejected", the user explicitly declined. Respond with a brief
|
||||
acknowledgment and do NOT retry or suggest alternatives.
|
||||
- If status is "not_found", relay the exact message to the user and ask them
|
||||
to verify the email subject or check if it has been indexed.
|
||||
- If status is "insufficient_permissions", the connector lacks the required OAuth scope.
|
||||
Inform the user they need to re-authenticate and do NOT retry this tool.
|
||||
Examples:
|
||||
- "Delete the email about 'Meeting Cancelled'"
|
||||
- "Trash the email from Bob about the project"
|
||||
"""
|
||||
logger.info(
|
||||
f"trash_gmail_email called: email_subject_or_id='{email_subject_or_id}', delete_from_kb={delete_from_kb}"
|
||||
)
|
||||
|
||||
if search_space_id is None or user_id is None:
|
||||
return {
|
||||
"status": "error",
|
||||
"message": "Gmail tool not properly configured. Please contact support.",
|
||||
}
|
||||
|
||||
try:
|
||||
async with async_session_maker() as db_session:
|
||||
metadata_service = GmailToolMetadataService(db_session)
|
||||
context = await metadata_service.get_trash_context(
|
||||
search_space_id, user_id, email_subject_or_id
|
||||
)
|
||||
|
||||
if "error" in context:
|
||||
error_msg = context["error"]
|
||||
if "not found" in error_msg.lower():
|
||||
logger.warning(f"Email not found: {error_msg}")
|
||||
return {"status": "not_found", "message": error_msg}
|
||||
logger.error(f"Failed to fetch trash context: {error_msg}")
|
||||
return {"status": "error", "message": error_msg}
|
||||
|
||||
account = context.get("account", {})
|
||||
if account.get("auth_expired"):
|
||||
logger.warning(
|
||||
"Gmail account %s has expired authentication",
|
||||
account.get("id"),
|
||||
)
|
||||
return {
|
||||
"status": "auth_error",
|
||||
"message": "The Gmail account for this email needs re-authentication. Please re-authenticate in your connector settings.",
|
||||
"connector_type": "gmail",
|
||||
}
|
||||
|
||||
email = context["email"]
|
||||
message_id = email["message_id"]
|
||||
document_id = email.get("document_id")
|
||||
connector_id_from_context = context["account"]["id"]
|
||||
|
||||
if not message_id:
|
||||
return {
|
||||
"status": "error",
|
||||
"message": "Message ID is missing from the indexed document. Please re-index the email and try again.",
|
||||
}
|
||||
|
||||
logger.info(
|
||||
f"Requesting approval for trashing Gmail email: '{email_subject_or_id}' (message_id={message_id}, delete_from_kb={delete_from_kb})"
|
||||
)
|
||||
result = request_approval(
|
||||
action_type="gmail_email_trash",
|
||||
tool_name="trash_gmail_email",
|
||||
params={
|
||||
"message_id": message_id,
|
||||
"connector_id": connector_id_from_context,
|
||||
"delete_from_kb": delete_from_kb,
|
||||
},
|
||||
context=context,
|
||||
)
|
||||
|
||||
if result.rejected:
|
||||
return {
|
||||
"status": "rejected",
|
||||
"message": "User declined. The email was not trashed. Do not ask again or suggest alternatives.",
|
||||
}
|
||||
|
||||
final_message_id = result.params.get("message_id", message_id)
|
||||
final_connector_id = result.params.get(
|
||||
"connector_id", connector_id_from_context
|
||||
)
|
||||
final_delete_from_kb = result.params.get(
|
||||
"delete_from_kb", delete_from_kb
|
||||
)
|
||||
|
||||
if not final_connector_id:
|
||||
return {
|
||||
"status": "error",
|
||||
"message": "No connector found for this email.",
|
||||
}
|
||||
|
||||
from sqlalchemy.future import select
|
||||
|
||||
from app.db import SearchSourceConnector, SearchSourceConnectorType
|
||||
|
||||
_gmail_types = [
|
||||
SearchSourceConnectorType.GOOGLE_GMAIL_CONNECTOR,
|
||||
SearchSourceConnectorType.COMPOSIO_GMAIL_CONNECTOR,
|
||||
]
|
||||
|
||||
result = await db_session.execute(
|
||||
select(SearchSourceConnector).filter(
|
||||
SearchSourceConnector.id == final_connector_id,
|
||||
SearchSourceConnector.search_space_id == search_space_id,
|
||||
SearchSourceConnector.user_id == user_id,
|
||||
SearchSourceConnector.connector_type.in_(_gmail_types),
|
||||
)
|
||||
)
|
||||
connector = result.scalars().first()
|
||||
if not connector:
|
||||
return {
|
||||
"status": "error",
|
||||
"message": "Selected Gmail connector is invalid or has been disconnected.",
|
||||
}
|
||||
|
||||
logger.info(
|
||||
f"Trashing Gmail email: message_id='{final_message_id}', connector={final_connector_id}"
|
||||
)
|
||||
|
||||
is_composio_gmail = (
|
||||
connector.connector_type
|
||||
== SearchSourceConnectorType.COMPOSIO_GMAIL_CONNECTOR
|
||||
)
|
||||
if is_composio_gmail:
|
||||
cca_id = connector.config.get("composio_connected_account_id")
|
||||
if not cca_id:
|
||||
return {
|
||||
"status": "error",
|
||||
"message": "Composio connected account ID not found for this Gmail connector.",
|
||||
}
|
||||
else:
|
||||
from google.oauth2.credentials import Credentials
|
||||
|
||||
from app.config import config
|
||||
from app.utils.oauth_security import TokenEncryption
|
||||
|
||||
config_data = dict(connector.config)
|
||||
token_encrypted = config_data.get("_token_encrypted", False)
|
||||
if token_encrypted and config.SECRET_KEY:
|
||||
token_encryption = TokenEncryption(config.SECRET_KEY)
|
||||
if config_data.get("token"):
|
||||
config_data["token"] = token_encryption.decrypt_token(
|
||||
config_data["token"]
|
||||
)
|
||||
if config_data.get("refresh_token"):
|
||||
config_data["refresh_token"] = (
|
||||
token_encryption.decrypt_token(
|
||||
config_data["refresh_token"]
|
||||
)
|
||||
)
|
||||
if config_data.get("client_secret"):
|
||||
config_data["client_secret"] = (
|
||||
token_encryption.decrypt_token(
|
||||
config_data["client_secret"]
|
||||
)
|
||||
)
|
||||
|
||||
exp = config_data.get("expiry", "")
|
||||
if exp:
|
||||
exp = exp.replace("Z", "")
|
||||
|
||||
creds = Credentials(
|
||||
token=config_data.get("token"),
|
||||
refresh_token=config_data.get("refresh_token"),
|
||||
token_uri=config_data.get("token_uri"),
|
||||
client_id=config_data.get("client_id"),
|
||||
client_secret=config_data.get("client_secret"),
|
||||
scopes=config_data.get("scopes", []),
|
||||
expiry=datetime.fromisoformat(exp) if exp else None,
|
||||
)
|
||||
|
||||
try:
|
||||
if is_composio_gmail:
|
||||
from app.agents.shared.tools.gmail.composio_helpers import (
|
||||
execute_composio_gmail_tool,
|
||||
)
|
||||
|
||||
_trashed, error = await execute_composio_gmail_tool(
|
||||
connector,
|
||||
user_id,
|
||||
"GMAIL_MOVE_TO_TRASH",
|
||||
{"user_id": "me", "message_id": final_message_id},
|
||||
)
|
||||
if error:
|
||||
raise RuntimeError(error)
|
||||
else:
|
||||
from googleapiclient.discovery import build
|
||||
|
||||
gmail_service = build("gmail", "v1", credentials=creds)
|
||||
await asyncio.get_event_loop().run_in_executor(
|
||||
None,
|
||||
lambda: (
|
||||
gmail_service.users()
|
||||
.messages()
|
||||
.trash(userId="me", id=final_message_id)
|
||||
.execute()
|
||||
),
|
||||
)
|
||||
except Exception as api_err:
|
||||
from googleapiclient.errors import HttpError
|
||||
|
||||
if isinstance(api_err, HttpError) and api_err.resp.status == 403:
|
||||
logger.warning(
|
||||
f"Insufficient permissions for connector {connector.id}: {api_err}"
|
||||
)
|
||||
try:
|
||||
from sqlalchemy.orm.attributes import flag_modified
|
||||
|
||||
if not connector.config.get("auth_expired"):
|
||||
connector.config = {
|
||||
**connector.config,
|
||||
"auth_expired": True,
|
||||
}
|
||||
flag_modified(connector, "config")
|
||||
await db_session.commit()
|
||||
except Exception:
|
||||
logger.warning(
|
||||
"Failed to persist auth_expired for connector %s",
|
||||
connector.id,
|
||||
exc_info=True,
|
||||
)
|
||||
return {
|
||||
"status": "insufficient_permissions",
|
||||
"connector_id": connector.id,
|
||||
"message": "This Gmail account needs additional permissions. Please re-authenticate in connector settings.",
|
||||
}
|
||||
raise
|
||||
|
||||
logger.info(f"Gmail email trashed: message_id={final_message_id}")
|
||||
|
||||
trash_result: dict[str, Any] = {
|
||||
"status": "success",
|
||||
"message_id": final_message_id,
|
||||
"message": f"Successfully moved email '{email.get('subject', email_subject_or_id)}' to trash.",
|
||||
}
|
||||
|
||||
deleted_from_kb = False
|
||||
if final_delete_from_kb and document_id:
|
||||
try:
|
||||
from app.db import Document
|
||||
|
||||
doc_result = await db_session.execute(
|
||||
select(Document).filter(Document.id == document_id)
|
||||
)
|
||||
document = doc_result.scalars().first()
|
||||
if document:
|
||||
await db_session.delete(document)
|
||||
await db_session.commit()
|
||||
deleted_from_kb = True
|
||||
logger.info(
|
||||
f"Deleted document {document_id} from knowledge base"
|
||||
)
|
||||
else:
|
||||
logger.warning(f"Document {document_id} not found in KB")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to delete document from KB: {e}")
|
||||
await db_session.rollback()
|
||||
trash_result["warning"] = (
|
||||
f"Email trashed, but failed to remove from knowledge base: {e!s}"
|
||||
)
|
||||
|
||||
trash_result["deleted_from_kb"] = deleted_from_kb
|
||||
if deleted_from_kb:
|
||||
trash_result["message"] = (
|
||||
f"{trash_result.get('message', '')} (also removed from knowledge base)"
|
||||
)
|
||||
|
||||
return trash_result
|
||||
|
||||
except Exception as e:
|
||||
from langgraph.errors import GraphInterrupt
|
||||
|
||||
if isinstance(e, GraphInterrupt):
|
||||
raise
|
||||
|
||||
logger.error(f"Error trashing Gmail email: {e}", exc_info=True)
|
||||
return {
|
||||
"status": "error",
|
||||
"message": "Something went wrong while trashing the email. Please try again.",
|
||||
}
|
||||
|
||||
return trash_gmail_email
|
||||
495
surfsense_backend/app/agents/shared/tools/gmail/update_draft.py
Normal file
495
surfsense_backend/app/agents/shared/tools/gmail/update_draft.py
Normal file
|
|
@ -0,0 +1,495 @@
|
|||
import asyncio
|
||||
import base64
|
||||
import logging
|
||||
from datetime import datetime
|
||||
from email.mime.text import MIMEText
|
||||
from typing import Any
|
||||
|
||||
from langchain_core.tools import tool
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from app.agents.shared.tools.hitl import request_approval
|
||||
from app.db import async_session_maker
|
||||
from app.services.gmail import GmailToolMetadataService
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def create_update_gmail_draft_tool(
|
||||
db_session: AsyncSession | None = None,
|
||||
search_space_id: int | None = None,
|
||||
user_id: str | None = None,
|
||||
):
|
||||
"""
|
||||
Factory function to create the update_gmail_draft tool.
|
||||
|
||||
The tool acquires its own short-lived ``AsyncSession`` per call via
|
||||
:data:`async_session_maker` so the closure is safe to share across
|
||||
HTTP requests by the compiled-agent cache. Capturing a per-request
|
||||
session here would surface stale/closed sessions on cache hits.
|
||||
|
||||
Args:
|
||||
db_session: Reserved for registry compatibility. Per-call sessions
|
||||
are opened via :data:`async_session_maker` inside the tool body.
|
||||
|
||||
Returns:
|
||||
Configured update_gmail_draft tool
|
||||
"""
|
||||
del db_session # per-call session — see docstring
|
||||
|
||||
@tool
|
||||
async def update_gmail_draft(
|
||||
draft_subject_or_id: str,
|
||||
body: str,
|
||||
to: str | None = None,
|
||||
subject: str | None = None,
|
||||
cc: str | None = None,
|
||||
bcc: str | None = None,
|
||||
) -> dict[str, Any]:
|
||||
"""Update an existing Gmail draft.
|
||||
|
||||
Use when the user asks to modify, edit, or add content to an existing
|
||||
email draft. This replaces the draft content with the new version.
|
||||
The user will be able to review and edit the content before it is applied.
|
||||
|
||||
If the user simply wants to "edit" a draft without specifying exact changes,
|
||||
generate the body yourself using your best understanding of the conversation
|
||||
context. The user will review and can freely edit the content in the approval
|
||||
card before confirming.
|
||||
|
||||
IMPORTANT: This tool is ONLY for modifying Gmail draft content, NOT for
|
||||
deleting/trashing drafts (use trash_gmail_email instead), Notion pages,
|
||||
calendar events, or any other content type.
|
||||
|
||||
Args:
|
||||
draft_subject_or_id: The exact subject line of the draft to update
|
||||
(as it appears in Gmail drafts).
|
||||
body: The full updated body content for the draft. Generate this
|
||||
yourself based on the user's request and conversation context.
|
||||
to: Optional new recipient email address (keeps original if omitted).
|
||||
subject: Optional new subject line (keeps original if omitted).
|
||||
cc: Optional CC recipient(s), comma-separated.
|
||||
bcc: Optional BCC recipient(s), comma-separated.
|
||||
|
||||
Returns:
|
||||
Dictionary with:
|
||||
- status: "success", "rejected", "not_found", or "error"
|
||||
- draft_id: Gmail draft ID (if success)
|
||||
- message: Result message
|
||||
|
||||
IMPORTANT:
|
||||
- If status is "rejected", the user explicitly declined the action.
|
||||
Respond with a brief acknowledgment and do NOT retry or suggest alternatives.
|
||||
- If status is "not_found", relay the exact message to the user and ask them
|
||||
to verify the draft subject or check if it has been indexed.
|
||||
- If status is "insufficient_permissions", the connector lacks the required OAuth scope.
|
||||
Inform the user they need to re-authenticate and do NOT retry the action.
|
||||
|
||||
Examples:
|
||||
- "Update the Kurseong Plan draft with the new itinerary details"
|
||||
- "Edit my draft about the project proposal and change the recipient"
|
||||
- "Let me edit the meeting notes draft" (call with current body content so user can edit in the approval card)
|
||||
"""
|
||||
logger.info(
|
||||
f"update_gmail_draft called: draft_subject_or_id='{draft_subject_or_id}'"
|
||||
)
|
||||
|
||||
if search_space_id is None or user_id is None:
|
||||
return {
|
||||
"status": "error",
|
||||
"message": "Gmail tool not properly configured. Please contact support.",
|
||||
}
|
||||
|
||||
try:
|
||||
async with async_session_maker() as db_session:
|
||||
metadata_service = GmailToolMetadataService(db_session)
|
||||
context = await metadata_service.get_update_context(
|
||||
search_space_id, user_id, draft_subject_or_id
|
||||
)
|
||||
|
||||
if "error" in context:
|
||||
error_msg = context["error"]
|
||||
if "not found" in error_msg.lower():
|
||||
logger.warning(f"Draft not found: {error_msg}")
|
||||
return {"status": "not_found", "message": error_msg}
|
||||
logger.error(f"Failed to fetch update context: {error_msg}")
|
||||
return {"status": "error", "message": error_msg}
|
||||
|
||||
account = context.get("account", {})
|
||||
if account.get("auth_expired"):
|
||||
logger.warning(
|
||||
"Gmail account %s has expired authentication",
|
||||
account.get("id"),
|
||||
)
|
||||
return {
|
||||
"status": "auth_error",
|
||||
"message": "The Gmail account for this draft needs re-authentication. Please re-authenticate in your connector settings.",
|
||||
"connector_type": "gmail",
|
||||
}
|
||||
|
||||
email = context["email"]
|
||||
message_id = email["message_id"]
|
||||
document_id = email.get("document_id")
|
||||
connector_id_from_context = account["id"]
|
||||
draft_id_from_context = context.get("draft_id")
|
||||
|
||||
original_subject = email.get("subject", draft_subject_or_id)
|
||||
final_subject_default = subject if subject else original_subject
|
||||
final_to_default = to if to else ""
|
||||
|
||||
logger.info(
|
||||
f"Requesting approval for updating Gmail draft: '{original_subject}' "
|
||||
f"(message_id={message_id}, draft_id={draft_id_from_context})"
|
||||
)
|
||||
result = request_approval(
|
||||
action_type="gmail_draft_update",
|
||||
tool_name="update_gmail_draft",
|
||||
params={
|
||||
"message_id": message_id,
|
||||
"draft_id": draft_id_from_context,
|
||||
"to": final_to_default,
|
||||
"subject": final_subject_default,
|
||||
"body": body,
|
||||
"cc": cc,
|
||||
"bcc": bcc,
|
||||
"connector_id": connector_id_from_context,
|
||||
},
|
||||
context=context,
|
||||
)
|
||||
|
||||
if result.rejected:
|
||||
return {
|
||||
"status": "rejected",
|
||||
"message": "User declined. The draft was not updated. Do not ask again or suggest alternatives.",
|
||||
}
|
||||
|
||||
final_to = result.params.get("to", final_to_default)
|
||||
final_subject = result.params.get("subject", final_subject_default)
|
||||
final_body = result.params.get("body", body)
|
||||
final_cc = result.params.get("cc", cc)
|
||||
final_bcc = result.params.get("bcc", bcc)
|
||||
final_connector_id = result.params.get(
|
||||
"connector_id", connector_id_from_context
|
||||
)
|
||||
final_draft_id = result.params.get("draft_id", draft_id_from_context)
|
||||
|
||||
if not final_connector_id:
|
||||
return {
|
||||
"status": "error",
|
||||
"message": "No connector found for this draft.",
|
||||
}
|
||||
|
||||
from sqlalchemy.future import select
|
||||
|
||||
from app.db import SearchSourceConnector, SearchSourceConnectorType
|
||||
|
||||
_gmail_types = [
|
||||
SearchSourceConnectorType.GOOGLE_GMAIL_CONNECTOR,
|
||||
SearchSourceConnectorType.COMPOSIO_GMAIL_CONNECTOR,
|
||||
]
|
||||
|
||||
result = await db_session.execute(
|
||||
select(SearchSourceConnector).filter(
|
||||
SearchSourceConnector.id == final_connector_id,
|
||||
SearchSourceConnector.search_space_id == search_space_id,
|
||||
SearchSourceConnector.user_id == user_id,
|
||||
SearchSourceConnector.connector_type.in_(_gmail_types),
|
||||
)
|
||||
)
|
||||
connector = result.scalars().first()
|
||||
if not connector:
|
||||
return {
|
||||
"status": "error",
|
||||
"message": "Selected Gmail connector is invalid or has been disconnected.",
|
||||
}
|
||||
|
||||
logger.info(
|
||||
f"Updating Gmail draft: subject='{final_subject}', connector={final_connector_id}"
|
||||
)
|
||||
|
||||
is_composio_gmail = (
|
||||
connector.connector_type
|
||||
== SearchSourceConnectorType.COMPOSIO_GMAIL_CONNECTOR
|
||||
)
|
||||
if is_composio_gmail:
|
||||
cca_id = connector.config.get("composio_connected_account_id")
|
||||
if not cca_id:
|
||||
return {
|
||||
"status": "error",
|
||||
"message": "Composio connected account ID not found for this Gmail connector.",
|
||||
}
|
||||
else:
|
||||
from google.oauth2.credentials import Credentials
|
||||
|
||||
from app.config import config
|
||||
from app.utils.oauth_security import TokenEncryption
|
||||
|
||||
config_data = dict(connector.config)
|
||||
token_encrypted = config_data.get("_token_encrypted", False)
|
||||
if token_encrypted and config.SECRET_KEY:
|
||||
token_encryption = TokenEncryption(config.SECRET_KEY)
|
||||
if config_data.get("token"):
|
||||
config_data["token"] = token_encryption.decrypt_token(
|
||||
config_data["token"]
|
||||
)
|
||||
if config_data.get("refresh_token"):
|
||||
config_data["refresh_token"] = (
|
||||
token_encryption.decrypt_token(
|
||||
config_data["refresh_token"]
|
||||
)
|
||||
)
|
||||
if config_data.get("client_secret"):
|
||||
config_data["client_secret"] = (
|
||||
token_encryption.decrypt_token(
|
||||
config_data["client_secret"]
|
||||
)
|
||||
)
|
||||
|
||||
exp = config_data.get("expiry", "")
|
||||
if exp:
|
||||
exp = exp.replace("Z", "")
|
||||
|
||||
creds = Credentials(
|
||||
token=config_data.get("token"),
|
||||
refresh_token=config_data.get("refresh_token"),
|
||||
token_uri=config_data.get("token_uri"),
|
||||
client_id=config_data.get("client_id"),
|
||||
client_secret=config_data.get("client_secret"),
|
||||
scopes=config_data.get("scopes", []),
|
||||
expiry=datetime.fromisoformat(exp) if exp else None,
|
||||
)
|
||||
|
||||
# Resolve draft_id if not already available
|
||||
if not final_draft_id:
|
||||
logger.info(
|
||||
f"draft_id not in metadata, looking up via drafts.list for message_id={message_id}"
|
||||
)
|
||||
if is_composio_gmail:
|
||||
final_draft_id = await _find_composio_draft_id_by_message(
|
||||
connector, user_id, message_id
|
||||
)
|
||||
else:
|
||||
from googleapiclient.discovery import build
|
||||
|
||||
gmail_service = build("gmail", "v1", credentials=creds)
|
||||
final_draft_id = await _find_draft_id_by_message(
|
||||
gmail_service, message_id
|
||||
)
|
||||
|
||||
if not final_draft_id:
|
||||
return {
|
||||
"status": "error",
|
||||
"message": (
|
||||
"Could not find this draft in Gmail. "
|
||||
"It may have already been sent or deleted."
|
||||
),
|
||||
}
|
||||
|
||||
message = MIMEText(final_body)
|
||||
if final_to:
|
||||
message["to"] = final_to
|
||||
message["subject"] = final_subject
|
||||
if final_cc:
|
||||
message["cc"] = final_cc
|
||||
if final_bcc:
|
||||
message["bcc"] = final_bcc
|
||||
raw = base64.urlsafe_b64encode(message.as_bytes()).decode()
|
||||
|
||||
try:
|
||||
if is_composio_gmail:
|
||||
from app.agents.shared.tools.gmail.composio_helpers import (
|
||||
execute_composio_gmail_tool,
|
||||
split_recipients,
|
||||
)
|
||||
|
||||
updated, error = await execute_composio_gmail_tool(
|
||||
connector,
|
||||
user_id,
|
||||
"GMAIL_UPDATE_DRAFT",
|
||||
{
|
||||
"user_id": "me",
|
||||
"draft_id": final_draft_id,
|
||||
"recipient_email": final_to,
|
||||
"subject": final_subject,
|
||||
"body": final_body,
|
||||
"cc": split_recipients(final_cc),
|
||||
"bcc": split_recipients(final_bcc),
|
||||
"is_html": False,
|
||||
},
|
||||
)
|
||||
if error:
|
||||
raise RuntimeError(error)
|
||||
if not isinstance(updated, dict):
|
||||
updated = {}
|
||||
else:
|
||||
from googleapiclient.discovery import build
|
||||
|
||||
gmail_service = build("gmail", "v1", credentials=creds)
|
||||
updated = await asyncio.get_event_loop().run_in_executor(
|
||||
None,
|
||||
lambda: (
|
||||
gmail_service.users()
|
||||
.drafts()
|
||||
.update(
|
||||
userId="me",
|
||||
id=final_draft_id,
|
||||
body={"message": {"raw": raw}},
|
||||
)
|
||||
.execute()
|
||||
),
|
||||
)
|
||||
except Exception as api_err:
|
||||
from googleapiclient.errors import HttpError
|
||||
|
||||
if isinstance(api_err, HttpError) and api_err.resp.status == 403:
|
||||
logger.warning(
|
||||
f"Insufficient permissions for connector {connector.id}: {api_err}"
|
||||
)
|
||||
try:
|
||||
from sqlalchemy.orm.attributes import flag_modified
|
||||
|
||||
if not connector.config.get("auth_expired"):
|
||||
connector.config = {
|
||||
**connector.config,
|
||||
"auth_expired": True,
|
||||
}
|
||||
flag_modified(connector, "config")
|
||||
await db_session.commit()
|
||||
except Exception:
|
||||
logger.warning(
|
||||
"Failed to persist auth_expired for connector %s",
|
||||
connector.id,
|
||||
exc_info=True,
|
||||
)
|
||||
return {
|
||||
"status": "insufficient_permissions",
|
||||
"connector_id": connector.id,
|
||||
"message": "This Gmail account needs additional permissions. Please re-authenticate in connector settings.",
|
||||
}
|
||||
if isinstance(api_err, HttpError) and api_err.resp.status == 404:
|
||||
return {
|
||||
"status": "error",
|
||||
"message": "Draft no longer exists in Gmail. It may have been sent or deleted.",
|
||||
}
|
||||
raise
|
||||
|
||||
logger.info(f"Gmail draft updated: id={updated.get('id')}")
|
||||
|
||||
kb_message_suffix = ""
|
||||
if document_id:
|
||||
try:
|
||||
from sqlalchemy.future import select as sa_select
|
||||
from sqlalchemy.orm.attributes import flag_modified
|
||||
|
||||
from app.db import Document
|
||||
|
||||
doc_result = await db_session.execute(
|
||||
sa_select(Document).filter(Document.id == document_id)
|
||||
)
|
||||
document = doc_result.scalars().first()
|
||||
if document:
|
||||
document.source_markdown = final_body
|
||||
document.title = final_subject
|
||||
meta = dict(document.document_metadata or {})
|
||||
meta["subject"] = final_subject
|
||||
meta["draft_id"] = updated.get("id", final_draft_id)
|
||||
updated_msg = updated.get("message", {})
|
||||
if updated_msg.get("id"):
|
||||
meta["message_id"] = updated_msg["id"]
|
||||
document.document_metadata = meta
|
||||
flag_modified(document, "document_metadata")
|
||||
await db_session.commit()
|
||||
kb_message_suffix = (
|
||||
" Your knowledge base has also been updated."
|
||||
)
|
||||
logger.info(
|
||||
f"KB document {document_id} updated for draft {final_draft_id}"
|
||||
)
|
||||
else:
|
||||
kb_message_suffix = " This draft will be fully updated in your knowledge base in the next scheduled sync."
|
||||
except Exception as kb_err:
|
||||
logger.warning(f"KB update after draft edit failed: {kb_err}")
|
||||
await db_session.rollback()
|
||||
kb_message_suffix = " This draft will be fully updated in your knowledge base in the next scheduled sync."
|
||||
|
||||
return {
|
||||
"status": "success",
|
||||
"draft_id": updated.get("id"),
|
||||
"message": f"Successfully updated Gmail draft with subject '{final_subject}'.{kb_message_suffix}",
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
from langgraph.errors import GraphInterrupt
|
||||
|
||||
if isinstance(e, GraphInterrupt):
|
||||
raise
|
||||
|
||||
logger.error(f"Error updating Gmail draft: {e}", exc_info=True)
|
||||
return {
|
||||
"status": "error",
|
||||
"message": "Something went wrong while updating the draft. Please try again.",
|
||||
}
|
||||
|
||||
return update_gmail_draft
|
||||
|
||||
|
||||
async def _find_draft_id_by_message(gmail_service: Any, message_id: str) -> str | None:
|
||||
"""Look up a draft's ID by its message ID via the Gmail API."""
|
||||
try:
|
||||
page_token = None
|
||||
while True:
|
||||
kwargs: dict[str, Any] = {"userId": "me", "maxResults": 100}
|
||||
if page_token:
|
||||
kwargs["pageToken"] = page_token
|
||||
|
||||
response = await asyncio.get_event_loop().run_in_executor(
|
||||
None,
|
||||
lambda kwargs=kwargs: (
|
||||
gmail_service.users().drafts().list(**kwargs).execute()
|
||||
),
|
||||
)
|
||||
|
||||
for draft in response.get("drafts", []):
|
||||
if draft.get("message", {}).get("id") == message_id:
|
||||
return draft["id"]
|
||||
|
||||
page_token = response.get("nextPageToken")
|
||||
if not page_token:
|
||||
break
|
||||
|
||||
return None
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to look up draft by message_id: {e}")
|
||||
return None
|
||||
|
||||
|
||||
async def _find_composio_draft_id_by_message(
|
||||
connector: Any, user_id: str, message_id: str
|
||||
) -> str | None:
|
||||
from app.agents.shared.tools.gmail.composio_helpers import (
|
||||
execute_composio_gmail_tool,
|
||||
)
|
||||
|
||||
page_token = ""
|
||||
while True:
|
||||
params: dict[str, Any] = {
|
||||
"user_id": "me",
|
||||
"max_results": 100,
|
||||
"verbose": False,
|
||||
}
|
||||
if page_token:
|
||||
params["page_token"] = page_token
|
||||
|
||||
data, error = await execute_composio_gmail_tool(
|
||||
connector, user_id, "GMAIL_LIST_DRAFTS", params
|
||||
)
|
||||
if error or not isinstance(data, dict):
|
||||
return None
|
||||
|
||||
for draft in data.get("drafts", []):
|
||||
if draft.get("message", {}).get("id") == message_id:
|
||||
return draft.get("id")
|
||||
|
||||
page_token = data.get("nextPageToken") or data.get("next_page_token") or ""
|
||||
if not page_token:
|
||||
return None
|
||||
Loading…
Add table
Add a link
Reference in a new issue