add write operations to notion connector

This commit is contained in:
CREDO23 2026-02-11 16:11:46 +02:00
parent 2ef2474058
commit c492505876

View file

@ -777,3 +777,303 @@ class NotionHistoryConnector:
# Return empty string for unsupported block types
return ""
# =========================================================================
# WRITE OPERATIONS (create, update, delete pages)
# =========================================================================
def _markdown_to_blocks(self, markdown: str) -> list[dict[str, Any]]:
"""
Convert markdown content to Notion blocks.
This is a simple converter that handles basic markdown.
For more complex markdown, consider using a proper markdown parser.
Args:
markdown: Markdown content
Returns:
List of Notion block objects
"""
blocks = []
lines = markdown.split("\n")
for line in lines:
line = line.strip()
if not line:
continue
# Heading 1
if line.startswith("# "):
blocks.append({
"object": "block",
"type": "heading_1",
"heading_1": {
"rich_text": [{"type": "text", "text": {"content": line[2:]}}]
},
})
# Heading 2
elif line.startswith("## "):
blocks.append({
"object": "block",
"type": "heading_2",
"heading_2": {
"rich_text": [{"type": "text", "text": {"content": line[3:]}}]
},
})
# Heading 3
elif line.startswith("### "):
blocks.append({
"object": "block",
"type": "heading_3",
"heading_3": {
"rich_text": [{"type": "text", "text": {"content": line[4:]}}]
},
})
# Bullet list
elif line.startswith("- ") or line.startswith("* "):
blocks.append({
"object": "block",
"type": "bulleted_list_item",
"bulleted_list_item": {
"rich_text": [{"type": "text", "text": {"content": line[2:]}}]
},
})
# Numbered list
elif len(line) > 2 and line[0].isdigit() and line[1:3] == ". ":
blocks.append({
"object": "block",
"type": "numbered_list_item",
"numbered_list_item": {
"rich_text": [{"type": "text", "text": {"content": line[3:]}}]
},
})
# Regular paragraph
else:
blocks.append({
"object": "block",
"type": "paragraph",
"paragraph": {
"rich_text": [{"type": "text", "text": {"content": line}}]
},
})
return blocks
async def create_page(
self, title: str, content: str, parent_page_id: str | None = None
) -> dict[str, Any]:
"""
Create a new Notion page.
Args:
title: Page title
content: Page content (markdown format)
parent_page_id: Optional parent page ID (creates as subpage if provided)
Returns:
Dictionary with page details:
- page_id: Created page ID
- url: Page URL
- title: Page title
- status: "success" or "error"
- message: Success/error message
Raises:
APIResponseError: If Notion API returns an error
"""
try:
# Get Notion client
notion = await self._get_notion_client()
# Convert markdown content to Notion blocks
children = self._markdown_to_blocks(content)
# Prepare parent
if parent_page_id:
parent = {"type": "page_id", "page_id": parent_page_id}
else:
# Try to use workspace root
# Note: This requires proper permissions
result = await self._session.execute(
select(SearchSourceConnector).filter(
SearchSourceConnector.id == self._connector_id
)
)
connector = result.scalars().first()
if connector and connector.config.get("workspace_id"):
parent = {"type": "workspace", "workspace": True}
else:
raise ValueError(
"parent_page_id is required. "
"Please specify a parent page where the new page should be created."
)
# Create the page
response = await notion.pages.create(
parent=parent,
properties={
"title": {
"title": [{"type": "text", "text": {"content": title}}]
}
},
children=children[:100], # Notion API limit: 100 blocks per request
)
page_id = response["id"]
page_url = response["url"]
# If content has more than 100 blocks, append them
if len(children) > 100:
for i in range(100, len(children), 100):
batch = children[i : i + 100]
await notion.blocks.children.append(
block_id=page_id, children=batch
)
return {
"status": "success",
"page_id": page_id,
"url": page_url,
"title": title,
"message": f"✅ Created Notion page '{title}'",
}
except APIResponseError as e:
logger.error(f"Notion API error creating page: {e}")
error_msg = e.body.get("message", str(e)) if hasattr(e, "body") else str(e)
return {
"status": "error",
"message": f"Failed to create Notion page: {error_msg}",
}
except Exception as e:
logger.error(f"Unexpected error creating Notion page: {e}")
return {
"status": "error",
"message": f"Failed to create Notion page: {str(e)}",
}
async def update_page(
self, page_id: str, title: str | None = None, content: str | None = None
) -> dict[str, Any]:
"""
Update an existing Notion page.
Args:
page_id: Page ID to update
title: New page title (optional)
content: New page content in markdown (optional)
Returns:
Dictionary with update result
Raises:
APIResponseError: If Notion API returns an error
"""
try:
notion = await self._get_notion_client()
# Update title if provided
if title:
await notion.pages.update(
page_id=page_id,
properties={
"title": {
"title": [{"type": "text", "text": {"content": title}}]
}
},
)
# Update content if provided
if content:
# First, get existing blocks
existing_blocks = await notion.blocks.children.list(block_id=page_id)
# Delete existing blocks
for block in existing_blocks.get("results", []):
await notion.blocks.delete(block_id=block["id"])
# Add new content
children = self._markdown_to_blocks(content)
for i in range(0, len(children), 100):
batch = children[i : i + 100]
await notion.blocks.children.append(
block_id=page_id, children=batch
)
# Get updated page
response = await notion.pages.retrieve(page_id=page_id)
page_url = response["url"]
page_title = response["properties"]["title"]["title"][0]["text"]["content"]
return {
"status": "success",
"page_id": page_id,
"url": page_url,
"title": page_title,
"message": f"✅ Updated Notion page '{page_title}'",
}
except APIResponseError as e:
logger.error(f"Notion API error updating page: {e}")
error_msg = e.body.get("message", str(e)) if hasattr(e, "body") else str(e)
return {
"status": "error",
"message": f"Failed to update Notion page: {error_msg}",
}
except Exception as e:
logger.error(f"Unexpected error updating Notion page: {e}")
return {
"status": "error",
"message": f"Failed to update Notion page: {str(e)}",
}
async def delete_page(self, page_id: str) -> dict[str, Any]:
"""
Delete (archive) a Notion page.
Note: Notion doesn't truly delete pages, it archives them.
Args:
page_id: Page ID to delete
Returns:
Dictionary with deletion result
Raises:
APIResponseError: If Notion API returns an error
"""
try:
notion = await self._get_notion_client()
# Archive the page (Notion's way of "deleting")
response = await notion.pages.update(page_id=page_id, archived=True)
page_title = "Unknown"
try:
page_title = response["properties"]["title"]["title"][0]["text"][
"content"
]
except (KeyError, IndexError):
pass
return {
"status": "success",
"page_id": page_id,
"message": f"✅ Deleted Notion page '{page_title}'",
}
except APIResponseError as e:
logger.error(f"Notion API error deleting page: {e}")
error_msg = e.body.get("message", str(e)) if hasattr(e, "body") else str(e)
return {
"status": "error",
"message": f"Failed to delete Notion page: {error_msg}",
}
except Exception as e:
logger.error(f"Unexpected error deleting Notion page: {e}")
return {
"status": "error",
"message": f"Failed to delete Notion page: {str(e)}",
}