diff --git a/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/linear/__init__.py b/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/linear/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/linear/agent.py b/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/linear/agent.py new file mode 100644 index 000000000..f95d07010 --- /dev/null +++ b/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/linear/agent.py @@ -0,0 +1,54 @@ +"""`linear` route: ``SubAgent`` spec for deepagents.""" + +from __future__ import annotations + +from collections.abc import Sequence +from typing import Any + +from deepagents import SubAgent +from langchain_core.language_models import BaseChatModel + +from app.agents.multi_agent_with_deepagents.subagents.shared.md_file_reader import ( + read_md_file, +) +from app.agents.multi_agent_with_deepagents.subagents.shared.permissions import ( + ToolsPermissions, + merge_tools_permissions, +) +from app.agents.multi_agent_with_deepagents.subagents.shared.subagent_builder import ( + pack_subagent, +) + +from .tools.index import load_tools + +NAME = "linear" + + +def build_subagent( + *, + dependencies: dict[str, Any], + model: BaseChatModel | None = None, + extra_middleware: Sequence[Any] | None = None, + extra_tools_bucket: ToolsPermissions | None = None, +) -> SubAgent: + buckets = load_tools(dependencies=dependencies) + merged_tools_bucket = merge_tools_permissions(buckets, extra_tools_bucket) + tools = [ + row["tool"] + for row in (*merged_tools_bucket["allow"], *merged_tools_bucket["ask"]) + if row.get("tool") is not None + ] + interrupt_on = {r["name"]: True for r in merged_tools_bucket["ask"] if r.get("name")} + description = read_md_file(__package__, "description").strip() + if not description: + description = "Handles linear tasks for this workspace." + system_prompt = read_md_file(__package__, "system_prompt").strip() + return pack_subagent( + name=NAME, + description=description, + system_prompt=system_prompt, + tools=tools, + interrupt_on=interrupt_on, + model=model, + extra_middleware=extra_middleware, + ) diff --git a/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/linear/description.md b/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/linear/description.md new file mode 100644 index 000000000..6ad02c788 --- /dev/null +++ b/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/linear/description.md @@ -0,0 +1 @@ +Use for Linear issue/project work: find/create issues, update status/assignees, review project progress, and inspect cycles. diff --git a/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/linear/system_prompt.md b/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/linear/system_prompt.md new file mode 100644 index 000000000..ce91cc49f --- /dev/null +++ b/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/linear/system_prompt.md @@ -0,0 +1,45 @@ +You are the Linear MCP operations sub-agent. +You receive delegated instructions from a supervisor agent and return structured results for supervisor synthesis. + + +Execute Linear MCP operations accurately using only available runtime tools. + + + +- Runtime-provided Linear MCP tools for issues/projects/teams/workflows. + + + +- Follow tool descriptions exactly; do not assume unsupported endpoints. +- If required identifiers or context are missing, return `status=blocked` with `missing_fields` and supervisor `next_step`. +- Never invent IDs, statuses, or mutation outcomes. + + + +- Do not execute non-Linear tasks. + + + +- Never claim mutation success without tool confirmation. + + + +- On tool failure, return `status=error` with concise recovery `next_step`. +- On unresolved ambiguity, return `status=blocked` with candidates. + + + +Return **only** one JSON object (no markdown/prose): +{ + "status": "success" | "partial" | "blocked" | "error", + "action_summary": string, + "evidence": { "items": object | null }, + "next_step": string | null, + "missing_fields": string[] | null, + "assumptions": string[] | null +} +Rules: +- `status=success` -> `next_step=null`, `missing_fields=null`. +- `status=partial|blocked|error` -> `next_step` must be non-null. +- `status=blocked` due to missing required inputs -> `missing_fields` must be non-null. + diff --git a/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/linear/tools/__init__.py b/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/linear/tools/__init__.py new file mode 100644 index 000000000..31acf1e2a --- /dev/null +++ b/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/linear/tools/__init__.py @@ -0,0 +1,11 @@ +"""Linear tools for creating, updating, and deleting issues.""" + +from .create_issue import create_create_linear_issue_tool +from .delete_issue import create_delete_linear_issue_tool +from .update_issue import create_update_linear_issue_tool + +__all__ = [ + "create_create_linear_issue_tool", + "create_delete_linear_issue_tool", + "create_update_linear_issue_tool", +] diff --git a/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/linear/tools/create_issue.py b/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/linear/tools/create_issue.py new file mode 100644 index 000000000..ff254e133 --- /dev/null +++ b/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/linear/tools/create_issue.py @@ -0,0 +1,248 @@ +import logging +from typing import Any + +from langchain_core.tools import tool +from sqlalchemy.ext.asyncio import AsyncSession + +from app.agents.new_chat.tools.hitl import request_approval +from app.connectors.linear_connector import LinearAPIError, LinearConnector +from app.services.linear import LinearToolMetadataService + +logger = logging.getLogger(__name__) + + +def create_create_linear_issue_tool( + db_session: AsyncSession | None = None, + search_space_id: int | None = None, + user_id: str | None = None, + connector_id: int | None = None, +): + """ + Factory function to create the create_linear_issue tool. + + Args: + db_session: Database session for accessing the Linear connector + search_space_id: Search space ID to find the Linear connector + user_id: User ID for fetching user-specific context + connector_id: Optional specific connector ID (if known) + + Returns: + Configured create_linear_issue tool + """ + + @tool + async def create_linear_issue( + title: str, + description: str | None = None, + ) -> dict[str, Any]: + """Create a new issue in Linear. + + Use this tool when the user explicitly asks to create, add, or file + a new issue / ticket / task in Linear. The user MUST describe the issue + before you call this tool. If the request is vague, ask what the issue + should be about. Never call this tool without a clear topic from the user. + + Args: + title: Short, descriptive issue title. Infer from the user's request. + description: Optional markdown body for the issue. Generate from context. + + Returns: + Dictionary with: + - status: "success", "rejected", or "error" + - issue_id: Linear issue UUID (if success) + - identifier: Human-readable ID like "ENG-42" (if success) + - url: URL to the created issue (if success) + - message: Result message + + IMPORTANT: If status is "rejected", the user explicitly declined the action. + Respond with a brief acknowledgment (e.g., "Understood, I won't create the issue.") + and move on. Do NOT retry, troubleshoot, or suggest alternatives. + + Examples: + - "Create a Linear issue for the login bug" + - "File a ticket about the payment timeout problem" + - "Add an issue for the broken search feature" + """ + logger.info(f"create_linear_issue called: title='{title}'") + + if db_session is None or search_space_id is None or user_id is None: + logger.error( + "Linear tool not properly configured - missing required parameters" + ) + return { + "status": "error", + "message": "Linear tool not properly configured. Please contact support.", + } + + try: + metadata_service = LinearToolMetadataService(db_session) + context = await metadata_service.get_creation_context( + search_space_id, user_id + ) + + if "error" in context: + logger.error(f"Failed to fetch creation context: {context['error']}") + return {"status": "error", "message": context["error"]} + + workspaces = context.get("workspaces", []) + if workspaces and all(w.get("auth_expired") for w in workspaces): + logger.warning("All Linear accounts have expired authentication") + return { + "status": "auth_error", + "message": "All connected Linear accounts need re-authentication. Please re-authenticate in your connector settings.", + "connector_type": "linear", + } + + logger.info(f"Requesting approval for creating Linear issue: '{title}'") + result = request_approval( + action_type="linear_issue_creation", + tool_name="create_linear_issue", + params={ + "title": title, + "description": description, + "team_id": None, + "state_id": None, + "assignee_id": None, + "priority": None, + "label_ids": [], + "connector_id": connector_id, + }, + context=context, + ) + + if result.rejected: + logger.info("Linear issue creation rejected by user") + return { + "status": "rejected", + "message": "User declined. Do not retry or suggest alternatives.", + } + + final_title = result.params.get("title", title) + final_description = result.params.get("description", description) + final_team_id = result.params.get("team_id") + final_state_id = result.params.get("state_id") + final_assignee_id = result.params.get("assignee_id") + final_priority = result.params.get("priority") + final_label_ids = result.params.get("label_ids") or [] + final_connector_id = result.params.get("connector_id", connector_id) + + if not final_title or not final_title.strip(): + logger.error("Title is empty or contains only whitespace") + return {"status": "error", "message": "Issue title cannot be empty."} + if not final_team_id: + return { + "status": "error", + "message": "A team must be selected to create an issue.", + } + + from sqlalchemy.future import select + + from app.db import SearchSourceConnector, SearchSourceConnectorType + + actual_connector_id = final_connector_id + if actual_connector_id is None: + result = await db_session.execute( + select(SearchSourceConnector).filter( + SearchSourceConnector.search_space_id == search_space_id, + SearchSourceConnector.user_id == user_id, + SearchSourceConnector.connector_type + == SearchSourceConnectorType.LINEAR_CONNECTOR, + ) + ) + connector = result.scalars().first() + if not connector: + return { + "status": "error", + "message": "No Linear connector found. Please connect Linear in your workspace settings.", + } + actual_connector_id = connector.id + logger.info(f"Found Linear connector: id={actual_connector_id}") + else: + result = await db_session.execute( + select(SearchSourceConnector).filter( + SearchSourceConnector.id == actual_connector_id, + SearchSourceConnector.search_space_id == search_space_id, + SearchSourceConnector.user_id == user_id, + SearchSourceConnector.connector_type + == SearchSourceConnectorType.LINEAR_CONNECTOR, + ) + ) + connector = result.scalars().first() + if not connector: + return { + "status": "error", + "message": "Selected Linear connector is invalid or has been disconnected.", + } + logger.info(f"Validated Linear connector: id={actual_connector_id}") + + logger.info( + f"Creating Linear issue with final params: title='{final_title}'" + ) + linear_client = LinearConnector( + session=db_session, connector_id=actual_connector_id + ) + result = await linear_client.create_issue( + team_id=final_team_id, + title=final_title, + description=final_description, + state_id=final_state_id, + assignee_id=final_assignee_id, + priority=final_priority, + label_ids=final_label_ids if final_label_ids else None, + ) + + if result.get("status") == "error": + logger.error(f"Failed to create Linear issue: {result.get('message')}") + return {"status": "error", "message": result.get("message")} + + logger.info( + f"Linear issue created: {result.get('identifier')} - {result.get('title')}" + ) + + kb_message_suffix = "" + try: + from app.services.linear import LinearKBSyncService + + kb_service = LinearKBSyncService(db_session) + kb_result = await kb_service.sync_after_create( + issue_id=result.get("id"), + issue_identifier=result.get("identifier", ""), + issue_title=result.get("title", final_title), + issue_url=result.get("url"), + description=final_description, + connector_id=actual_connector_id, + search_space_id=search_space_id, + user_id=user_id, + ) + if kb_result["status"] == "success": + kb_message_suffix = " Your knowledge base has also been updated." + else: + kb_message_suffix = " This issue will be added to your knowledge base in the next scheduled sync." + except Exception as kb_err: + logger.warning(f"KB sync after create failed: {kb_err}") + kb_message_suffix = " This issue will be added to your knowledge base in the next scheduled sync." + + return { + "status": "success", + "issue_id": result.get("id"), + "identifier": result.get("identifier"), + "url": result.get("url"), + "message": (result.get("message", "") + kb_message_suffix), + } + + except Exception as e: + from langgraph.errors import GraphInterrupt + + if isinstance(e, GraphInterrupt): + raise + + logger.error(f"Error creating Linear issue: {e}", exc_info=True) + if isinstance(e, ValueError | LinearAPIError): + message = str(e) + else: + message = ( + "Something went wrong while creating the issue. Please try again." + ) + return {"status": "error", "message": message} + + return create_linear_issue diff --git a/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/linear/tools/delete_issue.py b/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/linear/tools/delete_issue.py new file mode 100644 index 000000000..29ef0cdf2 --- /dev/null +++ b/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/linear/tools/delete_issue.py @@ -0,0 +1,245 @@ +import logging +from typing import Any + +from langchain_core.tools import tool +from sqlalchemy.ext.asyncio import AsyncSession + +from app.agents.new_chat.tools.hitl import request_approval +from app.connectors.linear_connector import LinearAPIError, LinearConnector +from app.services.linear import LinearToolMetadataService + +logger = logging.getLogger(__name__) + + +def create_delete_linear_issue_tool( + db_session: AsyncSession | None = None, + search_space_id: int | None = None, + user_id: str | None = None, + connector_id: int | None = None, +): + """ + Factory function to create the delete_linear_issue tool. + + Args: + db_session: Database session for accessing the Linear connector + search_space_id: Search space ID to find the Linear connector + user_id: User ID for finding the correct Linear connector + connector_id: Optional specific connector ID (if known) + + Returns: + Configured delete_linear_issue tool + """ + + @tool + async def delete_linear_issue( + issue_ref: str, + delete_from_kb: bool = False, + ) -> dict[str, Any]: + """Archive (delete) a Linear issue. + + Use this tool when the user asks to delete, remove, or archive a Linear issue. + Note that Linear archives issues rather than permanently deleting them + (they can be restored from the archive). + + + Args: + issue_ref: The issue to delete. Can be the issue title (e.g. "Fix login bug"), + the identifier (e.g. "ENG-42"), or the full document title + (e.g. "ENG-42: Fix login bug"). + delete_from_kb: Whether to also remove the issue from the knowledge base. + Default is False. Set to True to remove from both Linear + and the knowledge base. + + Returns: + Dictionary with: + - status: "success", "rejected", "not_found", or "error" + - identifier: Human-readable ID like "ENG-42" (if success) + - message: Success or error message + - deleted_from_kb: Whether the issue was also removed from the knowledge base (if success) + + IMPORTANT: + - If status is "rejected", the user explicitly declined the action. + Respond with a brief acknowledgment (e.g., "Understood, I won't delete the issue.") + and move on. Do NOT ask for alternatives or troubleshoot. + - If status is "not_found", inform the user conversationally using the exact message + provided. Do NOT treat this as an error. Simply relay the message and ask the user + to verify the issue title or identifier, or check if it has been indexed. + Examples: + - "Delete the 'Fix login bug' Linear issue" + - "Archive ENG-42" + - "Remove the 'Old payment flow' issue from Linear" + """ + logger.info( + f"delete_linear_issue called: issue_ref='{issue_ref}', delete_from_kb={delete_from_kb}" + ) + + if db_session is None or search_space_id is None or user_id is None: + logger.error( + "Linear tool not properly configured - missing required parameters" + ) + return { + "status": "error", + "message": "Linear tool not properly configured. Please contact support.", + } + + try: + metadata_service = LinearToolMetadataService(db_session) + context = await metadata_service.get_delete_context( + search_space_id, user_id, issue_ref + ) + + if "error" in context: + error_msg = context["error"] + if context.get("auth_expired"): + logger.warning(f"Auth expired for delete context: {error_msg}") + return { + "status": "auth_error", + "message": error_msg, + "connector_id": context.get("connector_id"), + "connector_type": "linear", + } + if "not found" in error_msg.lower(): + logger.warning(f"Issue not found: {error_msg}") + return {"status": "not_found", "message": error_msg} + else: + logger.error(f"Failed to fetch delete context: {error_msg}") + return {"status": "error", "message": error_msg} + + issue_id = context["issue"]["id"] + issue_identifier = context["issue"].get("identifier", "") + document_id = context["issue"]["document_id"] + connector_id_from_context = context.get("workspace", {}).get("id") + + logger.info( + f"Requesting approval for deleting Linear issue: '{issue_ref}' " + f"(id={issue_id}, delete_from_kb={delete_from_kb})" + ) + result = request_approval( + action_type="linear_issue_deletion", + tool_name="delete_linear_issue", + params={ + "issue_id": issue_id, + "connector_id": connector_id_from_context, + "delete_from_kb": delete_from_kb, + }, + context=context, + ) + + if result.rejected: + logger.info("Linear issue deletion rejected by user") + return { + "status": "rejected", + "message": "User declined. Do not retry or suggest alternatives.", + } + + final_issue_id = result.params.get("issue_id", issue_id) + final_connector_id = result.params.get( + "connector_id", connector_id_from_context + ) + final_delete_from_kb = result.params.get("delete_from_kb", delete_from_kb) + + logger.info( + f"Deleting Linear issue with final params: issue_id={final_issue_id}, " + f"connector_id={final_connector_id}, delete_from_kb={final_delete_from_kb}" + ) + + from sqlalchemy.future import select + + from app.db import SearchSourceConnector, SearchSourceConnectorType + + if final_connector_id: + result = await db_session.execute( + select(SearchSourceConnector).filter( + SearchSourceConnector.id == final_connector_id, + SearchSourceConnector.search_space_id == search_space_id, + SearchSourceConnector.user_id == user_id, + SearchSourceConnector.connector_type + == SearchSourceConnectorType.LINEAR_CONNECTOR, + ) + ) + connector = result.scalars().first() + if not connector: + logger.error( + f"Invalid connector_id={final_connector_id} for search_space_id={search_space_id}" + ) + return { + "status": "error", + "message": "Selected Linear connector is invalid or has been disconnected.", + } + actual_connector_id = connector.id + logger.info(f"Validated Linear connector: id={actual_connector_id}") + else: + logger.error("No connector found for this issue") + return { + "status": "error", + "message": "No connector found for this issue.", + } + + linear_client = LinearConnector( + session=db_session, connector_id=actual_connector_id + ) + + result = await linear_client.archive_issue(issue_id=final_issue_id) + + logger.info( + f"archive_issue result: {result.get('status')} - {result.get('message', '')}" + ) + + deleted_from_kb = False + if ( + result.get("status") == "success" + and final_delete_from_kb + and document_id + ): + try: + from app.db import Document + + doc_result = await db_session.execute( + select(Document).filter(Document.id == document_id) + ) + document = doc_result.scalars().first() + if document: + await db_session.delete(document) + await db_session.commit() + deleted_from_kb = True + logger.info( + f"Deleted document {document_id} from knowledge base" + ) + else: + logger.warning(f"Document {document_id} not found in KB") + except Exception as e: + logger.error(f"Failed to delete document from KB: {e}") + await db_session.rollback() + result["warning"] = ( + f"Issue archived in Linear, but failed to remove from knowledge base: {e!s}" + ) + + if result.get("status") == "success": + result["deleted_from_kb"] = deleted_from_kb + if issue_identifier: + result["message"] = ( + f"Issue {issue_identifier} archived successfully." + ) + if deleted_from_kb: + result["message"] = ( + f"{result.get('message', '')} Also removed from the knowledge base." + ) + + return result + + except Exception as e: + from langgraph.errors import GraphInterrupt + + if isinstance(e, GraphInterrupt): + raise + + logger.error(f"Error deleting Linear issue: {e}", exc_info=True) + if isinstance(e, ValueError | LinearAPIError): + message = str(e) + else: + message = ( + "Something went wrong while deleting the issue. Please try again." + ) + return {"status": "error", "message": message} + + return delete_linear_issue diff --git a/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/linear/tools/index.py b/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/linear/tools/index.py new file mode 100644 index 000000000..ef668ffb1 --- /dev/null +++ b/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/linear/tools/index.py @@ -0,0 +1,32 @@ +from __future__ import annotations + +from typing import Any + +from app.agents.multi_agent_with_deepagents.subagents.shared.permissions import ( + ToolsPermissions, +) + +from .create_issue import create_create_linear_issue_tool +from .delete_issue import create_delete_linear_issue_tool +from .update_issue import create_update_linear_issue_tool + + +def load_tools(*, dependencies: dict[str, Any] | None = None, **kwargs: Any) -> ToolsPermissions: + d = {**(dependencies or {}), **kwargs} + common = { + "db_session": d["db_session"], + "search_space_id": d["search_space_id"], + "user_id": d["user_id"], + "connector_id": d.get("connector_id"), + } + create = create_create_linear_issue_tool(**common) + update = create_update_linear_issue_tool(**common) + delete = create_delete_linear_issue_tool(**common) + return { + "allow": [], + "ask": [ + {"name": getattr(create, "name", "") or "", "tool": create}, + {"name": getattr(update, "name", "") or "", "tool": update}, + {"name": getattr(delete, "name", "") or "", "tool": delete}, + ], + } diff --git a/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/linear/tools/update_issue.py b/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/linear/tools/update_issue.py new file mode 100644 index 000000000..f35d0dddd --- /dev/null +++ b/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/linear/tools/update_issue.py @@ -0,0 +1,318 @@ +import logging +from typing import Any + +from langchain_core.tools import tool +from sqlalchemy.ext.asyncio import AsyncSession + +from app.agents.new_chat.tools.hitl import request_approval +from app.connectors.linear_connector import LinearAPIError, LinearConnector +from app.services.linear import LinearKBSyncService, LinearToolMetadataService + +logger = logging.getLogger(__name__) + + +def create_update_linear_issue_tool( + db_session: AsyncSession | None = None, + search_space_id: int | None = None, + user_id: str | None = None, + connector_id: int | None = None, +): + """ + Factory function to create the update_linear_issue tool. + + Args: + db_session: Database session for accessing the Linear connector + search_space_id: Search space ID to find the Linear connector + user_id: User ID for fetching user-specific context + connector_id: Optional specific connector ID (if known) + + Returns: + Configured update_linear_issue tool + """ + + @tool + async def update_linear_issue( + issue_ref: str, + new_title: str | None = None, + new_description: str | None = None, + new_state_name: str | None = None, + new_assignee_email: str | None = None, + new_priority: int | None = None, + new_label_names: list[str] | None = None, + ) -> dict[str, Any]: + """Update an existing Linear issue that has been indexed in the knowledge base. + + Use this tool when the user asks to modify, change, or update a Linear issue — + for example, changing its status, reassigning it, updating its title or description, + adjusting its priority, or changing its labels. + + Only issues already indexed in the knowledge base can be updated. + + Args: + issue_ref: The issue to update. Can be the issue title (e.g. "Fix login bug"), + the identifier (e.g. "ENG-42"), or the full document title + (e.g. "ENG-42: Fix login bug"). Matched case-insensitively. + new_title: New title for the issue (optional). + new_description: New markdown body for the issue (optional). + new_state_name: New workflow state name (e.g. "In Progress", "Done"). + Matched case-insensitively against the team's states. + new_assignee_email: Email address of the new assignee. + Matched case-insensitively against the team's members. + new_priority: New priority (0 = No Priority, 1 = Urgent, 2 = High, + 3 = Medium, 4 = Low). + new_label_names: New set of label names to apply. + Matched case-insensitively against the team's labels. + Unrecognised names are silently skipped. + + Returns: + Dictionary with: + - status: "success", "rejected", "not_found", or "error" + - identifier: Human-readable ID like "ENG-42" (if success) + - url: URL to the updated issue (if success) + - message: Result message + + IMPORTANT: + - If status is "rejected", the user explicitly declined the action. + Respond with a brief acknowledgment (e.g., "Understood, I didn't update the issue.") + and move on. Do NOT ask for alternatives or troubleshoot. + - If status is "not_found", inform the user conversationally using the exact message + provided. Do NOT treat this as an error. Simply relay the message and ask the user + to verify the issue title or identifier, or check if it has been indexed. + + Examples: + - "Mark the 'Fix login bug' issue as done" + - "Assign ENG-42 to john@company.com" + - "Change the priority of 'Payment timeout' to urgent" + """ + logger.info(f"update_linear_issue called: issue_ref='{issue_ref}'") + + if db_session is None or search_space_id is None or user_id is None: + logger.error( + "Linear tool not properly configured - missing required parameters" + ) + return { + "status": "error", + "message": "Linear tool not properly configured. Please contact support.", + } + + try: + metadata_service = LinearToolMetadataService(db_session) + context = await metadata_service.get_update_context( + search_space_id, user_id, issue_ref + ) + + if "error" in context: + error_msg = context["error"] + if context.get("auth_expired"): + logger.warning(f"Auth expired for update context: {error_msg}") + return { + "status": "auth_error", + "message": error_msg, + "connector_id": context.get("connector_id"), + "connector_type": "linear", + } + if "not found" in error_msg.lower(): + logger.warning(f"Issue not found: {error_msg}") + return {"status": "not_found", "message": error_msg} + else: + logger.error(f"Failed to fetch update context: {error_msg}") + return {"status": "error", "message": error_msg} + + issue_id = context["issue"]["id"] + document_id = context["issue"]["document_id"] + connector_id_from_context = context.get("workspace", {}).get("id") + + team = context.get("team", {}) + new_state_id = _resolve_state(team, new_state_name) + new_assignee_id = _resolve_assignee(team, new_assignee_email) + new_label_ids = _resolve_labels(team, new_label_names) + + logger.info( + f"Requesting approval for updating Linear issue: '{issue_ref}' (id={issue_id})" + ) + result = request_approval( + action_type="linear_issue_update", + tool_name="update_linear_issue", + params={ + "issue_id": issue_id, + "document_id": document_id, + "new_title": new_title, + "new_description": new_description, + "new_state_id": new_state_id, + "new_assignee_id": new_assignee_id, + "new_priority": new_priority, + "new_label_ids": new_label_ids, + "connector_id": connector_id_from_context, + }, + context=context, + ) + + if result.rejected: + logger.info("Linear issue update rejected by user") + return { + "status": "rejected", + "message": "User declined. Do not retry or suggest alternatives.", + } + + final_issue_id = result.params.get("issue_id", issue_id) + final_document_id = result.params.get("document_id", document_id) + final_new_title = result.params.get("new_title", new_title) + final_new_description = result.params.get( + "new_description", new_description + ) + final_new_state_id = result.params.get("new_state_id", new_state_id) + final_new_assignee_id = result.params.get( + "new_assignee_id", new_assignee_id + ) + final_new_priority = result.params.get("new_priority", new_priority) + final_new_label_ids: list[str] | None = result.params.get( + "new_label_ids", new_label_ids + ) + final_connector_id = result.params.get( + "connector_id", connector_id_from_context + ) + + if not final_connector_id: + logger.error("No connector found for this issue") + return { + "status": "error", + "message": "No connector found for this issue.", + } + + from sqlalchemy.future import select + + from app.db import SearchSourceConnector, SearchSourceConnectorType + + result = await db_session.execute( + select(SearchSourceConnector).filter( + SearchSourceConnector.id == final_connector_id, + SearchSourceConnector.search_space_id == search_space_id, + SearchSourceConnector.user_id == user_id, + SearchSourceConnector.connector_type + == SearchSourceConnectorType.LINEAR_CONNECTOR, + ) + ) + connector = result.scalars().first() + if not connector: + logger.error( + f"Invalid connector_id={final_connector_id} for search_space_id={search_space_id}" + ) + return { + "status": "error", + "message": "Selected Linear connector is invalid or has been disconnected.", + } + logger.info(f"Validated Linear connector: id={final_connector_id}") + + logger.info( + f"Updating Linear issue with final params: issue_id={final_issue_id}" + ) + linear_client = LinearConnector( + session=db_session, connector_id=final_connector_id + ) + updated_issue = await linear_client.update_issue( + issue_id=final_issue_id, + title=final_new_title, + description=final_new_description, + state_id=final_new_state_id, + assignee_id=final_new_assignee_id, + priority=final_new_priority, + label_ids=final_new_label_ids, + ) + + if updated_issue.get("status") == "error": + logger.error( + f"Failed to update Linear issue: {updated_issue.get('message')}" + ) + return { + "status": "error", + "message": updated_issue.get("message"), + } + + logger.info( + f"update_issue result: {updated_issue.get('identifier')} - {updated_issue.get('title')}" + ) + + if final_document_id is not None: + logger.info( + f"Updating knowledge base for document {final_document_id}..." + ) + kb_service = LinearKBSyncService(db_session) + kb_result = await kb_service.sync_after_update( + document_id=final_document_id, + issue_id=final_issue_id, + user_id=user_id, + search_space_id=search_space_id, + ) + if kb_result["status"] == "success": + logger.info( + f"Knowledge base successfully updated for issue {final_issue_id}" + ) + kb_message = " Your knowledge base has also been updated." + elif kb_result["status"] == "not_indexed": + kb_message = " This issue will be added to your knowledge base in the next scheduled sync." + else: + logger.warning( + f"KB update failed for issue {final_issue_id}: {kb_result.get('message')}" + ) + kb_message = " Your knowledge base will be updated in the next scheduled sync." + else: + kb_message = "" + + identifier = updated_issue.get("identifier") + default_msg = f"Issue {identifier} updated successfully." + return { + "status": "success", + "identifier": identifier, + "url": updated_issue.get("url"), + "message": f"{updated_issue.get('message', default_msg)}{kb_message}", + } + + except Exception as e: + from langgraph.errors import GraphInterrupt + + if isinstance(e, GraphInterrupt): + raise + + logger.error(f"Error updating Linear issue: {e}", exc_info=True) + if isinstance(e, ValueError | LinearAPIError): + message = str(e) + else: + message = ( + "Something went wrong while updating the issue. Please try again." + ) + return {"status": "error", "message": message} + + return update_linear_issue + + +def _resolve_state(team: dict, state_name: str | None) -> str | None: + if not state_name: + return None + name_lower = state_name.lower() + for state in team.get("states", []): + if state.get("name", "").lower() == name_lower: + return state["id"] + return None + + +def _resolve_assignee(team: dict, assignee_email: str | None) -> str | None: + if not assignee_email: + return None + email_lower = assignee_email.lower() + for member in team.get("members", []): + if member.get("email", "").lower() == email_lower: + return member["id"] + return None + + +def _resolve_labels(team: dict, label_names: list[str] | None) -> list[str] | None: + if label_names is None: + return None + if not label_names: + return [] + name_set = {n.lower() for n in label_names} + return [ + label["id"] + for label in team.get("labels", []) + if label.get("name", "").lower() in name_set + ] diff --git a/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/luma/__init__.py b/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/luma/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/luma/agent.py b/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/luma/agent.py new file mode 100644 index 000000000..7b53d4edd --- /dev/null +++ b/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/luma/agent.py @@ -0,0 +1,54 @@ +"""`luma` route: ``SubAgent`` spec for deepagents.""" + +from __future__ import annotations + +from collections.abc import Sequence +from typing import Any + +from deepagents import SubAgent +from langchain_core.language_models import BaseChatModel + +from app.agents.multi_agent_with_deepagents.subagents.shared.md_file_reader import ( + read_md_file, +) +from app.agents.multi_agent_with_deepagents.subagents.shared.permissions import ( + ToolsPermissions, + merge_tools_permissions, +) +from app.agents.multi_agent_with_deepagents.subagents.shared.subagent_builder import ( + pack_subagent, +) + +from .tools.index import load_tools + +NAME = "luma" + + +def build_subagent( + *, + dependencies: dict[str, Any], + model: BaseChatModel | None = None, + extra_middleware: Sequence[Any] | None = None, + extra_tools_bucket: ToolsPermissions | None = None, +) -> SubAgent: + buckets = load_tools(dependencies=dependencies) + merged_tools_bucket = merge_tools_permissions(buckets, extra_tools_bucket) + tools = [ + row["tool"] + for row in (*merged_tools_bucket["allow"], *merged_tools_bucket["ask"]) + if row.get("tool") is not None + ] + interrupt_on = {r["name"]: True for r in merged_tools_bucket["ask"] if r.get("name")} + description = read_md_file(__package__, "description").strip() + if not description: + description = "Handles luma tasks for this workspace." + system_prompt = read_md_file(__package__, "system_prompt").strip() + return pack_subagent( + name=NAME, + description=description, + system_prompt=system_prompt, + tools=tools, + interrupt_on=interrupt_on, + model=model, + extra_middleware=extra_middleware, + ) diff --git a/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/luma/description.md b/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/luma/description.md new file mode 100644 index 000000000..9eaae4ac5 --- /dev/null +++ b/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/luma/description.md @@ -0,0 +1 @@ +Use for Luma event operations: list events, inspect event details, and create new events. diff --git a/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/luma/system_prompt.md b/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/luma/system_prompt.md new file mode 100644 index 000000000..a2b4b7391 --- /dev/null +++ b/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/luma/system_prompt.md @@ -0,0 +1,55 @@ +You are the Luma operations sub-agent. +You receive delegated instructions from a supervisor agent and return structured results for supervisor synthesis. + + +Execute Luma event listing, reads, and creation accurately. + + + +- `list_luma_events` +- `read_luma_event` +- `create_luma_event` + + + +- Use only tools in ``. +- Resolve relative dates against runtime timestamp. +- If required event fields are missing, return `status=blocked` with `missing_fields`. +- Never invent event IDs/times or creation outcomes. + + + +- Do not perform non-Luma tasks. + + + +- Never claim event creation success without tool confirmation. + + + +- On tool failure, return `status=error` with concise recovery `next_step`. +- On missing required fields, return `status=blocked` with `missing_fields`. + + + +Return **only** one JSON object (no markdown/prose): +{ + "status": "success" | "partial" | "blocked" | "error", + "action_summary": string, + "evidence": { + "event_id": string | null, + "title": string | null, + "start_at": string (ISO 8601 with timezone) | null, + "matched_candidates": [ + { "event_id": string, "title": string | null, "start_at": string | null } + ] | null + }, + "next_step": string | null, + "missing_fields": string[] | null, + "assumptions": string[] | null +} +Rules: +- `status=success` -> `next_step=null`, `missing_fields=null`. +- `status=partial|blocked|error` -> `next_step` must be non-null. +- `status=blocked` due to missing required inputs -> `missing_fields` must be non-null. + diff --git a/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/luma/tools/__init__.py b/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/luma/tools/__init__.py new file mode 100644 index 000000000..255119bee --- /dev/null +++ b/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/luma/tools/__init__.py @@ -0,0 +1,15 @@ +from app.agents.new_chat.tools.luma.create_event import ( + create_create_luma_event_tool, +) +from app.agents.new_chat.tools.luma.list_events import ( + create_list_luma_events_tool, +) +from app.agents.new_chat.tools.luma.read_event import ( + create_read_luma_event_tool, +) + +__all__ = [ + "create_create_luma_event_tool", + "create_list_luma_events_tool", + "create_read_luma_event_tool", +] diff --git a/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/luma/tools/_auth.py b/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/luma/tools/_auth.py new file mode 100644 index 000000000..c6d1cd148 --- /dev/null +++ b/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/luma/tools/_auth.py @@ -0,0 +1,39 @@ +"""Builds Luma API auth for connector-backed event tools.""" + +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy.future import select + +from app.db import SearchSourceConnector, SearchSourceConnectorType + +LUMA_API = "https://public-api.luma.com/v1" + + +async def get_luma_connector( + db_session: AsyncSession, + search_space_id: int, + user_id: str, +) -> SearchSourceConnector | None: + result = await db_session.execute( + select(SearchSourceConnector).filter( + SearchSourceConnector.search_space_id == search_space_id, + SearchSourceConnector.user_id == user_id, + SearchSourceConnector.connector_type + == SearchSourceConnectorType.LUMA_CONNECTOR, + ) + ) + return result.scalars().first() + + +def get_api_key(connector: SearchSourceConnector) -> str: + """Extract the API key from connector config (handles both key names).""" + key = connector.config.get("api_key") or connector.config.get("LUMA_API_KEY") + if not key: + raise ValueError("Luma API key not found in connector config.") + return key + + +def luma_headers(api_key: str) -> dict[str, str]: + return { + "Content-Type": "application/json", + "x-luma-api-key": api_key, + } diff --git a/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/luma/tools/create_event.py b/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/luma/tools/create_event.py new file mode 100644 index 000000000..0a24a988f --- /dev/null +++ b/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/luma/tools/create_event.py @@ -0,0 +1,129 @@ +import logging +from typing import Any + +import httpx +from langchain_core.tools import tool +from sqlalchemy.ext.asyncio import AsyncSession + +from app.agents.new_chat.tools.hitl import request_approval + +from ._auth import LUMA_API, get_api_key, get_luma_connector, luma_headers + +logger = logging.getLogger(__name__) + + +def create_create_luma_event_tool( + db_session: AsyncSession | None = None, + search_space_id: int | None = None, + user_id: str | None = None, +): + @tool + async def create_luma_event( + name: str, + start_at: str, + end_at: str, + description: str | None = None, + timezone: str = "UTC", + ) -> dict[str, Any]: + """Create a new event on Luma. + + Args: + name: The event title. + start_at: Start time in ISO 8601 format (e.g. "2026-05-01T18:00:00"). + end_at: End time in ISO 8601 format (e.g. "2026-05-01T20:00:00"). + description: Optional event description (markdown supported). + timezone: Timezone string (default "UTC", e.g. "America/New_York"). + + Returns: + Dictionary with status, event_id on success. + + IMPORTANT: + - If status is "rejected", the user explicitly declined. Do NOT retry. + """ + if db_session is None or search_space_id is None or user_id is None: + return {"status": "error", "message": "Luma tool not properly configured."} + + try: + connector = await get_luma_connector(db_session, search_space_id, user_id) + if not connector: + return {"status": "error", "message": "No Luma connector found."} + + result = request_approval( + action_type="luma_create_event", + tool_name="create_luma_event", + params={ + "name": name, + "start_at": start_at, + "end_at": end_at, + "description": description, + "timezone": timezone, + }, + context={"connector_id": connector.id}, + ) + + if result.rejected: + return { + "status": "rejected", + "message": "User declined. Event was not created.", + } + + final_name = result.params.get("name", name) + final_start = result.params.get("start_at", start_at) + final_end = result.params.get("end_at", end_at) + final_desc = result.params.get("description", description) + final_tz = result.params.get("timezone", timezone) + + api_key = get_api_key(connector) + headers = luma_headers(api_key) + + body: dict[str, Any] = { + "name": final_name, + "start_at": final_start, + "end_at": final_end, + "timezone": final_tz, + } + if final_desc: + body["description_md"] = final_desc + + async with httpx.AsyncClient(timeout=20.0) as client: + resp = await client.post( + f"{LUMA_API}/event/create", + headers=headers, + json=body, + ) + + if resp.status_code == 401: + return { + "status": "auth_error", + "message": "Luma API key is invalid.", + "connector_type": "luma", + } + if resp.status_code == 403: + return { + "status": "error", + "message": "Luma Plus subscription required to create events via API.", + } + if resp.status_code not in (200, 201): + return { + "status": "error", + "message": f"Luma API error: {resp.status_code} — {resp.text[:200]}", + } + + data = resp.json() + event_id = data.get("api_id") or data.get("event", {}).get("api_id") + + return { + "status": "success", + "event_id": event_id, + "message": f"Event '{final_name}' created on Luma.", + } + + except Exception as e: + from langgraph.errors import GraphInterrupt + + if isinstance(e, GraphInterrupt): + raise + logger.error("Error creating Luma event: %s", e, exc_info=True) + return {"status": "error", "message": "Failed to create Luma event."} + + return create_luma_event diff --git a/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/luma/tools/index.py b/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/luma/tools/index.py new file mode 100644 index 000000000..2be92a227 --- /dev/null +++ b/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/luma/tools/index.py @@ -0,0 +1,30 @@ +from __future__ import annotations + +from typing import Any + +from app.agents.multi_agent_with_deepagents.subagents.shared.permissions import ( + ToolsPermissions, +) + +from .create_event import create_create_luma_event_tool +from .list_events import create_list_luma_events_tool +from .read_event import create_read_luma_event_tool + + +def load_tools(*, dependencies: dict[str, Any] | None = None, **kwargs: Any) -> ToolsPermissions: + d = {**(dependencies or {}), **kwargs} + common = { + "db_session": d["db_session"], + "search_space_id": d["search_space_id"], + "user_id": d["user_id"], + } + list_ev = create_list_luma_events_tool(**common) + read_ev = create_read_luma_event_tool(**common) + create = create_create_luma_event_tool(**common) + return { + "allow": [ + {"name": getattr(list_ev, "name", "") or "", "tool": list_ev}, + {"name": getattr(read_ev, "name", "") or "", "tool": read_ev}, + ], + "ask": [{"name": getattr(create, "name", "") or "", "tool": create}], + } diff --git a/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/luma/tools/list_events.py b/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/luma/tools/list_events.py new file mode 100644 index 000000000..aec5ad220 --- /dev/null +++ b/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/luma/tools/list_events.py @@ -0,0 +1,111 @@ +import logging +from typing import Any + +import httpx +from langchain_core.tools import tool +from sqlalchemy.ext.asyncio import AsyncSession + +from ._auth import LUMA_API, get_api_key, get_luma_connector, luma_headers + +logger = logging.getLogger(__name__) + + +def create_list_luma_events_tool( + db_session: AsyncSession | None = None, + search_space_id: int | None = None, + user_id: str | None = None, +): + @tool + async def list_luma_events( + max_results: int = 25, + ) -> dict[str, Any]: + """List upcoming and recent Luma events. + + Args: + max_results: Maximum events to return (default 25, max 50). + + Returns: + Dictionary with status and a list of events including + event_id, name, start_at, end_at, location, url. + """ + if db_session is None or search_space_id is None or user_id is None: + return {"status": "error", "message": "Luma tool not properly configured."} + + max_results = min(max_results, 50) + + try: + connector = await get_luma_connector(db_session, search_space_id, user_id) + if not connector: + return {"status": "error", "message": "No Luma connector found."} + + api_key = get_api_key(connector) + headers = luma_headers(api_key) + + all_entries: list[dict] = [] + cursor = None + + async with httpx.AsyncClient(timeout=20.0) as client: + while len(all_entries) < max_results: + params: dict[str, Any] = { + "limit": min(100, max_results - len(all_entries)) + } + if cursor: + params["cursor"] = cursor + + resp = await client.get( + f"{LUMA_API}/calendar/list-events", + headers=headers, + params=params, + ) + + if resp.status_code == 401: + return { + "status": "auth_error", + "message": "Luma API key is invalid.", + "connector_type": "luma", + } + if resp.status_code != 200: + return { + "status": "error", + "message": f"Luma API error: {resp.status_code}", + } + + data = resp.json() + entries = data.get("entries", []) + if not entries: + break + all_entries.extend(entries) + + next_cursor = data.get("next_cursor") + if not next_cursor: + break + cursor = next_cursor + + events = [] + for entry in all_entries[:max_results]: + ev = entry.get("event", {}) + geo = ev.get("geo_info", {}) + events.append( + { + "event_id": entry.get("api_id"), + "name": ev.get("name", "Untitled"), + "start_at": ev.get("start_at", ""), + "end_at": ev.get("end_at", ""), + "timezone": ev.get("timezone", ""), + "location": geo.get("name", ""), + "url": ev.get("url", ""), + "visibility": ev.get("visibility", ""), + } + ) + + return {"status": "success", "events": events, "total": len(events)} + + except Exception as e: + from langgraph.errors import GraphInterrupt + + if isinstance(e, GraphInterrupt): + raise + logger.error("Error listing Luma events: %s", e, exc_info=True) + return {"status": "error", "message": "Failed to list Luma events."} + + return list_luma_events diff --git a/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/luma/tools/read_event.py b/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/luma/tools/read_event.py new file mode 100644 index 000000000..b37a9d617 --- /dev/null +++ b/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/luma/tools/read_event.py @@ -0,0 +1,92 @@ +import logging +from typing import Any + +import httpx +from langchain_core.tools import tool +from sqlalchemy.ext.asyncio import AsyncSession + +from ._auth import LUMA_API, get_api_key, get_luma_connector, luma_headers + +logger = logging.getLogger(__name__) + + +def create_read_luma_event_tool( + db_session: AsyncSession | None = None, + search_space_id: int | None = None, + user_id: str | None = None, +): + @tool + async def read_luma_event(event_id: str) -> dict[str, Any]: + """Read detailed information about a specific Luma event. + + Args: + event_id: The Luma event API ID (from list_luma_events). + + Returns: + Dictionary with status and full event details including + description, attendees count, meeting URL. + """ + if db_session is None or search_space_id is None or user_id is None: + return {"status": "error", "message": "Luma tool not properly configured."} + + try: + connector = await get_luma_connector(db_session, search_space_id, user_id) + if not connector: + return {"status": "error", "message": "No Luma connector found."} + + api_key = get_api_key(connector) + headers = luma_headers(api_key) + + async with httpx.AsyncClient(timeout=15.0) as client: + resp = await client.get( + f"{LUMA_API}/events/{event_id}", + headers=headers, + ) + + if resp.status_code == 401: + return { + "status": "auth_error", + "message": "Luma API key is invalid.", + "connector_type": "luma", + } + if resp.status_code == 404: + return { + "status": "not_found", + "message": f"Event '{event_id}' not found.", + } + if resp.status_code != 200: + return { + "status": "error", + "message": f"Luma API error: {resp.status_code}", + } + + data = resp.json() + ev = data.get("event", data) + geo = ev.get("geo_info", {}) + + event_detail = { + "event_id": event_id, + "name": ev.get("name", ""), + "description": ev.get("description", ""), + "start_at": ev.get("start_at", ""), + "end_at": ev.get("end_at", ""), + "timezone": ev.get("timezone", ""), + "location_name": geo.get("name", ""), + "address": geo.get("address", ""), + "url": ev.get("url", ""), + "meeting_url": ev.get("meeting_url", ""), + "visibility": ev.get("visibility", ""), + "cover_url": ev.get("cover_url", ""), + } + + return {"status": "success", "event": event_detail} + + except Exception as e: + from langgraph.errors import GraphInterrupt + + if isinstance(e, GraphInterrupt): + raise + logger.error("Error reading Luma event: %s", e, exc_info=True) + return {"status": "error", "message": "Failed to read Luma event."} + + return read_luma_event diff --git a/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/notion/__init__.py b/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/notion/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/notion/agent.py b/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/notion/agent.py new file mode 100644 index 000000000..7d15e5cc0 --- /dev/null +++ b/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/notion/agent.py @@ -0,0 +1,54 @@ +"""`notion` route: ``SubAgent`` spec for deepagents.""" + +from __future__ import annotations + +from collections.abc import Sequence +from typing import Any + +from deepagents import SubAgent +from langchain_core.language_models import BaseChatModel + +from app.agents.multi_agent_with_deepagents.subagents.shared.md_file_reader import ( + read_md_file, +) +from app.agents.multi_agent_with_deepagents.subagents.shared.permissions import ( + ToolsPermissions, + merge_tools_permissions, +) +from app.agents.multi_agent_with_deepagents.subagents.shared.subagent_builder import ( + pack_subagent, +) + +from .tools.index import load_tools + +NAME = "notion" + + +def build_subagent( + *, + dependencies: dict[str, Any], + model: BaseChatModel | None = None, + extra_middleware: Sequence[Any] | None = None, + extra_tools_bucket: ToolsPermissions | None = None, +) -> SubAgent: + buckets = load_tools(dependencies=dependencies) + merged_tools_bucket = merge_tools_permissions(buckets, extra_tools_bucket) + tools = [ + row["tool"] + for row in (*merged_tools_bucket["allow"], *merged_tools_bucket["ask"]) + if row.get("tool") is not None + ] + interrupt_on = {r["name"]: True for r in merged_tools_bucket["ask"] if r.get("name")} + description = read_md_file(__package__, "description").strip() + if not description: + description = "Handles notion tasks for this workspace." + system_prompt = read_md_file(__package__, "system_prompt").strip() + return pack_subagent( + name=NAME, + description=description, + system_prompt=system_prompt, + tools=tools, + interrupt_on=interrupt_on, + model=model, + extra_middleware=extra_middleware, + ) diff --git a/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/notion/description.md b/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/notion/description.md new file mode 100644 index 000000000..f1d51c18a --- /dev/null +++ b/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/notion/description.md @@ -0,0 +1 @@ +Use for Notion workspace pages: create pages, update page content, and delete pages. diff --git a/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/notion/system_prompt.md b/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/notion/system_prompt.md new file mode 100644 index 000000000..a40e9f4d0 --- /dev/null +++ b/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/notion/system_prompt.md @@ -0,0 +1,56 @@ +You are the Notion operations sub-agent. +You receive delegated instructions from a supervisor agent and return structured results for supervisor synthesis. + + +Execute Notion page operations accurately in the connected workspace. + + + +- `create_notion_page` +- `update_notion_page` +- `delete_notion_page` + + + +- Use only tools in ``. +- If target page context is unclear, do not ask the user directly; return `status=blocked` with candidate options and supervisor `next_step`. +- Never invent page IDs, titles, or mutation outcomes. + + + +- Do not perform non-Notion tasks. + + + +- Before update/delete, ensure the target page match is explicit. +- Never claim mutation success without tool confirmation. + + + +- On tool failure, return `status=error` with concise retry/recovery `next_step`. +- On ambiguous target, return `status=blocked` with candidate options. + + + +Return **only** one JSON object (no markdown/prose): +{ + "status": "success" | "partial" | "blocked" | "error", + "action_summary": string, + "evidence": { + "page_id": string | null, + "page_title": string | null, + "matched_candidates": [ + { "page_id": string, "page_title": string | null } + ] | null + }, + "next_step": string | null, + "missing_fields": string[] | null, + "assumptions": string[] | null +} + +Rules: +- `status=success` -> `next_step=null`, `missing_fields=null`. +- `status=partial|blocked|error` -> `next_step` must be non-null. +- `status=blocked` due to missing required inputs -> `missing_fields` must be non-null. +- On ambiguity, include candidate options in `evidence.matched_candidates`. + diff --git a/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/notion/tools/__init__.py b/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/notion/tools/__init__.py new file mode 100644 index 000000000..6ce825dca --- /dev/null +++ b/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/notion/tools/__init__.py @@ -0,0 +1,11 @@ +"""Notion tools for creating, updating, and deleting pages.""" + +from .create_page import create_create_notion_page_tool +from .delete_page import create_delete_notion_page_tool +from .update_page import create_update_notion_page_tool + +__all__ = [ + "create_create_notion_page_tool", + "create_delete_notion_page_tool", + "create_update_notion_page_tool", +] diff --git a/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/notion/tools/create_page.py b/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/notion/tools/create_page.py new file mode 100644 index 000000000..6efffe960 --- /dev/null +++ b/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/notion/tools/create_page.py @@ -0,0 +1,244 @@ +import logging +from typing import Any + +from langchain_core.tools import tool +from sqlalchemy.ext.asyncio import AsyncSession + +from app.agents.new_chat.tools.hitl import request_approval +from app.connectors.notion_history import NotionAPIError, NotionHistoryConnector +from app.services.notion import NotionToolMetadataService + +logger = logging.getLogger(__name__) + + +def create_create_notion_page_tool( + db_session: AsyncSession | None = None, + search_space_id: int | None = None, + user_id: str | None = None, + connector_id: int | None = None, +): + """ + Factory function to create the create_notion_page tool. + + Args: + db_session: Database session for accessing Notion connector + search_space_id: Search space ID to find the Notion connector + user_id: User ID for fetching user-specific context + connector_id: Optional specific connector ID (if known) + + Returns: + Configured create_notion_page tool + """ + + @tool + async def create_notion_page( + title: str, + content: str | None = None, + ) -> dict[str, Any]: + """Create a new page in Notion with the given title and content. + + Use this tool when the user asks you to create, save, or publish + something to Notion. The page will be created in the user's + configured Notion workspace. The user MUST specify a topic before you + call this tool. If the request does not contain a topic (e.g. "create a + notion page"), ask what the page should be about. Never call this tool + without a clear topic from the user. + + Args: + title: The title of the Notion page. + content: Optional markdown content for the page body (supports headings, lists, paragraphs). + Generate this yourself based on the user's topic. + + Returns: + Dictionary with: + - status: "success", "rejected", or "error" + - page_id: Created page ID (if success) + - url: URL to the created page (if success) + - title: Page title (if success) + - message: Result message + + IMPORTANT: If status is "rejected", the user explicitly declined the action. + Respond with a brief acknowledgment (e.g., "Understood, I didn't create the page.") + and move on. Do NOT troubleshoot or suggest alternatives. + + Examples: + - "Create a Notion page about our Q2 roadmap" + - "Save a summary of today's discussion to Notion" + """ + logger.info(f"create_notion_page called: title='{title}'") + + if db_session is None or search_space_id is None or user_id is None: + logger.error( + "Notion tool not properly configured - missing required parameters" + ) + return { + "status": "error", + "message": "Notion tool not properly configured. Please contact support.", + } + + try: + metadata_service = NotionToolMetadataService(db_session) + context = await metadata_service.get_creation_context( + search_space_id, user_id + ) + + if "error" in context: + logger.error(f"Failed to fetch creation context: {context['error']}") + return { + "status": "error", + "message": context["error"], + } + + accounts = context.get("accounts", []) + if accounts and all(a.get("auth_expired") for a in accounts): + logger.warning("All Notion accounts have expired authentication") + return { + "status": "auth_error", + "message": "All connected Notion accounts need re-authentication. Please re-authenticate in your connector settings.", + "connector_type": "notion", + } + + logger.info(f"Requesting approval for creating Notion page: '{title}'") + result = request_approval( + action_type="notion_page_creation", + tool_name="create_notion_page", + params={ + "title": title, + "content": content, + "parent_page_id": None, + "connector_id": connector_id, + }, + context=context, + ) + + if result.rejected: + logger.info("Notion page creation rejected by user") + return { + "status": "rejected", + "message": "User declined. Do not retry or suggest alternatives.", + } + + final_title = result.params.get("title", title) + final_content = result.params.get("content", content) + final_parent_page_id = result.params.get("parent_page_id") + final_connector_id = result.params.get("connector_id", connector_id) + + if not final_title or not final_title.strip(): + logger.error("Title is empty or contains only whitespace") + return { + "status": "error", + "message": "Page title cannot be empty. Please provide a valid title.", + } + + logger.info( + f"Creating Notion page with final params: title='{final_title}'" + ) + + from sqlalchemy.future import select + + from app.db import SearchSourceConnector, SearchSourceConnectorType + + actual_connector_id = final_connector_id + if actual_connector_id is None: + result = await db_session.execute( + select(SearchSourceConnector).filter( + SearchSourceConnector.search_space_id == search_space_id, + SearchSourceConnector.user_id == user_id, + SearchSourceConnector.connector_type + == SearchSourceConnectorType.NOTION_CONNECTOR, + ) + ) + connector = result.scalars().first() + + if not connector: + logger.warning( + f"No Notion connector found for search_space_id={search_space_id}" + ) + return { + "status": "error", + "message": "No Notion connector found. Please connect Notion in your workspace settings.", + } + + actual_connector_id = connector.id + logger.info(f"Found Notion connector: id={actual_connector_id}") + else: + result = await db_session.execute( + select(SearchSourceConnector).filter( + SearchSourceConnector.id == actual_connector_id, + SearchSourceConnector.search_space_id == search_space_id, + SearchSourceConnector.user_id == user_id, + SearchSourceConnector.connector_type + == SearchSourceConnectorType.NOTION_CONNECTOR, + ) + ) + connector = result.scalars().first() + + if not connector: + logger.error( + f"Invalid connector_id={actual_connector_id} for search_space_id={search_space_id}" + ) + return { + "status": "error", + "message": "Selected Notion account is invalid or has been disconnected. Please select a valid account.", + } + logger.info(f"Validated Notion connector: id={actual_connector_id}") + + notion_connector = NotionHistoryConnector( + session=db_session, + connector_id=actual_connector_id, + ) + + result = await notion_connector.create_page( + title=final_title, + content=final_content, + parent_page_id=final_parent_page_id, + ) + logger.info( + f"create_page result: {result.get('status')} - {result.get('message', '')}" + ) + + if result.get("status") == "success": + kb_message_suffix = "" + try: + from app.services.notion import NotionKBSyncService + + kb_service = NotionKBSyncService(db_session) + kb_result = await kb_service.sync_after_create( + page_id=result.get("page_id"), + page_title=result.get("title", final_title), + page_url=result.get("url"), + content=final_content, + connector_id=actual_connector_id, + search_space_id=search_space_id, + user_id=user_id, + ) + if kb_result["status"] == "success": + kb_message_suffix = ( + " Your knowledge base has also been updated." + ) + else: + kb_message_suffix = " This page will be added to your knowledge base in the next scheduled sync." + except Exception as kb_err: + logger.warning(f"KB sync after create failed: {kb_err}") + kb_message_suffix = " This page will be added to your knowledge base in the next scheduled sync." + + result["message"] = result.get("message", "") + kb_message_suffix + + return result + + except Exception as e: + from langgraph.errors import GraphInterrupt + + if isinstance(e, GraphInterrupt): + raise + + logger.error(f"Error creating Notion page: {e}", exc_info=True) + if isinstance(e, ValueError | NotionAPIError): + message = str(e) + else: + message = ( + "Something went wrong while creating the page. Please try again." + ) + return {"status": "error", "message": message} + + return create_notion_page diff --git a/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/notion/tools/delete_page.py b/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/notion/tools/delete_page.py new file mode 100644 index 000000000..07f7583d2 --- /dev/null +++ b/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/notion/tools/delete_page.py @@ -0,0 +1,262 @@ +import logging +from typing import Any + +from langchain_core.tools import tool +from sqlalchemy.ext.asyncio import AsyncSession + +from app.agents.new_chat.tools.hitl import request_approval +from app.connectors.notion_history import NotionAPIError, NotionHistoryConnector +from app.services.notion.tool_metadata_service import NotionToolMetadataService + +logger = logging.getLogger(__name__) + + +def create_delete_notion_page_tool( + db_session: AsyncSession | None = None, + search_space_id: int | None = None, + user_id: str | None = None, + connector_id: int | None = None, +): + """ + Factory function to create the delete_notion_page tool. + + Args: + db_session: Database session for accessing Notion connector + search_space_id: Search space ID to find the Notion connector + user_id: User ID for finding the correct Notion connector + connector_id: Optional specific connector ID (if known) + + Returns: + Configured delete_notion_page tool + """ + + @tool + async def delete_notion_page( + page_title: str, + delete_from_kb: bool = False, + ) -> dict[str, Any]: + """Delete (archive) a Notion page. + + Use this tool when the user asks you to delete, remove, or archive + a Notion page. Note that Notion doesn't permanently delete pages, + it archives them (they can be restored from trash). + + Args: + page_title: The title of the Notion page to delete. + delete_from_kb: Whether to also remove the page from the knowledge base. + Default is False. + Set to True to permanently remove from both Notion and knowledge base. + + Returns: + Dictionary with: + - status: "success", "rejected", "not_found", or "error" + - page_id: Deleted page ID (if success) + - message: Success or error message + - deleted_from_kb: Whether the page was also removed from knowledge base (if success) + + Examples: + - "Delete the 'Meeting Notes' Notion page" + - "Remove the 'Old Project Plan' Notion page" + - "Archive the 'Draft Ideas' Notion page" + """ + logger.info( + f"delete_notion_page called: page_title='{page_title}', delete_from_kb={delete_from_kb}" + ) + + if db_session is None or search_space_id is None or user_id is None: + logger.error( + "Notion tool not properly configured - missing required parameters" + ) + return { + "status": "error", + "message": "Notion tool not properly configured. Please contact support.", + } + + try: + # Get page context (page_id, account, title) from indexed data + metadata_service = NotionToolMetadataService(db_session) + context = await metadata_service.get_delete_context( + search_space_id, user_id, page_title + ) + + if "error" in context: + error_msg = context["error"] + # Check if it's a "not found" error (softer handling for LLM) + if "not found" in error_msg.lower(): + logger.warning(f"Page not found: {error_msg}") + return { + "status": "not_found", + "message": error_msg, + } + else: + logger.error(f"Failed to fetch delete context: {error_msg}") + return { + "status": "error", + "message": error_msg, + } + + account = context.get("account", {}) + if account.get("auth_expired"): + logger.warning( + "Notion account %s has expired authentication", + account.get("id"), + ) + return { + "status": "auth_error", + "message": "The Notion account for this page needs re-authentication. Please re-authenticate in your connector settings.", + } + + page_id = context.get("page_id") + connector_id_from_context = account.get("id") + document_id = context.get("document_id") + + logger.info( + f"Requesting approval for deleting Notion page: '{page_title}' (page_id={page_id}, delete_from_kb={delete_from_kb})" + ) + + result = request_approval( + action_type="notion_page_deletion", + tool_name="delete_notion_page", + params={ + "page_id": page_id, + "connector_id": connector_id_from_context, + "delete_from_kb": delete_from_kb, + }, + context=context, + ) + + if result.rejected: + logger.info("Notion page deletion rejected by user") + return { + "status": "rejected", + "message": "User declined. Do not retry or suggest alternatives.", + } + + final_page_id = result.params.get("page_id", page_id) + final_connector_id = result.params.get( + "connector_id", connector_id_from_context + ) + final_delete_from_kb = result.params.get("delete_from_kb", delete_from_kb) + + logger.info( + f"Deleting Notion page with final params: page_id={final_page_id}, connector_id={final_connector_id}, delete_from_kb={final_delete_from_kb}" + ) + + from sqlalchemy.future import select + + from app.db import SearchSourceConnector, SearchSourceConnectorType + + # Validate the connector + if final_connector_id: + result = await db_session.execute( + select(SearchSourceConnector).filter( + SearchSourceConnector.id == final_connector_id, + SearchSourceConnector.search_space_id == search_space_id, + SearchSourceConnector.user_id == user_id, + SearchSourceConnector.connector_type + == SearchSourceConnectorType.NOTION_CONNECTOR, + ) + ) + connector = result.scalars().first() + + if not connector: + logger.error( + f"Invalid connector_id={final_connector_id} for search_space_id={search_space_id}" + ) + return { + "status": "error", + "message": "Selected Notion account is invalid or has been disconnected. Please select a valid account.", + } + actual_connector_id = connector.id + logger.info(f"Validated Notion connector: id={actual_connector_id}") + else: + logger.error("No connector found for this page") + return { + "status": "error", + "message": "No connector found for this page.", + } + + # Create connector instance + notion_connector = NotionHistoryConnector( + session=db_session, + connector_id=actual_connector_id, + ) + + # Delete the page from Notion + result = await notion_connector.delete_page(page_id=final_page_id) + logger.info( + f"delete_page result: {result.get('status')} - {result.get('message', '')}" + ) + + # If deletion was successful and user wants to delete from KB + deleted_from_kb = False + if ( + result.get("status") == "success" + and final_delete_from_kb + and document_id + ): + try: + from sqlalchemy.future import select + + from app.db import Document + + # Get the document + doc_result = await db_session.execute( + select(Document).filter(Document.id == document_id) + ) + document = doc_result.scalars().first() + + if document: + await db_session.delete(document) + await db_session.commit() + deleted_from_kb = True + logger.info( + f"Deleted document {document_id} from knowledge base" + ) + else: + logger.warning(f"Document {document_id} not found in KB") + except Exception as e: + logger.error(f"Failed to delete document from KB: {e}") + await db_session.rollback() + result["warning"] = ( + f"Page deleted from Notion, but failed to remove from knowledge base: {e!s}" + ) + + # Update result with KB deletion status + if result.get("status") == "success": + result["deleted_from_kb"] = deleted_from_kb + if deleted_from_kb: + result["message"] = ( + f"{result.get('message', '')} (also removed from knowledge base)" + ) + + return result + + except Exception as e: + from langgraph.errors import GraphInterrupt + + if isinstance(e, GraphInterrupt): + raise + + logger.error(f"Error deleting Notion page: {e}", exc_info=True) + error_str = str(e).lower() + if isinstance(e, NotionAPIError) and ( + "401" in error_str or "unauthorized" in error_str + ): + return { + "status": "auth_error", + "message": str(e), + "connector_id": connector_id_from_context + if "connector_id_from_context" in dir() + else None, + "connector_type": "notion", + } + if isinstance(e, ValueError | NotionAPIError): + message = str(e) + else: + message = ( + "Something went wrong while deleting the page. Please try again." + ) + return {"status": "error", "message": message} + + return delete_notion_page diff --git a/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/notion/tools/index.py b/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/notion/tools/index.py new file mode 100644 index 000000000..dd1db9031 --- /dev/null +++ b/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/notion/tools/index.py @@ -0,0 +1,31 @@ +from __future__ import annotations + +from typing import Any + +from app.agents.multi_agent_with_deepagents.subagents.shared.permissions import ( + ToolsPermissions, +) + +from .create_page import create_create_notion_page_tool +from .delete_page import create_delete_notion_page_tool +from .update_page import create_update_notion_page_tool + + +def load_tools(*, dependencies: dict[str, Any] | None = None, **kwargs: Any) -> ToolsPermissions: + d = {**(dependencies or {}), **kwargs} + common = { + "db_session": d["db_session"], + "search_space_id": d["search_space_id"], + "user_id": d["user_id"], + } + create = create_create_notion_page_tool(**common) + update = create_update_notion_page_tool(**common) + delete = create_delete_notion_page_tool(**common) + return { + "allow": [], + "ask": [ + {"name": getattr(create, "name", "") or "", "tool": create}, + {"name": getattr(update, "name", "") or "", "tool": update}, + {"name": getattr(delete, "name", "") or "", "tool": delete}, + ], + } diff --git a/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/notion/tools/update_page.py b/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/notion/tools/update_page.py new file mode 100644 index 000000000..85c08177c --- /dev/null +++ b/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/connectors/notion/tools/update_page.py @@ -0,0 +1,265 @@ +import logging +from typing import Any + +from langchain_core.tools import tool +from sqlalchemy.ext.asyncio import AsyncSession + +from app.agents.new_chat.tools.hitl import request_approval +from app.connectors.notion_history import NotionAPIError, NotionHistoryConnector +from app.services.notion import NotionToolMetadataService + +logger = logging.getLogger(__name__) + + +def create_update_notion_page_tool( + db_session: AsyncSession | None = None, + search_space_id: int | None = None, + user_id: str | None = None, + connector_id: int | None = None, +): + """ + Factory function to create the update_notion_page tool. + + Args: + db_session: Database session for accessing Notion connector + search_space_id: Search space ID to find the Notion connector + user_id: User ID for fetching user-specific context + connector_id: Optional specific connector ID (if known) + + Returns: + Configured update_notion_page tool + """ + + @tool + async def update_notion_page( + page_title: str, + content: str | None = None, + ) -> dict[str, Any]: + """Update an existing Notion page by appending new content. + + Use this tool when the user asks you to add content to, modify, or update + a Notion page. The new content will be appended to the existing page content. + The user MUST specify what to add before you call this tool. If the + request is vague, ask what content they want added. + + Args: + page_title: The title of the Notion page to update. + content: Optional markdown content to append to the page body (supports headings, lists, paragraphs). + Generate this yourself based on the user's request. + + Returns: + Dictionary with: + - status: "success", "rejected", "not_found", or "error" + - page_id: Updated page ID (if success) + - url: URL to the updated page (if success) + - title: Current page title (if success) + - message: Result message + + IMPORTANT: + - If status is "rejected", the user explicitly declined the action. + Respond with a brief acknowledgment (e.g., "Understood, I didn't update the page.") + and move on. Do NOT ask for alternatives or troubleshoot. + - If status is "not_found", inform the user conversationally using the exact message provided. + Example: "I couldn't find the page '[page_title]' in your indexed Notion pages. [message details]" + Do NOT treat this as an error. Do NOT invent information. Simply relay the message and + ask the user to verify the page title or check if it's been indexed. + Examples: + - "Add today's meeting notes to the 'Meeting Notes' Notion page" + - "Update the 'Project Plan' page with a status update on phase 1" + """ + logger.info( + f"update_notion_page called: page_title='{page_title}', content_length={len(content) if content else 0}" + ) + + if db_session is None or search_space_id is None or user_id is None: + logger.error( + "Notion tool not properly configured - missing required parameters" + ) + return { + "status": "error", + "message": "Notion tool not properly configured. Please contact support.", + } + + if not content or not content.strip(): + logger.error(f"Empty content provided for page '{page_title}'") + return { + "status": "error", + "message": "Content is required to update the page. Please provide the actual content you want to add.", + } + + try: + metadata_service = NotionToolMetadataService(db_session) + context = await metadata_service.get_update_context( + search_space_id, user_id, page_title + ) + + if "error" in context: + error_msg = context["error"] + # Check if it's a "not found" error (softer handling for LLM) + if "not found" in error_msg.lower(): + logger.warning(f"Page not found: {error_msg}") + return { + "status": "not_found", + "message": error_msg, + } + else: + logger.error(f"Failed to fetch update context: {error_msg}") + return { + "status": "error", + "message": error_msg, + } + + account = context.get("account", {}) + if account.get("auth_expired"): + logger.warning( + "Notion account %s has expired authentication", + account.get("id"), + ) + return { + "status": "auth_error", + "message": "The Notion account for this page needs re-authentication. Please re-authenticate in your connector settings.", + } + + page_id = context.get("page_id") + document_id = context.get("document_id") + connector_id_from_context = context.get("account", {}).get("id") + + logger.info( + f"Requesting approval for updating Notion page: '{page_title}' (page_id={page_id})" + ) + result = request_approval( + action_type="notion_page_update", + tool_name="update_notion_page", + params={ + "page_id": page_id, + "content": content, + "connector_id": connector_id_from_context, + }, + context=context, + ) + + if result.rejected: + logger.info("Notion page update rejected by user") + return { + "status": "rejected", + "message": "User declined. Do not retry or suggest alternatives.", + } + + final_page_id = result.params.get("page_id", page_id) + final_content = result.params.get("content", content) + final_connector_id = result.params.get( + "connector_id", connector_id_from_context + ) + + logger.info( + f"Updating Notion page with final params: page_id={final_page_id}, has_content={final_content is not None}" + ) + + from sqlalchemy.future import select + + from app.db import SearchSourceConnector, SearchSourceConnectorType + + if final_connector_id: + result = await db_session.execute( + select(SearchSourceConnector).filter( + SearchSourceConnector.id == final_connector_id, + SearchSourceConnector.search_space_id == search_space_id, + SearchSourceConnector.user_id == user_id, + SearchSourceConnector.connector_type + == SearchSourceConnectorType.NOTION_CONNECTOR, + ) + ) + connector = result.scalars().first() + + if not connector: + logger.error( + f"Invalid connector_id={final_connector_id} for search_space_id={search_space_id}" + ) + return { + "status": "error", + "message": "Selected Notion account is invalid or has been disconnected. Please select a valid account.", + } + actual_connector_id = connector.id + logger.info(f"Validated Notion connector: id={actual_connector_id}") + else: + logger.error("No connector found for this page") + return { + "status": "error", + "message": "No connector found for this page.", + } + + notion_connector = NotionHistoryConnector( + session=db_session, + connector_id=actual_connector_id, + ) + + result = await notion_connector.update_page( + page_id=final_page_id, + content=final_content, + ) + logger.info( + f"update_page result: {result.get('status')} - {result.get('message', '')}" + ) + + if result.get("status") == "success" and document_id is not None: + from app.services.notion import NotionKBSyncService + + logger.info(f"Updating knowledge base for document {document_id}...") + kb_service = NotionKBSyncService(db_session) + kb_result = await kb_service.sync_after_update( + document_id=document_id, + appended_content=final_content, + user_id=user_id, + search_space_id=search_space_id, + appended_block_ids=result.get("appended_block_ids"), + ) + + if kb_result["status"] == "success": + result["message"] = ( + f"{result['message']}. Your knowledge base has also been updated." + ) + logger.info( + f"Knowledge base successfully updated for page {final_page_id}" + ) + elif kb_result["status"] == "not_indexed": + result["message"] = ( + f"{result['message']}. This page will be added to your knowledge base in the next scheduled sync." + ) + else: + result["message"] = ( + f"{result['message']}. Your knowledge base will be updated in the next scheduled sync." + ) + logger.warning( + f"KB update failed for page {final_page_id}: {kb_result['message']}" + ) + + return result + + except Exception as e: + from langgraph.errors import GraphInterrupt + + if isinstance(e, GraphInterrupt): + raise + + logger.error(f"Error updating Notion page: {e}", exc_info=True) + error_str = str(e).lower() + if isinstance(e, NotionAPIError) and ( + "401" in error_str or "unauthorized" in error_str + ): + return { + "status": "auth_error", + "message": str(e), + "connector_id": connector_id_from_context + if "connector_id_from_context" in dir() + else None, + "connector_type": "notion", + } + if isinstance(e, ValueError | NotionAPIError): + message = str(e) + else: + message = ( + "Something went wrong while updating the page. Please try again." + ) + return {"status": "error", "message": message} + + return update_notion_page