SurfSense/surfsense_backend/app/connectors/notion_history.py

236 lines
7.5 KiB
Python
Raw Normal View History

2025-08-21 14:43:04 -07:00
from notion_client import AsyncClient
2025-03-14 18:53:14 -07:00
2025-03-14 18:53:14 -07:00
class NotionHistoryConnector:
def __init__(self, token):
"""
Initialize the NotionPageFetcher with a token.
2025-03-14 18:53:14 -07:00
Args:
token (str): Notion integration token
"""
2025-08-21 14:43:04 -07:00
self.notion = AsyncClient(auth=token)
2025-08-21 14:43:04 -07:00
async def close(self):
"""Close the async client connection."""
await self.notion.aclose()
async def __aenter__(self):
"""Async context manager entry."""
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Async context manager exit."""
await self.close()
async def get_all_pages(self, start_date=None, end_date=None):
2025-03-14 18:53:14 -07:00
"""
Fetches all pages shared with your integration and their content.
2025-03-14 18:53:14 -07:00
Args:
start_date (str, optional): ISO 8601 date string (e.g., "2023-01-01T00:00:00Z")
end_date (str, optional): ISO 8601 date string (e.g., "2023-12-31T23:59:59Z")
2025-03-14 18:53:14 -07:00
Returns:
list: List of dictionaries containing page data
"""
# Build the filter for the search
# Note: Notion API requires specific filter structure
search_params = {}
2025-03-14 18:53:14 -07:00
# Filter for pages only (not databases)
search_params["filter"] = {"value": "page", "property": "object"}
2025-03-14 18:53:14 -07:00
# Add date filters if provided
if start_date or end_date:
date_filter = {}
2025-03-14 18:53:14 -07:00
if start_date:
date_filter["on_or_after"] = start_date
2025-03-14 18:53:14 -07:00
if end_date:
date_filter["on_or_before"] = end_date
2025-03-14 18:53:14 -07:00
# Add the date filter to the search params
if date_filter:
search_params["sort"] = {
"direction": "descending",
"timestamp": "last_edited_time",
2025-03-14 18:53:14 -07:00
}
# Paginate through all pages the integration has access to
pages = []
has_more = True
cursor = None
while has_more:
if cursor:
search_params["start_cursor"] = cursor
2025-11-03 16:00:58 -08:00
search_results = await self.notion.search(**search_params)
2025-11-03 16:00:58 -08:00
pages.extend(search_results["results"])
has_more = search_results.get("has_more", False)
2025-11-03 16:00:58 -08:00
if has_more:
cursor = search_results.get("next_cursor")
2025-03-14 18:53:14 -07:00
all_page_data = []
2025-03-14 18:53:14 -07:00
for page in pages:
page_id = page["id"]
2025-03-14 18:53:14 -07:00
# Get detailed page information
2025-08-21 14:43:04 -07:00
page_content = await self.get_page_content(page_id)
all_page_data.append(
{
"page_id": page_id,
"title": self.get_page_title(page),
"content": page_content,
}
)
2025-03-14 18:53:14 -07:00
return all_page_data
2025-03-14 18:53:14 -07:00
def get_page_title(self, page):
"""
Extracts the title from a page object.
2025-03-14 18:53:14 -07:00
Args:
page (dict): Notion page object
2025-03-14 18:53:14 -07:00
Returns:
str: Page title or a fallback string
"""
# Title can be in different properties depending on the page type
if "properties" in page:
# Try to find a title property
for _prop_name, prop_data in page["properties"].items():
2025-03-14 18:53:14 -07:00
if prop_data["type"] == "title" and len(prop_data["title"]) > 0:
return " ".join(
[text_obj["plain_text"] for text_obj in prop_data["title"]]
)
2025-03-14 18:53:14 -07:00
# If no title found, return the page ID as fallback
return f"Untitled page ({page['id']})"
2025-08-21 14:43:04 -07:00
async def get_page_content(self, page_id):
2025-03-14 18:53:14 -07:00
"""
Fetches the content (blocks) of a specific page.
2025-03-14 18:53:14 -07:00
Args:
page_id (str): The ID of the page to fetch
2025-03-14 18:53:14 -07:00
Returns:
list: List of processed blocks from the page
"""
blocks = []
has_more = True
cursor = None
2025-03-14 18:53:14 -07:00
# Paginate through all blocks
while has_more:
if cursor:
2025-08-21 14:43:04 -07:00
response = await self.notion.blocks.children.list(
block_id=page_id, start_cursor=cursor
)
2025-03-14 18:53:14 -07:00
else:
2025-08-21 14:43:04 -07:00
response = await self.notion.blocks.children.list(block_id=page_id)
2025-03-14 18:53:14 -07:00
blocks.extend(response["results"])
has_more = response["has_more"]
2025-03-14 18:53:14 -07:00
if has_more:
cursor = response["next_cursor"]
2025-03-14 18:53:14 -07:00
# Process nested blocks recursively
processed_blocks = []
for block in blocks:
2025-08-21 14:43:04 -07:00
processed_block = await self.process_block(block)
2025-03-14 18:53:14 -07:00
processed_blocks.append(processed_block)
2025-03-14 18:53:14 -07:00
return processed_blocks
2025-08-21 14:43:04 -07:00
async def process_block(self, block):
2025-03-14 18:53:14 -07:00
"""
Processes a block and recursively fetches any child blocks.
2025-03-14 18:53:14 -07:00
Args:
block (dict): The block to process
2025-03-14 18:53:14 -07:00
Returns:
dict: Processed block with content and children
"""
block_id = block["id"]
block_type = block["type"]
2025-03-14 18:53:14 -07:00
# Extract block content based on its type
content = self.extract_block_content(block)
2025-03-14 18:53:14 -07:00
# Check if block has children
has_children = block.get("has_children", False)
child_blocks = []
2025-03-14 18:53:14 -07:00
if has_children:
# Fetch and process child blocks
2025-08-21 14:43:04 -07:00
children_response = await self.notion.blocks.children.list(
block_id=block_id
)
2025-03-14 18:53:14 -07:00
for child_block in children_response["results"]:
2025-08-21 14:43:04 -07:00
child_blocks.append(await self.process_block(child_block))
2025-03-14 18:53:14 -07:00
return {
"id": block_id,
"type": block_type,
"content": content,
"children": child_blocks,
2025-03-14 18:53:14 -07:00
}
2025-03-14 18:53:14 -07:00
def extract_block_content(self, block):
"""
Extracts the content from a block based on its type.
2025-03-14 18:53:14 -07:00
Args:
block (dict): The block to extract content from
2025-03-14 18:53:14 -07:00
Returns:
str: Extracted content as a string
"""
block_type = block["type"]
2025-03-14 18:53:14 -07:00
# Different block types have different structures
if block_type in block and "rich_text" in block[block_type]:
return "".join(
[text_obj["plain_text"] for text_obj in block[block_type]["rich_text"]]
)
2025-03-14 18:53:14 -07:00
elif block_type == "image":
# Instead of returning the raw URL which may contain sensitive AWS credentials,
# return a placeholder or reference to the image
if "file" in block["image"]:
# For Notion-hosted images (which use AWS S3 pre-signed URLs)
return "[Notion Image]"
elif "external" in block["image"]:
# For external images, we can return a sanitized reference
url = block["image"]["external"]["url"]
# Only return the domain part of external URLs to avoid potential sensitive parameters
try:
from urllib.parse import urlparse
2025-03-14 18:53:14 -07:00
parsed_url = urlparse(url)
return f"[External Image from {parsed_url.netloc}]"
except Exception:
2025-03-14 18:53:14 -07:00
return "[External Image]"
elif block_type == "code":
language = block["code"]["language"]
code_text = "".join(
[text_obj["plain_text"] for text_obj in block["code"]["rich_text"]]
)
2025-03-14 18:53:14 -07:00
return f"```{language}\n{code_text}\n```"
elif block_type == "equation":
return block["equation"]["expression"]
# Add more block types as needed
2025-03-14 18:53:14 -07:00
# Return empty string for unsupported block types
return ""