delete Notion MCP services, tools, and route

This commit is contained in:
CREDO23 2026-04-21 20:33:10 +02:00
parent 2dfe03b9b2
commit ea3bda9ec3
9 changed files with 0 additions and 1838 deletions

View file

@ -1,5 +0,0 @@
"""MCP-backed Notion tool factories.
Drop-in replacements for ``tools/notion/`` that route through
Notion's hosted MCP server instead of direct API calls.
"""

View file

@ -1,205 +0,0 @@
import logging
from typing import Any
from langchain_core.tools import tool
from sqlalchemy.ext.asyncio import AsyncSession
from app.agents.new_chat.tools.hitl import request_approval
from app.services.notion import NotionToolMetadataService
logger = logging.getLogger(__name__)
def _find_mcp_connector(connectors):
"""Return the first connector with mcp_mode enabled, or None."""
for c in connectors:
if (c.config or {}).get("mcp_mode"):
return c
return None
def create_create_notion_page_mcp_tool(
db_session: AsyncSession | None = None,
search_space_id: int | None = None,
user_id: str | None = None,
connector_id: int | None = None,
):
@tool
async def create_notion_page(
title: str,
content: str | None = None,
) -> dict[str, Any]:
"""Create a new page in Notion with the given title and content.
Use this tool when the user asks you to create, save, or publish
something to Notion. The page will be created in the user's
configured Notion workspace. The user MUST specify a topic before you
call this tool. If the request does not contain a topic (e.g. "create a
notion page"), ask what the page should be about. Never call this tool
without a clear topic from the user.
Args:
title: The title of the Notion page.
content: Optional markdown content for the page body (supports headings, lists, paragraphs).
Generate this yourself based on the user's topic.
Returns:
Dictionary with:
- status: "success", "rejected", or "error"
- page_id: Created page ID (if success)
- url: URL to the created page (if success)
- title: Page title (if success)
- message: Result message
IMPORTANT: If status is "rejected", the user explicitly declined the action.
Respond with a brief acknowledgment (e.g., "Understood, I didn't create the page.")
and move on. Do NOT troubleshoot or suggest alternatives.
Examples:
- "Create a Notion page about our Q2 roadmap"
- "Save a summary of today's discussion to Notion"
"""
logger.info("create_notion_page (MCP) called: title='%s'", title)
if db_session is None or search_space_id is None or user_id is None:
logger.error("Notion MCP tool not properly configured - missing required parameters")
return {
"status": "error",
"message": "Notion tool not properly configured. Please contact support.",
}
try:
metadata_service = NotionToolMetadataService(db_session)
context = await metadata_service.get_creation_context(search_space_id, user_id)
if "error" in context:
logger.error("Failed to fetch creation context: %s", context["error"])
return {"status": "error", "message": context["error"]}
accounts = context.get("accounts", [])
if accounts and all(a.get("auth_expired") for a in accounts):
return {
"status": "auth_error",
"message": "All connected Notion accounts need re-authentication. Please re-authenticate in your connector settings.",
"connector_type": "notion",
}
result = request_approval(
action_type="notion_page_creation",
tool_name="create_notion_page",
params={
"title": title,
"content": content,
"parent_page_id": None,
"connector_id": connector_id,
},
context=context,
)
if result.rejected:
logger.info("Notion page creation rejected by user")
return {
"status": "rejected",
"message": "User declined. Do not retry or suggest alternatives.",
}
final_title = result.params.get("title", title)
final_content = result.params.get("content", content)
final_parent_page_id = result.params.get("parent_page_id")
final_connector_id = result.params.get("connector_id", connector_id)
if not final_title or not final_title.strip():
return {
"status": "error",
"message": "Page title cannot be empty. Please provide a valid title.",
}
from sqlalchemy.future import select
from app.db import SearchSourceConnector, SearchSourceConnectorType
actual_connector_id = final_connector_id
if actual_connector_id is None:
query_result = await db_session.execute(
select(SearchSourceConnector).filter(
SearchSourceConnector.search_space_id == search_space_id,
SearchSourceConnector.user_id == user_id,
SearchSourceConnector.connector_type == SearchSourceConnectorType.NOTION_CONNECTOR,
)
)
connectors = query_result.scalars().all()
connector = _find_mcp_connector(connectors)
if not connector:
return {
"status": "error",
"message": "No Notion MCP connector found. Please connect Notion (MCP) in your workspace settings.",
}
actual_connector_id = connector.id
else:
query_result = await db_session.execute(
select(SearchSourceConnector).filter(
SearchSourceConnector.id == actual_connector_id,
SearchSourceConnector.search_space_id == search_space_id,
SearchSourceConnector.user_id == user_id,
SearchSourceConnector.connector_type == SearchSourceConnectorType.NOTION_CONNECTOR,
)
)
connector = query_result.scalars().first()
if not connector:
return {
"status": "error",
"message": "Selected Notion account is invalid or has been disconnected.",
}
from app.services.notion_mcp.adapter import NotionMCPAdapter
adapter = NotionMCPAdapter(session=db_session, connector_id=actual_connector_id)
result = await adapter.create_page(
title=final_title,
content=final_content,
parent_page_id=final_parent_page_id,
)
logger.info("create_page (MCP) result: %s - %s", result.get("status"), result.get("message", ""))
if result.get("status") == "success":
kb_message_suffix = ""
try:
from app.services.notion import NotionKBSyncService
kb_service = NotionKBSyncService(db_session)
kb_result = await kb_service.sync_after_create(
page_id=result.get("page_id"),
page_title=result.get("title", final_title),
page_url=result.get("url"),
content=final_content,
connector_id=actual_connector_id,
search_space_id=search_space_id,
user_id=user_id,
)
if kb_result["status"] == "success":
kb_message_suffix = " Your knowledge base has also been updated."
else:
kb_message_suffix = " This page will be added to your knowledge base in the next scheduled sync."
except Exception as kb_err:
logger.warning("KB sync after create failed: %s", kb_err)
kb_message_suffix = " This page will be added to your knowledge base in the next scheduled sync."
result["message"] = result.get("message", "") + kb_message_suffix
return result
except Exception as e:
from langgraph.errors import GraphInterrupt
if isinstance(e, GraphInterrupt):
raise
logger.error("Error creating Notion page (MCP): %s", e, exc_info=True)
if isinstance(e, ValueError):
message = str(e)
else:
message = "Something went wrong while creating the page. Please try again."
return {"status": "error", "message": message}
return create_notion_page

View file

@ -1,173 +0,0 @@
import logging
from typing import Any
from langchain_core.tools import tool
from sqlalchemy.ext.asyncio import AsyncSession
from app.agents.new_chat.tools.hitl import request_approval
from app.services.notion.tool_metadata_service import NotionToolMetadataService
logger = logging.getLogger(__name__)
def create_delete_notion_page_mcp_tool(
db_session: AsyncSession | None = None,
search_space_id: int | None = None,
user_id: str | None = None,
connector_id: int | None = None,
):
@tool
async def delete_notion_page(
page_title: str,
delete_from_kb: bool = False,
) -> dict[str, Any]:
"""Delete (archive) a Notion page.
Use this tool when the user asks you to delete, remove, or archive
a Notion page. Note that Notion doesn't permanently delete pages,
it archives them (they can be restored from trash).
Args:
page_title: The title of the Notion page to delete.
delete_from_kb: Whether to also remove the page from the knowledge base.
Default is False.
Returns:
Dictionary with:
- status: "success", "rejected", "not_found", or "error"
- page_id: Deleted page ID (if success)
- message: Success or error message
- deleted_from_kb: Whether the page was also removed from knowledge base (if success)
Examples:
- "Delete the 'Meeting Notes' Notion page"
- "Remove the 'Old Project Plan' Notion page"
"""
logger.info(
"delete_notion_page (MCP) called: page_title='%s', delete_from_kb=%s",
page_title,
delete_from_kb,
)
if db_session is None or search_space_id is None or user_id is None:
logger.error("Notion MCP tool not properly configured - missing required parameters")
return {
"status": "error",
"message": "Notion tool not properly configured. Please contact support.",
}
try:
metadata_service = NotionToolMetadataService(db_session)
context = await metadata_service.get_delete_context(search_space_id, user_id, page_title)
if "error" in context:
error_msg = context["error"]
if "not found" in error_msg.lower():
return {"status": "not_found", "message": error_msg}
return {"status": "error", "message": error_msg}
account = context.get("account", {})
if account.get("auth_expired"):
return {
"status": "auth_error",
"message": "The Notion account for this page needs re-authentication. Please re-authenticate in your connector settings.",
}
page_id = context.get("page_id")
connector_id_from_context = account.get("id")
document_id = context.get("document_id")
result = request_approval(
action_type="notion_page_deletion",
tool_name="delete_notion_page",
params={
"page_id": page_id,
"connector_id": connector_id_from_context,
"delete_from_kb": delete_from_kb,
},
context=context,
)
if result.rejected:
logger.info("Notion page deletion rejected by user")
return {
"status": "rejected",
"message": "User declined. Do not retry or suggest alternatives.",
}
final_page_id = result.params.get("page_id", page_id)
final_connector_id = result.params.get("connector_id", connector_id_from_context)
final_delete_from_kb = result.params.get("delete_from_kb", delete_from_kb)
from sqlalchemy.future import select
from app.db import SearchSourceConnector, SearchSourceConnectorType
if final_connector_id:
query_result = await db_session.execute(
select(SearchSourceConnector).filter(
SearchSourceConnector.id == final_connector_id,
SearchSourceConnector.search_space_id == search_space_id,
SearchSourceConnector.user_id == user_id,
SearchSourceConnector.connector_type == SearchSourceConnectorType.NOTION_CONNECTOR,
)
)
connector = query_result.scalars().first()
if not connector:
return {
"status": "error",
"message": "Selected Notion account is invalid or has been disconnected.",
}
actual_connector_id = connector.id
else:
return {"status": "error", "message": "No connector found for this page."}
from app.services.notion_mcp.adapter import NotionMCPAdapter
adapter = NotionMCPAdapter(session=db_session, connector_id=actual_connector_id)
result = await adapter.delete_page(page_id=final_page_id)
logger.info("delete_page (MCP) result: %s - %s", result.get("status"), result.get("message", ""))
deleted_from_kb = False
if result.get("status") == "success" and final_delete_from_kb and document_id:
try:
from sqlalchemy.future import select
from app.db import Document
doc_result = await db_session.execute(
select(Document).filter(Document.id == document_id)
)
document = doc_result.scalars().first()
if document:
await db_session.delete(document)
await db_session.commit()
deleted_from_kb = True
logger.info("Deleted document %s from knowledge base", document_id)
except Exception as e:
logger.error("Failed to delete document from KB: %s", e)
await db_session.rollback()
result["warning"] = f"Page deleted from Notion, but failed to remove from knowledge base: {e!s}"
if result.get("status") == "success":
result["deleted_from_kb"] = deleted_from_kb
if deleted_from_kb:
result["message"] = f"{result.get('message', '')} (also removed from knowledge base)"
return result
except Exception as e:
from langgraph.errors import GraphInterrupt
if isinstance(e, GraphInterrupt):
raise
logger.error("Error deleting Notion page (MCP): %s", e, exc_info=True)
if isinstance(e, ValueError):
message = str(e)
else:
message = "Something went wrong while deleting the page. Please try again."
return {"status": "error", "message": message}
return delete_notion_page

View file

@ -1,179 +0,0 @@
import logging
from typing import Any
from langchain_core.tools import tool
from sqlalchemy.ext.asyncio import AsyncSession
from app.agents.new_chat.tools.hitl import request_approval
from app.services.notion import NotionToolMetadataService
logger = logging.getLogger(__name__)
def create_update_notion_page_mcp_tool(
db_session: AsyncSession | None = None,
search_space_id: int | None = None,
user_id: str | None = None,
connector_id: int | None = None,
):
@tool
async def update_notion_page(
page_title: str,
content: str | None = None,
) -> dict[str, Any]:
"""Update an existing Notion page by appending new content.
Use this tool when the user asks you to add content to, modify, or update
a Notion page. The new content will be appended to the existing page content.
The user MUST specify what to add before you call this tool. If the
request is vague, ask what content they want added.
Args:
page_title: The title of the Notion page to update.
content: Optional markdown content to append to the page body (supports headings, lists, paragraphs).
Generate this yourself based on the user's request.
Returns:
Dictionary with:
- status: "success", "rejected", "not_found", or "error"
- page_id: Updated page ID (if success)
- url: URL to the updated page (if success)
- title: Current page title (if success)
- message: Result message
IMPORTANT:
- If status is "rejected", the user explicitly declined the action.
Respond with a brief acknowledgment (e.g., "Understood, I didn't update the page.")
and move on. Do NOT ask for alternatives or troubleshoot.
- If status is "not_found", inform the user conversationally using the exact message provided.
Examples:
- "Add today's meeting notes to the 'Meeting Notes' Notion page"
- "Update the 'Project Plan' page with a status update on phase 1"
"""
logger.info(
"update_notion_page (MCP) called: page_title='%s', content_length=%d",
page_title,
len(content) if content else 0,
)
if db_session is None or search_space_id is None or user_id is None:
logger.error("Notion MCP tool not properly configured - missing required parameters")
return {
"status": "error",
"message": "Notion tool not properly configured. Please contact support.",
}
if not content or not content.strip():
return {
"status": "error",
"message": "Content is required to update the page. Please provide the actual content you want to add.",
}
try:
metadata_service = NotionToolMetadataService(db_session)
context = await metadata_service.get_update_context(search_space_id, user_id, page_title)
if "error" in context:
error_msg = context["error"]
if "not found" in error_msg.lower():
return {"status": "not_found", "message": error_msg}
return {"status": "error", "message": error_msg}
account = context.get("account", {})
if account.get("auth_expired"):
return {
"status": "auth_error",
"message": "The Notion account for this page needs re-authentication. Please re-authenticate in your connector settings.",
}
page_id = context.get("page_id")
document_id = context.get("document_id")
connector_id_from_context = account.get("id")
result = request_approval(
action_type="notion_page_update",
tool_name="update_notion_page",
params={
"page_id": page_id,
"content": content,
"connector_id": connector_id_from_context,
},
context=context,
)
if result.rejected:
logger.info("Notion page update rejected by user")
return {
"status": "rejected",
"message": "User declined. Do not retry or suggest alternatives.",
}
final_page_id = result.params.get("page_id", page_id)
final_content = result.params.get("content", content)
final_connector_id = result.params.get("connector_id", connector_id_from_context)
from sqlalchemy.future import select
from app.db import SearchSourceConnector, SearchSourceConnectorType
if final_connector_id:
query_result = await db_session.execute(
select(SearchSourceConnector).filter(
SearchSourceConnector.id == final_connector_id,
SearchSourceConnector.search_space_id == search_space_id,
SearchSourceConnector.user_id == user_id,
SearchSourceConnector.connector_type == SearchSourceConnectorType.NOTION_CONNECTOR,
)
)
connector = query_result.scalars().first()
if not connector:
return {
"status": "error",
"message": "Selected Notion account is invalid or has been disconnected.",
}
actual_connector_id = connector.id
else:
return {"status": "error", "message": "No connector found for this page."}
from app.services.notion_mcp.adapter import NotionMCPAdapter
adapter = NotionMCPAdapter(session=db_session, connector_id=actual_connector_id)
result = await adapter.update_page(page_id=final_page_id, content=final_content)
logger.info("update_page (MCP) result: %s - %s", result.get("status"), result.get("message", ""))
if result.get("status") == "success" and document_id is not None:
from app.services.notion import NotionKBSyncService
kb_service = NotionKBSyncService(db_session)
kb_result = await kb_service.sync_after_update(
document_id=document_id,
appended_content=final_content,
user_id=user_id,
search_space_id=search_space_id,
appended_block_ids=result.get("appended_block_ids"),
)
if kb_result["status"] == "success":
result["message"] = f"{result['message']}. Your knowledge base has also been updated."
elif kb_result["status"] == "not_indexed":
result["message"] = f"{result['message']}. This page will be added to your knowledge base in the next scheduled sync."
else:
result["message"] = f"{result['message']}. Your knowledge base will be updated in the next scheduled sync."
return result
except Exception as e:
from langgraph.errors import GraphInterrupt
if isinstance(e, GraphInterrupt):
raise
logger.error("Error updating Notion page (MCP): %s", e, exc_info=True)
if isinstance(e, ValueError):
message = str(e)
else:
message = "Something went wrong while updating the page. Please try again."
return {"status": "error", "message": message}
return update_notion_page

View file

@ -1,486 +0,0 @@
"""Notion MCP Connector OAuth Routes.
Handles OAuth 2.0 + PKCE authentication for Notion's hosted MCP server.
Based on: https://developers.notion.com/guides/mcp/build-mcp-client
This creates connectors with the same ``NOTION_CONNECTOR`` type as the
existing direct-API connector, but with ``mcp_mode: True`` in the config
so the adapter layer knows to route through MCP.
"""
import logging
from uuid import UUID
from fastapi import APIRouter, Depends, HTTPException, Request
from fastapi.responses import RedirectResponse
from sqlalchemy import select
from sqlalchemy.exc import IntegrityError
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm.attributes import flag_modified
from app.config import config
from app.db import (
SearchSourceConnector,
SearchSourceConnectorType,
User,
get_async_session,
)
from app.services.notion_mcp.oauth import (
ClientCredentials,
OAuthMetadata,
build_authorization_url,
discover_oauth_metadata,
exchange_code_for_tokens,
refresh_access_token,
register_client,
)
from app.users import current_active_user
from app.utils.connector_naming import (
check_duplicate_connector,
extract_identifier_from_credentials,
generate_unique_connector_name,
)
from app.utils.oauth_security import OAuthStateManager, TokenEncryption, generate_pkce_pair
logger = logging.getLogger(__name__)
router = APIRouter()
_state_manager: OAuthStateManager | None = None
_token_encryption: TokenEncryption | None = None
_oauth_metadata: OAuthMetadata | None = None
def _get_state_manager() -> OAuthStateManager:
global _state_manager
if _state_manager is None:
if not config.SECRET_KEY:
raise ValueError("SECRET_KEY must be set for OAuth security")
_state_manager = OAuthStateManager(config.SECRET_KEY)
return _state_manager
def _get_token_encryption() -> TokenEncryption:
global _token_encryption
if _token_encryption is None:
if not config.SECRET_KEY:
raise ValueError("SECRET_KEY must be set for token encryption")
_token_encryption = TokenEncryption(config.SECRET_KEY)
return _token_encryption
async def _get_oauth_metadata() -> OAuthMetadata:
global _oauth_metadata
if _oauth_metadata is None:
_oauth_metadata = await discover_oauth_metadata()
return _oauth_metadata
async def _fetch_workspace_info(access_token: str) -> dict:
"""Fetch workspace metadata using the Notion API with the fresh token.
The ``/v1/users/me`` endpoint returns bot info including workspace_name.
This populates connector config fields so naming and metadata services
work correctly.
"""
try:
import httpx
async with httpx.AsyncClient(timeout=15.0) as client:
resp = await client.get(
"https://api.notion.com/v1/users/me",
headers={
"Authorization": f"Bearer {access_token}",
"Notion-Version": "2022-06-28",
},
)
if resp.is_success:
data = resp.json()
bot_info = data.get("bot", {})
return {
"bot_id": data.get("id"),
"workspace_name": bot_info.get("workspace_name", "Notion Workspace"),
"workspace_icon": data.get("avatar_url") or "📄",
}
except Exception as e:
logger.warning("Failed to fetch workspace info: %s", e)
return {}
NOTION_MCP_REDIRECT_URI = None
def _get_redirect_uri() -> str:
global NOTION_MCP_REDIRECT_URI
if NOTION_MCP_REDIRECT_URI is None:
backend = config.BACKEND_URL or "http://localhost:8000"
NOTION_MCP_REDIRECT_URI = f"{backend}/api/v1/auth/notion-mcp/connector/callback"
return NOTION_MCP_REDIRECT_URI
# ---------------------------------------------------------------------------
# Route: initiate OAuth
# ---------------------------------------------------------------------------
@router.get("/auth/notion-mcp/connector/add")
async def connect_notion_mcp(
space_id: int,
user: User = Depends(current_active_user),
):
"""Initiate Notion MCP OAuth + PKCE flow."""
if not config.SECRET_KEY:
raise HTTPException(status_code=500, detail="SECRET_KEY not configured.")
try:
metadata = await _get_oauth_metadata()
redirect_uri = _get_redirect_uri()
credentials = await register_client(metadata, redirect_uri)
code_verifier, code_challenge = generate_pkce_pair()
state_manager = _get_state_manager()
state_encoded = state_manager.generate_secure_state(
space_id,
user.id,
code_verifier=code_verifier,
mcp_client_id=credentials.client_id,
mcp_client_secret=credentials.client_secret or "",
)
auth_url = build_authorization_url(
metadata=metadata,
client_id=credentials.client_id,
redirect_uri=redirect_uri,
code_challenge=code_challenge,
state=state_encoded,
)
logger.info("Generated Notion MCP OAuth URL for user %s, space %s", user.id, space_id)
return {"auth_url": auth_url}
except Exception as e:
logger.error("Failed to initiate Notion MCP OAuth: %s", e, exc_info=True)
raise HTTPException(
status_code=500, detail=f"Failed to initiate Notion MCP OAuth: {e!s}"
) from e
# ---------------------------------------------------------------------------
# Route: re-authenticate existing connector
# ---------------------------------------------------------------------------
@router.get("/auth/notion-mcp/connector/reauth")
async def reauth_notion_mcp(
space_id: int,
connector_id: int,
return_url: str | None = None,
user: User = Depends(current_active_user),
session: AsyncSession = Depends(get_async_session),
):
"""Initiate re-authentication for an existing Notion MCP connector."""
result = await session.execute(
select(SearchSourceConnector).filter(
SearchSourceConnector.id == connector_id,
SearchSourceConnector.user_id == user.id,
SearchSourceConnector.search_space_id == space_id,
SearchSourceConnector.connector_type == SearchSourceConnectorType.NOTION_CONNECTOR,
)
)
connector = result.scalars().first()
if not connector:
raise HTTPException(status_code=404, detail="Connector not found or access denied")
if not config.SECRET_KEY:
raise HTTPException(status_code=500, detail="SECRET_KEY not configured.")
try:
metadata = await _get_oauth_metadata()
redirect_uri = _get_redirect_uri()
credentials = await register_client(metadata, redirect_uri)
code_verifier, code_challenge = generate_pkce_pair()
extra: dict = {
"connector_id": connector_id,
"code_verifier": code_verifier,
"mcp_client_id": credentials.client_id,
"mcp_client_secret": credentials.client_secret or "",
}
if return_url and return_url.startswith("/"):
extra["return_url"] = return_url
state_manager = _get_state_manager()
state_encoded = state_manager.generate_secure_state(space_id, user.id, **extra)
auth_url = build_authorization_url(
metadata=metadata,
client_id=credentials.client_id,
redirect_uri=redirect_uri,
code_challenge=code_challenge,
state=state_encoded,
)
logger.info("Initiating Notion MCP re-auth for user %s, connector %s", user.id, connector_id)
return {"auth_url": auth_url}
except HTTPException:
raise
except Exception as e:
logger.error("Failed to initiate Notion MCP re-auth: %s", e, exc_info=True)
raise HTTPException(
status_code=500, detail=f"Failed to initiate Notion MCP re-auth: {e!s}"
) from e
# ---------------------------------------------------------------------------
# Route: OAuth callback
# ---------------------------------------------------------------------------
@router.get("/auth/notion-mcp/connector/callback")
async def notion_mcp_callback(
request: Request,
code: str | None = None,
error: str | None = None,
state: str | None = None,
session: AsyncSession = Depends(get_async_session),
):
"""Handle the OAuth callback from Notion's MCP authorization server."""
if error:
logger.warning("Notion MCP OAuth error: %s", error)
space_id = None
if state:
try:
data = _get_state_manager().validate_state(state)
space_id = data.get("space_id")
except Exception:
pass
if space_id:
return RedirectResponse(
url=f"{config.NEXT_FRONTEND_URL}/dashboard/{space_id}/connectors/callback?error=notion_mcp_oauth_denied"
)
return RedirectResponse(
url=f"{config.NEXT_FRONTEND_URL}/dashboard?error=notion_mcp_oauth_denied"
)
if not code:
raise HTTPException(status_code=400, detail="Missing authorization code")
if not state:
raise HTTPException(status_code=400, detail="Missing state parameter")
state_manager = _get_state_manager()
try:
data = state_manager.validate_state(state)
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=400, detail=f"Invalid state: {e!s}") from e
user_id = UUID(data["user_id"])
space_id = data["space_id"]
code_verifier = data.get("code_verifier")
mcp_client_id = data.get("mcp_client_id")
mcp_client_secret = data.get("mcp_client_secret") or None
if not code_verifier or not mcp_client_id:
raise HTTPException(status_code=400, detail="Missing PKCE or client data in state")
try:
metadata = await _get_oauth_metadata()
redirect_uri = _get_redirect_uri()
token_set = await exchange_code_for_tokens(
code=code,
code_verifier=code_verifier,
metadata=metadata,
client_id=mcp_client_id,
redirect_uri=redirect_uri,
client_secret=mcp_client_secret,
)
except Exception as e:
logger.error("Notion MCP token exchange failed: %s", e, exc_info=True)
raise HTTPException(status_code=400, detail=f"Token exchange failed: {e!s}") from e
token_encryption = _get_token_encryption()
workspace_info = await _fetch_workspace_info(token_set.access_token)
connector_config = {
"access_token": token_encryption.encrypt_token(token_set.access_token),
"refresh_token": token_encryption.encrypt_token(token_set.refresh_token)
if token_set.refresh_token
else None,
"expires_in": token_set.expires_in,
"expires_at": token_set.expires_at.isoformat() if token_set.expires_at else None,
"workspace_id": workspace_info.get("workspace_id"),
"workspace_name": workspace_info.get("workspace_name", "Notion Workspace"),
"workspace_icon": workspace_info.get("workspace_icon", "📄"),
"bot_id": workspace_info.get("bot_id"),
"mcp_mode": True,
"mcp_client_id": mcp_client_id,
"mcp_client_secret": token_encryption.encrypt_token(mcp_client_secret)
if mcp_client_secret
else None,
"_token_encrypted": True,
}
reauth_connector_id = data.get("connector_id")
reauth_return_url = data.get("return_url")
# --- Re-auth path ---
if reauth_connector_id:
result = await session.execute(
select(SearchSourceConnector).filter(
SearchSourceConnector.id == reauth_connector_id,
SearchSourceConnector.user_id == user_id,
SearchSourceConnector.search_space_id == space_id,
SearchSourceConnector.connector_type == SearchSourceConnectorType.NOTION_CONNECTOR,
)
)
db_connector = result.scalars().first()
if not db_connector:
raise HTTPException(status_code=404, detail="Connector not found during re-auth")
db_connector.config = connector_config
flag_modified(db_connector, "config")
await session.commit()
await session.refresh(db_connector)
logger.info("Re-authenticated Notion MCP connector %s for user %s", db_connector.id, user_id)
if reauth_return_url and reauth_return_url.startswith("/"):
return RedirectResponse(url=f"{config.NEXT_FRONTEND_URL}{reauth_return_url}")
return RedirectResponse(
url=f"{config.NEXT_FRONTEND_URL}/dashboard/{space_id}/connectors/callback?success=true&connector=notion-connector&connectorId={db_connector.id}"
)
# --- New connector path ---
connector_identifier = extract_identifier_from_credentials(
SearchSourceConnectorType.NOTION_CONNECTOR, connector_config
)
is_duplicate = await check_duplicate_connector(
session,
SearchSourceConnectorType.NOTION_CONNECTOR,
space_id,
user_id,
connector_identifier,
)
if is_duplicate:
logger.warning("Duplicate Notion MCP connector for user %s", user_id)
return RedirectResponse(
url=f"{config.NEXT_FRONTEND_URL}/dashboard/{space_id}/connectors/callback?error=duplicate_account&connector=notion-connector"
)
connector_name = await generate_unique_connector_name(
session,
SearchSourceConnectorType.NOTION_CONNECTOR,
space_id,
user_id,
connector_identifier,
)
new_connector = SearchSourceConnector(
name=connector_name,
connector_type=SearchSourceConnectorType.NOTION_CONNECTOR,
is_indexable=True,
config=connector_config,
search_space_id=space_id,
user_id=user_id,
)
session.add(new_connector)
try:
await session.commit()
logger.info("Created Notion MCP connector for user %s in space %s", user_id, space_id)
return RedirectResponse(
url=f"{config.NEXT_FRONTEND_URL}/dashboard/{space_id}/connectors/callback?success=true&connector=notion-connector&connectorId={new_connector.id}"
)
except IntegrityError as e:
await session.rollback()
raise HTTPException(status_code=409, detail=f"Database integrity error: {e!s}") from e
except Exception as e:
await session.rollback()
raise HTTPException(
status_code=500, detail=f"Failed to create connector: {e!s}"
) from e
# ---------------------------------------------------------------------------
# Token refresh helper (used by the adapter)
# ---------------------------------------------------------------------------
async def refresh_notion_mcp_token(
session: AsyncSession,
connector: SearchSourceConnector,
) -> SearchSourceConnector:
"""Refresh the MCP access token for a connector.
Handles refresh-token rotation: persists both new access_token
and new refresh_token atomically.
"""
token_encryption = _get_token_encryption()
cfg = connector.config or {}
encrypted_refresh = cfg.get("refresh_token")
if not encrypted_refresh:
raise HTTPException(status_code=400, detail="No refresh token available. Please re-authenticate.")
try:
refresh_token = token_encryption.decrypt_token(encrypted_refresh)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to decrypt refresh token: {e!s}") from e
mcp_client_id = cfg.get("mcp_client_id")
mcp_client_secret_encrypted = cfg.get("mcp_client_secret")
mcp_client_secret = (
token_encryption.decrypt_token(mcp_client_secret_encrypted)
if mcp_client_secret_encrypted
else None
)
if not mcp_client_id:
raise HTTPException(status_code=400, detail="Missing MCP client_id. Please re-authenticate.")
metadata = await _get_oauth_metadata()
try:
token_set = await refresh_access_token(
refresh_token=refresh_token,
metadata=metadata,
client_id=mcp_client_id,
client_secret=mcp_client_secret,
)
except ValueError as e:
if "REAUTH_REQUIRED" in str(e):
connector.config = {**connector.config, "auth_expired": True}
flag_modified(connector, "config")
await session.commit()
await session.refresh(connector)
raise HTTPException(
status_code=401, detail="Notion MCP authentication expired. Please re-authenticate."
) from e
raise HTTPException(status_code=400, detail=f"Token refresh failed: {e!s}") from e
updated_config = {
**connector.config,
"access_token": token_encryption.encrypt_token(token_set.access_token),
"refresh_token": token_encryption.encrypt_token(token_set.refresh_token)
if token_set.refresh_token
else connector.config.get("refresh_token"),
"expires_in": token_set.expires_in,
"expires_at": token_set.expires_at.isoformat() if token_set.expires_at else None,
"_token_encrypted": True,
}
updated_config.pop("auth_expired", None)
connector.config = updated_config
flag_modified(connector, "config")
await session.commit()
await session.refresh(connector)
logger.info("Refreshed Notion MCP token for connector %s", connector.id)
return connector

View file

@ -1,27 +0,0 @@
"""Notion MCP integration.
Routes Notion operations through Notion's hosted MCP server
at https://mcp.notion.com/mcp instead of direct API calls.
"""
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.db import SearchSourceConnector, SearchSourceConnectorType
async def has_mcp_notion_connector(
session: AsyncSession,
search_space_id: int,
) -> bool:
"""Check whether the search space has at least one MCP-mode Notion connector."""
result = await session.execute(
select(SearchSourceConnector.id, SearchSourceConnector.config).filter(
SearchSourceConnector.search_space_id == search_space_id,
SearchSourceConnector.connector_type == SearchSourceConnectorType.NOTION_CONNECTOR,
)
)
for _, config in result.all():
if isinstance(config, dict) and config.get("mcp_mode"):
return True
return False

View file

@ -1,253 +0,0 @@
"""Notion MCP Adapter.
Connects to Notion's hosted MCP server at ``https://mcp.notion.com/mcp``
and exposes the same method signatures as ``NotionHistoryConnector``'s
write operations so that tool factories can swap with a one-line change.
Includes an optional fallback to ``NotionHistoryConnector`` when the MCP
server returns known serialization errors (GitHub issues #215, #216).
"""
import logging
from datetime import UTC, datetime
from typing import Any
from mcp import ClientSession
from mcp.client.streamable_http import streamablehttp_client
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.config import config
from app.db import SearchSourceConnector
from app.schemas.notion_auth_credentials import NotionAuthCredentialsBase
from app.utils.oauth_security import TokenEncryption
from .response_parser import (
extract_text_from_mcp_response,
is_mcp_serialization_error,
parse_create_page_response,
parse_delete_page_response,
parse_fetch_page_response,
parse_health_check_response,
parse_update_page_response,
)
logger = logging.getLogger(__name__)
NOTION_MCP_URL = "https://mcp.notion.com/mcp"
class NotionMCPAdapter:
"""Routes Notion operations through the hosted MCP server.
Drop-in replacement for ``NotionHistoryConnector`` write methods.
Returns the same dict structure so KB sync works unchanged.
"""
def __init__(self, session: AsyncSession, connector_id: int):
self._session = session
self._connector_id = connector_id
self._access_token: str | None = None
async def _get_valid_token(self) -> str:
"""Get a valid MCP access token, refreshing if expired."""
result = await self._session.execute(
select(SearchSourceConnector).filter(
SearchSourceConnector.id == self._connector_id
)
)
connector = result.scalars().first()
if not connector:
raise ValueError(f"Connector {self._connector_id} not found")
cfg = connector.config or {}
if not cfg.get("mcp_mode"):
raise ValueError(
f"Connector {self._connector_id} is not an MCP connector"
)
access_token = cfg.get("access_token")
if not access_token:
raise ValueError("No access token in MCP connector config")
is_encrypted = cfg.get("_token_encrypted", False)
if is_encrypted and config.SECRET_KEY:
token_encryption = TokenEncryption(config.SECRET_KEY)
access_token = token_encryption.decrypt_token(access_token)
expires_at_str = cfg.get("expires_at")
if expires_at_str:
expires_at = datetime.fromisoformat(expires_at_str)
if expires_at.tzinfo is None:
expires_at = expires_at.replace(tzinfo=UTC)
if expires_at <= datetime.now(UTC):
from app.routes.notion_mcp_connector_route import refresh_notion_mcp_token
connector = await refresh_notion_mcp_token(self._session, connector)
cfg = connector.config or {}
access_token = cfg.get("access_token", "")
if is_encrypted and config.SECRET_KEY:
token_encryption = TokenEncryption(config.SECRET_KEY)
access_token = token_encryption.decrypt_token(access_token)
self._access_token = access_token
return access_token
async def _call_mcp_tool(
self, tool_name: str, arguments: dict[str, Any]
) -> str:
"""Connect to Notion MCP server and call a tool. Returns raw text."""
token = await self._get_valid_token()
headers = {"Authorization": f"Bearer {token}"}
async with (
streamablehttp_client(NOTION_MCP_URL, headers=headers) as (read, write, _),
ClientSession(read, write) as session,
):
await session.initialize()
response = await session.call_tool(tool_name, arguments=arguments)
return extract_text_from_mcp_response(response)
async def _call_with_fallback(
self,
tool_name: str,
arguments: dict[str, Any],
parser,
fallback_method: str | None = None,
fallback_kwargs: dict[str, Any] | None = None,
) -> dict[str, Any]:
"""Call MCP tool, parse response, and fall back on serialization errors."""
try:
raw_text = await self._call_mcp_tool(tool_name, arguments)
result = parser(raw_text)
if result.get("mcp_serialization_error") and fallback_method:
logger.warning(
"MCP tool '%s' hit serialization bug, falling back to direct API",
tool_name,
)
return await self._fallback(fallback_method, fallback_kwargs or {})
return result
except Exception as e:
error_str = str(e)
if is_mcp_serialization_error(error_str) and fallback_method:
logger.warning(
"MCP tool '%s' raised serialization error, falling back: %s",
tool_name,
error_str,
)
return await self._fallback(fallback_method, fallback_kwargs or {})
logger.error("MCP tool '%s' failed: %s", tool_name, e, exc_info=True)
return {"status": "error", "message": f"MCP call failed: {e!s}"}
async def _fallback(
self, method_name: str, kwargs: dict[str, Any]
) -> dict[str, Any]:
"""Fall back to NotionHistoryConnector for the given method.
Uses the already-refreshed MCP access token directly with the
Notion SDK, bypassing the connector's config-based token loading.
"""
from app.connectors.notion_history import NotionHistoryConnector
from app.schemas.notion_auth_credentials import NotionAuthCredentialsBase
token = self._access_token
if not token:
token = await self._get_valid_token()
connector = NotionHistoryConnector(
session=self._session,
connector_id=self._connector_id,
)
connector._credentials = NotionAuthCredentialsBase(access_token=token)
connector._using_legacy_token = True
method = getattr(connector, method_name)
return await method(**kwargs)
# ------------------------------------------------------------------
# Public API — same signatures as NotionHistoryConnector
# ------------------------------------------------------------------
async def create_page(
self,
title: str,
content: str,
parent_page_id: str | None = None,
) -> dict[str, Any]:
arguments: dict[str, Any] = {
"pages": [
{
"title": title,
"content": content,
}
]
}
if parent_page_id:
arguments["pages"][0]["parent_page_url"] = parent_page_id
return await self._call_with_fallback(
tool_name="notion-create-pages",
arguments=arguments,
parser=parse_create_page_response,
fallback_method="create_page",
fallback_kwargs={
"title": title,
"content": content,
"parent_page_id": parent_page_id,
},
)
async def update_page(
self,
page_id: str,
content: str | None = None,
) -> dict[str, Any]:
arguments: dict[str, Any] = {
"page_id": page_id,
"command": "replace_content",
}
if content:
arguments["new_str"] = content
return await self._call_with_fallback(
tool_name="notion-update-page",
arguments=arguments,
parser=parse_update_page_response,
fallback_method="update_page",
fallback_kwargs={"page_id": page_id, "content": content},
)
async def delete_page(self, page_id: str) -> dict[str, Any]:
arguments: dict[str, Any] = {
"page_id": page_id,
"command": "update_properties",
"archived": True,
}
return await self._call_with_fallback(
tool_name="notion-update-page",
arguments=arguments,
parser=parse_delete_page_response,
fallback_method="delete_page",
fallback_kwargs={"page_id": page_id},
)
async def fetch_page(self, page_url_or_id: str) -> dict[str, Any]:
"""Fetch page content via ``notion-fetch``."""
raw_text = await self._call_mcp_tool(
"notion-fetch", {"url": page_url_or_id}
)
return parse_fetch_page_response(raw_text)
async def health_check(self) -> dict[str, Any]:
"""Check MCP connection via ``notion-get-self``."""
try:
raw_text = await self._call_mcp_tool("notion-get-self", {})
return parse_health_check_response(raw_text)
except Exception as e:
return {"status": "error", "message": str(e)}

View file

@ -1,298 +0,0 @@
"""OAuth 2.0 + PKCE utilities for Notion's remote MCP server.
Implements the flow described in the official guide:
https://developers.notion.com/guides/mcp/build-mcp-client
Steps:
1. Discover OAuth metadata (RFC 9470 RFC 8414)
2. Dynamic client registration (RFC 7591)
3. Build authorization URL with PKCE code_challenge
4. Exchange authorization code + code_verifier for tokens
5. Refresh access tokens (with refresh-token rotation)
All functions are stateless callers (route handlers) manage storage.
"""
import logging
from dataclasses import dataclass
from datetime import UTC, datetime, timedelta
from typing import Any
import httpx
logger = logging.getLogger(__name__)
NOTION_MCP_SERVER_URL = "https://mcp.notion.com/mcp"
_HTTP_TIMEOUT = 30.0
@dataclass(frozen=True)
class OAuthMetadata:
issuer: str
authorization_endpoint: str
token_endpoint: str
registration_endpoint: str | None
code_challenge_methods_supported: list[str]
@dataclass(frozen=True)
class ClientCredentials:
client_id: str
client_secret: str | None = None
client_id_issued_at: int | None = None
client_secret_expires_at: int | None = None
@dataclass(frozen=True)
class TokenSet:
access_token: str
refresh_token: str | None
token_type: str
expires_in: int | None
expires_at: datetime | None
scope: str | None
# ---------------------------------------------------------------------------
# Step 1 — OAuth discovery
# ---------------------------------------------------------------------------
async def discover_oauth_metadata(
mcp_server_url: str = NOTION_MCP_SERVER_URL,
) -> OAuthMetadata:
"""Discover OAuth endpoints via RFC 9470 + RFC 8414.
1. Fetch protected-resource metadata to find the authorization server.
2. Fetch authorization-server metadata to get OAuth endpoints.
"""
from urllib.parse import urlparse
parsed = urlparse(mcp_server_url)
origin = f"{parsed.scheme}://{parsed.netloc}"
path = parsed.path.rstrip("/")
async with httpx.AsyncClient(timeout=_HTTP_TIMEOUT) as client:
# RFC 9470 — Protected Resource Metadata
# URL format: {origin}/.well-known/oauth-protected-resource{path}
pr_url = f"{origin}/.well-known/oauth-protected-resource{path}"
pr_resp = await client.get(pr_url)
pr_resp.raise_for_status()
pr_data = pr_resp.json()
auth_servers = pr_data.get("authorization_servers", [])
if not auth_servers:
raise ValueError("No authorization_servers in protected resource metadata")
auth_server_url = auth_servers[0]
# RFC 8414 — Authorization Server Metadata
as_url = f"{auth_server_url}/.well-known/oauth-authorization-server"
as_resp = await client.get(as_url)
as_resp.raise_for_status()
as_data = as_resp.json()
if not as_data.get("authorization_endpoint") or not as_data.get("token_endpoint"):
raise ValueError("Missing required OAuth endpoints in server metadata")
return OAuthMetadata(
issuer=as_data.get("issuer", auth_server_url),
authorization_endpoint=as_data["authorization_endpoint"],
token_endpoint=as_data["token_endpoint"],
registration_endpoint=as_data.get("registration_endpoint"),
code_challenge_methods_supported=as_data.get(
"code_challenge_methods_supported", []
),
)
# ---------------------------------------------------------------------------
# Step 2 — Dynamic client registration (RFC 7591)
# ---------------------------------------------------------------------------
async def register_client(
metadata: OAuthMetadata,
redirect_uri: str,
client_name: str = "SurfSense",
) -> ClientCredentials:
"""Dynamically register an OAuth client with the Notion MCP server."""
if not metadata.registration_endpoint:
raise ValueError("Server does not support dynamic client registration")
payload = {
"client_name": client_name,
"redirect_uris": [redirect_uri],
"grant_types": ["authorization_code", "refresh_token"],
"response_types": ["code"],
"token_endpoint_auth_method": "none",
}
async with httpx.AsyncClient(timeout=_HTTP_TIMEOUT) as client:
resp = await client.post(
metadata.registration_endpoint,
json=payload,
headers={"Content-Type": "application/json", "Accept": "application/json"},
)
if not resp.is_success:
logger.error(
"Dynamic client registration failed (%s): %s",
resp.status_code,
resp.text,
)
resp.raise_for_status()
data = resp.json()
return ClientCredentials(
client_id=data["client_id"],
client_secret=data.get("client_secret"),
client_id_issued_at=data.get("client_id_issued_at"),
client_secret_expires_at=data.get("client_secret_expires_at"),
)
# ---------------------------------------------------------------------------
# Step 3 — Build authorization URL
# ---------------------------------------------------------------------------
def build_authorization_url(
metadata: OAuthMetadata,
client_id: str,
redirect_uri: str,
code_challenge: str,
state: str,
) -> str:
"""Build the OAuth authorization URL with PKCE parameters."""
from urllib.parse import urlencode
params = {
"response_type": "code",
"client_id": client_id,
"redirect_uri": redirect_uri,
"code_challenge": code_challenge,
"code_challenge_method": "S256",
"state": state,
"prompt": "consent",
}
return f"{metadata.authorization_endpoint}?{urlencode(params)}"
# ---------------------------------------------------------------------------
# Step 4 — Exchange authorization code for tokens
# ---------------------------------------------------------------------------
async def exchange_code_for_tokens(
code: str,
code_verifier: str,
metadata: OAuthMetadata,
client_id: str,
redirect_uri: str,
client_secret: str | None = None,
) -> TokenSet:
"""Exchange an authorization code + PKCE verifier for tokens."""
form_data: dict[str, Any] = {
"grant_type": "authorization_code",
"code": code,
"client_id": client_id,
"redirect_uri": redirect_uri,
"code_verifier": code_verifier,
}
if client_secret:
form_data["client_secret"] = client_secret
async with httpx.AsyncClient(timeout=_HTTP_TIMEOUT) as client:
resp = await client.post(
metadata.token_endpoint,
data=form_data,
headers={
"Content-Type": "application/x-www-form-urlencoded",
"Accept": "application/json",
},
)
if not resp.is_success:
body = resp.text
raise ValueError(f"Token exchange failed ({resp.status_code}): {body}")
tokens = resp.json()
if not tokens.get("access_token"):
raise ValueError("No access_token in token response")
expires_at = None
if tokens.get("expires_in"):
expires_at = datetime.now(UTC) + timedelta(seconds=int(tokens["expires_in"]))
return TokenSet(
access_token=tokens["access_token"],
refresh_token=tokens.get("refresh_token"),
token_type=tokens.get("token_type", "Bearer"),
expires_in=tokens.get("expires_in"),
expires_at=expires_at,
scope=tokens.get("scope"),
)
# ---------------------------------------------------------------------------
# Step 5 — Refresh access token
# ---------------------------------------------------------------------------
async def refresh_access_token(
refresh_token: str,
metadata: OAuthMetadata,
client_id: str,
client_secret: str | None = None,
) -> TokenSet:
"""Refresh an access token.
Notion MCP uses refresh-token rotation: each refresh returns a new
refresh_token and invalidates the old one. Callers MUST persist the
new refresh_token atomically with the new access_token.
"""
form_data: dict[str, Any] = {
"grant_type": "refresh_token",
"refresh_token": refresh_token,
"client_id": client_id,
}
if client_secret:
form_data["client_secret"] = client_secret
async with httpx.AsyncClient(timeout=_HTTP_TIMEOUT) as client:
resp = await client.post(
metadata.token_endpoint,
data=form_data,
headers={
"Content-Type": "application/x-www-form-urlencoded",
"Accept": "application/json",
},
)
if not resp.is_success:
body = resp.text
try:
error_data = resp.json()
error_code = error_data.get("error", "")
if error_code == "invalid_grant":
raise ValueError("REAUTH_REQUIRED")
except ValueError:
if "REAUTH_REQUIRED" in str(resp.text) or resp.status_code == 401:
raise
raise ValueError(f"Token refresh failed ({resp.status_code}): {body}")
tokens = resp.json()
if not tokens.get("access_token"):
raise ValueError("No access_token in refresh response")
expires_at = None
if tokens.get("expires_in"):
expires_at = datetime.now(UTC) + timedelta(seconds=int(tokens["expires_in"]))
return TokenSet(
access_token=tokens["access_token"],
refresh_token=tokens.get("refresh_token"),
token_type=tokens.get("token_type", "Bearer"),
expires_in=tokens.get("expires_in"),
expires_at=expires_at,
scope=tokens.get("scope"),
)

View file

@ -1,212 +0,0 @@
"""Parse Notion MCP tool responses into structured dicts.
The Notion MCP server returns responses as MCP TextContent where the
``text`` field contains JSON-stringified Notion API response data.
See: https://deepwiki.com/makenotion/notion-mcp-server/4.3-request-and-response-handling
This module extracts that JSON and normalises it into the same dict
format that ``NotionHistoryConnector`` methods return, so downstream
code (KB sync, tool factories) works unchanged.
"""
import json
import logging
from typing import Any
logger = logging.getLogger(__name__)
MCP_SERIALIZATION_ERROR_MARKERS = [
"Expected array, received string",
"Expected object, received string",
"should be defined, instead was `undefined`",
]
def is_mcp_serialization_error(text: str) -> bool:
"""Return True if the MCP error text matches a known serialization bug."""
return any(marker in text for marker in MCP_SERIALIZATION_ERROR_MARKERS)
def extract_text_from_mcp_response(response) -> str:
"""Pull the concatenated text out of an MCP ``CallToolResult``.
Args:
response: The ``CallToolResult`` returned by ``session.call_tool()``.
Returns:
Concatenated text content from the response.
"""
parts: list[str] = []
for content in response.content:
if hasattr(content, "text"):
parts.append(content.text)
elif hasattr(content, "data"):
parts.append(str(content.data))
else:
parts.append(str(content))
return "\n".join(parts) if parts else ""
def _try_parse_json(text: str) -> dict[str, Any] | None:
"""Attempt to parse *text* as JSON, returning None on failure."""
try:
parsed = json.loads(text)
if isinstance(parsed, dict):
return parsed
except (json.JSONDecodeError, TypeError):
pass
return None
def _extract_page_title(page_data: dict[str, Any]) -> str:
"""Best-effort extraction of the page title from a Notion page object."""
props = page_data.get("properties", {})
for prop in props.values():
if prop.get("type") == "title":
title_parts = prop.get("title", [])
if title_parts:
return " ".join(t.get("plain_text", "") for t in title_parts)
return page_data.get("id", "Untitled")
def parse_create_page_response(raw_text: str) -> dict[str, Any]:
"""Parse a ``notion-create-pages`` MCP response.
Returns a dict compatible with ``NotionHistoryConnector.create_page()``:
``{status, page_id, url, title, message}``
"""
data = _try_parse_json(raw_text)
if data is None:
if is_mcp_serialization_error(raw_text):
return {
"status": "mcp_error",
"message": raw_text,
"mcp_serialization_error": True,
}
return {"status": "error", "message": f"Unexpected MCP response: {raw_text[:500]}"}
if data.get("status") == "error" or "error" in data:
return {
"status": "error",
"message": data.get("message", data.get("error", str(data))),
}
page_id = data.get("id", "")
url = data.get("url", "")
title = _extract_page_title(data)
return {
"status": "success",
"page_id": page_id,
"url": url,
"title": title,
"message": f"Created Notion page '{title}'",
}
def parse_update_page_response(raw_text: str) -> dict[str, Any]:
"""Parse a ``notion-update-page`` MCP response.
Returns a dict compatible with ``NotionHistoryConnector.update_page()``:
``{status, page_id, url, title, message}``
"""
data = _try_parse_json(raw_text)
if data is None:
if is_mcp_serialization_error(raw_text):
return {
"status": "mcp_error",
"message": raw_text,
"mcp_serialization_error": True,
}
return {"status": "error", "message": f"Unexpected MCP response: {raw_text[:500]}"}
if data.get("status") == "error" or "error" in data:
return {
"status": "error",
"message": data.get("message", data.get("error", str(data))),
}
page_id = data.get("id", "")
url = data.get("url", "")
title = _extract_page_title(data)
return {
"status": "success",
"page_id": page_id,
"url": url,
"title": title,
"message": f"Updated Notion page '{title}' (content appended)",
}
def parse_delete_page_response(raw_text: str) -> dict[str, Any]:
"""Parse an archive (delete) MCP response.
The Notion API responds to ``pages.update(archived=True)`` with
the archived page object.
Returns a dict compatible with ``NotionHistoryConnector.delete_page()``:
``{status, page_id, message}``
"""
data = _try_parse_json(raw_text)
if data is None:
if is_mcp_serialization_error(raw_text):
return {
"status": "mcp_error",
"message": raw_text,
"mcp_serialization_error": True,
}
return {"status": "error", "message": f"Unexpected MCP response: {raw_text[:500]}"}
if data.get("status") == "error" or "error" in data:
return {
"status": "error",
"message": data.get("message", data.get("error", str(data))),
}
page_id = data.get("id", "")
title = _extract_page_title(data)
return {
"status": "success",
"page_id": page_id,
"message": f"Deleted Notion page '{title}'",
}
def parse_fetch_page_response(raw_text: str) -> dict[str, Any]:
"""Parse a ``notion-fetch`` MCP response.
Returns the raw parsed dict (Notion page/block data) or an error dict.
"""
data = _try_parse_json(raw_text)
if data is None:
return {"status": "error", "message": f"Unexpected MCP response: {raw_text[:500]}"}
if data.get("status") == "error" or "error" in data:
return {
"status": "error",
"message": data.get("message", data.get("error", str(data))),
}
return {"status": "success", "data": data}
def parse_health_check_response(raw_text: str) -> dict[str, Any]:
"""Parse a ``notion-get-self`` MCP response for health checking."""
data = _try_parse_json(raw_text)
if data is None:
return {"status": "error", "message": raw_text[:500]}
if data.get("status") == "error" or "error" in data:
return {
"status": "error",
"message": data.get("message", data.get("error", str(data))),
}
return {"status": "success", "data": data}