diff --git a/surfsense_backend/app/agents/new_chat/tools/notion_mcp/__init__.py b/surfsense_backend/app/agents/new_chat/tools/notion_mcp/__init__.py deleted file mode 100644 index 1e1515bfb..000000000 --- a/surfsense_backend/app/agents/new_chat/tools/notion_mcp/__init__.py +++ /dev/null @@ -1,5 +0,0 @@ -"""MCP-backed Notion tool factories. - -Drop-in replacements for ``tools/notion/`` that route through -Notion's hosted MCP server instead of direct API calls. -""" diff --git a/surfsense_backend/app/agents/new_chat/tools/notion_mcp/create_page.py b/surfsense_backend/app/agents/new_chat/tools/notion_mcp/create_page.py deleted file mode 100644 index a73363a65..000000000 --- a/surfsense_backend/app/agents/new_chat/tools/notion_mcp/create_page.py +++ /dev/null @@ -1,205 +0,0 @@ -import logging -from typing import Any - -from langchain_core.tools import tool -from sqlalchemy.ext.asyncio import AsyncSession - -from app.agents.new_chat.tools.hitl import request_approval -from app.services.notion import NotionToolMetadataService - -logger = logging.getLogger(__name__) - - -def _find_mcp_connector(connectors): - """Return the first connector with mcp_mode enabled, or None.""" - for c in connectors: - if (c.config or {}).get("mcp_mode"): - return c - return None - - -def create_create_notion_page_mcp_tool( - db_session: AsyncSession | None = None, - search_space_id: int | None = None, - user_id: str | None = None, - connector_id: int | None = None, -): - @tool - async def create_notion_page( - title: str, - content: str | None = None, - ) -> dict[str, Any]: - """Create a new page in Notion with the given title and content. - - Use this tool when the user asks you to create, save, or publish - something to Notion. The page will be created in the user's - configured Notion workspace. The user MUST specify a topic before you - call this tool. If the request does not contain a topic (e.g. "create a - notion page"), ask what the page should be about. Never call this tool - without a clear topic from the user. - - Args: - title: The title of the Notion page. - content: Optional markdown content for the page body (supports headings, lists, paragraphs). - Generate this yourself based on the user's topic. - - Returns: - Dictionary with: - - status: "success", "rejected", or "error" - - page_id: Created page ID (if success) - - url: URL to the created page (if success) - - title: Page title (if success) - - message: Result message - - IMPORTANT: If status is "rejected", the user explicitly declined the action. - Respond with a brief acknowledgment (e.g., "Understood, I didn't create the page.") - and move on. Do NOT troubleshoot or suggest alternatives. - - Examples: - - "Create a Notion page about our Q2 roadmap" - - "Save a summary of today's discussion to Notion" - """ - logger.info("create_notion_page (MCP) called: title='%s'", title) - - if db_session is None or search_space_id is None or user_id is None: - logger.error("Notion MCP tool not properly configured - missing required parameters") - return { - "status": "error", - "message": "Notion tool not properly configured. Please contact support.", - } - - try: - metadata_service = NotionToolMetadataService(db_session) - context = await metadata_service.get_creation_context(search_space_id, user_id) - - if "error" in context: - logger.error("Failed to fetch creation context: %s", context["error"]) - return {"status": "error", "message": context["error"]} - - accounts = context.get("accounts", []) - if accounts and all(a.get("auth_expired") for a in accounts): - return { - "status": "auth_error", - "message": "All connected Notion accounts need re-authentication. Please re-authenticate in your connector settings.", - "connector_type": "notion", - } - - result = request_approval( - action_type="notion_page_creation", - tool_name="create_notion_page", - params={ - "title": title, - "content": content, - "parent_page_id": None, - "connector_id": connector_id, - }, - context=context, - ) - - if result.rejected: - logger.info("Notion page creation rejected by user") - return { - "status": "rejected", - "message": "User declined. Do not retry or suggest alternatives.", - } - - final_title = result.params.get("title", title) - final_content = result.params.get("content", content) - final_parent_page_id = result.params.get("parent_page_id") - final_connector_id = result.params.get("connector_id", connector_id) - - if not final_title or not final_title.strip(): - return { - "status": "error", - "message": "Page title cannot be empty. Please provide a valid title.", - } - - from sqlalchemy.future import select - - from app.db import SearchSourceConnector, SearchSourceConnectorType - - actual_connector_id = final_connector_id - if actual_connector_id is None: - query_result = await db_session.execute( - select(SearchSourceConnector).filter( - SearchSourceConnector.search_space_id == search_space_id, - SearchSourceConnector.user_id == user_id, - SearchSourceConnector.connector_type == SearchSourceConnectorType.NOTION_CONNECTOR, - ) - ) - connectors = query_result.scalars().all() - connector = _find_mcp_connector(connectors) - - if not connector: - return { - "status": "error", - "message": "No Notion MCP connector found. Please connect Notion (MCP) in your workspace settings.", - } - actual_connector_id = connector.id - else: - query_result = await db_session.execute( - select(SearchSourceConnector).filter( - SearchSourceConnector.id == actual_connector_id, - SearchSourceConnector.search_space_id == search_space_id, - SearchSourceConnector.user_id == user_id, - SearchSourceConnector.connector_type == SearchSourceConnectorType.NOTION_CONNECTOR, - ) - ) - connector = query_result.scalars().first() - if not connector: - return { - "status": "error", - "message": "Selected Notion account is invalid or has been disconnected.", - } - - from app.services.notion_mcp.adapter import NotionMCPAdapter - - adapter = NotionMCPAdapter(session=db_session, connector_id=actual_connector_id) - result = await adapter.create_page( - title=final_title, - content=final_content, - parent_page_id=final_parent_page_id, - ) - logger.info("create_page (MCP) result: %s - %s", result.get("status"), result.get("message", "")) - - if result.get("status") == "success": - kb_message_suffix = "" - try: - from app.services.notion import NotionKBSyncService - - kb_service = NotionKBSyncService(db_session) - kb_result = await kb_service.sync_after_create( - page_id=result.get("page_id"), - page_title=result.get("title", final_title), - page_url=result.get("url"), - content=final_content, - connector_id=actual_connector_id, - search_space_id=search_space_id, - user_id=user_id, - ) - if kb_result["status"] == "success": - kb_message_suffix = " Your knowledge base has also been updated." - else: - kb_message_suffix = " This page will be added to your knowledge base in the next scheduled sync." - except Exception as kb_err: - logger.warning("KB sync after create failed: %s", kb_err) - kb_message_suffix = " This page will be added to your knowledge base in the next scheduled sync." - - result["message"] = result.get("message", "") + kb_message_suffix - - return result - - except Exception as e: - from langgraph.errors import GraphInterrupt - - if isinstance(e, GraphInterrupt): - raise - - logger.error("Error creating Notion page (MCP): %s", e, exc_info=True) - if isinstance(e, ValueError): - message = str(e) - else: - message = "Something went wrong while creating the page. Please try again." - return {"status": "error", "message": message} - - return create_notion_page diff --git a/surfsense_backend/app/agents/new_chat/tools/notion_mcp/delete_page.py b/surfsense_backend/app/agents/new_chat/tools/notion_mcp/delete_page.py deleted file mode 100644 index c0cf7642b..000000000 --- a/surfsense_backend/app/agents/new_chat/tools/notion_mcp/delete_page.py +++ /dev/null @@ -1,173 +0,0 @@ -import logging -from typing import Any - -from langchain_core.tools import tool -from sqlalchemy.ext.asyncio import AsyncSession - -from app.agents.new_chat.tools.hitl import request_approval -from app.services.notion.tool_metadata_service import NotionToolMetadataService - -logger = logging.getLogger(__name__) - - -def create_delete_notion_page_mcp_tool( - db_session: AsyncSession | None = None, - search_space_id: int | None = None, - user_id: str | None = None, - connector_id: int | None = None, -): - @tool - async def delete_notion_page( - page_title: str, - delete_from_kb: bool = False, - ) -> dict[str, Any]: - """Delete (archive) a Notion page. - - Use this tool when the user asks you to delete, remove, or archive - a Notion page. Note that Notion doesn't permanently delete pages, - it archives them (they can be restored from trash). - - Args: - page_title: The title of the Notion page to delete. - delete_from_kb: Whether to also remove the page from the knowledge base. - Default is False. - - Returns: - Dictionary with: - - status: "success", "rejected", "not_found", or "error" - - page_id: Deleted page ID (if success) - - message: Success or error message - - deleted_from_kb: Whether the page was also removed from knowledge base (if success) - - Examples: - - "Delete the 'Meeting Notes' Notion page" - - "Remove the 'Old Project Plan' Notion page" - """ - logger.info( - "delete_notion_page (MCP) called: page_title='%s', delete_from_kb=%s", - page_title, - delete_from_kb, - ) - - if db_session is None or search_space_id is None or user_id is None: - logger.error("Notion MCP tool not properly configured - missing required parameters") - return { - "status": "error", - "message": "Notion tool not properly configured. Please contact support.", - } - - try: - metadata_service = NotionToolMetadataService(db_session) - context = await metadata_service.get_delete_context(search_space_id, user_id, page_title) - - if "error" in context: - error_msg = context["error"] - if "not found" in error_msg.lower(): - return {"status": "not_found", "message": error_msg} - return {"status": "error", "message": error_msg} - - account = context.get("account", {}) - if account.get("auth_expired"): - return { - "status": "auth_error", - "message": "The Notion account for this page needs re-authentication. Please re-authenticate in your connector settings.", - } - - page_id = context.get("page_id") - connector_id_from_context = account.get("id") - document_id = context.get("document_id") - - result = request_approval( - action_type="notion_page_deletion", - tool_name="delete_notion_page", - params={ - "page_id": page_id, - "connector_id": connector_id_from_context, - "delete_from_kb": delete_from_kb, - }, - context=context, - ) - - if result.rejected: - logger.info("Notion page deletion rejected by user") - return { - "status": "rejected", - "message": "User declined. Do not retry or suggest alternatives.", - } - - final_page_id = result.params.get("page_id", page_id) - final_connector_id = result.params.get("connector_id", connector_id_from_context) - final_delete_from_kb = result.params.get("delete_from_kb", delete_from_kb) - - from sqlalchemy.future import select - - from app.db import SearchSourceConnector, SearchSourceConnectorType - - if final_connector_id: - query_result = await db_session.execute( - select(SearchSourceConnector).filter( - SearchSourceConnector.id == final_connector_id, - SearchSourceConnector.search_space_id == search_space_id, - SearchSourceConnector.user_id == user_id, - SearchSourceConnector.connector_type == SearchSourceConnectorType.NOTION_CONNECTOR, - ) - ) - connector = query_result.scalars().first() - if not connector: - return { - "status": "error", - "message": "Selected Notion account is invalid or has been disconnected.", - } - actual_connector_id = connector.id - else: - return {"status": "error", "message": "No connector found for this page."} - - from app.services.notion_mcp.adapter import NotionMCPAdapter - - adapter = NotionMCPAdapter(session=db_session, connector_id=actual_connector_id) - result = await adapter.delete_page(page_id=final_page_id) - logger.info("delete_page (MCP) result: %s - %s", result.get("status"), result.get("message", "")) - - deleted_from_kb = False - if result.get("status") == "success" and final_delete_from_kb and document_id: - try: - from sqlalchemy.future import select - - from app.db import Document - - doc_result = await db_session.execute( - select(Document).filter(Document.id == document_id) - ) - document = doc_result.scalars().first() - - if document: - await db_session.delete(document) - await db_session.commit() - deleted_from_kb = True - logger.info("Deleted document %s from knowledge base", document_id) - except Exception as e: - logger.error("Failed to delete document from KB: %s", e) - await db_session.rollback() - result["warning"] = f"Page deleted from Notion, but failed to remove from knowledge base: {e!s}" - - if result.get("status") == "success": - result["deleted_from_kb"] = deleted_from_kb - if deleted_from_kb: - result["message"] = f"{result.get('message', '')} (also removed from knowledge base)" - - return result - - except Exception as e: - from langgraph.errors import GraphInterrupt - - if isinstance(e, GraphInterrupt): - raise - - logger.error("Error deleting Notion page (MCP): %s", e, exc_info=True) - if isinstance(e, ValueError): - message = str(e) - else: - message = "Something went wrong while deleting the page. Please try again." - return {"status": "error", "message": message} - - return delete_notion_page diff --git a/surfsense_backend/app/agents/new_chat/tools/notion_mcp/update_page.py b/surfsense_backend/app/agents/new_chat/tools/notion_mcp/update_page.py deleted file mode 100644 index 28599cbae..000000000 --- a/surfsense_backend/app/agents/new_chat/tools/notion_mcp/update_page.py +++ /dev/null @@ -1,179 +0,0 @@ -import logging -from typing import Any - -from langchain_core.tools import tool -from sqlalchemy.ext.asyncio import AsyncSession - -from app.agents.new_chat.tools.hitl import request_approval -from app.services.notion import NotionToolMetadataService - -logger = logging.getLogger(__name__) - - -def create_update_notion_page_mcp_tool( - db_session: AsyncSession | None = None, - search_space_id: int | None = None, - user_id: str | None = None, - connector_id: int | None = None, -): - @tool - async def update_notion_page( - page_title: str, - content: str | None = None, - ) -> dict[str, Any]: - """Update an existing Notion page by appending new content. - - Use this tool when the user asks you to add content to, modify, or update - a Notion page. The new content will be appended to the existing page content. - The user MUST specify what to add before you call this tool. If the - request is vague, ask what content they want added. - - Args: - page_title: The title of the Notion page to update. - content: Optional markdown content to append to the page body (supports headings, lists, paragraphs). - Generate this yourself based on the user's request. - - Returns: - Dictionary with: - - status: "success", "rejected", "not_found", or "error" - - page_id: Updated page ID (if success) - - url: URL to the updated page (if success) - - title: Current page title (if success) - - message: Result message - - IMPORTANT: - - If status is "rejected", the user explicitly declined the action. - Respond with a brief acknowledgment (e.g., "Understood, I didn't update the page.") - and move on. Do NOT ask for alternatives or troubleshoot. - - If status is "not_found", inform the user conversationally using the exact message provided. - - Examples: - - "Add today's meeting notes to the 'Meeting Notes' Notion page" - - "Update the 'Project Plan' page with a status update on phase 1" - """ - logger.info( - "update_notion_page (MCP) called: page_title='%s', content_length=%d", - page_title, - len(content) if content else 0, - ) - - if db_session is None or search_space_id is None or user_id is None: - logger.error("Notion MCP tool not properly configured - missing required parameters") - return { - "status": "error", - "message": "Notion tool not properly configured. Please contact support.", - } - - if not content or not content.strip(): - return { - "status": "error", - "message": "Content is required to update the page. Please provide the actual content you want to add.", - } - - try: - metadata_service = NotionToolMetadataService(db_session) - context = await metadata_service.get_update_context(search_space_id, user_id, page_title) - - if "error" in context: - error_msg = context["error"] - if "not found" in error_msg.lower(): - return {"status": "not_found", "message": error_msg} - return {"status": "error", "message": error_msg} - - account = context.get("account", {}) - if account.get("auth_expired"): - return { - "status": "auth_error", - "message": "The Notion account for this page needs re-authentication. Please re-authenticate in your connector settings.", - } - - page_id = context.get("page_id") - document_id = context.get("document_id") - connector_id_from_context = account.get("id") - - result = request_approval( - action_type="notion_page_update", - tool_name="update_notion_page", - params={ - "page_id": page_id, - "content": content, - "connector_id": connector_id_from_context, - }, - context=context, - ) - - if result.rejected: - logger.info("Notion page update rejected by user") - return { - "status": "rejected", - "message": "User declined. Do not retry or suggest alternatives.", - } - - final_page_id = result.params.get("page_id", page_id) - final_content = result.params.get("content", content) - final_connector_id = result.params.get("connector_id", connector_id_from_context) - - from sqlalchemy.future import select - - from app.db import SearchSourceConnector, SearchSourceConnectorType - - if final_connector_id: - query_result = await db_session.execute( - select(SearchSourceConnector).filter( - SearchSourceConnector.id == final_connector_id, - SearchSourceConnector.search_space_id == search_space_id, - SearchSourceConnector.user_id == user_id, - SearchSourceConnector.connector_type == SearchSourceConnectorType.NOTION_CONNECTOR, - ) - ) - connector = query_result.scalars().first() - if not connector: - return { - "status": "error", - "message": "Selected Notion account is invalid or has been disconnected.", - } - actual_connector_id = connector.id - else: - return {"status": "error", "message": "No connector found for this page."} - - from app.services.notion_mcp.adapter import NotionMCPAdapter - - adapter = NotionMCPAdapter(session=db_session, connector_id=actual_connector_id) - result = await adapter.update_page(page_id=final_page_id, content=final_content) - logger.info("update_page (MCP) result: %s - %s", result.get("status"), result.get("message", "")) - - if result.get("status") == "success" and document_id is not None: - from app.services.notion import NotionKBSyncService - - kb_service = NotionKBSyncService(db_session) - kb_result = await kb_service.sync_after_update( - document_id=document_id, - appended_content=final_content, - user_id=user_id, - search_space_id=search_space_id, - appended_block_ids=result.get("appended_block_ids"), - ) - - if kb_result["status"] == "success": - result["message"] = f"{result['message']}. Your knowledge base has also been updated." - elif kb_result["status"] == "not_indexed": - result["message"] = f"{result['message']}. This page will be added to your knowledge base in the next scheduled sync." - else: - result["message"] = f"{result['message']}. Your knowledge base will be updated in the next scheduled sync." - - return result - - except Exception as e: - from langgraph.errors import GraphInterrupt - - if isinstance(e, GraphInterrupt): - raise - - logger.error("Error updating Notion page (MCP): %s", e, exc_info=True) - if isinstance(e, ValueError): - message = str(e) - else: - message = "Something went wrong while updating the page. Please try again." - return {"status": "error", "message": message} - - return update_notion_page diff --git a/surfsense_backend/app/routes/notion_mcp_connector_route.py b/surfsense_backend/app/routes/notion_mcp_connector_route.py deleted file mode 100644 index b9305cd74..000000000 --- a/surfsense_backend/app/routes/notion_mcp_connector_route.py +++ /dev/null @@ -1,486 +0,0 @@ -"""Notion MCP Connector OAuth Routes. - -Handles OAuth 2.0 + PKCE authentication for Notion's hosted MCP server. -Based on: https://developers.notion.com/guides/mcp/build-mcp-client - -This creates connectors with the same ``NOTION_CONNECTOR`` type as the -existing direct-API connector, but with ``mcp_mode: True`` in the config -so the adapter layer knows to route through MCP. -""" - -import logging -from uuid import UUID - -from fastapi import APIRouter, Depends, HTTPException, Request -from fastapi.responses import RedirectResponse -from sqlalchemy import select -from sqlalchemy.exc import IntegrityError -from sqlalchemy.ext.asyncio import AsyncSession -from sqlalchemy.orm.attributes import flag_modified - -from app.config import config -from app.db import ( - SearchSourceConnector, - SearchSourceConnectorType, - User, - get_async_session, -) -from app.services.notion_mcp.oauth import ( - ClientCredentials, - OAuthMetadata, - build_authorization_url, - discover_oauth_metadata, - exchange_code_for_tokens, - refresh_access_token, - register_client, -) -from app.users import current_active_user -from app.utils.connector_naming import ( - check_duplicate_connector, - extract_identifier_from_credentials, - generate_unique_connector_name, -) -from app.utils.oauth_security import OAuthStateManager, TokenEncryption, generate_pkce_pair - -logger = logging.getLogger(__name__) - -router = APIRouter() - -_state_manager: OAuthStateManager | None = None -_token_encryption: TokenEncryption | None = None -_oauth_metadata: OAuthMetadata | None = None - - -def _get_state_manager() -> OAuthStateManager: - global _state_manager - if _state_manager is None: - if not config.SECRET_KEY: - raise ValueError("SECRET_KEY must be set for OAuth security") - _state_manager = OAuthStateManager(config.SECRET_KEY) - return _state_manager - - -def _get_token_encryption() -> TokenEncryption: - global _token_encryption - if _token_encryption is None: - if not config.SECRET_KEY: - raise ValueError("SECRET_KEY must be set for token encryption") - _token_encryption = TokenEncryption(config.SECRET_KEY) - return _token_encryption - - -async def _get_oauth_metadata() -> OAuthMetadata: - global _oauth_metadata - if _oauth_metadata is None: - _oauth_metadata = await discover_oauth_metadata() - return _oauth_metadata - - -async def _fetch_workspace_info(access_token: str) -> dict: - """Fetch workspace metadata using the Notion API with the fresh token. - - The ``/v1/users/me`` endpoint returns bot info including workspace_name. - This populates connector config fields so naming and metadata services - work correctly. - """ - try: - import httpx - - async with httpx.AsyncClient(timeout=15.0) as client: - resp = await client.get( - "https://api.notion.com/v1/users/me", - headers={ - "Authorization": f"Bearer {access_token}", - "Notion-Version": "2022-06-28", - }, - ) - if resp.is_success: - data = resp.json() - bot_info = data.get("bot", {}) - return { - "bot_id": data.get("id"), - "workspace_name": bot_info.get("workspace_name", "Notion Workspace"), - "workspace_icon": data.get("avatar_url") or "📄", - } - except Exception as e: - logger.warning("Failed to fetch workspace info: %s", e) - return {} - - -NOTION_MCP_REDIRECT_URI = None - - -def _get_redirect_uri() -> str: - global NOTION_MCP_REDIRECT_URI - if NOTION_MCP_REDIRECT_URI is None: - backend = config.BACKEND_URL or "http://localhost:8000" - NOTION_MCP_REDIRECT_URI = f"{backend}/api/v1/auth/notion-mcp/connector/callback" - return NOTION_MCP_REDIRECT_URI - - -# --------------------------------------------------------------------------- -# Route: initiate OAuth -# --------------------------------------------------------------------------- - - -@router.get("/auth/notion-mcp/connector/add") -async def connect_notion_mcp( - space_id: int, - user: User = Depends(current_active_user), -): - """Initiate Notion MCP OAuth + PKCE flow.""" - if not config.SECRET_KEY: - raise HTTPException(status_code=500, detail="SECRET_KEY not configured.") - - try: - metadata = await _get_oauth_metadata() - - redirect_uri = _get_redirect_uri() - credentials = await register_client(metadata, redirect_uri) - - code_verifier, code_challenge = generate_pkce_pair() - - state_manager = _get_state_manager() - state_encoded = state_manager.generate_secure_state( - space_id, - user.id, - code_verifier=code_verifier, - mcp_client_id=credentials.client_id, - mcp_client_secret=credentials.client_secret or "", - ) - - auth_url = build_authorization_url( - metadata=metadata, - client_id=credentials.client_id, - redirect_uri=redirect_uri, - code_challenge=code_challenge, - state=state_encoded, - ) - - logger.info("Generated Notion MCP OAuth URL for user %s, space %s", user.id, space_id) - return {"auth_url": auth_url} - - except Exception as e: - logger.error("Failed to initiate Notion MCP OAuth: %s", e, exc_info=True) - raise HTTPException( - status_code=500, detail=f"Failed to initiate Notion MCP OAuth: {e!s}" - ) from e - - -# --------------------------------------------------------------------------- -# Route: re-authenticate existing connector -# --------------------------------------------------------------------------- - - -@router.get("/auth/notion-mcp/connector/reauth") -async def reauth_notion_mcp( - space_id: int, - connector_id: int, - return_url: str | None = None, - user: User = Depends(current_active_user), - session: AsyncSession = Depends(get_async_session), -): - """Initiate re-authentication for an existing Notion MCP connector.""" - result = await session.execute( - select(SearchSourceConnector).filter( - SearchSourceConnector.id == connector_id, - SearchSourceConnector.user_id == user.id, - SearchSourceConnector.search_space_id == space_id, - SearchSourceConnector.connector_type == SearchSourceConnectorType.NOTION_CONNECTOR, - ) - ) - connector = result.scalars().first() - if not connector: - raise HTTPException(status_code=404, detail="Connector not found or access denied") - - if not config.SECRET_KEY: - raise HTTPException(status_code=500, detail="SECRET_KEY not configured.") - - try: - metadata = await _get_oauth_metadata() - redirect_uri = _get_redirect_uri() - credentials = await register_client(metadata, redirect_uri) - - code_verifier, code_challenge = generate_pkce_pair() - - extra: dict = { - "connector_id": connector_id, - "code_verifier": code_verifier, - "mcp_client_id": credentials.client_id, - "mcp_client_secret": credentials.client_secret or "", - } - if return_url and return_url.startswith("/"): - extra["return_url"] = return_url - - state_manager = _get_state_manager() - state_encoded = state_manager.generate_secure_state(space_id, user.id, **extra) - - auth_url = build_authorization_url( - metadata=metadata, - client_id=credentials.client_id, - redirect_uri=redirect_uri, - code_challenge=code_challenge, - state=state_encoded, - ) - - logger.info("Initiating Notion MCP re-auth for user %s, connector %s", user.id, connector_id) - return {"auth_url": auth_url} - - except HTTPException: - raise - except Exception as e: - logger.error("Failed to initiate Notion MCP re-auth: %s", e, exc_info=True) - raise HTTPException( - status_code=500, detail=f"Failed to initiate Notion MCP re-auth: {e!s}" - ) from e - - -# --------------------------------------------------------------------------- -# Route: OAuth callback -# --------------------------------------------------------------------------- - - -@router.get("/auth/notion-mcp/connector/callback") -async def notion_mcp_callback( - request: Request, - code: str | None = None, - error: str | None = None, - state: str | None = None, - session: AsyncSession = Depends(get_async_session), -): - """Handle the OAuth callback from Notion's MCP authorization server.""" - if error: - logger.warning("Notion MCP OAuth error: %s", error) - space_id = None - if state: - try: - data = _get_state_manager().validate_state(state) - space_id = data.get("space_id") - except Exception: - pass - if space_id: - return RedirectResponse( - url=f"{config.NEXT_FRONTEND_URL}/dashboard/{space_id}/connectors/callback?error=notion_mcp_oauth_denied" - ) - return RedirectResponse( - url=f"{config.NEXT_FRONTEND_URL}/dashboard?error=notion_mcp_oauth_denied" - ) - - if not code: - raise HTTPException(status_code=400, detail="Missing authorization code") - if not state: - raise HTTPException(status_code=400, detail="Missing state parameter") - - state_manager = _get_state_manager() - try: - data = state_manager.validate_state(state) - except HTTPException: - raise - except Exception as e: - raise HTTPException(status_code=400, detail=f"Invalid state: {e!s}") from e - - user_id = UUID(data["user_id"]) - space_id = data["space_id"] - code_verifier = data.get("code_verifier") - mcp_client_id = data.get("mcp_client_id") - mcp_client_secret = data.get("mcp_client_secret") or None - - if not code_verifier or not mcp_client_id: - raise HTTPException(status_code=400, detail="Missing PKCE or client data in state") - - try: - metadata = await _get_oauth_metadata() - redirect_uri = _get_redirect_uri() - - token_set = await exchange_code_for_tokens( - code=code, - code_verifier=code_verifier, - metadata=metadata, - client_id=mcp_client_id, - redirect_uri=redirect_uri, - client_secret=mcp_client_secret, - ) - except Exception as e: - logger.error("Notion MCP token exchange failed: %s", e, exc_info=True) - raise HTTPException(status_code=400, detail=f"Token exchange failed: {e!s}") from e - - token_encryption = _get_token_encryption() - - workspace_info = await _fetch_workspace_info(token_set.access_token) - - connector_config = { - "access_token": token_encryption.encrypt_token(token_set.access_token), - "refresh_token": token_encryption.encrypt_token(token_set.refresh_token) - if token_set.refresh_token - else None, - "expires_in": token_set.expires_in, - "expires_at": token_set.expires_at.isoformat() if token_set.expires_at else None, - "workspace_id": workspace_info.get("workspace_id"), - "workspace_name": workspace_info.get("workspace_name", "Notion Workspace"), - "workspace_icon": workspace_info.get("workspace_icon", "📄"), - "bot_id": workspace_info.get("bot_id"), - "mcp_mode": True, - "mcp_client_id": mcp_client_id, - "mcp_client_secret": token_encryption.encrypt_token(mcp_client_secret) - if mcp_client_secret - else None, - "_token_encrypted": True, - } - - reauth_connector_id = data.get("connector_id") - reauth_return_url = data.get("return_url") - - # --- Re-auth path --- - if reauth_connector_id: - result = await session.execute( - select(SearchSourceConnector).filter( - SearchSourceConnector.id == reauth_connector_id, - SearchSourceConnector.user_id == user_id, - SearchSourceConnector.search_space_id == space_id, - SearchSourceConnector.connector_type == SearchSourceConnectorType.NOTION_CONNECTOR, - ) - ) - db_connector = result.scalars().first() - if not db_connector: - raise HTTPException(status_code=404, detail="Connector not found during re-auth") - - db_connector.config = connector_config - flag_modified(db_connector, "config") - await session.commit() - await session.refresh(db_connector) - - logger.info("Re-authenticated Notion MCP connector %s for user %s", db_connector.id, user_id) - if reauth_return_url and reauth_return_url.startswith("/"): - return RedirectResponse(url=f"{config.NEXT_FRONTEND_URL}{reauth_return_url}") - return RedirectResponse( - url=f"{config.NEXT_FRONTEND_URL}/dashboard/{space_id}/connectors/callback?success=true&connector=notion-connector&connectorId={db_connector.id}" - ) - - # --- New connector path --- - connector_identifier = extract_identifier_from_credentials( - SearchSourceConnectorType.NOTION_CONNECTOR, connector_config - ) - - is_duplicate = await check_duplicate_connector( - session, - SearchSourceConnectorType.NOTION_CONNECTOR, - space_id, - user_id, - connector_identifier, - ) - if is_duplicate: - logger.warning("Duplicate Notion MCP connector for user %s", user_id) - return RedirectResponse( - url=f"{config.NEXT_FRONTEND_URL}/dashboard/{space_id}/connectors/callback?error=duplicate_account&connector=notion-connector" - ) - - connector_name = await generate_unique_connector_name( - session, - SearchSourceConnectorType.NOTION_CONNECTOR, - space_id, - user_id, - connector_identifier, - ) - - new_connector = SearchSourceConnector( - name=connector_name, - connector_type=SearchSourceConnectorType.NOTION_CONNECTOR, - is_indexable=True, - config=connector_config, - search_space_id=space_id, - user_id=user_id, - ) - session.add(new_connector) - - try: - await session.commit() - logger.info("Created Notion MCP connector for user %s in space %s", user_id, space_id) - return RedirectResponse( - url=f"{config.NEXT_FRONTEND_URL}/dashboard/{space_id}/connectors/callback?success=true&connector=notion-connector&connectorId={new_connector.id}" - ) - except IntegrityError as e: - await session.rollback() - raise HTTPException(status_code=409, detail=f"Database integrity error: {e!s}") from e - except Exception as e: - await session.rollback() - raise HTTPException( - status_code=500, detail=f"Failed to create connector: {e!s}" - ) from e - - -# --------------------------------------------------------------------------- -# Token refresh helper (used by the adapter) -# --------------------------------------------------------------------------- - - -async def refresh_notion_mcp_token( - session: AsyncSession, - connector: SearchSourceConnector, -) -> SearchSourceConnector: - """Refresh the MCP access token for a connector. - - Handles refresh-token rotation: persists both new access_token - and new refresh_token atomically. - """ - token_encryption = _get_token_encryption() - - cfg = connector.config or {} - encrypted_refresh = cfg.get("refresh_token") - if not encrypted_refresh: - raise HTTPException(status_code=400, detail="No refresh token available. Please re-authenticate.") - - try: - refresh_token = token_encryption.decrypt_token(encrypted_refresh) - except Exception as e: - raise HTTPException(status_code=500, detail=f"Failed to decrypt refresh token: {e!s}") from e - - mcp_client_id = cfg.get("mcp_client_id") - mcp_client_secret_encrypted = cfg.get("mcp_client_secret") - mcp_client_secret = ( - token_encryption.decrypt_token(mcp_client_secret_encrypted) - if mcp_client_secret_encrypted - else None - ) - - if not mcp_client_id: - raise HTTPException(status_code=400, detail="Missing MCP client_id. Please re-authenticate.") - - metadata = await _get_oauth_metadata() - - try: - token_set = await refresh_access_token( - refresh_token=refresh_token, - metadata=metadata, - client_id=mcp_client_id, - client_secret=mcp_client_secret, - ) - except ValueError as e: - if "REAUTH_REQUIRED" in str(e): - connector.config = {**connector.config, "auth_expired": True} - flag_modified(connector, "config") - await session.commit() - await session.refresh(connector) - raise HTTPException( - status_code=401, detail="Notion MCP authentication expired. Please re-authenticate." - ) from e - raise HTTPException(status_code=400, detail=f"Token refresh failed: {e!s}") from e - - updated_config = { - **connector.config, - "access_token": token_encryption.encrypt_token(token_set.access_token), - "refresh_token": token_encryption.encrypt_token(token_set.refresh_token) - if token_set.refresh_token - else connector.config.get("refresh_token"), - "expires_in": token_set.expires_in, - "expires_at": token_set.expires_at.isoformat() if token_set.expires_at else None, - "_token_encrypted": True, - } - updated_config.pop("auth_expired", None) - - connector.config = updated_config - flag_modified(connector, "config") - await session.commit() - await session.refresh(connector) - - logger.info("Refreshed Notion MCP token for connector %s", connector.id) - return connector diff --git a/surfsense_backend/app/services/notion_mcp/__init__.py b/surfsense_backend/app/services/notion_mcp/__init__.py deleted file mode 100644 index 6a57500b6..000000000 --- a/surfsense_backend/app/services/notion_mcp/__init__.py +++ /dev/null @@ -1,27 +0,0 @@ -"""Notion MCP integration. - -Routes Notion operations through Notion's hosted MCP server -at https://mcp.notion.com/mcp instead of direct API calls. -""" - -from sqlalchemy import select -from sqlalchemy.ext.asyncio import AsyncSession - -from app.db import SearchSourceConnector, SearchSourceConnectorType - - -async def has_mcp_notion_connector( - session: AsyncSession, - search_space_id: int, -) -> bool: - """Check whether the search space has at least one MCP-mode Notion connector.""" - result = await session.execute( - select(SearchSourceConnector.id, SearchSourceConnector.config).filter( - SearchSourceConnector.search_space_id == search_space_id, - SearchSourceConnector.connector_type == SearchSourceConnectorType.NOTION_CONNECTOR, - ) - ) - for _, config in result.all(): - if isinstance(config, dict) and config.get("mcp_mode"): - return True - return False diff --git a/surfsense_backend/app/services/notion_mcp/adapter.py b/surfsense_backend/app/services/notion_mcp/adapter.py deleted file mode 100644 index 76eac6305..000000000 --- a/surfsense_backend/app/services/notion_mcp/adapter.py +++ /dev/null @@ -1,253 +0,0 @@ -"""Notion MCP Adapter. - -Connects to Notion's hosted MCP server at ``https://mcp.notion.com/mcp`` -and exposes the same method signatures as ``NotionHistoryConnector``'s -write operations so that tool factories can swap with a one-line change. - -Includes an optional fallback to ``NotionHistoryConnector`` when the MCP -server returns known serialization errors (GitHub issues #215, #216). -""" - -import logging -from datetime import UTC, datetime -from typing import Any - -from mcp import ClientSession -from mcp.client.streamable_http import streamablehttp_client -from sqlalchemy import select -from sqlalchemy.ext.asyncio import AsyncSession - -from app.config import config -from app.db import SearchSourceConnector -from app.schemas.notion_auth_credentials import NotionAuthCredentialsBase -from app.utils.oauth_security import TokenEncryption - -from .response_parser import ( - extract_text_from_mcp_response, - is_mcp_serialization_error, - parse_create_page_response, - parse_delete_page_response, - parse_fetch_page_response, - parse_health_check_response, - parse_update_page_response, -) - -logger = logging.getLogger(__name__) - -NOTION_MCP_URL = "https://mcp.notion.com/mcp" - - -class NotionMCPAdapter: - """Routes Notion operations through the hosted MCP server. - - Drop-in replacement for ``NotionHistoryConnector`` write methods. - Returns the same dict structure so KB sync works unchanged. - """ - - def __init__(self, session: AsyncSession, connector_id: int): - self._session = session - self._connector_id = connector_id - self._access_token: str | None = None - - async def _get_valid_token(self) -> str: - """Get a valid MCP access token, refreshing if expired.""" - result = await self._session.execute( - select(SearchSourceConnector).filter( - SearchSourceConnector.id == self._connector_id - ) - ) - connector = result.scalars().first() - if not connector: - raise ValueError(f"Connector {self._connector_id} not found") - - cfg = connector.config or {} - - if not cfg.get("mcp_mode"): - raise ValueError( - f"Connector {self._connector_id} is not an MCP connector" - ) - - access_token = cfg.get("access_token") - if not access_token: - raise ValueError("No access token in MCP connector config") - - is_encrypted = cfg.get("_token_encrypted", False) - if is_encrypted and config.SECRET_KEY: - token_encryption = TokenEncryption(config.SECRET_KEY) - access_token = token_encryption.decrypt_token(access_token) - - expires_at_str = cfg.get("expires_at") - if expires_at_str: - expires_at = datetime.fromisoformat(expires_at_str) - if expires_at.tzinfo is None: - expires_at = expires_at.replace(tzinfo=UTC) - if expires_at <= datetime.now(UTC): - from app.routes.notion_mcp_connector_route import refresh_notion_mcp_token - - connector = await refresh_notion_mcp_token(self._session, connector) - cfg = connector.config or {} - access_token = cfg.get("access_token", "") - if is_encrypted and config.SECRET_KEY: - token_encryption = TokenEncryption(config.SECRET_KEY) - access_token = token_encryption.decrypt_token(access_token) - - self._access_token = access_token - return access_token - - async def _call_mcp_tool( - self, tool_name: str, arguments: dict[str, Any] - ) -> str: - """Connect to Notion MCP server and call a tool. Returns raw text.""" - token = await self._get_valid_token() - headers = {"Authorization": f"Bearer {token}"} - - async with ( - streamablehttp_client(NOTION_MCP_URL, headers=headers) as (read, write, _), - ClientSession(read, write) as session, - ): - await session.initialize() - response = await session.call_tool(tool_name, arguments=arguments) - return extract_text_from_mcp_response(response) - - async def _call_with_fallback( - self, - tool_name: str, - arguments: dict[str, Any], - parser, - fallback_method: str | None = None, - fallback_kwargs: dict[str, Any] | None = None, - ) -> dict[str, Any]: - """Call MCP tool, parse response, and fall back on serialization errors.""" - try: - raw_text = await self._call_mcp_tool(tool_name, arguments) - result = parser(raw_text) - - if result.get("mcp_serialization_error") and fallback_method: - logger.warning( - "MCP tool '%s' hit serialization bug, falling back to direct API", - tool_name, - ) - return await self._fallback(fallback_method, fallback_kwargs or {}) - - return result - - except Exception as e: - error_str = str(e) - if is_mcp_serialization_error(error_str) and fallback_method: - logger.warning( - "MCP tool '%s' raised serialization error, falling back: %s", - tool_name, - error_str, - ) - return await self._fallback(fallback_method, fallback_kwargs or {}) - - logger.error("MCP tool '%s' failed: %s", tool_name, e, exc_info=True) - return {"status": "error", "message": f"MCP call failed: {e!s}"} - - async def _fallback( - self, method_name: str, kwargs: dict[str, Any] - ) -> dict[str, Any]: - """Fall back to NotionHistoryConnector for the given method. - - Uses the already-refreshed MCP access token directly with the - Notion SDK, bypassing the connector's config-based token loading. - """ - from app.connectors.notion_history import NotionHistoryConnector - from app.schemas.notion_auth_credentials import NotionAuthCredentialsBase - - token = self._access_token - if not token: - token = await self._get_valid_token() - - connector = NotionHistoryConnector( - session=self._session, - connector_id=self._connector_id, - ) - connector._credentials = NotionAuthCredentialsBase(access_token=token) - connector._using_legacy_token = True - - method = getattr(connector, method_name) - return await method(**kwargs) - - # ------------------------------------------------------------------ - # Public API — same signatures as NotionHistoryConnector - # ------------------------------------------------------------------ - - async def create_page( - self, - title: str, - content: str, - parent_page_id: str | None = None, - ) -> dict[str, Any]: - arguments: dict[str, Any] = { - "pages": [ - { - "title": title, - "content": content, - } - ] - } - if parent_page_id: - arguments["pages"][0]["parent_page_url"] = parent_page_id - - return await self._call_with_fallback( - tool_name="notion-create-pages", - arguments=arguments, - parser=parse_create_page_response, - fallback_method="create_page", - fallback_kwargs={ - "title": title, - "content": content, - "parent_page_id": parent_page_id, - }, - ) - - async def update_page( - self, - page_id: str, - content: str | None = None, - ) -> dict[str, Any]: - arguments: dict[str, Any] = { - "page_id": page_id, - "command": "replace_content", - } - if content: - arguments["new_str"] = content - - return await self._call_with_fallback( - tool_name="notion-update-page", - arguments=arguments, - parser=parse_update_page_response, - fallback_method="update_page", - fallback_kwargs={"page_id": page_id, "content": content}, - ) - - async def delete_page(self, page_id: str) -> dict[str, Any]: - arguments: dict[str, Any] = { - "page_id": page_id, - "command": "update_properties", - "archived": True, - } - - return await self._call_with_fallback( - tool_name="notion-update-page", - arguments=arguments, - parser=parse_delete_page_response, - fallback_method="delete_page", - fallback_kwargs={"page_id": page_id}, - ) - - async def fetch_page(self, page_url_or_id: str) -> dict[str, Any]: - """Fetch page content via ``notion-fetch``.""" - raw_text = await self._call_mcp_tool( - "notion-fetch", {"url": page_url_or_id} - ) - return parse_fetch_page_response(raw_text) - - async def health_check(self) -> dict[str, Any]: - """Check MCP connection via ``notion-get-self``.""" - try: - raw_text = await self._call_mcp_tool("notion-get-self", {}) - return parse_health_check_response(raw_text) - except Exception as e: - return {"status": "error", "message": str(e)} diff --git a/surfsense_backend/app/services/notion_mcp/oauth.py b/surfsense_backend/app/services/notion_mcp/oauth.py deleted file mode 100644 index cfa6ad3e0..000000000 --- a/surfsense_backend/app/services/notion_mcp/oauth.py +++ /dev/null @@ -1,298 +0,0 @@ -"""OAuth 2.0 + PKCE utilities for Notion's remote MCP server. - -Implements the flow described in the official guide: -https://developers.notion.com/guides/mcp/build-mcp-client - -Steps: - 1. Discover OAuth metadata (RFC 9470 → RFC 8414) - 2. Dynamic client registration (RFC 7591) - 3. Build authorization URL with PKCE code_challenge - 4. Exchange authorization code + code_verifier for tokens - 5. Refresh access tokens (with refresh-token rotation) - -All functions are stateless — callers (route handlers) manage storage. -""" - -import logging -from dataclasses import dataclass -from datetime import UTC, datetime, timedelta -from typing import Any - -import httpx - -logger = logging.getLogger(__name__) - -NOTION_MCP_SERVER_URL = "https://mcp.notion.com/mcp" -_HTTP_TIMEOUT = 30.0 - - -@dataclass(frozen=True) -class OAuthMetadata: - issuer: str - authorization_endpoint: str - token_endpoint: str - registration_endpoint: str | None - code_challenge_methods_supported: list[str] - - -@dataclass(frozen=True) -class ClientCredentials: - client_id: str - client_secret: str | None = None - client_id_issued_at: int | None = None - client_secret_expires_at: int | None = None - - -@dataclass(frozen=True) -class TokenSet: - access_token: str - refresh_token: str | None - token_type: str - expires_in: int | None - expires_at: datetime | None - scope: str | None - - -# --------------------------------------------------------------------------- -# Step 1 — OAuth discovery -# --------------------------------------------------------------------------- - - -async def discover_oauth_metadata( - mcp_server_url: str = NOTION_MCP_SERVER_URL, -) -> OAuthMetadata: - """Discover OAuth endpoints via RFC 9470 + RFC 8414. - - 1. Fetch protected-resource metadata to find the authorization server. - 2. Fetch authorization-server metadata to get OAuth endpoints. - """ - from urllib.parse import urlparse - - parsed = urlparse(mcp_server_url) - origin = f"{parsed.scheme}://{parsed.netloc}" - path = parsed.path.rstrip("/") - - async with httpx.AsyncClient(timeout=_HTTP_TIMEOUT) as client: - # RFC 9470 — Protected Resource Metadata - # URL format: {origin}/.well-known/oauth-protected-resource{path} - pr_url = f"{origin}/.well-known/oauth-protected-resource{path}" - pr_resp = await client.get(pr_url) - pr_resp.raise_for_status() - pr_data = pr_resp.json() - - auth_servers = pr_data.get("authorization_servers", []) - if not auth_servers: - raise ValueError("No authorization_servers in protected resource metadata") - auth_server_url = auth_servers[0] - - # RFC 8414 — Authorization Server Metadata - as_url = f"{auth_server_url}/.well-known/oauth-authorization-server" - as_resp = await client.get(as_url) - as_resp.raise_for_status() - as_data = as_resp.json() - - if not as_data.get("authorization_endpoint") or not as_data.get("token_endpoint"): - raise ValueError("Missing required OAuth endpoints in server metadata") - - return OAuthMetadata( - issuer=as_data.get("issuer", auth_server_url), - authorization_endpoint=as_data["authorization_endpoint"], - token_endpoint=as_data["token_endpoint"], - registration_endpoint=as_data.get("registration_endpoint"), - code_challenge_methods_supported=as_data.get( - "code_challenge_methods_supported", [] - ), - ) - - -# --------------------------------------------------------------------------- -# Step 2 — Dynamic client registration (RFC 7591) -# --------------------------------------------------------------------------- - - -async def register_client( - metadata: OAuthMetadata, - redirect_uri: str, - client_name: str = "SurfSense", -) -> ClientCredentials: - """Dynamically register an OAuth client with the Notion MCP server.""" - if not metadata.registration_endpoint: - raise ValueError("Server does not support dynamic client registration") - - payload = { - "client_name": client_name, - "redirect_uris": [redirect_uri], - "grant_types": ["authorization_code", "refresh_token"], - "response_types": ["code"], - "token_endpoint_auth_method": "none", - } - - async with httpx.AsyncClient(timeout=_HTTP_TIMEOUT) as client: - resp = await client.post( - metadata.registration_endpoint, - json=payload, - headers={"Content-Type": "application/json", "Accept": "application/json"}, - ) - if not resp.is_success: - logger.error( - "Dynamic client registration failed (%s): %s", - resp.status_code, - resp.text, - ) - resp.raise_for_status() - data = resp.json() - - return ClientCredentials( - client_id=data["client_id"], - client_secret=data.get("client_secret"), - client_id_issued_at=data.get("client_id_issued_at"), - client_secret_expires_at=data.get("client_secret_expires_at"), - ) - - -# --------------------------------------------------------------------------- -# Step 3 — Build authorization URL -# --------------------------------------------------------------------------- - - -def build_authorization_url( - metadata: OAuthMetadata, - client_id: str, - redirect_uri: str, - code_challenge: str, - state: str, -) -> str: - """Build the OAuth authorization URL with PKCE parameters.""" - from urllib.parse import urlencode - - params = { - "response_type": "code", - "client_id": client_id, - "redirect_uri": redirect_uri, - "code_challenge": code_challenge, - "code_challenge_method": "S256", - "state": state, - "prompt": "consent", - } - return f"{metadata.authorization_endpoint}?{urlencode(params)}" - - -# --------------------------------------------------------------------------- -# Step 4 — Exchange authorization code for tokens -# --------------------------------------------------------------------------- - - -async def exchange_code_for_tokens( - code: str, - code_verifier: str, - metadata: OAuthMetadata, - client_id: str, - redirect_uri: str, - client_secret: str | None = None, -) -> TokenSet: - """Exchange an authorization code + PKCE verifier for tokens.""" - form_data: dict[str, Any] = { - "grant_type": "authorization_code", - "code": code, - "client_id": client_id, - "redirect_uri": redirect_uri, - "code_verifier": code_verifier, - } - if client_secret: - form_data["client_secret"] = client_secret - - async with httpx.AsyncClient(timeout=_HTTP_TIMEOUT) as client: - resp = await client.post( - metadata.token_endpoint, - data=form_data, - headers={ - "Content-Type": "application/x-www-form-urlencoded", - "Accept": "application/json", - }, - ) - if not resp.is_success: - body = resp.text - raise ValueError(f"Token exchange failed ({resp.status_code}): {body}") - tokens = resp.json() - - if not tokens.get("access_token"): - raise ValueError("No access_token in token response") - - expires_at = None - if tokens.get("expires_in"): - expires_at = datetime.now(UTC) + timedelta(seconds=int(tokens["expires_in"])) - - return TokenSet( - access_token=tokens["access_token"], - refresh_token=tokens.get("refresh_token"), - token_type=tokens.get("token_type", "Bearer"), - expires_in=tokens.get("expires_in"), - expires_at=expires_at, - scope=tokens.get("scope"), - ) - - -# --------------------------------------------------------------------------- -# Step 5 — Refresh access token -# --------------------------------------------------------------------------- - - -async def refresh_access_token( - refresh_token: str, - metadata: OAuthMetadata, - client_id: str, - client_secret: str | None = None, -) -> TokenSet: - """Refresh an access token. - - Notion MCP uses refresh-token rotation: each refresh returns a new - refresh_token and invalidates the old one. Callers MUST persist the - new refresh_token atomically with the new access_token. - """ - form_data: dict[str, Any] = { - "grant_type": "refresh_token", - "refresh_token": refresh_token, - "client_id": client_id, - } - if client_secret: - form_data["client_secret"] = client_secret - - async with httpx.AsyncClient(timeout=_HTTP_TIMEOUT) as client: - resp = await client.post( - metadata.token_endpoint, - data=form_data, - headers={ - "Content-Type": "application/x-www-form-urlencoded", - "Accept": "application/json", - }, - ) - - if not resp.is_success: - body = resp.text - try: - error_data = resp.json() - error_code = error_data.get("error", "") - if error_code == "invalid_grant": - raise ValueError("REAUTH_REQUIRED") - except ValueError: - if "REAUTH_REQUIRED" in str(resp.text) or resp.status_code == 401: - raise - raise ValueError(f"Token refresh failed ({resp.status_code}): {body}") - - tokens = resp.json() - - if not tokens.get("access_token"): - raise ValueError("No access_token in refresh response") - - expires_at = None - if tokens.get("expires_in"): - expires_at = datetime.now(UTC) + timedelta(seconds=int(tokens["expires_in"])) - - return TokenSet( - access_token=tokens["access_token"], - refresh_token=tokens.get("refresh_token"), - token_type=tokens.get("token_type", "Bearer"), - expires_in=tokens.get("expires_in"), - expires_at=expires_at, - scope=tokens.get("scope"), - ) diff --git a/surfsense_backend/app/services/notion_mcp/response_parser.py b/surfsense_backend/app/services/notion_mcp/response_parser.py deleted file mode 100644 index 34d5ef332..000000000 --- a/surfsense_backend/app/services/notion_mcp/response_parser.py +++ /dev/null @@ -1,212 +0,0 @@ -"""Parse Notion MCP tool responses into structured dicts. - -The Notion MCP server returns responses as MCP TextContent where the -``text`` field contains JSON-stringified Notion API response data. -See: https://deepwiki.com/makenotion/notion-mcp-server/4.3-request-and-response-handling - -This module extracts that JSON and normalises it into the same dict -format that ``NotionHistoryConnector`` methods return, so downstream -code (KB sync, tool factories) works unchanged. -""" - -import json -import logging -from typing import Any - -logger = logging.getLogger(__name__) - -MCP_SERIALIZATION_ERROR_MARKERS = [ - "Expected array, received string", - "Expected object, received string", - "should be defined, instead was `undefined`", -] - - -def is_mcp_serialization_error(text: str) -> bool: - """Return True if the MCP error text matches a known serialization bug.""" - return any(marker in text for marker in MCP_SERIALIZATION_ERROR_MARKERS) - - -def extract_text_from_mcp_response(response) -> str: - """Pull the concatenated text out of an MCP ``CallToolResult``. - - Args: - response: The ``CallToolResult`` returned by ``session.call_tool()``. - - Returns: - Concatenated text content from the response. - """ - parts: list[str] = [] - for content in response.content: - if hasattr(content, "text"): - parts.append(content.text) - elif hasattr(content, "data"): - parts.append(str(content.data)) - else: - parts.append(str(content)) - return "\n".join(parts) if parts else "" - - -def _try_parse_json(text: str) -> dict[str, Any] | None: - """Attempt to parse *text* as JSON, returning None on failure.""" - try: - parsed = json.loads(text) - if isinstance(parsed, dict): - return parsed - except (json.JSONDecodeError, TypeError): - pass - return None - - -def _extract_page_title(page_data: dict[str, Any]) -> str: - """Best-effort extraction of the page title from a Notion page object.""" - props = page_data.get("properties", {}) - for prop in props.values(): - if prop.get("type") == "title": - title_parts = prop.get("title", []) - if title_parts: - return " ".join(t.get("plain_text", "") for t in title_parts) - return page_data.get("id", "Untitled") - - -def parse_create_page_response(raw_text: str) -> dict[str, Any]: - """Parse a ``notion-create-pages`` MCP response. - - Returns a dict compatible with ``NotionHistoryConnector.create_page()``: - ``{status, page_id, url, title, message}`` - """ - data = _try_parse_json(raw_text) - - if data is None: - if is_mcp_serialization_error(raw_text): - return { - "status": "mcp_error", - "message": raw_text, - "mcp_serialization_error": True, - } - return {"status": "error", "message": f"Unexpected MCP response: {raw_text[:500]}"} - - if data.get("status") == "error" or "error" in data: - return { - "status": "error", - "message": data.get("message", data.get("error", str(data))), - } - - page_id = data.get("id", "") - url = data.get("url", "") - title = _extract_page_title(data) - - return { - "status": "success", - "page_id": page_id, - "url": url, - "title": title, - "message": f"Created Notion page '{title}'", - } - - -def parse_update_page_response(raw_text: str) -> dict[str, Any]: - """Parse a ``notion-update-page`` MCP response. - - Returns a dict compatible with ``NotionHistoryConnector.update_page()``: - ``{status, page_id, url, title, message}`` - """ - data = _try_parse_json(raw_text) - - if data is None: - if is_mcp_serialization_error(raw_text): - return { - "status": "mcp_error", - "message": raw_text, - "mcp_serialization_error": True, - } - return {"status": "error", "message": f"Unexpected MCP response: {raw_text[:500]}"} - - if data.get("status") == "error" or "error" in data: - return { - "status": "error", - "message": data.get("message", data.get("error", str(data))), - } - - page_id = data.get("id", "") - url = data.get("url", "") - title = _extract_page_title(data) - - return { - "status": "success", - "page_id": page_id, - "url": url, - "title": title, - "message": f"Updated Notion page '{title}' (content appended)", - } - - -def parse_delete_page_response(raw_text: str) -> dict[str, Any]: - """Parse an archive (delete) MCP response. - - The Notion API responds to ``pages.update(archived=True)`` with - the archived page object. - - Returns a dict compatible with ``NotionHistoryConnector.delete_page()``: - ``{status, page_id, message}`` - """ - data = _try_parse_json(raw_text) - - if data is None: - if is_mcp_serialization_error(raw_text): - return { - "status": "mcp_error", - "message": raw_text, - "mcp_serialization_error": True, - } - return {"status": "error", "message": f"Unexpected MCP response: {raw_text[:500]}"} - - if data.get("status") == "error" or "error" in data: - return { - "status": "error", - "message": data.get("message", data.get("error", str(data))), - } - - page_id = data.get("id", "") - title = _extract_page_title(data) - - return { - "status": "success", - "page_id": page_id, - "message": f"Deleted Notion page '{title}'", - } - - -def parse_fetch_page_response(raw_text: str) -> dict[str, Any]: - """Parse a ``notion-fetch`` MCP response. - - Returns the raw parsed dict (Notion page/block data) or an error dict. - """ - data = _try_parse_json(raw_text) - - if data is None: - return {"status": "error", "message": f"Unexpected MCP response: {raw_text[:500]}"} - - if data.get("status") == "error" or "error" in data: - return { - "status": "error", - "message": data.get("message", data.get("error", str(data))), - } - - return {"status": "success", "data": data} - - -def parse_health_check_response(raw_text: str) -> dict[str, Any]: - """Parse a ``notion-get-self`` MCP response for health checking.""" - data = _try_parse_json(raw_text) - - if data is None: - return {"status": "error", "message": raw_text[:500]} - - if data.get("status") == "error" or "error" in data: - return { - "status": "error", - "message": data.get("message", data.get("error", str(data))), - } - - return {"status": "success", "data": data}