feat(notion-mcp): add MCP agent tool factories and registry wiring

This commit is contained in:
CREDO23 2026-04-20 21:02:10 +02:00
parent 41d547934d
commit 8d438f52f5
5 changed files with 601 additions and 0 deletions

View file

@ -0,0 +1,5 @@
"""MCP-backed Notion tool factories.
Drop-in replacements for ``tools/notion/`` that route through
Notion's hosted MCP server instead of direct API calls.
"""

View file

@ -0,0 +1,205 @@
import logging
from typing import Any
from langchain_core.tools import tool
from sqlalchemy.ext.asyncio import AsyncSession
from app.agents.new_chat.tools.hitl import request_approval
from app.services.notion import NotionToolMetadataService
logger = logging.getLogger(__name__)
def _find_mcp_connector(connectors):
"""Return the first connector with mcp_mode enabled, or None."""
for c in connectors:
if (c.config or {}).get("mcp_mode"):
return c
return None
def create_create_notion_page_mcp_tool(
db_session: AsyncSession | None = None,
search_space_id: int | None = None,
user_id: str | None = None,
connector_id: int | None = None,
):
@tool
async def create_notion_page(
title: str,
content: str | None = None,
) -> dict[str, Any]:
"""Create a new page in Notion with the given title and content.
Use this tool when the user asks you to create, save, or publish
something to Notion. The page will be created in the user's
configured Notion workspace. The user MUST specify a topic before you
call this tool. If the request does not contain a topic (e.g. "create a
notion page"), ask what the page should be about. Never call this tool
without a clear topic from the user.
Args:
title: The title of the Notion page.
content: Optional markdown content for the page body (supports headings, lists, paragraphs).
Generate this yourself based on the user's topic.
Returns:
Dictionary with:
- status: "success", "rejected", or "error"
- page_id: Created page ID (if success)
- url: URL to the created page (if success)
- title: Page title (if success)
- message: Result message
IMPORTANT: If status is "rejected", the user explicitly declined the action.
Respond with a brief acknowledgment (e.g., "Understood, I didn't create the page.")
and move on. Do NOT troubleshoot or suggest alternatives.
Examples:
- "Create a Notion page about our Q2 roadmap"
- "Save a summary of today's discussion to Notion"
"""
logger.info("create_notion_page (MCP) called: title='%s'", title)
if db_session is None or search_space_id is None or user_id is None:
logger.error("Notion MCP tool not properly configured - missing required parameters")
return {
"status": "error",
"message": "Notion tool not properly configured. Please contact support.",
}
try:
metadata_service = NotionToolMetadataService(db_session)
context = await metadata_service.get_creation_context(search_space_id, user_id)
if "error" in context:
logger.error("Failed to fetch creation context: %s", context["error"])
return {"status": "error", "message": context["error"]}
accounts = context.get("accounts", [])
if accounts and all(a.get("auth_expired") for a in accounts):
return {
"status": "auth_error",
"message": "All connected Notion accounts need re-authentication. Please re-authenticate in your connector settings.",
"connector_type": "notion",
}
result = request_approval(
action_type="notion_page_creation",
tool_name="create_notion_page",
params={
"title": title,
"content": content,
"parent_page_id": None,
"connector_id": connector_id,
},
context=context,
)
if result.rejected:
logger.info("Notion page creation rejected by user")
return {
"status": "rejected",
"message": "User declined. Do not retry or suggest alternatives.",
}
final_title = result.params.get("title", title)
final_content = result.params.get("content", content)
final_parent_page_id = result.params.get("parent_page_id")
final_connector_id = result.params.get("connector_id", connector_id)
if not final_title or not final_title.strip():
return {
"status": "error",
"message": "Page title cannot be empty. Please provide a valid title.",
}
from sqlalchemy.future import select
from app.db import SearchSourceConnector, SearchSourceConnectorType
actual_connector_id = final_connector_id
if actual_connector_id is None:
query_result = await db_session.execute(
select(SearchSourceConnector).filter(
SearchSourceConnector.search_space_id == search_space_id,
SearchSourceConnector.user_id == user_id,
SearchSourceConnector.connector_type == SearchSourceConnectorType.NOTION_CONNECTOR,
)
)
connectors = query_result.scalars().all()
connector = _find_mcp_connector(connectors)
if not connector:
return {
"status": "error",
"message": "No Notion MCP connector found. Please connect Notion (MCP) in your workspace settings.",
}
actual_connector_id = connector.id
else:
query_result = await db_session.execute(
select(SearchSourceConnector).filter(
SearchSourceConnector.id == actual_connector_id,
SearchSourceConnector.search_space_id == search_space_id,
SearchSourceConnector.user_id == user_id,
SearchSourceConnector.connector_type == SearchSourceConnectorType.NOTION_CONNECTOR,
)
)
connector = query_result.scalars().first()
if not connector:
return {
"status": "error",
"message": "Selected Notion account is invalid or has been disconnected.",
}
from app.services.notion_mcp.adapter import NotionMCPAdapter
adapter = NotionMCPAdapter(session=db_session, connector_id=actual_connector_id)
result = await adapter.create_page(
title=final_title,
content=final_content,
parent_page_id=final_parent_page_id,
)
logger.info("create_page (MCP) result: %s - %s", result.get("status"), result.get("message", ""))
if result.get("status") == "success":
kb_message_suffix = ""
try:
from app.services.notion import NotionKBSyncService
kb_service = NotionKBSyncService(db_session)
kb_result = await kb_service.sync_after_create(
page_id=result.get("page_id"),
page_title=result.get("title", final_title),
page_url=result.get("url"),
content=final_content,
connector_id=actual_connector_id,
search_space_id=search_space_id,
user_id=user_id,
)
if kb_result["status"] == "success":
kb_message_suffix = " Your knowledge base has also been updated."
else:
kb_message_suffix = " This page will be added to your knowledge base in the next scheduled sync."
except Exception as kb_err:
logger.warning("KB sync after create failed: %s", kb_err)
kb_message_suffix = " This page will be added to your knowledge base in the next scheduled sync."
result["message"] = result.get("message", "") + kb_message_suffix
return result
except Exception as e:
from langgraph.errors import GraphInterrupt
if isinstance(e, GraphInterrupt):
raise
logger.error("Error creating Notion page (MCP): %s", e, exc_info=True)
if isinstance(e, ValueError):
message = str(e)
else:
message = "Something went wrong while creating the page. Please try again."
return {"status": "error", "message": message}
return create_notion_page

View file

@ -0,0 +1,173 @@
import logging
from typing import Any
from langchain_core.tools import tool
from sqlalchemy.ext.asyncio import AsyncSession
from app.agents.new_chat.tools.hitl import request_approval
from app.services.notion.tool_metadata_service import NotionToolMetadataService
logger = logging.getLogger(__name__)
def create_delete_notion_page_mcp_tool(
db_session: AsyncSession | None = None,
search_space_id: int | None = None,
user_id: str | None = None,
connector_id: int | None = None,
):
@tool
async def delete_notion_page(
page_title: str,
delete_from_kb: bool = False,
) -> dict[str, Any]:
"""Delete (archive) a Notion page.
Use this tool when the user asks you to delete, remove, or archive
a Notion page. Note that Notion doesn't permanently delete pages,
it archives them (they can be restored from trash).
Args:
page_title: The title of the Notion page to delete.
delete_from_kb: Whether to also remove the page from the knowledge base.
Default is False.
Returns:
Dictionary with:
- status: "success", "rejected", "not_found", or "error"
- page_id: Deleted page ID (if success)
- message: Success or error message
- deleted_from_kb: Whether the page was also removed from knowledge base (if success)
Examples:
- "Delete the 'Meeting Notes' Notion page"
- "Remove the 'Old Project Plan' Notion page"
"""
logger.info(
"delete_notion_page (MCP) called: page_title='%s', delete_from_kb=%s",
page_title,
delete_from_kb,
)
if db_session is None or search_space_id is None or user_id is None:
logger.error("Notion MCP tool not properly configured - missing required parameters")
return {
"status": "error",
"message": "Notion tool not properly configured. Please contact support.",
}
try:
metadata_service = NotionToolMetadataService(db_session)
context = await metadata_service.get_delete_context(search_space_id, user_id, page_title)
if "error" in context:
error_msg = context["error"]
if "not found" in error_msg.lower():
return {"status": "not_found", "message": error_msg}
return {"status": "error", "message": error_msg}
account = context.get("account", {})
if account.get("auth_expired"):
return {
"status": "auth_error",
"message": "The Notion account for this page needs re-authentication. Please re-authenticate in your connector settings.",
}
page_id = context.get("page_id")
connector_id_from_context = account.get("id")
document_id = context.get("document_id")
result = request_approval(
action_type="notion_page_deletion",
tool_name="delete_notion_page",
params={
"page_id": page_id,
"connector_id": connector_id_from_context,
"delete_from_kb": delete_from_kb,
},
context=context,
)
if result.rejected:
logger.info("Notion page deletion rejected by user")
return {
"status": "rejected",
"message": "User declined. Do not retry or suggest alternatives.",
}
final_page_id = result.params.get("page_id", page_id)
final_connector_id = result.params.get("connector_id", connector_id_from_context)
final_delete_from_kb = result.params.get("delete_from_kb", delete_from_kb)
from sqlalchemy.future import select
from app.db import SearchSourceConnector, SearchSourceConnectorType
if final_connector_id:
query_result = await db_session.execute(
select(SearchSourceConnector).filter(
SearchSourceConnector.id == final_connector_id,
SearchSourceConnector.search_space_id == search_space_id,
SearchSourceConnector.user_id == user_id,
SearchSourceConnector.connector_type == SearchSourceConnectorType.NOTION_CONNECTOR,
)
)
connector = query_result.scalars().first()
if not connector:
return {
"status": "error",
"message": "Selected Notion account is invalid or has been disconnected.",
}
actual_connector_id = connector.id
else:
return {"status": "error", "message": "No connector found for this page."}
from app.services.notion_mcp.adapter import NotionMCPAdapter
adapter = NotionMCPAdapter(session=db_session, connector_id=actual_connector_id)
result = await adapter.delete_page(page_id=final_page_id)
logger.info("delete_page (MCP) result: %s - %s", result.get("status"), result.get("message", ""))
deleted_from_kb = False
if result.get("status") == "success" and final_delete_from_kb and document_id:
try:
from sqlalchemy.future import select
from app.db import Document
doc_result = await db_session.execute(
select(Document).filter(Document.id == document_id)
)
document = doc_result.scalars().first()
if document:
await db_session.delete(document)
await db_session.commit()
deleted_from_kb = True
logger.info("Deleted document %s from knowledge base", document_id)
except Exception as e:
logger.error("Failed to delete document from KB: %s", e)
await db_session.rollback()
result["warning"] = f"Page deleted from Notion, but failed to remove from knowledge base: {e!s}"
if result.get("status") == "success":
result["deleted_from_kb"] = deleted_from_kb
if deleted_from_kb:
result["message"] = f"{result.get('message', '')} (also removed from knowledge base)"
return result
except Exception as e:
from langgraph.errors import GraphInterrupt
if isinstance(e, GraphInterrupt):
raise
logger.error("Error deleting Notion page (MCP): %s", e, exc_info=True)
if isinstance(e, ValueError):
message = str(e)
else:
message = "Something went wrong while deleting the page. Please try again."
return {"status": "error", "message": message}
return delete_notion_page

View file

@ -0,0 +1,179 @@
import logging
from typing import Any
from langchain_core.tools import tool
from sqlalchemy.ext.asyncio import AsyncSession
from app.agents.new_chat.tools.hitl import request_approval
from app.services.notion import NotionToolMetadataService
logger = logging.getLogger(__name__)
def create_update_notion_page_mcp_tool(
db_session: AsyncSession | None = None,
search_space_id: int | None = None,
user_id: str | None = None,
connector_id: int | None = None,
):
@tool
async def update_notion_page(
page_title: str,
content: str | None = None,
) -> dict[str, Any]:
"""Update an existing Notion page by appending new content.
Use this tool when the user asks you to add content to, modify, or update
a Notion page. The new content will be appended to the existing page content.
The user MUST specify what to add before you call this tool. If the
request is vague, ask what content they want added.
Args:
page_title: The title of the Notion page to update.
content: Optional markdown content to append to the page body (supports headings, lists, paragraphs).
Generate this yourself based on the user's request.
Returns:
Dictionary with:
- status: "success", "rejected", "not_found", or "error"
- page_id: Updated page ID (if success)
- url: URL to the updated page (if success)
- title: Current page title (if success)
- message: Result message
IMPORTANT:
- If status is "rejected", the user explicitly declined the action.
Respond with a brief acknowledgment (e.g., "Understood, I didn't update the page.")
and move on. Do NOT ask for alternatives or troubleshoot.
- If status is "not_found", inform the user conversationally using the exact message provided.
Examples:
- "Add today's meeting notes to the 'Meeting Notes' Notion page"
- "Update the 'Project Plan' page with a status update on phase 1"
"""
logger.info(
"update_notion_page (MCP) called: page_title='%s', content_length=%d",
page_title,
len(content) if content else 0,
)
if db_session is None or search_space_id is None or user_id is None:
logger.error("Notion MCP tool not properly configured - missing required parameters")
return {
"status": "error",
"message": "Notion tool not properly configured. Please contact support.",
}
if not content or not content.strip():
return {
"status": "error",
"message": "Content is required to update the page. Please provide the actual content you want to add.",
}
try:
metadata_service = NotionToolMetadataService(db_session)
context = await metadata_service.get_update_context(search_space_id, user_id, page_title)
if "error" in context:
error_msg = context["error"]
if "not found" in error_msg.lower():
return {"status": "not_found", "message": error_msg}
return {"status": "error", "message": error_msg}
account = context.get("account", {})
if account.get("auth_expired"):
return {
"status": "auth_error",
"message": "The Notion account for this page needs re-authentication. Please re-authenticate in your connector settings.",
}
page_id = context.get("page_id")
document_id = context.get("document_id")
connector_id_from_context = account.get("id")
result = request_approval(
action_type="notion_page_update",
tool_name="update_notion_page",
params={
"page_id": page_id,
"content": content,
"connector_id": connector_id_from_context,
},
context=context,
)
if result.rejected:
logger.info("Notion page update rejected by user")
return {
"status": "rejected",
"message": "User declined. Do not retry or suggest alternatives.",
}
final_page_id = result.params.get("page_id", page_id)
final_content = result.params.get("content", content)
final_connector_id = result.params.get("connector_id", connector_id_from_context)
from sqlalchemy.future import select
from app.db import SearchSourceConnector, SearchSourceConnectorType
if final_connector_id:
query_result = await db_session.execute(
select(SearchSourceConnector).filter(
SearchSourceConnector.id == final_connector_id,
SearchSourceConnector.search_space_id == search_space_id,
SearchSourceConnector.user_id == user_id,
SearchSourceConnector.connector_type == SearchSourceConnectorType.NOTION_CONNECTOR,
)
)
connector = query_result.scalars().first()
if not connector:
return {
"status": "error",
"message": "Selected Notion account is invalid or has been disconnected.",
}
actual_connector_id = connector.id
else:
return {"status": "error", "message": "No connector found for this page."}
from app.services.notion_mcp.adapter import NotionMCPAdapter
adapter = NotionMCPAdapter(session=db_session, connector_id=actual_connector_id)
result = await adapter.update_page(page_id=final_page_id, content=final_content)
logger.info("update_page (MCP) result: %s - %s", result.get("status"), result.get("message", ""))
if result.get("status") == "success" and document_id is not None:
from app.services.notion import NotionKBSyncService
kb_service = NotionKBSyncService(db_session)
kb_result = await kb_service.sync_after_update(
document_id=document_id,
appended_content=final_content,
user_id=user_id,
search_space_id=search_space_id,
appended_block_ids=result.get("appended_block_ids"),
)
if kb_result["status"] == "success":
result["message"] = f"{result['message']}. Your knowledge base has also been updated."
elif kb_result["status"] == "not_indexed":
result["message"] = f"{result['message']}. This page will be added to your knowledge base in the next scheduled sync."
else:
result["message"] = f"{result['message']}. Your knowledge base will be updated in the next scheduled sync."
return result
except Exception as e:
from langgraph.errors import GraphInterrupt
if isinstance(e, GraphInterrupt):
raise
logger.error("Error updating Notion page (MCP): %s", e, exc_info=True)
if isinstance(e, ValueError):
message = str(e)
else:
message = "Something went wrong while updating the page. Please try again."
return {"status": "error", "message": message}
return update_notion_page

View file

@ -86,6 +86,11 @@ from .notion import (
create_delete_notion_page_tool,
create_update_notion_page_tool,
)
from .notion_mcp import (
create_page as notion_mcp_create_page_mod,
delete_page as notion_mcp_delete_page_mod,
update_page as notion_mcp_update_page_mod,
)
from .onedrive import (
create_create_onedrive_file_tool,
create_delete_onedrive_file_tool,
@ -316,6 +321,40 @@ BUILTIN_TOOLS: list[ToolDefinition] = [
requires=["db_session", "search_space_id", "user_id"],
),
# =========================================================================
# NOTION MCP TOOLS - MCP-backed variants (disabled until swap)
# These route through Notion's hosted MCP server instead of direct API.
# =========================================================================
ToolDefinition(
name="create_notion_page_mcp",
description="Create a new page in Notion via MCP server",
factory=lambda deps: notion_mcp_create_page_mod.create_create_notion_page_mcp_tool(
db_session=deps["db_session"],
search_space_id=deps["search_space_id"],
user_id=deps["user_id"],
),
requires=["db_session", "search_space_id", "user_id"],
),
ToolDefinition(
name="update_notion_page_mcp",
description="Append new content to an existing Notion page via MCP server",
factory=lambda deps: notion_mcp_update_page_mod.create_update_notion_page_mcp_tool(
db_session=deps["db_session"],
search_space_id=deps["search_space_id"],
user_id=deps["user_id"],
),
requires=["db_session", "search_space_id", "user_id"],
),
ToolDefinition(
name="delete_notion_page_mcp",
description="Delete an existing Notion page via MCP server",
factory=lambda deps: notion_mcp_delete_page_mod.create_delete_notion_page_mcp_tool(
db_session=deps["db_session"],
search_space_id=deps["search_space_id"],
user_id=deps["user_id"],
),
requires=["db_session", "search_space_id", "user_id"],
),
# =========================================================================
# GOOGLE DRIVE TOOLS - create files, delete files
# Auto-disabled when no Google Drive connector is configured (see chat_deepagent.py)
# =========================================================================