diff --git a/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/gmail/__init__.py b/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/gmail/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/gmail/agent.py b/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/gmail/agent.py new file mode 100644 index 000000000..de4971e1c --- /dev/null +++ b/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/gmail/agent.py @@ -0,0 +1,54 @@ +"""`gmail` route: ``SubAgent`` spec for deepagents.""" + +from __future__ import annotations + +from collections.abc import Sequence +from typing import Any + +from deepagents import SubAgent +from langchain_core.language_models import BaseChatModel + +from app.agents.multi_agent_with_deepagents.subagents.shared.md_file_reader import ( + read_md_file, +) +from app.agents.multi_agent_with_deepagents.subagents.shared.permissions import ( + ToolsPermissions, + merge_tools_permissions, +) +from app.agents.multi_agent_with_deepagents.subagents.shared.subagent_builder import ( + pack_subagent, +) + +from .tools.index import load_tools + +NAME = "gmail" + + +def build_subagent( + *, + dependencies: dict[str, Any], + model: BaseChatModel | None = None, + extra_middleware: Sequence[Any] | None = None, + extra_tools_bucket: ToolsPermissions | None = None, +) -> SubAgent: + buckets = load_tools(dependencies=dependencies) + merged_tools_bucket = merge_tools_permissions(buckets, extra_tools_bucket) + tools = [ + row["tool"] + for row in (*merged_tools_bucket["allow"], *merged_tools_bucket["ask"]) + if row.get("tool") is not None + ] + interrupt_on = {r["name"]: True for r in merged_tools_bucket["ask"] if r.get("name")} + description = read_md_file(__package__, "description").strip() + if not description: + description = "Handles gmail tasks for this workspace." + system_prompt = read_md_file(__package__, "system_prompt").strip() + return pack_subagent( + name=NAME, + description=description, + system_prompt=system_prompt, + tools=tools, + interrupt_on=interrupt_on, + model=model, + extra_middleware=extra_middleware, + ) diff --git a/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/gmail/description.md b/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/gmail/description.md new file mode 100644 index 000000000..db5614805 --- /dev/null +++ b/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/gmail/description.md @@ -0,0 +1 @@ +Use for Gmail inbox actions: search/read emails, draft or update replies, send messages, and trash emails. diff --git a/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/gmail/system_prompt.md b/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/gmail/system_prompt.md new file mode 100644 index 000000000..961100261 --- /dev/null +++ b/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/gmail/system_prompt.md @@ -0,0 +1,82 @@ +You are the Gmail operations sub-agent. +You receive delegated instructions from a supervisor agent and return structured results for supervisor synthesis. + + +Execute Gmail operations accurately: search/read emails, prepare drafts, send, and trash. + + + +- `search_gmail`: find candidate emails with query constraints. +- `read_gmail_email`: read one message in full detail. +- `create_gmail_draft`: create a new draft. +- `update_gmail_draft`: modify an existing draft. +- `send_gmail_email`: send an email. +- `trash_gmail_email`: move an email to trash. + + + +- Use only tools in ``. +- Build precise search queries using Gmail operators when possible (`from:`, `to:`, `subject:`, `after:`, `before:`, `has:attachment`, `is:unread`, `label:`). +- Resolve relative dates against runtime timestamp; prefer narrower interpretation. +- For reply requests, identify the target thread/email via search + read before drafting. +- If required fields are missing or target selection is ambiguous, return `status=blocked` with `missing_fields` and disambiguation candidates. +- Never invent IDs, recipients, timestamps, quoted text, or tool outcomes. + + + +- Do not perform non-Gmail work. +- Filing operations not represented in `` (archive/label/mark-read/move-folder) are unsupported here. + + + +- For send: verify draft `to`, `subject`, and `body` match delegated instructions. +- If any send-critical field was inferred, do not send; return `status=blocked` with inferred values in `assumptions`. +- For trash: ensure explicit target match before deletion. +- If a destructive action appears already completed this session, do not repeat; return prior evidence. + + + +- On tool failure, return `status=error` with concise recovery `next_step`. +- If search has no strong match, return `status=blocked` with suggested tighter filters. +- If multiple strong candidates remain for risky actions, return `status=blocked` with top options. + + + +Return **only** one JSON object (no markdown/prose): +{ + "status": "success" | "partial" | "blocked" | "error", + "action_summary": string, + "evidence": { + "email_id": string | null, + "thread_id": string | null, + "subject": string | null, + "sender": string | null, + "recipients": string[] | null, + "received_at": string (ISO 8601 with timezone) | null, + "sent_message": { + "id": string, + "to": string[], + "subject": string | null, + "sent_at": string (ISO 8601 with timezone) | null + } | null, + "matched_candidates": [ + { + "email_id": string, + "subject": string | null, + "sender": string | null, + "received_at": string (ISO 8601 with timezone) | null + } + ] | null + }, + "next_step": string | null, + "missing_fields": string[] | null, + "assumptions": string[] | null +} + +Rules: +- `status=success` -> `next_step=null`, `missing_fields=null`. +- `status=partial|blocked|error` -> `next_step` must be non-null. +- `status=blocked` due to missing required inputs -> `missing_fields` must be non-null. +- For blocked ambiguity, include options in `evidence.matched_candidates`. +- For trash actions, `evidence.email_id` is the trashed message. + diff --git a/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/gmail/tools/__init__.py b/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/gmail/tools/__init__.py new file mode 100644 index 000000000..294840122 --- /dev/null +++ b/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/gmail/tools/__init__.py @@ -0,0 +1,27 @@ +from app.agents.new_chat.tools.gmail.create_draft import ( + create_create_gmail_draft_tool, +) +from app.agents.new_chat.tools.gmail.read_email import ( + create_read_gmail_email_tool, +) +from app.agents.new_chat.tools.gmail.search_emails import ( + create_search_gmail_tool, +) +from app.agents.new_chat.tools.gmail.send_email import ( + create_send_gmail_email_tool, +) +from app.agents.new_chat.tools.gmail.trash_email import ( + create_trash_gmail_email_tool, +) +from app.agents.new_chat.tools.gmail.update_draft import ( + create_update_gmail_draft_tool, +) + +__all__ = [ + "create_create_gmail_draft_tool", + "create_read_gmail_email_tool", + "create_search_gmail_tool", + "create_send_gmail_email_tool", + "create_trash_gmail_email_tool", + "create_update_gmail_draft_tool", +] diff --git a/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/gmail/tools/create_draft.py b/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/gmail/tools/create_draft.py new file mode 100644 index 000000000..0bd044695 --- /dev/null +++ b/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/gmail/tools/create_draft.py @@ -0,0 +1,313 @@ +import asyncio +import base64 +import logging +from datetime import datetime +from email.mime.text import MIMEText +from typing import Any + +from langchain_core.tools import tool +from sqlalchemy.ext.asyncio import AsyncSession + +from app.agents.new_chat.tools.hitl import request_approval +from app.services.gmail import GmailToolMetadataService + +logger = logging.getLogger(__name__) + + +def create_create_gmail_draft_tool( + db_session: AsyncSession | None = None, + search_space_id: int | None = None, + user_id: str | None = None, +): + @tool + async def create_gmail_draft( + to: str, + subject: str, + body: str, + cc: str | None = None, + bcc: str | None = None, + ) -> dict[str, Any]: + """Create a draft email in Gmail. + + Use when the user asks to draft, compose, or prepare an email without + sending it. + + Args: + to: Recipient email address. + subject: Email subject line. + body: Email body content. + cc: Optional CC recipient(s), comma-separated. + bcc: Optional BCC recipient(s), comma-separated. + + Returns: + Dictionary with: + - status: "success", "rejected", or "error" + - draft_id: Gmail draft ID (if success) + - message: Result message + + IMPORTANT: + - If status is "rejected", the user explicitly declined the action. + Respond with a brief acknowledgment and do NOT retry or suggest alternatives. + - If status is "insufficient_permissions", the connector lacks the required OAuth scope. + Inform the user they need to re-authenticate and do NOT retry the action. + + Examples: + - "Draft an email to alice@example.com about the meeting" + - "Compose a reply to Bob about the project update" + """ + logger.info(f"create_gmail_draft called: to='{to}', subject='{subject}'") + + if db_session is None or search_space_id is None or user_id is None: + return { + "status": "error", + "message": "Gmail tool not properly configured. Please contact support.", + } + + try: + metadata_service = GmailToolMetadataService(db_session) + context = await metadata_service.get_creation_context( + search_space_id, user_id + ) + + if "error" in context: + logger.error(f"Failed to fetch creation context: {context['error']}") + return {"status": "error", "message": context["error"]} + + accounts = context.get("accounts", []) + if accounts and all(a.get("auth_expired") for a in accounts): + logger.warning("All Gmail accounts have expired authentication") + return { + "status": "auth_error", + "message": "All connected Gmail accounts need re-authentication. Please re-authenticate in your connector settings.", + "connector_type": "gmail", + } + + logger.info( + f"Requesting approval for creating Gmail draft: to='{to}', subject='{subject}'" + ) + result = request_approval( + action_type="gmail_draft_creation", + tool_name="create_gmail_draft", + params={ + "to": to, + "subject": subject, + "body": body, + "cc": cc, + "bcc": bcc, + "connector_id": None, + }, + context=context, + ) + + if result.rejected: + return { + "status": "rejected", + "message": "User declined. The draft was not created. Do not ask again or suggest alternatives.", + } + + final_to = result.params.get("to", to) + final_subject = result.params.get("subject", subject) + final_body = result.params.get("body", body) + final_cc = result.params.get("cc", cc) + final_bcc = result.params.get("bcc", bcc) + final_connector_id = result.params.get("connector_id") + + from sqlalchemy.future import select + + from app.db import SearchSourceConnector, SearchSourceConnectorType + + _gmail_types = [ + SearchSourceConnectorType.GOOGLE_GMAIL_CONNECTOR, + SearchSourceConnectorType.COMPOSIO_GMAIL_CONNECTOR, + ] + + if final_connector_id is not None: + result = await db_session.execute( + select(SearchSourceConnector).filter( + SearchSourceConnector.id == final_connector_id, + SearchSourceConnector.search_space_id == search_space_id, + SearchSourceConnector.user_id == user_id, + SearchSourceConnector.connector_type.in_(_gmail_types), + ) + ) + connector = result.scalars().first() + if not connector: + return { + "status": "error", + "message": "Selected Gmail connector is invalid or has been disconnected.", + } + actual_connector_id = connector.id + else: + result = await db_session.execute( + select(SearchSourceConnector).filter( + SearchSourceConnector.search_space_id == search_space_id, + SearchSourceConnector.user_id == user_id, + SearchSourceConnector.connector_type.in_(_gmail_types), + ) + ) + connector = result.scalars().first() + if not connector: + return { + "status": "error", + "message": "No Gmail connector found. Please connect Gmail in your workspace settings.", + } + actual_connector_id = connector.id + + logger.info( + f"Creating Gmail draft: to='{final_to}', subject='{final_subject}', connector={actual_connector_id}" + ) + + if ( + connector.connector_type + == SearchSourceConnectorType.COMPOSIO_GMAIL_CONNECTOR + ): + from app.utils.google_credentials import build_composio_credentials + + cca_id = connector.config.get("composio_connected_account_id") + if cca_id: + creds = build_composio_credentials(cca_id) + else: + return { + "status": "error", + "message": "Composio connected account ID not found for this Gmail connector.", + } + else: + from google.oauth2.credentials import Credentials + + from app.config import config + from app.utils.oauth_security import TokenEncryption + + config_data = dict(connector.config) + token_encrypted = config_data.get("_token_encrypted", False) + if token_encrypted and config.SECRET_KEY: + token_encryption = TokenEncryption(config.SECRET_KEY) + if config_data.get("token"): + config_data["token"] = token_encryption.decrypt_token( + config_data["token"] + ) + if config_data.get("refresh_token"): + config_data["refresh_token"] = token_encryption.decrypt_token( + config_data["refresh_token"] + ) + if config_data.get("client_secret"): + config_data["client_secret"] = token_encryption.decrypt_token( + config_data["client_secret"] + ) + + exp = config_data.get("expiry", "") + if exp: + exp = exp.replace("Z", "") + + creds = Credentials( + token=config_data.get("token"), + refresh_token=config_data.get("refresh_token"), + token_uri=config_data.get("token_uri"), + client_id=config_data.get("client_id"), + client_secret=config_data.get("client_secret"), + scopes=config_data.get("scopes", []), + expiry=datetime.fromisoformat(exp) if exp else None, + ) + + from googleapiclient.discovery import build + + gmail_service = build("gmail", "v1", credentials=creds) + + message = MIMEText(final_body) + message["to"] = final_to + message["subject"] = final_subject + if final_cc: + message["cc"] = final_cc + if final_bcc: + message["bcc"] = final_bcc + raw = base64.urlsafe_b64encode(message.as_bytes()).decode() + + try: + created = await asyncio.get_event_loop().run_in_executor( + None, + lambda: ( + gmail_service.users() + .drafts() + .create(userId="me", body={"message": {"raw": raw}}) + .execute() + ), + ) + except Exception as api_err: + from googleapiclient.errors import HttpError + + if isinstance(api_err, HttpError) and api_err.resp.status == 403: + logger.warning( + f"Insufficient permissions for connector {actual_connector_id}: {api_err}" + ) + try: + from sqlalchemy.orm.attributes import flag_modified + + _res = await db_session.execute( + select(SearchSourceConnector).where( + SearchSourceConnector.id == actual_connector_id + ) + ) + _conn = _res.scalar_one_or_none() + if _conn and not _conn.config.get("auth_expired"): + _conn.config = {**_conn.config, "auth_expired": True} + flag_modified(_conn, "config") + await db_session.commit() + except Exception: + logger.warning( + "Failed to persist auth_expired for connector %s", + actual_connector_id, + exc_info=True, + ) + return { + "status": "insufficient_permissions", + "connector_id": actual_connector_id, + "message": "This Gmail account needs additional permissions. Please re-authenticate in connector settings.", + } + raise + + logger.info(f"Gmail draft created: id={created.get('id')}") + + kb_message_suffix = "" + try: + from app.services.gmail import GmailKBSyncService + + kb_service = GmailKBSyncService(db_session) + draft_message = created.get("message", {}) + kb_result = await kb_service.sync_after_create( + message_id=draft_message.get("id", ""), + thread_id=draft_message.get("threadId", ""), + subject=final_subject, + sender="me", + date_str=datetime.now().strftime("%Y-%m-%d %H:%M:%S"), + body_text=final_body, + connector_id=actual_connector_id, + search_space_id=search_space_id, + user_id=user_id, + draft_id=created.get("id"), + ) + if kb_result["status"] == "success": + kb_message_suffix = " Your knowledge base has also been updated." + else: + kb_message_suffix = " This draft will be added to your knowledge base in the next scheduled sync." + except Exception as kb_err: + logger.warning(f"KB sync after create failed: {kb_err}") + kb_message_suffix = " This draft will be added to your knowledge base in the next scheduled sync." + + return { + "status": "success", + "draft_id": created.get("id"), + "message": f"Successfully created Gmail draft with subject '{final_subject}'.{kb_message_suffix}", + } + + except Exception as e: + from langgraph.errors import GraphInterrupt + + if isinstance(e, GraphInterrupt): + raise + + logger.error(f"Error creating Gmail draft: {e}", exc_info=True) + return { + "status": "error", + "message": "Something went wrong while creating the draft. Please try again.", + } + + return create_gmail_draft diff --git a/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/gmail/tools/index.py b/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/gmail/tools/index.py new file mode 100644 index 000000000..d382aaf7d --- /dev/null +++ b/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/gmail/tools/index.py @@ -0,0 +1,41 @@ +from __future__ import annotations + +from typing import Any + +from app.agents.multi_agent_with_deepagents.subagents.shared.permissions import ( + ToolsPermissions, +) + +from .create_draft import create_create_gmail_draft_tool +from .read_email import create_read_gmail_email_tool +from .search_emails import create_search_gmail_tool +from .send_email import create_send_gmail_email_tool +from .trash_email import create_trash_gmail_email_tool +from .update_draft import create_update_gmail_draft_tool + + +def load_tools(*, dependencies: dict[str, Any] | None = None, **kwargs: Any) -> ToolsPermissions: + d = {**(dependencies or {}), **kwargs} + common = { + "db_session": d["db_session"], + "search_space_id": d["search_space_id"], + "user_id": d["user_id"], + } + search = create_search_gmail_tool(**common) + read = create_read_gmail_email_tool(**common) + draft = create_create_gmail_draft_tool(**common) + send = create_send_gmail_email_tool(**common) + trash = create_trash_gmail_email_tool(**common) + updraft = create_update_gmail_draft_tool(**common) + return { + "allow": [ + {"name": getattr(search, "name", "") or "", "tool": search}, + {"name": getattr(read, "name", "") or "", "tool": read}, + ], + "ask": [ + {"name": getattr(draft, "name", "") or "", "tool": draft}, + {"name": getattr(send, "name", "") or "", "tool": send}, + {"name": getattr(trash, "name", "") or "", "tool": trash}, + {"name": getattr(updraft, "name", "") or "", "tool": updraft}, + ], + } diff --git a/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/gmail/tools/read_email.py b/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/gmail/tools/read_email.py new file mode 100644 index 000000000..deec1627c --- /dev/null +++ b/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/gmail/tools/read_email.py @@ -0,0 +1,100 @@ +import logging +from typing import Any + +from langchain_core.tools import tool +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy.future import select + +from app.db import SearchSourceConnector, SearchSourceConnectorType + +logger = logging.getLogger(__name__) + +_GMAIL_TYPES = [ + SearchSourceConnectorType.GOOGLE_GMAIL_CONNECTOR, + SearchSourceConnectorType.COMPOSIO_GMAIL_CONNECTOR, +] + + +def create_read_gmail_email_tool( + db_session: AsyncSession | None = None, + search_space_id: int | None = None, + user_id: str | None = None, +): + @tool + async def read_gmail_email(message_id: str) -> dict[str, Any]: + """Read the full content of a specific Gmail email by its message ID. + + Use after search_gmail to get the complete body of an email. + + Args: + message_id: The Gmail message ID (from search_gmail results). + + Returns: + Dictionary with status and the full email content formatted as markdown. + """ + if db_session is None or search_space_id is None or user_id is None: + return {"status": "error", "message": "Gmail tool not properly configured."} + + try: + result = await db_session.execute( + select(SearchSourceConnector).filter( + SearchSourceConnector.search_space_id == search_space_id, + SearchSourceConnector.user_id == user_id, + SearchSourceConnector.connector_type.in_(_GMAIL_TYPES), + ) + ) + connector = result.scalars().first() + if not connector: + return { + "status": "error", + "message": "No Gmail connector found. Please connect Gmail in your workspace settings.", + } + + from app.agents.new_chat.tools.gmail.search_emails import _build_credentials + + creds = _build_credentials(connector) + + from app.connectors.google_gmail_connector import GoogleGmailConnector + + gmail = GoogleGmailConnector( + credentials=creds, + session=db_session, + user_id=user_id, + connector_id=connector.id, + ) + + detail, error = await gmail.get_message_details(message_id) + if error: + if ( + "re-authenticate" in error.lower() + or "authentication failed" in error.lower() + ): + return { + "status": "auth_error", + "message": error, + "connector_type": "gmail", + } + return {"status": "error", "message": error} + + if not detail: + return { + "status": "not_found", + "message": f"Email with ID '{message_id}' not found.", + } + + content = gmail.format_message_to_markdown(detail) + + return {"status": "success", "message_id": message_id, "content": content} + + except Exception as e: + from langgraph.errors import GraphInterrupt + + if isinstance(e, GraphInterrupt): + raise + logger.error("Error reading Gmail email: %s", e, exc_info=True) + return { + "status": "error", + "message": "Failed to read email. Please try again.", + } + + return read_gmail_email diff --git a/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/gmail/tools/search_emails.py b/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/gmail/tools/search_emails.py new file mode 100644 index 000000000..2e363609e --- /dev/null +++ b/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/gmail/tools/search_emails.py @@ -0,0 +1,182 @@ +import logging +from datetime import datetime +from typing import Any + +from langchain_core.tools import tool +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy.future import select + +from app.db import SearchSourceConnector, SearchSourceConnectorType + +logger = logging.getLogger(__name__) + +_GMAIL_TYPES = [ + SearchSourceConnectorType.GOOGLE_GMAIL_CONNECTOR, + SearchSourceConnectorType.COMPOSIO_GMAIL_CONNECTOR, +] + +_token_encryption_cache: object | None = None + + +def _get_token_encryption(): + global _token_encryption_cache + if _token_encryption_cache is None: + from app.config import config + from app.utils.oauth_security import TokenEncryption + + if not config.SECRET_KEY: + raise RuntimeError("SECRET_KEY not configured for token decryption.") + _token_encryption_cache = TokenEncryption(config.SECRET_KEY) + return _token_encryption_cache + + +def _build_credentials(connector: SearchSourceConnector): + """Build Google OAuth Credentials from a connector's stored config. + + Handles both native OAuth connectors (with encrypted tokens) and + Composio-backed connectors. Shared by Gmail and Calendar tools. + """ + from app.utils.google_credentials import COMPOSIO_GOOGLE_CONNECTOR_TYPES + + if connector.connector_type in COMPOSIO_GOOGLE_CONNECTOR_TYPES: + from app.utils.google_credentials import build_composio_credentials + + cca_id = connector.config.get("composio_connected_account_id") + if not cca_id: + raise ValueError("Composio connected account ID not found.") + return build_composio_credentials(cca_id) + + from google.oauth2.credentials import Credentials + + cfg = dict(connector.config) + if cfg.get("_token_encrypted"): + enc = _get_token_encryption() + for key in ("token", "refresh_token", "client_secret"): + if cfg.get(key): + cfg[key] = enc.decrypt_token(cfg[key]) + + exp = (cfg.get("expiry") or "").replace("Z", "") + return Credentials( + token=cfg.get("token"), + refresh_token=cfg.get("refresh_token"), + token_uri=cfg.get("token_uri"), + client_id=cfg.get("client_id"), + client_secret=cfg.get("client_secret"), + scopes=cfg.get("scopes", []), + expiry=datetime.fromisoformat(exp) if exp else None, + ) + + +def create_search_gmail_tool( + db_session: AsyncSession | None = None, + search_space_id: int | None = None, + user_id: str | None = None, +): + @tool + async def search_gmail( + query: str, + max_results: int = 10, + ) -> dict[str, Any]: + """Search emails in the user's Gmail inbox using Gmail search syntax. + + Args: + query: Gmail search query, same syntax as the Gmail search bar. + Examples: "from:alice@example.com", "subject:meeting", + "is:unread", "after:2024/01/01 before:2024/02/01", + "has:attachment", "in:sent". + max_results: Number of emails to return (default 10, max 20). + + Returns: + Dictionary with status and a list of email summaries including + message_id, subject, from, date, snippet. + """ + if db_session is None or search_space_id is None or user_id is None: + return {"status": "error", "message": "Gmail tool not properly configured."} + + max_results = min(max_results, 20) + + try: + result = await db_session.execute( + select(SearchSourceConnector).filter( + SearchSourceConnector.search_space_id == search_space_id, + SearchSourceConnector.user_id == user_id, + SearchSourceConnector.connector_type.in_(_GMAIL_TYPES), + ) + ) + connector = result.scalars().first() + if not connector: + return { + "status": "error", + "message": "No Gmail connector found. Please connect Gmail in your workspace settings.", + } + + creds = _build_credentials(connector) + + from app.connectors.google_gmail_connector import GoogleGmailConnector + + gmail = GoogleGmailConnector( + credentials=creds, + session=db_session, + user_id=user_id, + connector_id=connector.id, + ) + + messages_list, error = await gmail.get_messages_list( + max_results=max_results, query=query + ) + if error: + if ( + "re-authenticate" in error.lower() + or "authentication failed" in error.lower() + ): + return { + "status": "auth_error", + "message": error, + "connector_type": "gmail", + } + return {"status": "error", "message": error} + + if not messages_list: + return { + "status": "success", + "emails": [], + "total": 0, + "message": "No emails found.", + } + + emails = [] + for msg in messages_list: + detail, err = await gmail.get_message_details(msg["id"]) + if err: + continue + headers = { + h["name"].lower(): h["value"] + for h in detail.get("payload", {}).get("headers", []) + } + emails.append( + { + "message_id": detail.get("id"), + "thread_id": detail.get("threadId"), + "subject": headers.get("subject", "No Subject"), + "from": headers.get("from", "Unknown"), + "to": headers.get("to", ""), + "date": headers.get("date", ""), + "snippet": detail.get("snippet", ""), + "labels": detail.get("labelIds", []), + } + ) + + return {"status": "success", "emails": emails, "total": len(emails)} + + except Exception as e: + from langgraph.errors import GraphInterrupt + + if isinstance(e, GraphInterrupt): + raise + logger.error("Error searching Gmail: %s", e, exc_info=True) + return { + "status": "error", + "message": "Failed to search Gmail. Please try again.", + } + + return search_gmail diff --git a/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/gmail/tools/send_email.py b/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/gmail/tools/send_email.py new file mode 100644 index 000000000..c3f0999f4 --- /dev/null +++ b/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/gmail/tools/send_email.py @@ -0,0 +1,315 @@ +import asyncio +import base64 +import logging +from datetime import datetime +from email.mime.text import MIMEText +from typing import Any + +from langchain_core.tools import tool +from sqlalchemy.ext.asyncio import AsyncSession + +from app.agents.new_chat.tools.hitl import request_approval +from app.services.gmail import GmailToolMetadataService + +logger = logging.getLogger(__name__) + + +def create_send_gmail_email_tool( + db_session: AsyncSession | None = None, + search_space_id: int | None = None, + user_id: str | None = None, +): + @tool + async def send_gmail_email( + to: str, + subject: str, + body: str, + cc: str | None = None, + bcc: str | None = None, + ) -> dict[str, Any]: + """Send an email via Gmail. + + Use when the user explicitly asks to send an email. This sends the + email immediately - it cannot be unsent. + + Args: + to: Recipient email address. + subject: Email subject line. + body: Email body content. + cc: Optional CC recipient(s), comma-separated. + bcc: Optional BCC recipient(s), comma-separated. + + Returns: + Dictionary with: + - status: "success", "rejected", or "error" + - message_id: Gmail message ID (if success) + - thread_id: Gmail thread ID (if success) + - message: Result message + + IMPORTANT: + - If status is "rejected", the user explicitly declined the action. + Respond with a brief acknowledgment and do NOT retry or suggest alternatives. + - If status is "insufficient_permissions", the connector lacks the required OAuth scope. + Inform the user they need to re-authenticate and do NOT retry the action. + + Examples: + - "Send an email to alice@example.com about the meeting" + - "Email Bob the project update" + """ + logger.info(f"send_gmail_email called: to='{to}', subject='{subject}'") + + if db_session is None or search_space_id is None or user_id is None: + return { + "status": "error", + "message": "Gmail tool not properly configured. Please contact support.", + } + + try: + metadata_service = GmailToolMetadataService(db_session) + context = await metadata_service.get_creation_context( + search_space_id, user_id + ) + + if "error" in context: + logger.error(f"Failed to fetch creation context: {context['error']}") + return {"status": "error", "message": context["error"]} + + accounts = context.get("accounts", []) + if accounts and all(a.get("auth_expired") for a in accounts): + logger.warning("All Gmail accounts have expired authentication") + return { + "status": "auth_error", + "message": "All connected Gmail accounts need re-authentication. Please re-authenticate in your connector settings.", + "connector_type": "gmail", + } + + logger.info( + f"Requesting approval for sending Gmail email: to='{to}', subject='{subject}'" + ) + result = request_approval( + action_type="gmail_email_send", + tool_name="send_gmail_email", + params={ + "to": to, + "subject": subject, + "body": body, + "cc": cc, + "bcc": bcc, + "connector_id": None, + }, + context=context, + ) + + if result.rejected: + return { + "status": "rejected", + "message": "User declined. The email was not sent. Do not ask again or suggest alternatives.", + } + + final_to = result.params.get("to", to) + final_subject = result.params.get("subject", subject) + final_body = result.params.get("body", body) + final_cc = result.params.get("cc", cc) + final_bcc = result.params.get("bcc", bcc) + final_connector_id = result.params.get("connector_id") + + from sqlalchemy.future import select + + from app.db import SearchSourceConnector, SearchSourceConnectorType + + _gmail_types = [ + SearchSourceConnectorType.GOOGLE_GMAIL_CONNECTOR, + SearchSourceConnectorType.COMPOSIO_GMAIL_CONNECTOR, + ] + + if final_connector_id is not None: + result = await db_session.execute( + select(SearchSourceConnector).filter( + SearchSourceConnector.id == final_connector_id, + SearchSourceConnector.search_space_id == search_space_id, + SearchSourceConnector.user_id == user_id, + SearchSourceConnector.connector_type.in_(_gmail_types), + ) + ) + connector = result.scalars().first() + if not connector: + return { + "status": "error", + "message": "Selected Gmail connector is invalid or has been disconnected.", + } + actual_connector_id = connector.id + else: + result = await db_session.execute( + select(SearchSourceConnector).filter( + SearchSourceConnector.search_space_id == search_space_id, + SearchSourceConnector.user_id == user_id, + SearchSourceConnector.connector_type.in_(_gmail_types), + ) + ) + connector = result.scalars().first() + if not connector: + return { + "status": "error", + "message": "No Gmail connector found. Please connect Gmail in your workspace settings.", + } + actual_connector_id = connector.id + + logger.info( + f"Sending Gmail email: to='{final_to}', subject='{final_subject}', connector={actual_connector_id}" + ) + + if ( + connector.connector_type + == SearchSourceConnectorType.COMPOSIO_GMAIL_CONNECTOR + ): + from app.utils.google_credentials import build_composio_credentials + + cca_id = connector.config.get("composio_connected_account_id") + if cca_id: + creds = build_composio_credentials(cca_id) + else: + return { + "status": "error", + "message": "Composio connected account ID not found for this Gmail connector.", + } + else: + from google.oauth2.credentials import Credentials + + from app.config import config + from app.utils.oauth_security import TokenEncryption + + config_data = dict(connector.config) + token_encrypted = config_data.get("_token_encrypted", False) + if token_encrypted and config.SECRET_KEY: + token_encryption = TokenEncryption(config.SECRET_KEY) + if config_data.get("token"): + config_data["token"] = token_encryption.decrypt_token( + config_data["token"] + ) + if config_data.get("refresh_token"): + config_data["refresh_token"] = token_encryption.decrypt_token( + config_data["refresh_token"] + ) + if config_data.get("client_secret"): + config_data["client_secret"] = token_encryption.decrypt_token( + config_data["client_secret"] + ) + + exp = config_data.get("expiry", "") + if exp: + exp = exp.replace("Z", "") + + creds = Credentials( + token=config_data.get("token"), + refresh_token=config_data.get("refresh_token"), + token_uri=config_data.get("token_uri"), + client_id=config_data.get("client_id"), + client_secret=config_data.get("client_secret"), + scopes=config_data.get("scopes", []), + expiry=datetime.fromisoformat(exp) if exp else None, + ) + + from googleapiclient.discovery import build + + gmail_service = build("gmail", "v1", credentials=creds) + + message = MIMEText(final_body) + message["to"] = final_to + message["subject"] = final_subject + if final_cc: + message["cc"] = final_cc + if final_bcc: + message["bcc"] = final_bcc + raw = base64.urlsafe_b64encode(message.as_bytes()).decode() + + try: + sent = await asyncio.get_event_loop().run_in_executor( + None, + lambda: ( + gmail_service.users() + .messages() + .send(userId="me", body={"raw": raw}) + .execute() + ), + ) + except Exception as api_err: + from googleapiclient.errors import HttpError + + if isinstance(api_err, HttpError) and api_err.resp.status == 403: + logger.warning( + f"Insufficient permissions for connector {actual_connector_id}: {api_err}" + ) + try: + from sqlalchemy.orm.attributes import flag_modified + + _res = await db_session.execute( + select(SearchSourceConnector).where( + SearchSourceConnector.id == actual_connector_id + ) + ) + _conn = _res.scalar_one_or_none() + if _conn and not _conn.config.get("auth_expired"): + _conn.config = {**_conn.config, "auth_expired": True} + flag_modified(_conn, "config") + await db_session.commit() + except Exception: + logger.warning( + "Failed to persist auth_expired for connector %s", + actual_connector_id, + exc_info=True, + ) + return { + "status": "insufficient_permissions", + "connector_id": actual_connector_id, + "message": "This Gmail account needs additional permissions. Please re-authenticate in connector settings.", + } + raise + + logger.info( + f"Gmail email sent: id={sent.get('id')}, threadId={sent.get('threadId')}" + ) + + kb_message_suffix = "" + try: + from app.services.gmail import GmailKBSyncService + + kb_service = GmailKBSyncService(db_session) + kb_result = await kb_service.sync_after_create( + message_id=sent.get("id", ""), + thread_id=sent.get("threadId", ""), + subject=final_subject, + sender="me", + date_str=datetime.now().strftime("%Y-%m-%d %H:%M:%S"), + body_text=final_body, + connector_id=actual_connector_id, + search_space_id=search_space_id, + user_id=user_id, + ) + if kb_result["status"] == "success": + kb_message_suffix = " Your knowledge base has also been updated." + else: + kb_message_suffix = " This email will be added to your knowledge base in the next scheduled sync." + except Exception as kb_err: + logger.warning(f"KB sync after send failed: {kb_err}") + kb_message_suffix = " This email will be added to your knowledge base in the next scheduled sync." + + return { + "status": "success", + "message_id": sent.get("id"), + "thread_id": sent.get("threadId"), + "message": f"Successfully sent email to '{final_to}' with subject '{final_subject}'.{kb_message_suffix}", + } + + except Exception as e: + from langgraph.errors import GraphInterrupt + + if isinstance(e, GraphInterrupt): + raise + + logger.error(f"Error sending Gmail email: {e}", exc_info=True) + return { + "status": "error", + "message": "Something went wrong while sending the email. Please try again.", + } + + return send_gmail_email diff --git a/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/gmail/tools/trash_email.py b/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/gmail/tools/trash_email.py new file mode 100644 index 000000000..1f1f6227a --- /dev/null +++ b/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/gmail/tools/trash_email.py @@ -0,0 +1,309 @@ +import asyncio +import logging +from datetime import datetime +from typing import Any + +from langchain_core.tools import tool +from sqlalchemy.ext.asyncio import AsyncSession + +from app.agents.new_chat.tools.hitl import request_approval +from app.services.gmail import GmailToolMetadataService + +logger = logging.getLogger(__name__) + + +def create_trash_gmail_email_tool( + db_session: AsyncSession | None = None, + search_space_id: int | None = None, + user_id: str | None = None, +): + @tool + async def trash_gmail_email( + email_subject_or_id: str, + delete_from_kb: bool = False, + ) -> dict[str, Any]: + """Move an email or draft to trash in Gmail. + + Use when the user asks to delete, remove, or trash an email or draft. + + Args: + email_subject_or_id: The exact subject line or message ID of the + email to trash (as it appears in the inbox). + delete_from_kb: Whether to also remove the email from the knowledge base. + Default is False. + Set to True to remove from both Gmail and knowledge base. + + Returns: + Dictionary with: + - status: "success", "rejected", "not_found", or "error" + - message_id: Gmail message ID (if success) + - deleted_from_kb: whether the document was removed from the knowledge base + - message: Result message + + IMPORTANT: + - If status is "rejected", the user explicitly declined. Respond with a brief + acknowledgment and do NOT retry or suggest alternatives. + - If status is "not_found", relay the exact message to the user and ask them + to verify the email subject or check if it has been indexed. + - If status is "insufficient_permissions", the connector lacks the required OAuth scope. + Inform the user they need to re-authenticate and do NOT retry this tool. + Examples: + - "Delete the email about 'Meeting Cancelled'" + - "Trash the email from Bob about the project" + """ + logger.info( + f"trash_gmail_email called: email_subject_or_id='{email_subject_or_id}', delete_from_kb={delete_from_kb}" + ) + + if db_session is None or search_space_id is None or user_id is None: + return { + "status": "error", + "message": "Gmail tool not properly configured. Please contact support.", + } + + try: + metadata_service = GmailToolMetadataService(db_session) + context = await metadata_service.get_trash_context( + search_space_id, user_id, email_subject_or_id + ) + + if "error" in context: + error_msg = context["error"] + if "not found" in error_msg.lower(): + logger.warning(f"Email not found: {error_msg}") + return {"status": "not_found", "message": error_msg} + logger.error(f"Failed to fetch trash context: {error_msg}") + return {"status": "error", "message": error_msg} + + account = context.get("account", {}) + if account.get("auth_expired"): + logger.warning( + "Gmail account %s has expired authentication", + account.get("id"), + ) + return { + "status": "auth_error", + "message": "The Gmail account for this email needs re-authentication. Please re-authenticate in your connector settings.", + "connector_type": "gmail", + } + + email = context["email"] + message_id = email["message_id"] + document_id = email.get("document_id") + connector_id_from_context = context["account"]["id"] + + if not message_id: + return { + "status": "error", + "message": "Message ID is missing from the indexed document. Please re-index the email and try again.", + } + + logger.info( + f"Requesting approval for trashing Gmail email: '{email_subject_or_id}' (message_id={message_id}, delete_from_kb={delete_from_kb})" + ) + result = request_approval( + action_type="gmail_email_trash", + tool_name="trash_gmail_email", + params={ + "message_id": message_id, + "connector_id": connector_id_from_context, + "delete_from_kb": delete_from_kb, + }, + context=context, + ) + + if result.rejected: + return { + "status": "rejected", + "message": "User declined. The email was not trashed. Do not ask again or suggest alternatives.", + } + + final_message_id = result.params.get("message_id", message_id) + final_connector_id = result.params.get( + "connector_id", connector_id_from_context + ) + final_delete_from_kb = result.params.get("delete_from_kb", delete_from_kb) + + if not final_connector_id: + return { + "status": "error", + "message": "No connector found for this email.", + } + + from sqlalchemy.future import select + + from app.db import SearchSourceConnector, SearchSourceConnectorType + + _gmail_types = [ + SearchSourceConnectorType.GOOGLE_GMAIL_CONNECTOR, + SearchSourceConnectorType.COMPOSIO_GMAIL_CONNECTOR, + ] + + result = await db_session.execute( + select(SearchSourceConnector).filter( + SearchSourceConnector.id == final_connector_id, + SearchSourceConnector.search_space_id == search_space_id, + SearchSourceConnector.user_id == user_id, + SearchSourceConnector.connector_type.in_(_gmail_types), + ) + ) + connector = result.scalars().first() + if not connector: + return { + "status": "error", + "message": "Selected Gmail connector is invalid or has been disconnected.", + } + + logger.info( + f"Trashing Gmail email: message_id='{final_message_id}', connector={final_connector_id}" + ) + + if ( + connector.connector_type + == SearchSourceConnectorType.COMPOSIO_GMAIL_CONNECTOR + ): + from app.utils.google_credentials import build_composio_credentials + + cca_id = connector.config.get("composio_connected_account_id") + if cca_id: + creds = build_composio_credentials(cca_id) + else: + return { + "status": "error", + "message": "Composio connected account ID not found for this Gmail connector.", + } + else: + from google.oauth2.credentials import Credentials + + from app.config import config + from app.utils.oauth_security import TokenEncryption + + config_data = dict(connector.config) + token_encrypted = config_data.get("_token_encrypted", False) + if token_encrypted and config.SECRET_KEY: + token_encryption = TokenEncryption(config.SECRET_KEY) + if config_data.get("token"): + config_data["token"] = token_encryption.decrypt_token( + config_data["token"] + ) + if config_data.get("refresh_token"): + config_data["refresh_token"] = token_encryption.decrypt_token( + config_data["refresh_token"] + ) + if config_data.get("client_secret"): + config_data["client_secret"] = token_encryption.decrypt_token( + config_data["client_secret"] + ) + + exp = config_data.get("expiry", "") + if exp: + exp = exp.replace("Z", "") + + creds = Credentials( + token=config_data.get("token"), + refresh_token=config_data.get("refresh_token"), + token_uri=config_data.get("token_uri"), + client_id=config_data.get("client_id"), + client_secret=config_data.get("client_secret"), + scopes=config_data.get("scopes", []), + expiry=datetime.fromisoformat(exp) if exp else None, + ) + + from googleapiclient.discovery import build + + gmail_service = build("gmail", "v1", credentials=creds) + + try: + await asyncio.get_event_loop().run_in_executor( + None, + lambda: ( + gmail_service.users() + .messages() + .trash(userId="me", id=final_message_id) + .execute() + ), + ) + except Exception as api_err: + from googleapiclient.errors import HttpError + + if isinstance(api_err, HttpError) and api_err.resp.status == 403: + logger.warning( + f"Insufficient permissions for connector {connector.id}: {api_err}" + ) + try: + from sqlalchemy.orm.attributes import flag_modified + + if not connector.config.get("auth_expired"): + connector.config = { + **connector.config, + "auth_expired": True, + } + flag_modified(connector, "config") + await db_session.commit() + except Exception: + logger.warning( + "Failed to persist auth_expired for connector %s", + connector.id, + exc_info=True, + ) + return { + "status": "insufficient_permissions", + "connector_id": connector.id, + "message": "This Gmail account needs additional permissions. Please re-authenticate in connector settings.", + } + raise + + logger.info(f"Gmail email trashed: message_id={final_message_id}") + + trash_result: dict[str, Any] = { + "status": "success", + "message_id": final_message_id, + "message": f"Successfully moved email '{email.get('subject', email_subject_or_id)}' to trash.", + } + + deleted_from_kb = False + if final_delete_from_kb and document_id: + try: + from app.db import Document + + doc_result = await db_session.execute( + select(Document).filter(Document.id == document_id) + ) + document = doc_result.scalars().first() + if document: + await db_session.delete(document) + await db_session.commit() + deleted_from_kb = True + logger.info( + f"Deleted document {document_id} from knowledge base" + ) + else: + logger.warning(f"Document {document_id} not found in KB") + except Exception as e: + logger.error(f"Failed to delete document from KB: {e}") + await db_session.rollback() + trash_result["warning"] = ( + f"Email trashed, but failed to remove from knowledge base: {e!s}" + ) + + trash_result["deleted_from_kb"] = deleted_from_kb + if deleted_from_kb: + trash_result["message"] = ( + f"{trash_result.get('message', '')} (also removed from knowledge base)" + ) + + return trash_result + + except Exception as e: + from langgraph.errors import GraphInterrupt + + if isinstance(e, GraphInterrupt): + raise + + logger.error(f"Error trashing Gmail email: {e}", exc_info=True) + return { + "status": "error", + "message": "Something went wrong while trashing the email. Please try again.", + } + + return trash_gmail_email diff --git a/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/gmail/tools/update_draft.py b/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/gmail/tools/update_draft.py new file mode 100644 index 000000000..91178cd21 --- /dev/null +++ b/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/gmail/tools/update_draft.py @@ -0,0 +1,410 @@ +import asyncio +import base64 +import logging +from datetime import datetime +from email.mime.text import MIMEText +from typing import Any + +from langchain_core.tools import tool +from sqlalchemy.ext.asyncio import AsyncSession + +from app.agents.new_chat.tools.hitl import request_approval +from app.services.gmail import GmailToolMetadataService + +logger = logging.getLogger(__name__) + + +def create_update_gmail_draft_tool( + db_session: AsyncSession | None = None, + search_space_id: int | None = None, + user_id: str | None = None, +): + @tool + async def update_gmail_draft( + draft_subject_or_id: str, + body: str, + to: str | None = None, + subject: str | None = None, + cc: str | None = None, + bcc: str | None = None, + ) -> dict[str, Any]: + """Update an existing Gmail draft. + + Use when the user asks to modify, edit, or add content to an existing + email draft. This replaces the draft content with the new version. + The user will be able to review and edit the content before it is applied. + + If the user simply wants to "edit" a draft without specifying exact changes, + generate the body yourself using your best understanding of the conversation + context. The user will review and can freely edit the content in the approval + card before confirming. + + IMPORTANT: This tool is ONLY for modifying Gmail draft content, NOT for + deleting/trashing drafts (use trash_gmail_email instead), Notion pages, + calendar events, or any other content type. + + Args: + draft_subject_or_id: The exact subject line of the draft to update + (as it appears in Gmail drafts). + body: The full updated body content for the draft. Generate this + yourself based on the user's request and conversation context. + to: Optional new recipient email address (keeps original if omitted). + subject: Optional new subject line (keeps original if omitted). + cc: Optional CC recipient(s), comma-separated. + bcc: Optional BCC recipient(s), comma-separated. + + Returns: + Dictionary with: + - status: "success", "rejected", "not_found", or "error" + - draft_id: Gmail draft ID (if success) + - message: Result message + + IMPORTANT: + - If status is "rejected", the user explicitly declined the action. + Respond with a brief acknowledgment and do NOT retry or suggest alternatives. + - If status is "not_found", relay the exact message to the user and ask them + to verify the draft subject or check if it has been indexed. + - If status is "insufficient_permissions", the connector lacks the required OAuth scope. + Inform the user they need to re-authenticate and do NOT retry the action. + + Examples: + - "Update the Kurseong Plan draft with the new itinerary details" + - "Edit my draft about the project proposal and change the recipient" + - "Let me edit the meeting notes draft" (call with current body content so user can edit in the approval card) + """ + logger.info( + f"update_gmail_draft called: draft_subject_or_id='{draft_subject_or_id}'" + ) + + if db_session is None or search_space_id is None or user_id is None: + return { + "status": "error", + "message": "Gmail tool not properly configured. Please contact support.", + } + + try: + metadata_service = GmailToolMetadataService(db_session) + context = await metadata_service.get_update_context( + search_space_id, user_id, draft_subject_or_id + ) + + if "error" in context: + error_msg = context["error"] + if "not found" in error_msg.lower(): + logger.warning(f"Draft not found: {error_msg}") + return {"status": "not_found", "message": error_msg} + logger.error(f"Failed to fetch update context: {error_msg}") + return {"status": "error", "message": error_msg} + + account = context.get("account", {}) + if account.get("auth_expired"): + logger.warning( + "Gmail account %s has expired authentication", + account.get("id"), + ) + return { + "status": "auth_error", + "message": "The Gmail account for this draft needs re-authentication. Please re-authenticate in your connector settings.", + "connector_type": "gmail", + } + + email = context["email"] + message_id = email["message_id"] + document_id = email.get("document_id") + connector_id_from_context = account["id"] + draft_id_from_context = context.get("draft_id") + + original_subject = email.get("subject", draft_subject_or_id) + final_subject_default = subject if subject else original_subject + final_to_default = to if to else "" + + logger.info( + f"Requesting approval for updating Gmail draft: '{original_subject}' " + f"(message_id={message_id}, draft_id={draft_id_from_context})" + ) + result = request_approval( + action_type="gmail_draft_update", + tool_name="update_gmail_draft", + params={ + "message_id": message_id, + "draft_id": draft_id_from_context, + "to": final_to_default, + "subject": final_subject_default, + "body": body, + "cc": cc, + "bcc": bcc, + "connector_id": connector_id_from_context, + }, + context=context, + ) + + if result.rejected: + return { + "status": "rejected", + "message": "User declined. The draft was not updated. Do not ask again or suggest alternatives.", + } + + final_to = result.params.get("to", final_to_default) + final_subject = result.params.get("subject", final_subject_default) + final_body = result.params.get("body", body) + final_cc = result.params.get("cc", cc) + final_bcc = result.params.get("bcc", bcc) + final_connector_id = result.params.get( + "connector_id", connector_id_from_context + ) + final_draft_id = result.params.get("draft_id", draft_id_from_context) + + if not final_connector_id: + return { + "status": "error", + "message": "No connector found for this draft.", + } + + from sqlalchemy.future import select + + from app.db import SearchSourceConnector, SearchSourceConnectorType + + _gmail_types = [ + SearchSourceConnectorType.GOOGLE_GMAIL_CONNECTOR, + SearchSourceConnectorType.COMPOSIO_GMAIL_CONNECTOR, + ] + + result = await db_session.execute( + select(SearchSourceConnector).filter( + SearchSourceConnector.id == final_connector_id, + SearchSourceConnector.search_space_id == search_space_id, + SearchSourceConnector.user_id == user_id, + SearchSourceConnector.connector_type.in_(_gmail_types), + ) + ) + connector = result.scalars().first() + if not connector: + return { + "status": "error", + "message": "Selected Gmail connector is invalid or has been disconnected.", + } + + logger.info( + f"Updating Gmail draft: subject='{final_subject}', connector={final_connector_id}" + ) + + if ( + connector.connector_type + == SearchSourceConnectorType.COMPOSIO_GMAIL_CONNECTOR + ): + from app.utils.google_credentials import build_composio_credentials + + cca_id = connector.config.get("composio_connected_account_id") + if cca_id: + creds = build_composio_credentials(cca_id) + else: + return { + "status": "error", + "message": "Composio connected account ID not found for this Gmail connector.", + } + else: + from google.oauth2.credentials import Credentials + + from app.config import config + from app.utils.oauth_security import TokenEncryption + + config_data = dict(connector.config) + token_encrypted = config_data.get("_token_encrypted", False) + if token_encrypted and config.SECRET_KEY: + token_encryption = TokenEncryption(config.SECRET_KEY) + if config_data.get("token"): + config_data["token"] = token_encryption.decrypt_token( + config_data["token"] + ) + if config_data.get("refresh_token"): + config_data["refresh_token"] = token_encryption.decrypt_token( + config_data["refresh_token"] + ) + if config_data.get("client_secret"): + config_data["client_secret"] = token_encryption.decrypt_token( + config_data["client_secret"] + ) + + exp = config_data.get("expiry", "") + if exp: + exp = exp.replace("Z", "") + + creds = Credentials( + token=config_data.get("token"), + refresh_token=config_data.get("refresh_token"), + token_uri=config_data.get("token_uri"), + client_id=config_data.get("client_id"), + client_secret=config_data.get("client_secret"), + scopes=config_data.get("scopes", []), + expiry=datetime.fromisoformat(exp) if exp else None, + ) + + from googleapiclient.discovery import build + + gmail_service = build("gmail", "v1", credentials=creds) + + # Resolve draft_id if not already available + if not final_draft_id: + logger.info( + f"draft_id not in metadata, looking up via drafts.list for message_id={message_id}" + ) + final_draft_id = await _find_draft_id_by_message( + gmail_service, message_id + ) + + if not final_draft_id: + return { + "status": "error", + "message": ( + "Could not find this draft in Gmail. " + "It may have already been sent or deleted." + ), + } + + message = MIMEText(final_body) + if final_to: + message["to"] = final_to + message["subject"] = final_subject + if final_cc: + message["cc"] = final_cc + if final_bcc: + message["bcc"] = final_bcc + raw = base64.urlsafe_b64encode(message.as_bytes()).decode() + + try: + updated = await asyncio.get_event_loop().run_in_executor( + None, + lambda: ( + gmail_service.users() + .drafts() + .update( + userId="me", + id=final_draft_id, + body={"message": {"raw": raw}}, + ) + .execute() + ), + ) + except Exception as api_err: + from googleapiclient.errors import HttpError + + if isinstance(api_err, HttpError) and api_err.resp.status == 403: + logger.warning( + f"Insufficient permissions for connector {connector.id}: {api_err}" + ) + try: + from sqlalchemy.orm.attributes import flag_modified + + if not connector.config.get("auth_expired"): + connector.config = { + **connector.config, + "auth_expired": True, + } + flag_modified(connector, "config") + await db_session.commit() + except Exception: + logger.warning( + "Failed to persist auth_expired for connector %s", + connector.id, + exc_info=True, + ) + return { + "status": "insufficient_permissions", + "connector_id": connector.id, + "message": "This Gmail account needs additional permissions. Please re-authenticate in connector settings.", + } + if isinstance(api_err, HttpError) and api_err.resp.status == 404: + return { + "status": "error", + "message": "Draft no longer exists in Gmail. It may have been sent or deleted.", + } + raise + + logger.info(f"Gmail draft updated: id={updated.get('id')}") + + kb_message_suffix = "" + if document_id: + try: + from sqlalchemy.future import select as sa_select + from sqlalchemy.orm.attributes import flag_modified + + from app.db import Document + + doc_result = await db_session.execute( + sa_select(Document).filter(Document.id == document_id) + ) + document = doc_result.scalars().first() + if document: + document.source_markdown = final_body + document.title = final_subject + meta = dict(document.document_metadata or {}) + meta["subject"] = final_subject + meta["draft_id"] = updated.get("id", final_draft_id) + updated_msg = updated.get("message", {}) + if updated_msg.get("id"): + meta["message_id"] = updated_msg["id"] + document.document_metadata = meta + flag_modified(document, "document_metadata") + await db_session.commit() + kb_message_suffix = ( + " Your knowledge base has also been updated." + ) + logger.info( + f"KB document {document_id} updated for draft {final_draft_id}" + ) + else: + kb_message_suffix = " This draft will be fully updated in your knowledge base in the next scheduled sync." + except Exception as kb_err: + logger.warning(f"KB update after draft edit failed: {kb_err}") + await db_session.rollback() + kb_message_suffix = " This draft will be fully updated in your knowledge base in the next scheduled sync." + + return { + "status": "success", + "draft_id": updated.get("id"), + "message": f"Successfully updated Gmail draft with subject '{final_subject}'.{kb_message_suffix}", + } + + except Exception as e: + from langgraph.errors import GraphInterrupt + + if isinstance(e, GraphInterrupt): + raise + + logger.error(f"Error updating Gmail draft: {e}", exc_info=True) + return { + "status": "error", + "message": "Something went wrong while updating the draft. Please try again.", + } + + return update_gmail_draft + + +async def _find_draft_id_by_message(gmail_service: Any, message_id: str) -> str | None: + """Look up a draft's ID by its message ID via the Gmail API.""" + try: + page_token = None + while True: + kwargs: dict[str, Any] = {"userId": "me", "maxResults": 100} + if page_token: + kwargs["pageToken"] = page_token + + response = await asyncio.get_event_loop().run_in_executor( + None, + lambda kwargs=kwargs: ( + gmail_service.users().drafts().list(**kwargs).execute() + ), + ) + + for draft in response.get("drafts", []): + if draft.get("message", {}).get("id") == message_id: + return draft["id"] + + page_token = response.get("nextPageToken") + if not page_token: + break + + return None + except Exception as e: + logger.warning(f"Failed to look up draft by message_id: {e}") + return None diff --git a/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/google_drive/__init__.py b/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/google_drive/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/google_drive/agent.py b/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/google_drive/agent.py new file mode 100644 index 000000000..091f431f3 --- /dev/null +++ b/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/google_drive/agent.py @@ -0,0 +1,54 @@ +"""`google_drive` route: ``SubAgent`` spec for deepagents.""" + +from __future__ import annotations + +from collections.abc import Sequence +from typing import Any + +from deepagents import SubAgent +from langchain_core.language_models import BaseChatModel + +from app.agents.multi_agent_with_deepagents.subagents.shared.md_file_reader import ( + read_md_file, +) +from app.agents.multi_agent_with_deepagents.subagents.shared.permissions import ( + ToolsPermissions, + merge_tools_permissions, +) +from app.agents.multi_agent_with_deepagents.subagents.shared.subagent_builder import ( + pack_subagent, +) + +from .tools.index import load_tools + +NAME = "google_drive" + + +def build_subagent( + *, + dependencies: dict[str, Any], + model: BaseChatModel | None = None, + extra_middleware: Sequence[Any] | None = None, + extra_tools_bucket: ToolsPermissions | None = None, +) -> SubAgent: + buckets = load_tools(dependencies=dependencies) + merged_tools_bucket = merge_tools_permissions(buckets, extra_tools_bucket) + tools = [ + row["tool"] + for row in (*merged_tools_bucket["allow"], *merged_tools_bucket["ask"]) + if row.get("tool") is not None + ] + interrupt_on = {r["name"]: True for r in merged_tools_bucket["ask"] if r.get("name")} + description = read_md_file(__package__, "description").strip() + if not description: + description = "Handles google drive tasks for this workspace." + system_prompt = read_md_file(__package__, "system_prompt").strip() + return pack_subagent( + name=NAME, + description=description, + system_prompt=system_prompt, + tools=tools, + interrupt_on=interrupt_on, + model=model, + extra_middleware=extra_middleware, + ) diff --git a/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/google_drive/description.md b/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/google_drive/description.md new file mode 100644 index 000000000..3f54ef8f7 --- /dev/null +++ b/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/google_drive/description.md @@ -0,0 +1 @@ +Use for Google Drive document/file tasks: locate files, inspect content, and manage Drive files or folders. diff --git a/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/google_drive/system_prompt.md b/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/google_drive/system_prompt.md new file mode 100644 index 000000000..09dc0caa2 --- /dev/null +++ b/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/google_drive/system_prompt.md @@ -0,0 +1,54 @@ +You are the Google Drive operations sub-agent. +You receive delegated instructions from a supervisor agent and return structured results for supervisor synthesis. + + +Execute Google Drive file operations accurately in the connected account. + + + +- `create_google_drive_file` +- `delete_google_drive_file` + + + +- Use only tools in ``. +- Ensure target file identity/path is explicit before mutate actions. +- If target is ambiguous, return `status=blocked` with candidate files. +- Never invent file IDs/names or mutation outcomes. + + + +- Do not perform non-Google-Drive tasks. + + + +- Never claim file mutation success without tool confirmation. + + + +- On tool failure, return `status=error` with concise recovery `next_step`. +- On target ambiguity, return `status=blocked` with candidate files. + + + +Return **only** one JSON object (no markdown/prose): +{ + "status": "success" | "partial" | "blocked" | "error", + "action_summary": string, + "evidence": { + "file_id": string | null, + "file_name": string | null, + "operation": "create" | "delete" | null, + "matched_candidates": [ + { "file_id": string, "file_name": string | null } + ] | null + }, + "next_step": string | null, + "missing_fields": string[] | null, + "assumptions": string[] | null +} +Rules: +- `status=success` -> `next_step=null`, `missing_fields=null`. +- `status=partial|blocked|error` -> `next_step` must be non-null. +- `status=blocked` due to missing required inputs -> `missing_fields` must be non-null. + diff --git a/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/google_drive/tools/__init__.py b/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/google_drive/tools/__init__.py new file mode 100644 index 000000000..9c63bceb1 --- /dev/null +++ b/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/google_drive/tools/__init__.py @@ -0,0 +1,11 @@ +from app.agents.new_chat.tools.google_drive.create_file import ( + create_create_google_drive_file_tool, +) +from app.agents.new_chat.tools.google_drive.trash_file import ( + create_delete_google_drive_file_tool, +) + +__all__ = [ + "create_create_google_drive_file_tool", + "create_delete_google_drive_file_tool", +] diff --git a/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/google_drive/tools/create_file.py b/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/google_drive/tools/create_file.py new file mode 100644 index 000000000..f36db8f3f --- /dev/null +++ b/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/google_drive/tools/create_file.py @@ -0,0 +1,283 @@ +import logging +from typing import Any, Literal + +from googleapiclient.errors import HttpError +from langchain_core.tools import tool +from sqlalchemy.ext.asyncio import AsyncSession + +from app.agents.new_chat.tools.hitl import request_approval +from app.connectors.google_drive.client import GoogleDriveClient +from app.connectors.google_drive.file_types import GOOGLE_DOC, GOOGLE_SHEET +from app.services.google_drive import GoogleDriveToolMetadataService + +logger = logging.getLogger(__name__) + +_MIME_MAP: dict[str, str] = { + "google_doc": GOOGLE_DOC, + "google_sheet": GOOGLE_SHEET, +} + + +def create_create_google_drive_file_tool( + db_session: AsyncSession | None = None, + search_space_id: int | None = None, + user_id: str | None = None, +): + @tool + async def create_google_drive_file( + name: str, + file_type: Literal["google_doc", "google_sheet"], + content: str | None = None, + ) -> dict[str, Any]: + """Create a new Google Doc or Google Sheet in Google Drive. + + Use this tool when the user explicitly asks to create a new document + or spreadsheet in Google Drive. The user MUST specify a topic before + you call this tool. If the request does not contain a topic (e.g. + "create a drive doc" or "make a Google Sheet"), ask what the file + should be about. Never call this tool without a clear topic from the user. + + Args: + name: The file name (without extension). + file_type: Either "google_doc" or "google_sheet". + content: Optional initial content. Generate from the user's topic. + For google_doc, provide markdown text. For google_sheet, provide CSV-formatted text. + + Returns: + Dictionary with: + - status: "success", "rejected", or "error" + - file_id: Google Drive file ID (if success) + - name: File name (if success) + - web_view_link: URL to open the file (if success) + - message: Result message + + IMPORTANT: + - If status is "rejected", the user explicitly declined the action. + Respond with a brief acknowledgment and do NOT retry or suggest alternatives. + - If status is "insufficient_permissions", the connector lacks the required OAuth scope. + Inform the user they need to re-authenticate and do NOT retry the action. + + Examples: + - "Create a Google Doc with today's meeting notes" + - "Create a spreadsheet for the 2026 budget" + """ + logger.info( + f"create_google_drive_file called: name='{name}', type='{file_type}'" + ) + + if db_session is None or search_space_id is None or user_id is None: + return { + "status": "error", + "message": "Google Drive tool not properly configured. Please contact support.", + } + + if file_type not in _MIME_MAP: + return { + "status": "error", + "message": f"Unsupported file type '{file_type}'. Use 'google_doc' or 'google_sheet'.", + } + + try: + metadata_service = GoogleDriveToolMetadataService(db_session) + context = await metadata_service.get_creation_context( + search_space_id, user_id + ) + + if "error" in context: + logger.error(f"Failed to fetch creation context: {context['error']}") + return {"status": "error", "message": context["error"]} + + accounts = context.get("accounts", []) + if accounts and all(a.get("auth_expired") for a in accounts): + logger.warning("All Google Drive accounts have expired authentication") + return { + "status": "auth_error", + "message": "All connected Google Drive accounts need re-authentication. Please re-authenticate in your connector settings.", + "connector_type": "google_drive", + } + + logger.info( + f"Requesting approval for creating Google Drive file: name='{name}', type='{file_type}'" + ) + result = request_approval( + action_type="google_drive_file_creation", + tool_name="create_google_drive_file", + params={ + "name": name, + "file_type": file_type, + "content": content, + "connector_id": None, + "parent_folder_id": None, + }, + context=context, + ) + + if result.rejected: + return { + "status": "rejected", + "message": "User declined. The file was not created. Do not ask again or suggest alternatives.", + } + + final_name = result.params.get("name", name) + final_file_type = result.params.get("file_type", file_type) + final_content = result.params.get("content", content) + final_connector_id = result.params.get("connector_id") + final_parent_folder_id = result.params.get("parent_folder_id") + + if not final_name or not final_name.strip(): + return {"status": "error", "message": "File name cannot be empty."} + + mime_type = _MIME_MAP.get(final_file_type) + if not mime_type: + return { + "status": "error", + "message": f"Unsupported file type '{final_file_type}'.", + } + + from sqlalchemy.future import select + + from app.db import SearchSourceConnector, SearchSourceConnectorType + + _drive_types = [ + SearchSourceConnectorType.GOOGLE_DRIVE_CONNECTOR, + SearchSourceConnectorType.COMPOSIO_GOOGLE_DRIVE_CONNECTOR, + ] + + if final_connector_id is not None: + result = await db_session.execute( + select(SearchSourceConnector).filter( + SearchSourceConnector.id == final_connector_id, + SearchSourceConnector.search_space_id == search_space_id, + SearchSourceConnector.user_id == user_id, + SearchSourceConnector.connector_type.in_(_drive_types), + ) + ) + connector = result.scalars().first() + if not connector: + return { + "status": "error", + "message": "Selected Google Drive connector is invalid or has been disconnected.", + } + actual_connector_id = connector.id + else: + result = await db_session.execute( + select(SearchSourceConnector).filter( + SearchSourceConnector.search_space_id == search_space_id, + SearchSourceConnector.user_id == user_id, + SearchSourceConnector.connector_type.in_(_drive_types), + ) + ) + connector = result.scalars().first() + if not connector: + return { + "status": "error", + "message": "No Google Drive connector found. Please connect Google Drive in your workspace settings.", + } + actual_connector_id = connector.id + + logger.info( + f"Creating Google Drive file: name='{final_name}', type='{final_file_type}', connector={actual_connector_id}" + ) + + pre_built_creds = None + if ( + connector.connector_type + == SearchSourceConnectorType.COMPOSIO_GOOGLE_DRIVE_CONNECTOR + ): + from app.utils.google_credentials import build_composio_credentials + + cca_id = connector.config.get("composio_connected_account_id") + if cca_id: + pre_built_creds = build_composio_credentials(cca_id) + + client = GoogleDriveClient( + session=db_session, + connector_id=actual_connector_id, + credentials=pre_built_creds, + ) + try: + created = await client.create_file( + name=final_name, + mime_type=mime_type, + parent_folder_id=final_parent_folder_id, + content=final_content, + ) + except HttpError as http_err: + if http_err.resp.status == 403: + logger.warning( + f"Insufficient permissions for connector {actual_connector_id}: {http_err}" + ) + try: + from sqlalchemy.orm.attributes import flag_modified + + _res = await db_session.execute( + select(SearchSourceConnector).where( + SearchSourceConnector.id == actual_connector_id + ) + ) + _conn = _res.scalar_one_or_none() + if _conn and not _conn.config.get("auth_expired"): + _conn.config = {**_conn.config, "auth_expired": True} + flag_modified(_conn, "config") + await db_session.commit() + except Exception: + logger.warning( + "Failed to persist auth_expired for connector %s", + actual_connector_id, + exc_info=True, + ) + return { + "status": "insufficient_permissions", + "connector_id": actual_connector_id, + "message": "This Google Drive account needs additional permissions. Please re-authenticate in connector settings.", + } + raise + + logger.info( + f"Google Drive file created: id={created.get('id')}, name={created.get('name')}" + ) + + kb_message_suffix = "" + try: + from app.services.google_drive import GoogleDriveKBSyncService + + kb_service = GoogleDriveKBSyncService(db_session) + kb_result = await kb_service.sync_after_create( + file_id=created.get("id"), + file_name=created.get("name", final_name), + mime_type=mime_type, + web_view_link=created.get("webViewLink"), + content=final_content, + connector_id=actual_connector_id, + search_space_id=search_space_id, + user_id=user_id, + ) + if kb_result["status"] == "success": + kb_message_suffix = " Your knowledge base has also been updated." + else: + kb_message_suffix = " This file will be added to your knowledge base in the next scheduled sync." + except Exception as kb_err: + logger.warning(f"KB sync after create failed: {kb_err}") + kb_message_suffix = " This file will be added to your knowledge base in the next scheduled sync." + + return { + "status": "success", + "file_id": created.get("id"), + "name": created.get("name"), + "web_view_link": created.get("webViewLink"), + "message": f"Successfully created '{created.get('name')}' in Google Drive.{kb_message_suffix}", + } + + except Exception as e: + from langgraph.errors import GraphInterrupt + + if isinstance(e, GraphInterrupt): + raise + + logger.error(f"Error creating Google Drive file: {e}", exc_info=True) + return { + "status": "error", + "message": "Something went wrong while creating the file. Please try again.", + } + + return create_google_drive_file diff --git a/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/google_drive/tools/index.py b/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/google_drive/tools/index.py new file mode 100644 index 000000000..074cba74c --- /dev/null +++ b/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/google_drive/tools/index.py @@ -0,0 +1,28 @@ +from __future__ import annotations + +from typing import Any + +from app.agents.multi_agent_with_deepagents.subagents.shared.permissions import ( + ToolsPermissions, +) + +from .create_file import create_create_google_drive_file_tool +from .trash_file import create_delete_google_drive_file_tool + + +def load_tools(*, dependencies: dict[str, Any] | None = None, **kwargs: Any) -> ToolsPermissions: + d = {**(dependencies or {}), **kwargs} + common = { + "db_session": d["db_session"], + "search_space_id": d["search_space_id"], + "user_id": d["user_id"], + } + create = create_create_google_drive_file_tool(**common) + delete = create_delete_google_drive_file_tool(**common) + return { + "allow": [], + "ask": [ + {"name": getattr(create, "name", "") or "", "tool": create}, + {"name": getattr(delete, "name", "") or "", "tool": delete}, + ], + } diff --git a/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/google_drive/tools/trash_file.py b/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/google_drive/tools/trash_file.py new file mode 100644 index 000000000..832afff0d --- /dev/null +++ b/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/google_drive/tools/trash_file.py @@ -0,0 +1,262 @@ +import logging +from typing import Any + +from googleapiclient.errors import HttpError +from langchain_core.tools import tool +from sqlalchemy.ext.asyncio import AsyncSession + +from app.agents.new_chat.tools.hitl import request_approval +from app.connectors.google_drive.client import GoogleDriveClient +from app.services.google_drive import GoogleDriveToolMetadataService + +logger = logging.getLogger(__name__) + + +def create_delete_google_drive_file_tool( + db_session: AsyncSession | None = None, + search_space_id: int | None = None, + user_id: str | None = None, +): + @tool + async def delete_google_drive_file( + file_name: str, + delete_from_kb: bool = False, + ) -> dict[str, Any]: + """Move a Google Drive file to trash. + + Use this tool when the user explicitly asks to delete, remove, or trash + a file in Google Drive. + + Args: + file_name: The exact name of the file to trash (as it appears in Drive). + delete_from_kb: Whether to also remove the file from the knowledge base. + Default is False. + Set to True to remove from both Google Drive and knowledge base. + + Returns: + Dictionary with: + - status: "success", "rejected", "not_found", or "error" + - file_id: Google Drive file ID (if success) + - deleted_from_kb: whether the document was removed from the knowledge base + - message: Result message + + IMPORTANT: + - If status is "rejected", the user explicitly declined. Respond with a brief + acknowledgment and do NOT retry or suggest alternatives. + - If status is "not_found", relay the exact message to the user and ask them + to verify the file name or check if it has been indexed. + - If status is "insufficient_permissions", the connector lacks the required OAuth scope. + Inform the user they need to re-authenticate and do NOT retry this tool. + Examples: + - "Delete the 'Meeting Notes' file from Google Drive" + - "Trash the 'Old Budget' spreadsheet" + """ + logger.info( + f"delete_google_drive_file called: file_name='{file_name}', delete_from_kb={delete_from_kb}" + ) + + if db_session is None or search_space_id is None or user_id is None: + return { + "status": "error", + "message": "Google Drive tool not properly configured. Please contact support.", + } + + try: + metadata_service = GoogleDriveToolMetadataService(db_session) + context = await metadata_service.get_trash_context( + search_space_id, user_id, file_name + ) + + if "error" in context: + error_msg = context["error"] + if "not found" in error_msg.lower(): + logger.warning(f"File not found: {error_msg}") + return {"status": "not_found", "message": error_msg} + logger.error(f"Failed to fetch trash context: {error_msg}") + return {"status": "error", "message": error_msg} + + account = context.get("account", {}) + if account.get("auth_expired"): + logger.warning( + "Google Drive account %s has expired authentication", + account.get("id"), + ) + return { + "status": "auth_error", + "message": "The Google Drive account for this file needs re-authentication. Please re-authenticate in your connector settings.", + "connector_type": "google_drive", + } + + file = context["file"] + file_id = file["file_id"] + document_id = file.get("document_id") + connector_id_from_context = context["account"]["id"] + + if not file_id: + return { + "status": "error", + "message": "File ID is missing from the indexed document. Please re-index the file and try again.", + } + + logger.info( + f"Requesting approval for deleting Google Drive file: '{file_name}' (file_id={file_id}, delete_from_kb={delete_from_kb})" + ) + result = request_approval( + action_type="google_drive_file_trash", + tool_name="delete_google_drive_file", + params={ + "file_id": file_id, + "connector_id": connector_id_from_context, + "delete_from_kb": delete_from_kb, + }, + context=context, + ) + + if result.rejected: + return { + "status": "rejected", + "message": "User declined. The file was not trashed. Do not ask again or suggest alternatives.", + } + + final_file_id = result.params.get("file_id", file_id) + final_connector_id = result.params.get( + "connector_id", connector_id_from_context + ) + final_delete_from_kb = result.params.get("delete_from_kb", delete_from_kb) + + if not final_connector_id: + return { + "status": "error", + "message": "No connector found for this file.", + } + + from sqlalchemy.future import select + + from app.db import SearchSourceConnector, SearchSourceConnectorType + + _drive_types = [ + SearchSourceConnectorType.GOOGLE_DRIVE_CONNECTOR, + SearchSourceConnectorType.COMPOSIO_GOOGLE_DRIVE_CONNECTOR, + ] + + result = await db_session.execute( + select(SearchSourceConnector).filter( + SearchSourceConnector.id == final_connector_id, + SearchSourceConnector.search_space_id == search_space_id, + SearchSourceConnector.user_id == user_id, + SearchSourceConnector.connector_type.in_(_drive_types), + ) + ) + connector = result.scalars().first() + if not connector: + return { + "status": "error", + "message": "Selected Google Drive connector is invalid or has been disconnected.", + } + + logger.info( + f"Deleting Google Drive file: file_id='{final_file_id}', connector={final_connector_id}" + ) + + pre_built_creds = None + if ( + connector.connector_type + == SearchSourceConnectorType.COMPOSIO_GOOGLE_DRIVE_CONNECTOR + ): + from app.utils.google_credentials import build_composio_credentials + + cca_id = connector.config.get("composio_connected_account_id") + if cca_id: + pre_built_creds = build_composio_credentials(cca_id) + + client = GoogleDriveClient( + session=db_session, + connector_id=connector.id, + credentials=pre_built_creds, + ) + try: + await client.trash_file(file_id=final_file_id) + except HttpError as http_err: + if http_err.resp.status == 403: + logger.warning( + f"Insufficient permissions for connector {connector.id}: {http_err}" + ) + try: + from sqlalchemy.orm.attributes import flag_modified + + if not connector.config.get("auth_expired"): + connector.config = { + **connector.config, + "auth_expired": True, + } + flag_modified(connector, "config") + await db_session.commit() + except Exception: + logger.warning( + "Failed to persist auth_expired for connector %s", + connector.id, + exc_info=True, + ) + return { + "status": "insufficient_permissions", + "connector_id": connector.id, + "message": "This Google Drive account needs additional permissions. Please re-authenticate in connector settings.", + } + raise + + logger.info( + f"Google Drive file deleted (moved to trash): file_id={final_file_id}" + ) + + trash_result: dict[str, Any] = { + "status": "success", + "file_id": final_file_id, + "message": f"Successfully moved '{file['name']}' to trash.", + } + + deleted_from_kb = False + if final_delete_from_kb and document_id: + try: + from app.db import Document + + doc_result = await db_session.execute( + select(Document).filter(Document.id == document_id) + ) + document = doc_result.scalars().first() + if document: + await db_session.delete(document) + await db_session.commit() + deleted_from_kb = True + logger.info( + f"Deleted document {document_id} from knowledge base" + ) + else: + logger.warning(f"Document {document_id} not found in KB") + except Exception as e: + logger.error(f"Failed to delete document from KB: {e}") + await db_session.rollback() + trash_result["warning"] = ( + f"File moved to trash, but failed to remove from knowledge base: {e!s}" + ) + + trash_result["deleted_from_kb"] = deleted_from_kb + if deleted_from_kb: + trash_result["message"] = ( + f"{trash_result.get('message', '')} (also removed from knowledge base)" + ) + + return trash_result + + except Exception as e: + from langgraph.errors import GraphInterrupt + + if isinstance(e, GraphInterrupt): + raise + + logger.error(f"Error deleting Google Drive file: {e}", exc_info=True) + return { + "status": "error", + "message": "Something went wrong while trashing the file. Please try again.", + } + + return delete_google_drive_file diff --git a/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/jira/__init__.py b/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/jira/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/jira/agent.py b/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/jira/agent.py new file mode 100644 index 000000000..8e606a129 --- /dev/null +++ b/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/jira/agent.py @@ -0,0 +1,54 @@ +"""`jira` route: ``SubAgent`` spec for deepagents.""" + +from __future__ import annotations + +from collections.abc import Sequence +from typing import Any + +from deepagents import SubAgent +from langchain_core.language_models import BaseChatModel + +from app.agents.multi_agent_with_deepagents.subagents.shared.md_file_reader import ( + read_md_file, +) +from app.agents.multi_agent_with_deepagents.subagents.shared.permissions import ( + ToolsPermissions, + merge_tools_permissions, +) +from app.agents.multi_agent_with_deepagents.subagents.shared.subagent_builder import ( + pack_subagent, +) + +from .tools.index import load_tools + +NAME = "jira" + + +def build_subagent( + *, + dependencies: dict[str, Any], + model: BaseChatModel | None = None, + extra_middleware: Sequence[Any] | None = None, + extra_tools_bucket: ToolsPermissions | None = None, +) -> SubAgent: + buckets = load_tools(dependencies=dependencies) + merged_tools_bucket = merge_tools_permissions(buckets, extra_tools_bucket) + tools = [ + row["tool"] + for row in (*merged_tools_bucket["allow"], *merged_tools_bucket["ask"]) + if row.get("tool") is not None + ] + interrupt_on = {r["name"]: True for r in merged_tools_bucket["ask"] if r.get("name")} + description = read_md_file(__package__, "description").strip() + if not description: + description = "Handles jira tasks for this workspace." + system_prompt = read_md_file(__package__, "system_prompt").strip() + return pack_subagent( + name=NAME, + description=description, + system_prompt=system_prompt, + tools=tools, + interrupt_on=interrupt_on, + model=model, + extra_middleware=extra_middleware, + ) diff --git a/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/jira/description.md b/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/jira/description.md new file mode 100644 index 000000000..2cd7e082a --- /dev/null +++ b/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/jira/description.md @@ -0,0 +1 @@ +Use for Jira issue/project workflows: search issues, inspect fields, update tickets, and move work through workflow states. diff --git a/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/jira/system_prompt.md b/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/jira/system_prompt.md new file mode 100644 index 000000000..4f4ae8a66 --- /dev/null +++ b/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/jira/system_prompt.md @@ -0,0 +1,46 @@ +You are the Jira MCP operations sub-agent. +You receive delegated instructions from a supervisor agent and return structured results for supervisor synthesis. + + +Execute Jira MCP operations accurately, including discovery and issue mutation flows. + + + +- Runtime-provided Jira MCP tools for site/project discovery, issue search, create, and update. + + + +- Respect discovery dependencies (site/project/issue-type) before mutate calls. +- If required fields are missing or targets are ambiguous, return `status=blocked` with `missing_fields`. +- Do not guess keys/IDs. +- Never claim create/update success without tool confirmation. + + + +- Do not execute non-Jira tasks. + + + +- Never perform destructive/mutating actions without explicit target resolution. + + + +- On tool failure, return `status=error` with concise recovery `next_step`. +- On unresolved ambiguity, return `status=blocked` with candidates or missing fields. + + + +Return **only** one JSON object (no markdown/prose): +{ + "status": "success" | "partial" | "blocked" | "error", + "action_summary": string, + "evidence": { "items": object | null }, + "next_step": string | null, + "missing_fields": string[] | null, + "assumptions": string[] | null +} +Rules: +- `status=success` -> `next_step=null`, `missing_fields=null`. +- `status=partial|blocked|error` -> `next_step` must be non-null. +- `status=blocked` due to missing required inputs -> `missing_fields` must be non-null. + diff --git a/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/jira/tools/__init__.py b/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/jira/tools/__init__.py new file mode 100644 index 000000000..768738118 --- /dev/null +++ b/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/jira/tools/__init__.py @@ -0,0 +1,11 @@ +"""Jira tools for creating, updating, and deleting issues.""" + +from .create_issue import create_create_jira_issue_tool +from .delete_issue import create_delete_jira_issue_tool +from .update_issue import create_update_jira_issue_tool + +__all__ = [ + "create_create_jira_issue_tool", + "create_delete_jira_issue_tool", + "create_update_jira_issue_tool", +] diff --git a/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/jira/tools/create_issue.py b/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/jira/tools/create_issue.py new file mode 100644 index 000000000..8b40dde65 --- /dev/null +++ b/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/jira/tools/create_issue.py @@ -0,0 +1,216 @@ +import asyncio +import logging +from typing import Any + +from langchain_core.tools import tool +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy.orm.attributes import flag_modified + +from app.agents.new_chat.tools.hitl import request_approval +from app.connectors.jira_history import JiraHistoryConnector +from app.services.jira import JiraToolMetadataService + +logger = logging.getLogger(__name__) + + +def create_create_jira_issue_tool( + db_session: AsyncSession | None = None, + search_space_id: int | None = None, + user_id: str | None = None, + connector_id: int | None = None, +): + @tool + async def create_jira_issue( + project_key: str, + summary: str, + issue_type: str = "Task", + description: str | None = None, + priority: str | None = None, + ) -> dict[str, Any]: + """Create a new issue in Jira. + + Use this tool when the user explicitly asks to create a new Jira issue/ticket. + + Args: + project_key: The Jira project key (e.g. "PROJ", "ENG"). + summary: Short, descriptive issue title. + issue_type: Issue type (default "Task"). Others: "Bug", "Story", "Epic". + description: Optional description body for the issue. + priority: Optional priority name (e.g. "High", "Medium", "Low"). + + Returns: + Dictionary with status, issue_key, and message. + + IMPORTANT: + - If status is "rejected", the user declined. Do NOT retry. + - If status is "insufficient_permissions", inform user to re-authenticate. + """ + logger.info( + f"create_jira_issue called: project_key='{project_key}', summary='{summary}'" + ) + + if db_session is None or search_space_id is None or user_id is None: + return {"status": "error", "message": "Jira tool not properly configured."} + + try: + metadata_service = JiraToolMetadataService(db_session) + context = await metadata_service.get_creation_context( + search_space_id, user_id + ) + + if "error" in context: + return {"status": "error", "message": context["error"]} + + accounts = context.get("accounts", []) + if accounts and all(a.get("auth_expired") for a in accounts): + return { + "status": "auth_error", + "message": "All connected Jira accounts need re-authentication.", + "connector_type": "jira", + } + + result = request_approval( + action_type="jira_issue_creation", + tool_name="create_jira_issue", + params={ + "project_key": project_key, + "summary": summary, + "issue_type": issue_type, + "description": description, + "priority": priority, + "connector_id": connector_id, + }, + context=context, + ) + + if result.rejected: + return { + "status": "rejected", + "message": "User declined. Do not retry or suggest alternatives.", + } + + final_project_key = result.params.get("project_key", project_key) + final_summary = result.params.get("summary", summary) + final_issue_type = result.params.get("issue_type", issue_type) + final_description = result.params.get("description", description) + final_priority = result.params.get("priority", priority) + final_connector_id = result.params.get("connector_id", connector_id) + + if not final_summary or not final_summary.strip(): + return {"status": "error", "message": "Issue summary cannot be empty."} + if not final_project_key: + return {"status": "error", "message": "A project must be selected."} + + from sqlalchemy.future import select + + from app.db import SearchSourceConnector, SearchSourceConnectorType + + actual_connector_id = final_connector_id + if actual_connector_id is None: + result = await db_session.execute( + select(SearchSourceConnector).filter( + SearchSourceConnector.search_space_id == search_space_id, + SearchSourceConnector.user_id == user_id, + SearchSourceConnector.connector_type + == SearchSourceConnectorType.JIRA_CONNECTOR, + ) + ) + connector = result.scalars().first() + if not connector: + return {"status": "error", "message": "No Jira connector found."} + actual_connector_id = connector.id + else: + result = await db_session.execute( + select(SearchSourceConnector).filter( + SearchSourceConnector.id == actual_connector_id, + SearchSourceConnector.search_space_id == search_space_id, + SearchSourceConnector.user_id == user_id, + SearchSourceConnector.connector_type + == SearchSourceConnectorType.JIRA_CONNECTOR, + ) + ) + connector = result.scalars().first() + if not connector: + return { + "status": "error", + "message": "Selected Jira connector is invalid.", + } + + try: + jira_history = JiraHistoryConnector( + session=db_session, connector_id=actual_connector_id + ) + jira_client = await jira_history._get_jira_client() + api_result = await asyncio.to_thread( + jira_client.create_issue, + project_key=final_project_key, + summary=final_summary, + issue_type=final_issue_type, + description=final_description, + priority=final_priority, + ) + except Exception as api_err: + if "status code 403" in str(api_err).lower(): + try: + _conn = connector + _conn.config = {**_conn.config, "auth_expired": True} + flag_modified(_conn, "config") + await db_session.commit() + except Exception: + pass + return { + "status": "insufficient_permissions", + "connector_id": actual_connector_id, + "message": "This Jira account needs additional permissions. Please re-authenticate in connector settings.", + } + raise + + issue_key = api_result.get("key", "") + issue_url = ( + f"{jira_history._base_url}/browse/{issue_key}" + if jira_history._base_url and issue_key + else "" + ) + + kb_message_suffix = "" + try: + from app.services.jira import JiraKBSyncService + + kb_service = JiraKBSyncService(db_session) + kb_result = await kb_service.sync_after_create( + issue_id=issue_key, + issue_identifier=issue_key, + issue_title=final_summary, + description=final_description, + state="To Do", + connector_id=actual_connector_id, + search_space_id=search_space_id, + user_id=user_id, + ) + if kb_result["status"] == "success": + kb_message_suffix = " Your knowledge base has also been updated." + else: + kb_message_suffix = " This issue will be added to your knowledge base in the next scheduled sync." + except Exception as kb_err: + logger.warning(f"KB sync after create failed: {kb_err}") + kb_message_suffix = " This issue will be added to your knowledge base in the next scheduled sync." + + return { + "status": "success", + "issue_key": issue_key, + "issue_url": issue_url, + "message": f"Jira issue {issue_key} created successfully.{kb_message_suffix}", + } + + except Exception as e: + from langgraph.errors import GraphInterrupt + + if isinstance(e, GraphInterrupt): + raise + logger.error(f"Error creating Jira issue: {e}", exc_info=True) + return { + "status": "error", + "message": "Something went wrong while creating the issue.", + } + + return create_jira_issue diff --git a/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/jira/tools/delete_issue.py b/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/jira/tools/delete_issue.py new file mode 100644 index 000000000..6466c80ea --- /dev/null +++ b/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/jira/tools/delete_issue.py @@ -0,0 +1,183 @@ +import asyncio +import logging +from typing import Any + +from langchain_core.tools import tool +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy.orm.attributes import flag_modified + +from app.agents.new_chat.tools.hitl import request_approval +from app.connectors.jira_history import JiraHistoryConnector +from app.services.jira import JiraToolMetadataService + +logger = logging.getLogger(__name__) + + +def create_delete_jira_issue_tool( + db_session: AsyncSession | None = None, + search_space_id: int | None = None, + user_id: str | None = None, + connector_id: int | None = None, +): + @tool + async def delete_jira_issue( + issue_title_or_key: str, + delete_from_kb: bool = False, + ) -> dict[str, Any]: + """Delete a Jira issue. + + Use this tool when the user asks to delete or remove a Jira issue. + + Args: + issue_title_or_key: The issue key (e.g. "PROJ-42") or title. + delete_from_kb: Whether to also remove from the knowledge base. + + Returns: + Dictionary with status, message, and deleted_from_kb. + + IMPORTANT: + - If status is "rejected", do NOT retry. + - If status is "not_found", relay the message to the user. + - If status is "insufficient_permissions", inform user to re-authenticate. + """ + logger.info( + f"delete_jira_issue called: issue_title_or_key='{issue_title_or_key}'" + ) + + if db_session is None or search_space_id is None or user_id is None: + return {"status": "error", "message": "Jira tool not properly configured."} + + try: + metadata_service = JiraToolMetadataService(db_session) + context = await metadata_service.get_deletion_context( + search_space_id, user_id, issue_title_or_key + ) + + if "error" in context: + error_msg = context["error"] + if context.get("auth_expired"): + return { + "status": "auth_error", + "message": error_msg, + "connector_id": context.get("connector_id"), + "connector_type": "jira", + } + if "not found" in error_msg.lower(): + return {"status": "not_found", "message": error_msg} + return {"status": "error", "message": error_msg} + + issue_data = context["issue"] + issue_key = issue_data["issue_id"] + document_id = issue_data["document_id"] + connector_id_from_context = context.get("account", {}).get("id") + + result = request_approval( + action_type="jira_issue_deletion", + tool_name="delete_jira_issue", + params={ + "issue_key": issue_key, + "connector_id": connector_id_from_context, + "delete_from_kb": delete_from_kb, + }, + context=context, + ) + + if result.rejected: + return { + "status": "rejected", + "message": "User declined. Do not retry or suggest alternatives.", + } + + final_issue_key = result.params.get("issue_key", issue_key) + final_connector_id = result.params.get( + "connector_id", connector_id_from_context + ) + final_delete_from_kb = result.params.get("delete_from_kb", delete_from_kb) + + from sqlalchemy.future import select + + from app.db import SearchSourceConnector, SearchSourceConnectorType + + if not final_connector_id: + return { + "status": "error", + "message": "No connector found for this issue.", + } + + result = await db_session.execute( + select(SearchSourceConnector).filter( + SearchSourceConnector.id == final_connector_id, + SearchSourceConnector.search_space_id == search_space_id, + SearchSourceConnector.user_id == user_id, + SearchSourceConnector.connector_type + == SearchSourceConnectorType.JIRA_CONNECTOR, + ) + ) + connector = result.scalars().first() + if not connector: + return { + "status": "error", + "message": "Selected Jira connector is invalid.", + } + + try: + jira_history = JiraHistoryConnector( + session=db_session, connector_id=final_connector_id + ) + jira_client = await jira_history._get_jira_client() + await asyncio.to_thread(jira_client.delete_issue, final_issue_key) + except Exception as api_err: + if "status code 403" in str(api_err).lower(): + try: + connector.config = {**connector.config, "auth_expired": True} + flag_modified(connector, "config") + await db_session.commit() + except Exception: + pass + return { + "status": "insufficient_permissions", + "connector_id": final_connector_id, + "message": "This Jira account needs additional permissions. Please re-authenticate in connector settings.", + } + raise + + deleted_from_kb = False + if final_delete_from_kb and document_id: + try: + from app.db import Document + + doc_result = await db_session.execute( + select(Document).filter(Document.id == document_id) + ) + document = doc_result.scalars().first() + if document: + await db_session.delete(document) + await db_session.commit() + deleted_from_kb = True + except Exception as e: + logger.error(f"Failed to delete document from KB: {e}") + await db_session.rollback() + + message = f"Jira issue {final_issue_key} deleted successfully." + if deleted_from_kb: + message += " Also removed from the knowledge base." + + return { + "status": "success", + "issue_key": final_issue_key, + "deleted_from_kb": deleted_from_kb, + "message": message, + } + + except Exception as e: + from langgraph.errors import GraphInterrupt + + if isinstance(e, GraphInterrupt): + raise + logger.error(f"Error deleting Jira issue: {e}", exc_info=True) + return { + "status": "error", + "message": "Something went wrong while deleting the issue.", + } + + return delete_jira_issue diff --git a/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/jira/tools/index.py b/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/jira/tools/index.py new file mode 100644 index 000000000..c08909fcf --- /dev/null +++ b/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/jira/tools/index.py @@ -0,0 +1,32 @@ +from __future__ import annotations + +from typing import Any + +from app.agents.multi_agent_with_deepagents.subagents.shared.permissions import ( + ToolsPermissions, +) + +from .create_issue import create_create_jira_issue_tool +from .delete_issue import create_delete_jira_issue_tool +from .update_issue import create_update_jira_issue_tool + + +def load_tools(*, dependencies: dict[str, Any] | None = None, **kwargs: Any) -> ToolsPermissions: + d = {**(dependencies or {}), **kwargs} + common = { + "db_session": d["db_session"], + "search_space_id": d["search_space_id"], + "user_id": d["user_id"], + "connector_id": d.get("connector_id"), + } + create = create_create_jira_issue_tool(**common) + update = create_update_jira_issue_tool(**common) + delete = create_delete_jira_issue_tool(**common) + return { + "allow": [], + "ask": [ + {"name": getattr(create, "name", "") or "", "tool": create}, + {"name": getattr(update, "name", "") or "", "tool": update}, + {"name": getattr(delete, "name", "") or "", "tool": delete}, + ], + } diff --git a/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/jira/tools/update_issue.py b/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/jira/tools/update_issue.py new file mode 100644 index 000000000..f6e586a2e --- /dev/null +++ b/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/jira/tools/update_issue.py @@ -0,0 +1,226 @@ +import asyncio +import logging +from typing import Any + +from langchain_core.tools import tool +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy.orm.attributes import flag_modified + +from app.agents.new_chat.tools.hitl import request_approval +from app.connectors.jira_history import JiraHistoryConnector +from app.services.jira import JiraToolMetadataService + +logger = logging.getLogger(__name__) + + +def create_update_jira_issue_tool( + db_session: AsyncSession | None = None, + search_space_id: int | None = None, + user_id: str | None = None, + connector_id: int | None = None, +): + @tool + async def update_jira_issue( + issue_title_or_key: str, + new_summary: str | None = None, + new_description: str | None = None, + new_priority: str | None = None, + ) -> dict[str, Any]: + """Update an existing Jira issue. + + Use this tool when the user asks to modify, edit, or update a Jira issue. + + Args: + issue_title_or_key: The issue key (e.g. "PROJ-42") or title to identify the issue. + new_summary: Optional new title/summary for the issue. + new_description: Optional new description. + new_priority: Optional new priority name. + + Returns: + Dictionary with status and message. + + IMPORTANT: + - If status is "rejected", do NOT retry. + - If status is "not_found", relay the message and ask user to verify. + - If status is "insufficient_permissions", inform user to re-authenticate. + """ + logger.info( + f"update_jira_issue called: issue_title_or_key='{issue_title_or_key}'" + ) + + if db_session is None or search_space_id is None or user_id is None: + return {"status": "error", "message": "Jira tool not properly configured."} + + try: + metadata_service = JiraToolMetadataService(db_session) + context = await metadata_service.get_update_context( + search_space_id, user_id, issue_title_or_key + ) + + if "error" in context: + error_msg = context["error"] + if context.get("auth_expired"): + return { + "status": "auth_error", + "message": error_msg, + "connector_id": context.get("connector_id"), + "connector_type": "jira", + } + if "not found" in error_msg.lower(): + return {"status": "not_found", "message": error_msg} + return {"status": "error", "message": error_msg} + + issue_data = context["issue"] + issue_key = issue_data["issue_id"] + document_id = issue_data.get("document_id") + connector_id_from_context = context.get("account", {}).get("id") + + result = request_approval( + action_type="jira_issue_update", + tool_name="update_jira_issue", + params={ + "issue_key": issue_key, + "document_id": document_id, + "new_summary": new_summary, + "new_description": new_description, + "new_priority": new_priority, + "connector_id": connector_id_from_context, + }, + context=context, + ) + + if result.rejected: + return { + "status": "rejected", + "message": "User declined. Do not retry or suggest alternatives.", + } + + final_issue_key = result.params.get("issue_key", issue_key) + final_summary = result.params.get("new_summary", new_summary) + final_description = result.params.get("new_description", new_description) + final_priority = result.params.get("new_priority", new_priority) + final_connector_id = result.params.get( + "connector_id", connector_id_from_context + ) + final_document_id = result.params.get("document_id", document_id) + + from sqlalchemy.future import select + + from app.db import SearchSourceConnector, SearchSourceConnectorType + + if not final_connector_id: + return { + "status": "error", + "message": "No connector found for this issue.", + } + + result = await db_session.execute( + select(SearchSourceConnector).filter( + SearchSourceConnector.id == final_connector_id, + SearchSourceConnector.search_space_id == search_space_id, + SearchSourceConnector.user_id == user_id, + SearchSourceConnector.connector_type + == SearchSourceConnectorType.JIRA_CONNECTOR, + ) + ) + connector = result.scalars().first() + if not connector: + return { + "status": "error", + "message": "Selected Jira connector is invalid.", + } + + fields: dict[str, Any] = {} + if final_summary: + fields["summary"] = final_summary + if final_description is not None: + fields["description"] = { + "type": "doc", + "version": 1, + "content": [ + { + "type": "paragraph", + "content": [{"type": "text", "text": final_description}], + } + ], + } + if final_priority: + fields["priority"] = {"name": final_priority} + + if not fields: + return {"status": "error", "message": "No changes specified."} + + try: + jira_history = JiraHistoryConnector( + session=db_session, connector_id=final_connector_id + ) + jira_client = await jira_history._get_jira_client() + await asyncio.to_thread( + jira_client.update_issue, final_issue_key, fields + ) + except Exception as api_err: + if "status code 403" in str(api_err).lower(): + try: + connector.config = {**connector.config, "auth_expired": True} + flag_modified(connector, "config") + await db_session.commit() + except Exception: + pass + return { + "status": "insufficient_permissions", + "connector_id": final_connector_id, + "message": "This Jira account needs additional permissions. Please re-authenticate in connector settings.", + } + raise + + issue_url = ( + f"{jira_history._base_url}/browse/{final_issue_key}" + if jira_history._base_url and final_issue_key + else "" + ) + + kb_message_suffix = "" + if final_document_id: + try: + from app.services.jira import JiraKBSyncService + + kb_service = JiraKBSyncService(db_session) + kb_result = await kb_service.sync_after_update( + document_id=final_document_id, + issue_id=final_issue_key, + user_id=user_id, + search_space_id=search_space_id, + ) + if kb_result["status"] == "success": + kb_message_suffix = ( + " Your knowledge base has also been updated." + ) + else: + kb_message_suffix = ( + " The knowledge base will be updated in the next sync." + ) + except Exception as kb_err: + logger.warning(f"KB sync after update failed: {kb_err}") + kb_message_suffix = ( + " The knowledge base will be updated in the next sync." + ) + + return { + "status": "success", + "issue_key": final_issue_key, + "issue_url": issue_url, + "message": f"Jira issue {final_issue_key} updated successfully.{kb_message_suffix}", + } + + except Exception as e: + from langgraph.errors import GraphInterrupt + + if isinstance(e, GraphInterrupt): + raise + logger.error(f"Error updating Jira issue: {e}", exc_info=True) + return { + "status": "error", + "message": "Something went wrong while updating the issue.", + } + + return update_jira_issue