diff --git a/surfsense_backend/app/agents/shared/tools/catalog.py b/surfsense_backend/app/agents/shared/tools/catalog.py index b8be4109a..73a4070ab 100644 --- a/surfsense_backend/app/agents/shared/tools/catalog.py +++ b/surfsense_backend/app/agents/shared/tools/catalog.py @@ -47,7 +47,6 @@ TOOL_CATALOG: list[ToolMetadata] = [ ToolMetadata(name="generate_image", description="Generate images from text descriptions using AI image models"), ToolMetadata(name="scrape_webpage", description="Scrape and extract the main content from a webpage"), ToolMetadata(name="web_search", description="Search the web for real-time information using configured search engines"), - ToolMetadata(name="get_connected_accounts", description="Discover connected accounts for a service and their metadata"), ToolMetadata(name="create_automation", description="Draft an automation from an NL intent; user approves the card; tool saves"), ToolMetadata(name="update_memory", description="Save important long-term facts, preferences, and instructions to the (personal or team) memory"), ToolMetadata(name="create_notion_page", description="Create a new page in the user's Notion workspace"), diff --git a/surfsense_backend/app/agents/shared/tools/confluence/__init__.py b/surfsense_backend/app/agents/shared/tools/confluence/__init__.py deleted file mode 100644 index 3bf80b61b..000000000 --- a/surfsense_backend/app/agents/shared/tools/confluence/__init__.py +++ /dev/null @@ -1,11 +0,0 @@ -"""Confluence tools for creating, updating, and deleting pages.""" - -from .create_page import create_create_confluence_page_tool -from .delete_page import create_delete_confluence_page_tool -from .update_page import create_update_confluence_page_tool - -__all__ = [ - "create_create_confluence_page_tool", - "create_delete_confluence_page_tool", - "create_update_confluence_page_tool", -] diff --git a/surfsense_backend/app/agents/shared/tools/confluence/create_page.py b/surfsense_backend/app/agents/shared/tools/confluence/create_page.py deleted file mode 100644 index 95e2308e3..000000000 --- a/surfsense_backend/app/agents/shared/tools/confluence/create_page.py +++ /dev/null @@ -1,232 +0,0 @@ -import logging -from typing import Any - -from langchain_core.tools import tool -from sqlalchemy.ext.asyncio import AsyncSession -from sqlalchemy.orm.attributes import flag_modified - -from app.agents.shared.tools.hitl import request_approval -from app.connectors.confluence_history import ConfluenceHistoryConnector -from app.db import async_session_maker -from app.services.confluence import ConfluenceToolMetadataService - -logger = logging.getLogger(__name__) - - -def create_create_confluence_page_tool( - db_session: AsyncSession | None = None, - search_space_id: int | None = None, - user_id: str | None = None, - connector_id: int | None = None, -): - """ - Factory function to create the create_confluence_page tool. - - The tool acquires its own short-lived ``AsyncSession`` per call via - :data:`async_session_maker` so the closure is safe to share across - HTTP requests by the compiled-agent cache. Capturing a per-request - session here would surface stale/closed sessions on cache hits. - - Args: - db_session: Reserved for registry compatibility. Per-call sessions - are opened via :data:`async_session_maker` inside the tool body. - - Returns: - Configured create_confluence_page tool - """ - del db_session # per-call session — see docstring - - @tool - async def create_confluence_page( - title: str, - content: str | None = None, - space_id: str | None = None, - ) -> dict[str, Any]: - """Create a new page in Confluence. - - Use this tool when the user explicitly asks to create a new Confluence page. - - Args: - title: Title of the page. - content: Optional HTML/storage format content for the page body. - space_id: Optional Confluence space ID to create the page in. - - Returns: - Dictionary with status, page_id, and message. - - IMPORTANT: - - If status is "rejected", do NOT retry. - - If status is "insufficient_permissions", inform user to re-authenticate. - """ - logger.info(f"create_confluence_page called: title='{title}'") - - if search_space_id is None or user_id is None: - return { - "status": "error", - "message": "Confluence tool not properly configured.", - } - - try: - async with async_session_maker() as db_session: - metadata_service = ConfluenceToolMetadataService(db_session) - context = await metadata_service.get_creation_context( - search_space_id, user_id - ) - - if "error" in context: - return {"status": "error", "message": context["error"]} - - accounts = context.get("accounts", []) - if accounts and all(a.get("auth_expired") for a in accounts): - return { - "status": "auth_error", - "message": "All connected Confluence accounts need re-authentication.", - "connector_type": "confluence", - } - - result = request_approval( - action_type="confluence_page_creation", - tool_name="create_confluence_page", - params={ - "title": title, - "content": content, - "space_id": space_id, - "connector_id": connector_id, - }, - context=context, - ) - - if result.rejected: - return { - "status": "rejected", - "message": "User declined. Do not retry or suggest alternatives.", - } - - final_title = result.params.get("title", title) - final_content = result.params.get("content", content) or "" - final_space_id = result.params.get("space_id", space_id) - final_connector_id = result.params.get("connector_id", connector_id) - - if not final_title or not final_title.strip(): - return {"status": "error", "message": "Page title cannot be empty."} - if not final_space_id: - return {"status": "error", "message": "A space must be selected."} - - from sqlalchemy.future import select - - from app.db import SearchSourceConnector, SearchSourceConnectorType - - actual_connector_id = final_connector_id - if actual_connector_id is None: - result = await db_session.execute( - select(SearchSourceConnector).filter( - SearchSourceConnector.search_space_id == search_space_id, - SearchSourceConnector.user_id == user_id, - SearchSourceConnector.connector_type - == SearchSourceConnectorType.CONFLUENCE_CONNECTOR, - ) - ) - connector = result.scalars().first() - if not connector: - return { - "status": "error", - "message": "No Confluence connector found.", - } - actual_connector_id = connector.id - else: - result = await db_session.execute( - select(SearchSourceConnector).filter( - SearchSourceConnector.id == actual_connector_id, - SearchSourceConnector.search_space_id == search_space_id, - SearchSourceConnector.user_id == user_id, - SearchSourceConnector.connector_type - == SearchSourceConnectorType.CONFLUENCE_CONNECTOR, - ) - ) - connector = result.scalars().first() - if not connector: - return { - "status": "error", - "message": "Selected Confluence connector is invalid.", - } - - try: - client = ConfluenceHistoryConnector( - session=db_session, connector_id=actual_connector_id - ) - api_result = await client.create_page( - space_id=final_space_id, - title=final_title, - body=final_content, - ) - await client.close() - except Exception as api_err: - if ( - "http 403" in str(api_err).lower() - or "status code 403" in str(api_err).lower() - ): - try: - _conn = connector - _conn.config = {**_conn.config, "auth_expired": True} - flag_modified(_conn, "config") - await db_session.commit() - except Exception: - pass - return { - "status": "insufficient_permissions", - "connector_id": actual_connector_id, - "message": "This Confluence account needs additional permissions. Please re-authenticate in connector settings.", - } - raise - - page_id = str(api_result.get("id", "")) - page_links = ( - api_result.get("_links", {}) if isinstance(api_result, dict) else {} - ) - page_url = "" - if page_links.get("base") and page_links.get("webui"): - page_url = f"{page_links['base']}{page_links['webui']}" - - kb_message_suffix = "" - try: - from app.services.confluence import ConfluenceKBSyncService - - kb_service = ConfluenceKBSyncService(db_session) - kb_result = await kb_service.sync_after_create( - page_id=page_id, - page_title=final_title, - space_id=final_space_id, - body_content=final_content, - connector_id=actual_connector_id, - search_space_id=search_space_id, - user_id=user_id, - ) - if kb_result["status"] == "success": - kb_message_suffix = ( - " Your knowledge base has also been updated." - ) - else: - kb_message_suffix = " This page will be added to your knowledge base in the next scheduled sync." - except Exception as kb_err: - logger.warning(f"KB sync after create failed: {kb_err}") - kb_message_suffix = " This page will be added to your knowledge base in the next scheduled sync." - - return { - "status": "success", - "page_id": page_id, - "page_url": page_url, - "message": f"Confluence page '{final_title}' created successfully.{kb_message_suffix}", - } - - except Exception as e: - from langgraph.errors import GraphInterrupt - - if isinstance(e, GraphInterrupt): - raise - logger.error(f"Error creating Confluence page: {e}", exc_info=True) - return { - "status": "error", - "message": "Something went wrong while creating the page.", - } - - return create_confluence_page diff --git a/surfsense_backend/app/agents/shared/tools/confluence/delete_page.py b/surfsense_backend/app/agents/shared/tools/confluence/delete_page.py deleted file mode 100644 index dd1ee326e..000000000 --- a/surfsense_backend/app/agents/shared/tools/confluence/delete_page.py +++ /dev/null @@ -1,213 +0,0 @@ -import logging -from typing import Any - -from langchain_core.tools import tool -from sqlalchemy.ext.asyncio import AsyncSession -from sqlalchemy.orm.attributes import flag_modified - -from app.agents.shared.tools.hitl import request_approval -from app.connectors.confluence_history import ConfluenceHistoryConnector -from app.db import async_session_maker -from app.services.confluence import ConfluenceToolMetadataService - -logger = logging.getLogger(__name__) - - -def create_delete_confluence_page_tool( - db_session: AsyncSession | None = None, - search_space_id: int | None = None, - user_id: str | None = None, - connector_id: int | None = None, -): - """ - Factory function to create the delete_confluence_page tool. - - The tool acquires its own short-lived ``AsyncSession`` per call via - :data:`async_session_maker` so the closure is safe to share across - HTTP requests by the compiled-agent cache. Capturing a per-request - session here would surface stale/closed sessions on cache hits. - - Args: - db_session: Reserved for registry compatibility. Per-call sessions - are opened via :data:`async_session_maker` inside the tool body. - - Returns: - Configured delete_confluence_page tool - """ - del db_session # per-call session — see docstring - - @tool - async def delete_confluence_page( - page_title_or_id: str, - delete_from_kb: bool = False, - ) -> dict[str, Any]: - """Delete a Confluence page. - - Use this tool when the user asks to delete or remove a Confluence page. - - Args: - page_title_or_id: The page title or ID to identify the page. - delete_from_kb: Whether to also remove from the knowledge base. - - Returns: - Dictionary with status, message, and deleted_from_kb. - - IMPORTANT: - - If status is "rejected", do NOT retry. - - If status is "not_found", relay the message to the user. - - If status is "insufficient_permissions", inform user to re-authenticate. - """ - logger.info( - f"delete_confluence_page called: page_title_or_id='{page_title_or_id}'" - ) - - if search_space_id is None or user_id is None: - return { - "status": "error", - "message": "Confluence tool not properly configured.", - } - - try: - async with async_session_maker() as db_session: - metadata_service = ConfluenceToolMetadataService(db_session) - context = await metadata_service.get_deletion_context( - search_space_id, user_id, page_title_or_id - ) - - if "error" in context: - error_msg = context["error"] - if context.get("auth_expired"): - return { - "status": "auth_error", - "message": error_msg, - "connector_id": context.get("connector_id"), - "connector_type": "confluence", - } - if "not found" in error_msg.lower(): - return {"status": "not_found", "message": error_msg} - return {"status": "error", "message": error_msg} - - page_data = context["page"] - page_id = page_data["page_id"] - page_title = page_data.get("page_title", "") - document_id = page_data["document_id"] - connector_id_from_context = context.get("account", {}).get("id") - - result = request_approval( - action_type="confluence_page_deletion", - tool_name="delete_confluence_page", - params={ - "page_id": page_id, - "connector_id": connector_id_from_context, - "delete_from_kb": delete_from_kb, - }, - context=context, - ) - - if result.rejected: - return { - "status": "rejected", - "message": "User declined. Do not retry or suggest alternatives.", - } - - final_page_id = result.params.get("page_id", page_id) - final_connector_id = result.params.get( - "connector_id", connector_id_from_context - ) - final_delete_from_kb = result.params.get( - "delete_from_kb", delete_from_kb - ) - - from sqlalchemy.future import select - - from app.db import SearchSourceConnector, SearchSourceConnectorType - - if not final_connector_id: - return { - "status": "error", - "message": "No connector found for this page.", - } - - result = await db_session.execute( - select(SearchSourceConnector).filter( - SearchSourceConnector.id == final_connector_id, - SearchSourceConnector.search_space_id == search_space_id, - SearchSourceConnector.user_id == user_id, - SearchSourceConnector.connector_type - == SearchSourceConnectorType.CONFLUENCE_CONNECTOR, - ) - ) - connector = result.scalars().first() - if not connector: - return { - "status": "error", - "message": "Selected Confluence connector is invalid.", - } - - try: - client = ConfluenceHistoryConnector( - session=db_session, connector_id=final_connector_id - ) - await client.delete_page(final_page_id) - await client.close() - except Exception as api_err: - if ( - "http 403" in str(api_err).lower() - or "status code 403" in str(api_err).lower() - ): - try: - connector.config = { - **connector.config, - "auth_expired": True, - } - flag_modified(connector, "config") - await db_session.commit() - except Exception: - pass - return { - "status": "insufficient_permissions", - "connector_id": final_connector_id, - "message": "This Confluence account needs additional permissions. Please re-authenticate in connector settings.", - } - raise - - deleted_from_kb = False - if final_delete_from_kb and document_id: - try: - from app.db import Document - - doc_result = await db_session.execute( - select(Document).filter(Document.id == document_id) - ) - document = doc_result.scalars().first() - if document: - await db_session.delete(document) - await db_session.commit() - deleted_from_kb = True - except Exception as e: - logger.error(f"Failed to delete document from KB: {e}") - await db_session.rollback() - - message = f"Confluence page '{page_title}' deleted successfully." - if deleted_from_kb: - message += " Also removed from the knowledge base." - - return { - "status": "success", - "page_id": final_page_id, - "deleted_from_kb": deleted_from_kb, - "message": message, - } - - except Exception as e: - from langgraph.errors import GraphInterrupt - - if isinstance(e, GraphInterrupt): - raise - logger.error(f"Error deleting Confluence page: {e}", exc_info=True) - return { - "status": "error", - "message": "Something went wrong while deleting the page.", - } - - return delete_confluence_page diff --git a/surfsense_backend/app/agents/shared/tools/confluence/update_page.py b/surfsense_backend/app/agents/shared/tools/confluence/update_page.py deleted file mode 100644 index 1368f41b8..000000000 --- a/surfsense_backend/app/agents/shared/tools/confluence/update_page.py +++ /dev/null @@ -1,240 +0,0 @@ -import logging -from typing import Any - -from langchain_core.tools import tool -from sqlalchemy.ext.asyncio import AsyncSession -from sqlalchemy.orm.attributes import flag_modified - -from app.agents.shared.tools.hitl import request_approval -from app.connectors.confluence_history import ConfluenceHistoryConnector -from app.db import async_session_maker -from app.services.confluence import ConfluenceToolMetadataService - -logger = logging.getLogger(__name__) - - -def create_update_confluence_page_tool( - db_session: AsyncSession | None = None, - search_space_id: int | None = None, - user_id: str | None = None, - connector_id: int | None = None, -): - """ - Factory function to create the update_confluence_page tool. - - The tool acquires its own short-lived ``AsyncSession`` per call via - :data:`async_session_maker` so the closure is safe to share across - HTTP requests by the compiled-agent cache. Capturing a per-request - session here would surface stale/closed sessions on cache hits. - - Args: - db_session: Reserved for registry compatibility. Per-call sessions - are opened via :data:`async_session_maker` inside the tool body. - - Returns: - Configured update_confluence_page tool - """ - del db_session # per-call session — see docstring - - @tool - async def update_confluence_page( - page_title_or_id: str, - new_title: str | None = None, - new_content: str | None = None, - ) -> dict[str, Any]: - """Update an existing Confluence page. - - Use this tool when the user asks to modify or edit a Confluence page. - - Args: - page_title_or_id: The page title or ID to identify the page. - new_title: Optional new title for the page. - new_content: Optional new HTML/storage format content. - - Returns: - Dictionary with status and message. - - IMPORTANT: - - If status is "rejected", do NOT retry. - - If status is "not_found", relay the message to the user. - - If status is "insufficient_permissions", inform user to re-authenticate. - """ - logger.info( - f"update_confluence_page called: page_title_or_id='{page_title_or_id}'" - ) - - if search_space_id is None or user_id is None: - return { - "status": "error", - "message": "Confluence tool not properly configured.", - } - - try: - async with async_session_maker() as db_session: - metadata_service = ConfluenceToolMetadataService(db_session) - context = await metadata_service.get_update_context( - search_space_id, user_id, page_title_or_id - ) - - if "error" in context: - error_msg = context["error"] - if context.get("auth_expired"): - return { - "status": "auth_error", - "message": error_msg, - "connector_id": context.get("connector_id"), - "connector_type": "confluence", - } - if "not found" in error_msg.lower(): - return {"status": "not_found", "message": error_msg} - return {"status": "error", "message": error_msg} - - page_data = context["page"] - page_id = page_data["page_id"] - current_title = page_data["page_title"] - current_body = page_data.get("body", "") - current_version = page_data.get("version", 1) - document_id = page_data.get("document_id") - connector_id_from_context = context.get("account", {}).get("id") - - result = request_approval( - action_type="confluence_page_update", - tool_name="update_confluence_page", - params={ - "page_id": page_id, - "document_id": document_id, - "new_title": new_title, - "new_content": new_content, - "version": current_version, - "connector_id": connector_id_from_context, - }, - context=context, - ) - - if result.rejected: - return { - "status": "rejected", - "message": "User declined. Do not retry or suggest alternatives.", - } - - final_page_id = result.params.get("page_id", page_id) - final_title = result.params.get("new_title", new_title) or current_title - final_content = result.params.get("new_content", new_content) - if final_content is None: - final_content = current_body - final_version = result.params.get("version", current_version) - final_connector_id = result.params.get( - "connector_id", connector_id_from_context - ) - final_document_id = result.params.get("document_id", document_id) - - from sqlalchemy.future import select - - from app.db import SearchSourceConnector, SearchSourceConnectorType - - if not final_connector_id: - return { - "status": "error", - "message": "No connector found for this page.", - } - - result = await db_session.execute( - select(SearchSourceConnector).filter( - SearchSourceConnector.id == final_connector_id, - SearchSourceConnector.search_space_id == search_space_id, - SearchSourceConnector.user_id == user_id, - SearchSourceConnector.connector_type - == SearchSourceConnectorType.CONFLUENCE_CONNECTOR, - ) - ) - connector = result.scalars().first() - if not connector: - return { - "status": "error", - "message": "Selected Confluence connector is invalid.", - } - - try: - client = ConfluenceHistoryConnector( - session=db_session, connector_id=final_connector_id - ) - api_result = await client.update_page( - page_id=final_page_id, - title=final_title, - body=final_content, - version_number=final_version + 1, - ) - await client.close() - except Exception as api_err: - if ( - "http 403" in str(api_err).lower() - or "status code 403" in str(api_err).lower() - ): - try: - connector.config = { - **connector.config, - "auth_expired": True, - } - flag_modified(connector, "config") - await db_session.commit() - except Exception: - pass - return { - "status": "insufficient_permissions", - "connector_id": final_connector_id, - "message": "This Confluence account needs additional permissions. Please re-authenticate in connector settings.", - } - raise - - page_links = ( - api_result.get("_links", {}) if isinstance(api_result, dict) else {} - ) - page_url = "" - if page_links.get("base") and page_links.get("webui"): - page_url = f"{page_links['base']}{page_links['webui']}" - - kb_message_suffix = "" - if final_document_id: - try: - from app.services.confluence import ConfluenceKBSyncService - - kb_service = ConfluenceKBSyncService(db_session) - kb_result = await kb_service.sync_after_update( - document_id=final_document_id, - page_id=final_page_id, - user_id=user_id, - search_space_id=search_space_id, - ) - if kb_result["status"] == "success": - kb_message_suffix = ( - " Your knowledge base has also been updated." - ) - else: - kb_message_suffix = ( - " The knowledge base will be updated in the next sync." - ) - except Exception as kb_err: - logger.warning(f"KB sync after update failed: {kb_err}") - kb_message_suffix = ( - " The knowledge base will be updated in the next sync." - ) - - return { - "status": "success", - "page_id": final_page_id, - "page_url": page_url, - "message": f"Confluence page '{final_title}' updated successfully.{kb_message_suffix}", - } - - except Exception as e: - from langgraph.errors import GraphInterrupt - - if isinstance(e, GraphInterrupt): - raise - logger.error(f"Error updating Confluence page: {e}", exc_info=True) - return { - "status": "error", - "message": "Something went wrong while updating the page.", - } - - return update_confluence_page diff --git a/surfsense_backend/app/agents/shared/tools/connected_accounts.py b/surfsense_backend/app/agents/shared/tools/connected_accounts.py deleted file mode 100644 index 6420a90e6..000000000 --- a/surfsense_backend/app/agents/shared/tools/connected_accounts.py +++ /dev/null @@ -1,135 +0,0 @@ -"""Connected-accounts discovery tool. - -Lets the LLM discover which accounts are connected for a given service -(e.g. "jira", "linear", "slack") and retrieve the metadata it needs to -call action tools — such as Jira's ``cloudId``. - -The tool returns **only** non-sensitive fields explicitly listed in the -service's ``account_metadata_keys`` (see ``registry.py``), plus the -always-present ``display_name`` and ``connector_id``. -""" - -import logging -from typing import Any - -from langchain_core.tools import StructuredTool -from pydantic import BaseModel, Field -from sqlalchemy.ext.asyncio import AsyncSession -from sqlalchemy.future import select - -from app.db import SearchSourceConnector, SearchSourceConnectorType, async_session_maker -from app.services.mcp_oauth.registry import MCP_SERVICES - -logger = logging.getLogger(__name__) - -_SERVICE_KEY_BY_CONNECTOR_TYPE: dict[str, str] = { - cfg.connector_type: key for key, cfg in MCP_SERVICES.items() -} - - -class GetConnectedAccountsInput(BaseModel): - service: str = Field( - description=( - "Service key to look up connected accounts for. " - "Valid values: " + ", ".join(sorted(MCP_SERVICES.keys())) - ), - ) - - -def _extract_display_name(connector: SearchSourceConnector) -> str: - """Best-effort human-readable label for a connector.""" - cfg = connector.config or {} - if cfg.get("display_name"): - return cfg["display_name"] - if cfg.get("base_url"): - return f"{connector.name} ({cfg['base_url']})" - if cfg.get("organization_name"): - return f"{connector.name} ({cfg['organization_name']})" - return connector.name - - -def create_get_connected_accounts_tool( - db_session: AsyncSession, - search_space_id: int, - user_id: str, -) -> StructuredTool: - """Factory function to create the get_connected_accounts tool. - - The tool acquires its own short-lived ``AsyncSession`` per call via - :data:`async_session_maker` so the closure is safe to share across - HTTP requests by the compiled-agent cache. Capturing a per-request - session here would surface stale/closed sessions on cache hits. - - Args: - db_session: Reserved for registry compatibility. Per-call sessions - are opened via :data:`async_session_maker` inside the tool body. - search_space_id: Search space ID to scope account discovery to. - user_id: User ID to scope account discovery to. - - Returns: - Configured StructuredTool for connected-accounts discovery. - """ - del db_session # per-call session — see docstring - - async def _run(service: str) -> list[dict[str, Any]]: - svc_cfg = MCP_SERVICES.get(service) - if not svc_cfg: - return [ - { - "error": f"Unknown service '{service}'. Valid: {', '.join(sorted(MCP_SERVICES.keys()))}" - } - ] - - try: - connector_type = SearchSourceConnectorType(svc_cfg.connector_type) - except ValueError: - return [{"error": f"Connector type '{svc_cfg.connector_type}' not found."}] - - async with async_session_maker() as db_session: - result = await db_session.execute( - select(SearchSourceConnector).filter( - SearchSourceConnector.search_space_id == search_space_id, - SearchSourceConnector.user_id == user_id, - SearchSourceConnector.connector_type == connector_type, - ) - ) - connectors = result.scalars().all() - - if not connectors: - return [ - { - "error": f"No {svc_cfg.name} accounts connected. Ask the user to connect one in settings." - } - ] - - is_multi = len(connectors) > 1 - - accounts: list[dict[str, Any]] = [] - for conn in connectors: - cfg = conn.config or {} - entry: dict[str, Any] = { - "connector_id": conn.id, - "display_name": _extract_display_name(conn), - "service": service, - } - if is_multi: - entry["tool_prefix"] = f"{service}_{conn.id}" - for key in svc_cfg.account_metadata_keys: - if key in cfg: - entry[key] = cfg[key] - accounts.append(entry) - - return accounts - - return StructuredTool( - name="get_connected_accounts", - description=( - "Discover which accounts are connected for a service (e.g. jira, linear, slack, clickup, airtable). " - "Returns display names and service-specific metadata the action tools need " - "(e.g. Jira's cloudId). Call this BEFORE using a service's action tools when " - "you need an account identifier or are unsure which account to use." - ), - coroutine=_run, - args_schema=GetConnectedAccountsInput, - metadata={"hitl": False}, - ) diff --git a/surfsense_backend/app/agents/shared/tools/notion/__init__.py b/surfsense_backend/app/agents/shared/tools/notion/__init__.py deleted file mode 100644 index 6ce825dca..000000000 --- a/surfsense_backend/app/agents/shared/tools/notion/__init__.py +++ /dev/null @@ -1,11 +0,0 @@ -"""Notion tools for creating, updating, and deleting pages.""" - -from .create_page import create_create_notion_page_tool -from .delete_page import create_delete_notion_page_tool -from .update_page import create_update_notion_page_tool - -__all__ = [ - "create_create_notion_page_tool", - "create_delete_notion_page_tool", - "create_update_notion_page_tool", -] diff --git a/surfsense_backend/app/agents/shared/tools/notion/create_page.py b/surfsense_backend/app/agents/shared/tools/notion/create_page.py deleted file mode 100644 index b9e4d46d3..000000000 --- a/surfsense_backend/app/agents/shared/tools/notion/create_page.py +++ /dev/null @@ -1,258 +0,0 @@ -import logging -from typing import Any - -from langchain_core.tools import tool -from sqlalchemy.ext.asyncio import AsyncSession - -from app.agents.shared.tools.hitl import request_approval -from app.connectors.notion_history import NotionAPIError, NotionHistoryConnector -from app.db import async_session_maker -from app.services.notion import NotionToolMetadataService - -logger = logging.getLogger(__name__) - - -def create_create_notion_page_tool( - db_session: AsyncSession | None = None, - search_space_id: int | None = None, - user_id: str | None = None, - connector_id: int | None = None, -): - """ - Factory function to create the create_notion_page tool. - - The tool acquires its own short-lived ``AsyncSession`` per call via - :data:`async_session_maker`. This is critical for the compiled-agent - cache: the compiled graph (and therefore this closure) is reused - across HTTP requests, so capturing a per-request session here would - surface stale/closed sessions on cache hits. Per-call sessions also - keep the request's outer transaction free of long-running Notion API - blocking. - - Args: - db_session: Reserved for registry compatibility. Per-call sessions - are opened via :data:`async_session_maker` inside the tool body. - search_space_id: Search space ID to find the Notion connector - user_id: User ID for fetching user-specific context - connector_id: Optional specific connector ID (if known) - - Returns: - Configured create_notion_page tool - """ - del db_session # per-call session — see docstring - - @tool - async def create_notion_page( - title: str, - content: str | None = None, - ) -> dict[str, Any]: - """Create a new page in Notion with the given title and content. - - Use this tool when the user asks you to create, save, or publish - something to Notion. The page will be created in the user's - configured Notion workspace. The user MUST specify a topic before you - call this tool. If the request does not contain a topic (e.g. "create a - notion page"), ask what the page should be about. Never call this tool - without a clear topic from the user. - - Args: - title: The title of the Notion page. - content: Optional markdown content for the page body (supports headings, lists, paragraphs). - Generate this yourself based on the user's topic. - - Returns: - Dictionary with: - - status: "success", "rejected", or "error" - - page_id: Created page ID (if success) - - url: URL to the created page (if success) - - title: Page title (if success) - - message: Result message - - IMPORTANT: If status is "rejected", the user explicitly declined the action. - Respond with a brief acknowledgment (e.g., "Understood, I didn't create the page.") - and move on. Do NOT troubleshoot or suggest alternatives. - - Examples: - - "Create a Notion page about our Q2 roadmap" - - "Save a summary of today's discussion to Notion" - """ - logger.info(f"create_notion_page called: title='{title}'") - - if search_space_id is None or user_id is None: - logger.error( - "Notion tool not properly configured - missing required parameters" - ) - return { - "status": "error", - "message": "Notion tool not properly configured. Please contact support.", - } - - try: - async with async_session_maker() as db_session: - metadata_service = NotionToolMetadataService(db_session) - context = await metadata_service.get_creation_context( - search_space_id, user_id - ) - - if "error" in context: - logger.error( - f"Failed to fetch creation context: {context['error']}" - ) - return { - "status": "error", - "message": context["error"], - } - - accounts = context.get("accounts", []) - if accounts and all(a.get("auth_expired") for a in accounts): - logger.warning("All Notion accounts have expired authentication") - return { - "status": "auth_error", - "message": "All connected Notion accounts need re-authentication. Please re-authenticate in your connector settings.", - "connector_type": "notion", - } - - logger.info(f"Requesting approval for creating Notion page: '{title}'") - result = request_approval( - action_type="notion_page_creation", - tool_name="create_notion_page", - params={ - "title": title, - "content": content, - "parent_page_id": None, - "connector_id": connector_id, - }, - context=context, - ) - - if result.rejected: - logger.info("Notion page creation rejected by user") - return { - "status": "rejected", - "message": "User declined. Do not retry or suggest alternatives.", - } - - final_title = result.params.get("title", title) - final_content = result.params.get("content", content) - final_parent_page_id = result.params.get("parent_page_id") - final_connector_id = result.params.get("connector_id", connector_id) - - if not final_title or not final_title.strip(): - logger.error("Title is empty or contains only whitespace") - return { - "status": "error", - "message": "Page title cannot be empty. Please provide a valid title.", - } - - logger.info( - f"Creating Notion page with final params: title='{final_title}'" - ) - - from sqlalchemy.future import select - - from app.db import SearchSourceConnector, SearchSourceConnectorType - - actual_connector_id = final_connector_id - if actual_connector_id is None: - result = await db_session.execute( - select(SearchSourceConnector).filter( - SearchSourceConnector.search_space_id == search_space_id, - SearchSourceConnector.user_id == user_id, - SearchSourceConnector.connector_type - == SearchSourceConnectorType.NOTION_CONNECTOR, - ) - ) - connector = result.scalars().first() - - if not connector: - logger.warning( - f"No Notion connector found for search_space_id={search_space_id}" - ) - return { - "status": "error", - "message": "No Notion connector found. Please connect Notion in your workspace settings.", - } - - actual_connector_id = connector.id - logger.info(f"Found Notion connector: id={actual_connector_id}") - else: - result = await db_session.execute( - select(SearchSourceConnector).filter( - SearchSourceConnector.id == actual_connector_id, - SearchSourceConnector.search_space_id == search_space_id, - SearchSourceConnector.user_id == user_id, - SearchSourceConnector.connector_type - == SearchSourceConnectorType.NOTION_CONNECTOR, - ) - ) - connector = result.scalars().first() - - if not connector: - logger.error( - f"Invalid connector_id={actual_connector_id} for search_space_id={search_space_id}" - ) - return { - "status": "error", - "message": "Selected Notion account is invalid or has been disconnected. Please select a valid account.", - } - logger.info(f"Validated Notion connector: id={actual_connector_id}") - - notion_connector = NotionHistoryConnector( - session=db_session, - connector_id=actual_connector_id, - ) - - result = await notion_connector.create_page( - title=final_title, - content=final_content, - parent_page_id=final_parent_page_id, - ) - logger.info( - f"create_page result: {result.get('status')} - {result.get('message', '')}" - ) - - if result.get("status") == "success": - kb_message_suffix = "" - try: - from app.services.notion import NotionKBSyncService - - kb_service = NotionKBSyncService(db_session) - kb_result = await kb_service.sync_after_create( - page_id=result.get("page_id"), - page_title=result.get("title", final_title), - page_url=result.get("url"), - content=final_content, - connector_id=actual_connector_id, - search_space_id=search_space_id, - user_id=user_id, - ) - if kb_result["status"] == "success": - kb_message_suffix = ( - " Your knowledge base has also been updated." - ) - else: - kb_message_suffix = " This page will be added to your knowledge base in the next scheduled sync." - except Exception as kb_err: - logger.warning(f"KB sync after create failed: {kb_err}") - kb_message_suffix = " This page will be added to your knowledge base in the next scheduled sync." - - result["message"] = result.get("message", "") + kb_message_suffix - - return result - - except Exception as e: - from langgraph.errors import GraphInterrupt - - if isinstance(e, GraphInterrupt): - raise - - logger.error(f"Error creating Notion page: {e}", exc_info=True) - if isinstance(e, ValueError | NotionAPIError): - message = str(e) - else: - message = ( - "Something went wrong while creating the page. Please try again." - ) - return {"status": "error", "message": message} - - return create_notion_page diff --git a/surfsense_backend/app/agents/shared/tools/notion/delete_page.py b/surfsense_backend/app/agents/shared/tools/notion/delete_page.py deleted file mode 100644 index 3fa4af9dc..000000000 --- a/surfsense_backend/app/agents/shared/tools/notion/delete_page.py +++ /dev/null @@ -1,273 +0,0 @@ -import logging -from typing import Any - -from langchain_core.tools import tool -from sqlalchemy.ext.asyncio import AsyncSession - -from app.agents.shared.tools.hitl import request_approval -from app.connectors.notion_history import NotionAPIError, NotionHistoryConnector -from app.db import async_session_maker -from app.services.notion.tool_metadata_service import NotionToolMetadataService - -logger = logging.getLogger(__name__) - - -def create_delete_notion_page_tool( - db_session: AsyncSession | None = None, - search_space_id: int | None = None, - user_id: str | None = None, - connector_id: int | None = None, -): - """ - Factory function to create the delete_notion_page tool. - - The tool acquires its own short-lived ``AsyncSession`` per call via - :data:`async_session_maker` so the closure is safe to share across - HTTP requests by the compiled-agent cache. Capturing a per-request - session here would surface stale/closed sessions on cache hits. - - Args: - db_session: Reserved for registry compatibility. Per-call sessions - are opened via :data:`async_session_maker` inside the tool body. - search_space_id: Search space ID to find the Notion connector - user_id: User ID for finding the correct Notion connector - connector_id: Optional specific connector ID (if known) - - Returns: - Configured delete_notion_page tool - """ - del db_session # per-call session — see docstring - - @tool - async def delete_notion_page( - page_title: str, - delete_from_kb: bool = False, - ) -> dict[str, Any]: - """Delete (archive) a Notion page. - - Use this tool when the user asks you to delete, remove, or archive - a Notion page. Note that Notion doesn't permanently delete pages, - it archives them (they can be restored from trash). - - Args: - page_title: The title of the Notion page to delete. - delete_from_kb: Whether to also remove the page from the knowledge base. - Default is False. - Set to True to permanently remove from both Notion and knowledge base. - - Returns: - Dictionary with: - - status: "success", "rejected", "not_found", or "error" - - page_id: Deleted page ID (if success) - - message: Success or error message - - deleted_from_kb: Whether the page was also removed from knowledge base (if success) - - Examples: - - "Delete the 'Meeting Notes' Notion page" - - "Remove the 'Old Project Plan' Notion page" - - "Archive the 'Draft Ideas' Notion page" - """ - logger.info( - f"delete_notion_page called: page_title='{page_title}', delete_from_kb={delete_from_kb}" - ) - - if search_space_id is None or user_id is None: - logger.error( - "Notion tool not properly configured - missing required parameters" - ) - return { - "status": "error", - "message": "Notion tool not properly configured. Please contact support.", - } - - try: - async with async_session_maker() as db_session: - # Get page context (page_id, account, title) from indexed data - metadata_service = NotionToolMetadataService(db_session) - context = await metadata_service.get_delete_context( - search_space_id, user_id, page_title - ) - - if "error" in context: - error_msg = context["error"] - # Check if it's a "not found" error (softer handling for LLM) - if "not found" in error_msg.lower(): - logger.warning(f"Page not found: {error_msg}") - return { - "status": "not_found", - "message": error_msg, - } - else: - logger.error(f"Failed to fetch delete context: {error_msg}") - return { - "status": "error", - "message": error_msg, - } - - account = context.get("account", {}) - if account.get("auth_expired"): - logger.warning( - "Notion account %s has expired authentication", - account.get("id"), - ) - return { - "status": "auth_error", - "message": "The Notion account for this page needs re-authentication. Please re-authenticate in your connector settings.", - } - - page_id = context.get("page_id") - connector_id_from_context = account.get("id") - document_id = context.get("document_id") - - logger.info( - f"Requesting approval for deleting Notion page: '{page_title}' (page_id={page_id}, delete_from_kb={delete_from_kb})" - ) - - result = request_approval( - action_type="notion_page_deletion", - tool_name="delete_notion_page", - params={ - "page_id": page_id, - "connector_id": connector_id_from_context, - "delete_from_kb": delete_from_kb, - }, - context=context, - ) - - if result.rejected: - logger.info("Notion page deletion rejected by user") - return { - "status": "rejected", - "message": "User declined. Do not retry or suggest alternatives.", - } - - final_page_id = result.params.get("page_id", page_id) - final_connector_id = result.params.get( - "connector_id", connector_id_from_context - ) - final_delete_from_kb = result.params.get( - "delete_from_kb", delete_from_kb - ) - - logger.info( - f"Deleting Notion page with final params: page_id={final_page_id}, connector_id={final_connector_id}, delete_from_kb={final_delete_from_kb}" - ) - - from sqlalchemy.future import select - - from app.db import SearchSourceConnector, SearchSourceConnectorType - - # Validate the connector - if final_connector_id: - result = await db_session.execute( - select(SearchSourceConnector).filter( - SearchSourceConnector.id == final_connector_id, - SearchSourceConnector.search_space_id == search_space_id, - SearchSourceConnector.user_id == user_id, - SearchSourceConnector.connector_type - == SearchSourceConnectorType.NOTION_CONNECTOR, - ) - ) - connector = result.scalars().first() - - if not connector: - logger.error( - f"Invalid connector_id={final_connector_id} for search_space_id={search_space_id}" - ) - return { - "status": "error", - "message": "Selected Notion account is invalid or has been disconnected. Please select a valid account.", - } - actual_connector_id = connector.id - logger.info(f"Validated Notion connector: id={actual_connector_id}") - else: - logger.error("No connector found for this page") - return { - "status": "error", - "message": "No connector found for this page.", - } - - # Create connector instance - notion_connector = NotionHistoryConnector( - session=db_session, - connector_id=actual_connector_id, - ) - - # Delete the page from Notion - result = await notion_connector.delete_page(page_id=final_page_id) - logger.info( - f"delete_page result: {result.get('status')} - {result.get('message', '')}" - ) - - # If deletion was successful and user wants to delete from KB - deleted_from_kb = False - if ( - result.get("status") == "success" - and final_delete_from_kb - and document_id - ): - try: - from sqlalchemy.future import select - - from app.db import Document - - # Get the document - doc_result = await db_session.execute( - select(Document).filter(Document.id == document_id) - ) - document = doc_result.scalars().first() - - if document: - await db_session.delete(document) - await db_session.commit() - deleted_from_kb = True - logger.info( - f"Deleted document {document_id} from knowledge base" - ) - else: - logger.warning(f"Document {document_id} not found in KB") - except Exception as e: - logger.error(f"Failed to delete document from KB: {e}") - await db_session.rollback() - result["warning"] = ( - f"Page deleted from Notion, but failed to remove from knowledge base: {e!s}" - ) - - # Update result with KB deletion status - if result.get("status") == "success": - result["deleted_from_kb"] = deleted_from_kb - if deleted_from_kb: - result["message"] = ( - f"{result.get('message', '')} (also removed from knowledge base)" - ) - - return result - - except Exception as e: - from langgraph.errors import GraphInterrupt - - if isinstance(e, GraphInterrupt): - raise - - logger.error(f"Error deleting Notion page: {e}", exc_info=True) - error_str = str(e).lower() - if isinstance(e, NotionAPIError) and ( - "401" in error_str or "unauthorized" in error_str - ): - return { - "status": "auth_error", - "message": str(e), - "connector_id": connector_id_from_context - if "connector_id_from_context" in dir() - else None, - "connector_type": "notion", - } - if isinstance(e, ValueError | NotionAPIError): - message = str(e) - else: - message = ( - "Something went wrong while deleting the page. Please try again." - ) - return {"status": "error", "message": message} - - return delete_notion_page diff --git a/surfsense_backend/app/agents/shared/tools/notion/update_page.py b/surfsense_backend/app/agents/shared/tools/notion/update_page.py deleted file mode 100644 index ed4991052..000000000 --- a/surfsense_backend/app/agents/shared/tools/notion/update_page.py +++ /dev/null @@ -1,276 +0,0 @@ -import logging -from typing import Any - -from langchain_core.tools import tool -from sqlalchemy.ext.asyncio import AsyncSession - -from app.agents.shared.tools.hitl import request_approval -from app.connectors.notion_history import NotionAPIError, NotionHistoryConnector -from app.db import async_session_maker -from app.services.notion import NotionToolMetadataService - -logger = logging.getLogger(__name__) - - -def create_update_notion_page_tool( - db_session: AsyncSession | None = None, - search_space_id: int | None = None, - user_id: str | None = None, - connector_id: int | None = None, -): - """ - Factory function to create the update_notion_page tool. - - The tool acquires its own short-lived ``AsyncSession`` per call via - :data:`async_session_maker` so the closure is safe to share across - HTTP requests by the compiled-agent cache (see - ``create_create_notion_page_tool`` for the full rationale). - - Args: - db_session: Reserved for registry compatibility. Per-call sessions - are opened via :data:`async_session_maker` inside the tool body. - search_space_id: Search space ID to find the Notion connector - user_id: User ID for fetching user-specific context - connector_id: Optional specific connector ID (if known) - - Returns: - Configured update_notion_page tool - """ - del db_session # per-call session — see docstring - - @tool - async def update_notion_page( - page_title: str, - content: str | None = None, - ) -> dict[str, Any]: - """Update an existing Notion page by appending new content. - - Use this tool when the user asks you to add content to, modify, or update - a Notion page. The new content will be appended to the existing page content. - The user MUST specify what to add before you call this tool. If the - request is vague, ask what content they want added. - - Args: - page_title: The title of the Notion page to update. - content: Optional markdown content to append to the page body (supports headings, lists, paragraphs). - Generate this yourself based on the user's request. - - Returns: - Dictionary with: - - status: "success", "rejected", "not_found", or "error" - - page_id: Updated page ID (if success) - - url: URL to the updated page (if success) - - title: Current page title (if success) - - message: Result message - - IMPORTANT: - - If status is "rejected", the user explicitly declined the action. - Respond with a brief acknowledgment (e.g., "Understood, I didn't update the page.") - and move on. Do NOT ask for alternatives or troubleshoot. - - If status is "not_found", inform the user conversationally using the exact message provided. - Example: "I couldn't find the page '[page_title]' in your indexed Notion pages. [message details]" - Do NOT treat this as an error. Do NOT invent information. Simply relay the message and - ask the user to verify the page title or check if it's been indexed. - Examples: - - "Add today's meeting notes to the 'Meeting Notes' Notion page" - - "Update the 'Project Plan' page with a status update on phase 1" - """ - logger.info( - f"update_notion_page called: page_title='{page_title}', content_length={len(content) if content else 0}" - ) - - if search_space_id is None or user_id is None: - logger.error( - "Notion tool not properly configured - missing required parameters" - ) - return { - "status": "error", - "message": "Notion tool not properly configured. Please contact support.", - } - - if not content or not content.strip(): - logger.error(f"Empty content provided for page '{page_title}'") - return { - "status": "error", - "message": "Content is required to update the page. Please provide the actual content you want to add.", - } - - try: - async with async_session_maker() as db_session: - metadata_service = NotionToolMetadataService(db_session) - context = await metadata_service.get_update_context( - search_space_id, user_id, page_title - ) - - if "error" in context: - error_msg = context["error"] - # Check if it's a "not found" error (softer handling for LLM) - if "not found" in error_msg.lower(): - logger.warning(f"Page not found: {error_msg}") - return { - "status": "not_found", - "message": error_msg, - } - else: - logger.error(f"Failed to fetch update context: {error_msg}") - return { - "status": "error", - "message": error_msg, - } - - account = context.get("account", {}) - if account.get("auth_expired"): - logger.warning( - "Notion account %s has expired authentication", - account.get("id"), - ) - return { - "status": "auth_error", - "message": "The Notion account for this page needs re-authentication. Please re-authenticate in your connector settings.", - } - - page_id = context.get("page_id") - document_id = context.get("document_id") - connector_id_from_context = context.get("account", {}).get("id") - - logger.info( - f"Requesting approval for updating Notion page: '{page_title}' (page_id={page_id})" - ) - result = request_approval( - action_type="notion_page_update", - tool_name="update_notion_page", - params={ - "page_id": page_id, - "content": content, - "connector_id": connector_id_from_context, - }, - context=context, - ) - - if result.rejected: - logger.info("Notion page update rejected by user") - return { - "status": "rejected", - "message": "User declined. Do not retry or suggest alternatives.", - } - - final_page_id = result.params.get("page_id", page_id) - final_content = result.params.get("content", content) - final_connector_id = result.params.get( - "connector_id", connector_id_from_context - ) - - logger.info( - f"Updating Notion page with final params: page_id={final_page_id}, has_content={final_content is not None}" - ) - - from sqlalchemy.future import select - - from app.db import SearchSourceConnector, SearchSourceConnectorType - - if final_connector_id: - result = await db_session.execute( - select(SearchSourceConnector).filter( - SearchSourceConnector.id == final_connector_id, - SearchSourceConnector.search_space_id == search_space_id, - SearchSourceConnector.user_id == user_id, - SearchSourceConnector.connector_type - == SearchSourceConnectorType.NOTION_CONNECTOR, - ) - ) - connector = result.scalars().first() - - if not connector: - logger.error( - f"Invalid connector_id={final_connector_id} for search_space_id={search_space_id}" - ) - return { - "status": "error", - "message": "Selected Notion account is invalid or has been disconnected. Please select a valid account.", - } - actual_connector_id = connector.id - logger.info(f"Validated Notion connector: id={actual_connector_id}") - else: - logger.error("No connector found for this page") - return { - "status": "error", - "message": "No connector found for this page.", - } - - notion_connector = NotionHistoryConnector( - session=db_session, - connector_id=actual_connector_id, - ) - - result = await notion_connector.update_page( - page_id=final_page_id, - content=final_content, - ) - logger.info( - f"update_page result: {result.get('status')} - {result.get('message', '')}" - ) - - if result.get("status") == "success" and document_id is not None: - from app.services.notion import NotionKBSyncService - - logger.info( - f"Updating knowledge base for document {document_id}..." - ) - kb_service = NotionKBSyncService(db_session) - kb_result = await kb_service.sync_after_update( - document_id=document_id, - appended_content=final_content, - user_id=user_id, - search_space_id=search_space_id, - appended_block_ids=result.get("appended_block_ids"), - ) - - if kb_result["status"] == "success": - result["message"] = ( - f"{result['message']}. Your knowledge base has also been updated." - ) - logger.info( - f"Knowledge base successfully updated for page {final_page_id}" - ) - elif kb_result["status"] == "not_indexed": - result["message"] = ( - f"{result['message']}. This page will be added to your knowledge base in the next scheduled sync." - ) - else: - result["message"] = ( - f"{result['message']}. Your knowledge base will be updated in the next scheduled sync." - ) - logger.warning( - f"KB update failed for page {final_page_id}: {kb_result['message']}" - ) - - return result - - except Exception as e: - from langgraph.errors import GraphInterrupt - - if isinstance(e, GraphInterrupt): - raise - - logger.error(f"Error updating Notion page: {e}", exc_info=True) - error_str = str(e).lower() - if isinstance(e, NotionAPIError) and ( - "401" in error_str or "unauthorized" in error_str - ): - return { - "status": "auth_error", - "message": str(e), - "connector_id": connector_id_from_context - if "connector_id_from_context" in dir() - else None, - "connector_type": "notion", - } - if isinstance(e, ValueError | NotionAPIError): - message = str(e) - else: - message = ( - "Something went wrong while updating the page. Please try again." - ) - return {"status": "error", "message": message} - - return update_notion_page