Merge remote-tracking branch 'upstream/main' into feature/blocknote-editor

This commit is contained in:
Anish Sarkar 2025-11-30 04:10:49 +05:30
commit b98c312fb1
81 changed files with 8976 additions and 2387 deletions

View file

@ -600,3 +600,46 @@ async def _index_elasticsearch_documents(
await run_elasticsearch_indexing(
session, connector_id, search_space_id, user_id, start_date, end_date
)
@celery_app.task(name="index_crawled_urls", bind=True)
def index_crawled_urls_task(
self,
connector_id: int,
search_space_id: int,
user_id: str,
start_date: str,
end_date: str,
):
"""Celery task to index Web page Urls."""
import asyncio
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(
_index_crawled_urls(
connector_id, search_space_id, user_id, start_date, end_date
)
)
finally:
loop.close()
async def _index_crawled_urls(
connector_id: int,
search_space_id: int,
user_id: str,
start_date: str,
end_date: str,
):
"""Index Web page Urls with new session."""
from app.routes.search_source_connectors_routes import (
run_web_page_indexing,
)
async with get_celery_session_maker()() as session:
await run_web_page_indexing(
session, connector_id, search_space_id, user_id, start_date, end_date
)

View file

@ -9,7 +9,6 @@ from app.celery_app import celery_app
from app.config import config
from app.services.task_logging_service import TaskLoggingService
from app.tasks.document_processors import (
add_crawled_url_document,
add_extension_received_document,
add_youtube_video_document,
)
@ -120,71 +119,6 @@ async def _process_extension_document(
raise
@celery_app.task(name="process_crawled_url", bind=True)
def process_crawled_url_task(self, url: str, search_space_id: int, user_id: str):
"""
Celery task to process crawled URL.
Args:
url: URL to crawl and process
search_space_id: ID of the search space
user_id: ID of the user
"""
import asyncio
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(_process_crawled_url(url, search_space_id, user_id))
finally:
loop.close()
async def _process_crawled_url(url: str, search_space_id: int, user_id: str):
"""Process crawled URL with new session."""
async with get_celery_session_maker()() as session:
task_logger = TaskLoggingService(session, search_space_id)
log_entry = await task_logger.log_task_start(
task_name="process_crawled_url",
source="document_processor",
message=f"Starting URL crawling and processing for: {url}",
metadata={"document_type": "CRAWLED_URL", "url": url, "user_id": user_id},
)
try:
result = await add_crawled_url_document(
session, url, search_space_id, user_id
)
if result:
await task_logger.log_task_success(
log_entry,
f"Successfully crawled and processed URL: {url}",
{
"document_id": result.id,
"title": result.title,
"content_hash": result.content_hash,
},
)
else:
await task_logger.log_task_success(
log_entry,
f"URL document already exists (duplicate): {url}",
{"duplicate_detected": True},
)
except Exception as e:
await task_logger.log_task_failure(
log_entry,
f"Failed to crawl URL: {url}",
str(e),
{"error_type": type(e).__name__},
)
logger.error(f"Error processing crawled URL: {e!s}")
raise
@celery_app.task(name="process_youtube_video", bind=True)
def process_youtube_video_task(self, url: str, search_space_id: int, user_id: str):
"""

View file

@ -67,6 +67,7 @@ async def _check_and_trigger_schedules():
index_airtable_records_task,
index_clickup_tasks_task,
index_confluence_pages_task,
index_crawled_urls_task,
index_discord_messages_task,
index_elasticsearch_documents_task,
index_github_repos_task,
@ -94,6 +95,7 @@ async def _check_and_trigger_schedules():
SearchSourceConnectorType.DISCORD_CONNECTOR: index_discord_messages_task,
SearchSourceConnectorType.LUMA_CONNECTOR: index_luma_events_task,
SearchSourceConnectorType.ELASTICSEARCH_CONNECTOR: index_elasticsearch_documents_task,
SearchSourceConnectorType.WEBCRAWLER_CONNECTOR: index_crawled_urls_task,
}
# Trigger indexing for each due connector

View file

@ -17,6 +17,7 @@ Available indexers:
- Google Gmail: Index messages from Google Gmail
- Google Calendar: Index events from Google Calendar
- Luma: Index events from Luma
- Webcrawler: Index crawled URLs
- Elasticsearch: Index documents from Elasticsearch instances
"""
@ -41,6 +42,7 @@ from .luma_indexer import index_luma_events
# Documentation and knowledge management
from .notion_indexer import index_notion_pages
from .slack_indexer import index_slack_messages
from .webcrawler_indexer import index_crawled_urls
__all__ = [ # noqa: RUF022
"index_airtable_records",
@ -58,6 +60,7 @@ __all__ = [ # noqa: RUF022
"index_linear_issues",
# Documentation and knowledge management
"index_notion_pages",
"index_crawled_urls",
# Communication platforms
"index_slack_messages",
"index_google_gmail_messages",

View file

@ -0,0 +1,450 @@
"""
Webcrawler connector indexer.
"""
from datetime import datetime
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.ext.asyncio import AsyncSession
from app.config import config
from app.connectors.webcrawler_connector import WebCrawlerConnector
from app.db import Document, DocumentType, SearchSourceConnectorType
from app.services.llm_service import get_user_long_context_llm
from app.services.task_logging_service import TaskLoggingService
from app.utils.document_converters import (
create_document_chunks,
generate_content_hash,
generate_document_summary,
generate_unique_identifier_hash,
)
from .base import (
check_document_by_unique_identifier,
get_connector_by_id,
logger,
update_connector_last_indexed,
)
async def index_crawled_urls(
session: AsyncSession,
connector_id: int,
search_space_id: int,
user_id: str,
start_date: str | None = None,
end_date: str | None = None,
update_last_indexed: bool = True,
) -> tuple[int, str | None]:
"""
Index web page URLs.
Args:
session: Database session
connector_id: ID of the webcrawler connector
search_space_id: ID of the search space to store documents in
user_id: User ID
start_date: Start date for filtering (YYYY-MM-DD format) - optional
end_date: End date for filtering (YYYY-MM-DD format) - optional
update_last_indexed: Whether to update the last_indexed_at timestamp (default: True)
Returns:
Tuple containing (number of documents indexed, error message or None)
"""
task_logger = TaskLoggingService(session, search_space_id)
# Log task start
log_entry = await task_logger.log_task_start(
task_name="crawled_url_indexing",
source="connector_indexing_task",
message=f"Starting web page URL indexing for connector {connector_id}",
metadata={
"connector_id": connector_id,
"user_id": str(user_id),
"start_date": start_date,
"end_date": end_date,
},
)
try:
# Get the connector
await task_logger.log_task_progress(
log_entry,
f"Retrieving webcrawler connector {connector_id} from database",
{"stage": "connector_retrieval"},
)
# Get the connector from the database
connector = await get_connector_by_id(
session, connector_id, SearchSourceConnectorType.WEBCRAWLER_CONNECTOR
)
if not connector:
await task_logger.log_task_failure(
log_entry,
f"Connector with ID {connector_id} not found or is not a webcrawler connector",
"Connector not found",
{"error_type": "ConnectorNotFound"},
)
return (
0,
f"Connector with ID {connector_id} not found or is not a webcrawler connector",
)
# Get the Firecrawl API key from the connector config (optional)
api_key = connector.config.get("FIRECRAWL_API_KEY")
# Get URLs from connector config
initial_urls = connector.config.get("INITIAL_URLS", "")
if isinstance(initial_urls, str):
urls = [url.strip() for url in initial_urls.split("\n") if url.strip()]
elif isinstance(initial_urls, list):
urls = [url.strip() for url in initial_urls if url.strip()]
else:
urls = []
logger.info(
f"Starting crawled web page indexing for connector {connector_id} with {len(urls)} URLs"
)
# Initialize webcrawler client
await task_logger.log_task_progress(
log_entry,
f"Initializing webcrawler client for connector {connector_id}",
{
"stage": "client_initialization",
"use_firecrawl": bool(api_key),
},
)
crawler = WebCrawlerConnector(firecrawl_api_key=api_key)
# Validate URLs
if not urls:
await task_logger.log_task_failure(
log_entry,
"No URLs provided for indexing",
"Empty URL list",
{"error_type": "ValidationError"},
)
return 0, "No URLs provided for indexing"
await task_logger.log_task_progress(
log_entry,
f"Starting to crawl {len(urls)} URLs",
{
"stage": "crawling",
"total_urls": len(urls),
},
)
documents_indexed = 0
documents_updated = 0
documents_skipped = 0
failed_urls = []
for idx, url in enumerate(urls, 1):
try:
logger.info(f"Processing URL {idx}/{len(urls)}: {url}")
await task_logger.log_task_progress(
log_entry,
f"Crawling URL {idx}/{len(urls)}: {url}",
{
"stage": "crawling_url",
"url_index": idx,
"url": url,
},
)
# Crawl the URL
crawl_result, error = await crawler.crawl_url(url)
if error or not crawl_result:
logger.warning(f"Failed to crawl URL {url}: {error}")
failed_urls.append((url, error or "Unknown error"))
continue
# Extract content and metadata
content = crawl_result.get("content", "")
metadata = crawl_result.get("metadata", {})
crawler_type = crawl_result.get("crawler_type", "unknown")
if not content.strip():
logger.warning(f"Skipping URL with no content: {url}")
failed_urls.append((url, "No content extracted"))
documents_skipped += 1
continue
# Format content as structured document
structured_document = crawler.format_to_structured_document(
crawl_result
)
# Generate unique identifier hash for this URL
unique_identifier_hash = generate_unique_identifier_hash(
DocumentType.CRAWLED_URL, url, search_space_id
)
# Generate content hash
# TODO: To fix this by not including dynamic content like date, time, etc.
content_hash = generate_content_hash(
structured_document, search_space_id
)
# Check if document with this unique identifier already exists
existing_document = await check_document_by_unique_identifier(
session, unique_identifier_hash
)
# Extract useful metadata
title = metadata.get("title", url)
description = metadata.get("description", "")
language = metadata.get("language", "")
if existing_document:
# Document exists - check if content has changed
if existing_document.content_hash == content_hash:
logger.info(f"Document for URL {url} unchanged. Skipping.")
documents_skipped += 1
continue
else:
# Content has changed - update the existing document
logger.info(
f"Content changed for URL {url}. Updating document."
)
# Generate summary with metadata
user_llm = await get_user_long_context_llm(
session, user_id, search_space_id
)
if user_llm:
document_metadata = {
"url": url,
"title": title,
"description": description,
"language": language,
"document_type": "Crawled URL",
"crawler_type": crawler_type,
}
(
summary_content,
summary_embedding,
) = await generate_document_summary(
structured_document, user_llm, document_metadata
)
else:
# Fallback to simple summary if no LLM configured
summary_content = f"Crawled URL: {title}\n\n"
summary_content += f"URL: {url}\n"
if description:
summary_content += f"Description: {description}\n"
if language:
summary_content += f"Language: {language}\n"
summary_content += f"Crawler: {crawler_type}\n\n"
# Add content preview
content_preview = content[:1000]
if len(content) > 1000:
content_preview += "..."
summary_content += f"Content Preview:\n{content_preview}\n"
summary_embedding = config.embedding_model_instance.embed(
summary_content
)
# Process chunks
chunks = await create_document_chunks(content)
# Update existing document
existing_document.title = title
existing_document.content = summary_content
existing_document.content_hash = content_hash
existing_document.embedding = summary_embedding
existing_document.document_metadata = {
**metadata,
"crawler_type": crawler_type,
"last_crawled_at": datetime.now().strftime(
"%Y-%m-%d %H:%M:%S"
),
}
existing_document.chunks = chunks
documents_updated += 1
logger.info(f"Successfully updated URL {url}")
continue
# Document doesn't exist - create new one
# Generate summary with metadata
user_llm = await get_user_long_context_llm(
session, user_id, search_space_id
)
if user_llm:
document_metadata = {
"url": url,
"title": title,
"description": description,
"language": language,
"document_type": "Crawled URL",
"crawler_type": crawler_type,
}
(
summary_content,
summary_embedding,
) = await generate_document_summary(
structured_document, user_llm, document_metadata
)
else:
# Fallback to simple summary if no LLM configured
summary_content = f"Crawled URL: {title}\n\n"
summary_content += f"URL: {url}\n"
if description:
summary_content += f"Description: {description}\n"
if language:
summary_content += f"Language: {language}\n"
summary_content += f"Crawler: {crawler_type}\n\n"
# Add content preview
content_preview = content[:1000]
if len(content) > 1000:
content_preview += "..."
summary_content += f"Content Preview:\n{content_preview}\n"
summary_embedding = config.embedding_model_instance.embed(
summary_content
)
chunks = await create_document_chunks(content)
document = Document(
search_space_id=search_space_id,
title=title,
document_type=DocumentType.CRAWLED_URL,
document_metadata={
**metadata,
"crawler_type": crawler_type,
"indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
},
content=summary_content,
content_hash=content_hash,
unique_identifier_hash=unique_identifier_hash,
embedding=summary_embedding,
chunks=chunks,
)
session.add(document)
documents_indexed += 1
logger.info(f"Successfully indexed new URL {url}")
# Batch commit every 10 documents
if (documents_indexed + documents_updated) % 10 == 0:
logger.info(
f"Committing batch: {documents_indexed + documents_updated} URLs processed so far"
)
await session.commit()
except Exception as e:
logger.error(
f"Error processing URL {url}: {e!s}",
exc_info=True,
)
failed_urls.append((url, str(e)))
continue
total_processed = documents_indexed + documents_updated
if total_processed > 0:
await update_connector_last_indexed(session, connector, update_last_indexed)
# Final commit for any remaining documents not yet committed in batches
logger.info(
f"Final commit: Total {documents_indexed} new, {documents_updated} updated URLs processed"
)
await session.commit()
# Build result message
result_message = None
if failed_urls:
failed_summary = "; ".join(
[f"{url}: {error}" for url, error in failed_urls[:5]]
)
if len(failed_urls) > 5:
failed_summary += f" (and {len(failed_urls) - 5} more)"
result_message = (
f"Completed with {len(failed_urls)} failures: {failed_summary}"
)
await task_logger.log_task_success(
log_entry,
f"Successfully completed crawled web page indexing for connector {connector_id}",
{
"urls_processed": total_processed,
"documents_indexed": documents_indexed,
"documents_updated": documents_updated,
"documents_skipped": documents_skipped,
"failed_urls_count": len(failed_urls),
},
)
logger.info(
f"Web page indexing completed: {documents_indexed} new, "
f"{documents_updated} updated, {documents_skipped} skipped, "
f"{len(failed_urls)} failed"
)
return total_processed, result_message
except SQLAlchemyError as db_error:
await session.rollback()
await task_logger.log_task_failure(
log_entry,
f"Database error during web page indexing for connector {connector_id}",
str(db_error),
{"error_type": "SQLAlchemyError"},
)
logger.error(f"Database error: {db_error!s}", exc_info=True)
return 0, f"Database error: {db_error!s}"
except Exception as e:
await session.rollback()
await task_logger.log_task_failure(
log_entry,
f"Failed to index web page URLs for connector {connector_id}",
str(e),
{"error_type": type(e).__name__},
)
logger.error(f"Failed to index web page URLs: {e!s}", exc_info=True)
return 0, f"Failed to index web page URLs: {e!s}"
async def get_crawled_url_documents(
session: AsyncSession,
search_space_id: int,
connector_id: int | None = None,
) -> list[Document]:
"""
Get all crawled URL documents for a search space.
Args:
session: Database session
search_space_id: ID of the search space
connector_id: Optional connector ID to filter by
Returns:
List of Document objects
"""
from sqlalchemy import select
query = select(Document).filter(
Document.search_space_id == search_space_id,
Document.document_type == DocumentType.CRAWLED_URL,
)
if connector_id:
# Filter by connector if needed - you might need to add a connector_id field to Document
# or filter by some other means depending on your schema
pass
result = await session.execute(query)
documents = result.scalars().all()
return list(documents)

View file

@ -6,7 +6,6 @@ and sources. Each processor is responsible for handling a specific type of docum
processing task in the background.
Available processors:
- URL crawler: Process web pages from URLs
- Extension processor: Handle documents from browser extension
- Markdown processor: Process markdown files
- File processors: Handle files using different ETL services (Unstructured, LlamaCloud, Docling)
@ -26,14 +25,11 @@ from .file_processors import (
# Markdown processor
from .markdown_processor import add_received_markdown_file_document
from .url_crawler import add_crawled_url_document
# YouTube processor
from .youtube_processor import add_youtube_video_document
__all__ = [
# URL processing
"add_crawled_url_document",
# Extension processing
"add_extension_received_document",
"add_received_file_document_using_docling",

View file

@ -1,342 +0,0 @@
"""
URL crawler document processor.
"""
import logging
import validators
from firecrawl import AsyncFirecrawlApp
from langchain_community.document_loaders import AsyncChromiumLoader
from langchain_core.documents import Document as LangchainDocument
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.ext.asyncio import AsyncSession
from app.config import config
from app.db import Document, DocumentType
from app.services.llm_service import get_user_long_context_llm
from app.services.task_logging_service import TaskLoggingService
from app.utils.document_converters import (
create_document_chunks,
generate_content_hash,
generate_document_summary,
generate_unique_identifier_hash,
)
from .base import (
check_document_by_unique_identifier,
md,
)
async def add_crawled_url_document(
session: AsyncSession, url: str, search_space_id: int, user_id: str
) -> Document | None:
"""
Process and store a document from a crawled URL.
Args:
session: Database session
url: URL to crawl
search_space_id: ID of the search space
user_id: ID of the user
Returns:
Document object if successful, None if failed
"""
task_logger = TaskLoggingService(session, search_space_id)
# Log task start
log_entry = await task_logger.log_task_start(
task_name="crawl_url_document",
source="background_task",
message=f"Starting URL crawling process for: {url}",
metadata={"url": url, "user_id": str(user_id)},
)
try:
# URL validation step
await task_logger.log_task_progress(
log_entry, f"Validating URL: {url}", {"stage": "validation"}
)
if not validators.url(url):
raise ValueError(f"Url {url} is not a valid URL address")
# Set up crawler
await task_logger.log_task_progress(
log_entry,
f"Setting up crawler for URL: {url}",
{
"stage": "crawler_setup",
"firecrawl_available": bool(config.FIRECRAWL_API_KEY),
},
)
use_firecrawl = bool(config.FIRECRAWL_API_KEY)
if use_firecrawl:
# Use Firecrawl SDK directly
firecrawl_app = AsyncFirecrawlApp(api_key=config.FIRECRAWL_API_KEY)
else:
crawl_loader = AsyncChromiumLoader(urls=[url], headless=True)
# Perform crawling
await task_logger.log_task_progress(
log_entry,
f"Crawling URL content: {url}",
{
"stage": "crawling",
"crawler_type": "AsyncFirecrawlApp"
if use_firecrawl
else "AsyncChromiumLoader",
},
)
if use_firecrawl:
# Use async Firecrawl SDK with v1 API - properly awaited
scrape_result = await firecrawl_app.scrape_url(
url=url, formats=["markdown"]
)
# scrape_result is a Pydantic ScrapeResponse object
# Access attributes directly
if scrape_result and scrape_result.success:
# Extract markdown content
markdown_content = scrape_result.markdown or ""
# Extract metadata - this is a DICT
metadata = scrape_result.metadata if scrape_result.metadata else {}
# Convert to LangChain Document format
url_crawled = [
LangchainDocument(
page_content=markdown_content,
metadata={
"source": url,
"title": metadata.get("title", url),
"description": metadata.get("description", ""),
"language": metadata.get("language", ""),
"sourceURL": metadata.get("sourceURL", url),
**metadata, # Include all other metadata fields
},
)
]
content_in_markdown = url_crawled[0].page_content
else:
error_msg = (
scrape_result.error
if scrape_result and hasattr(scrape_result, "error")
else "Unknown error"
)
raise ValueError(f"Firecrawl failed to scrape URL: {error_msg}")
else:
# Use AsyncChromiumLoader as fallback
url_crawled = await crawl_loader.aload()
content_in_markdown = md.transform_documents(url_crawled)[0].page_content
# Format document
await task_logger.log_task_progress(
log_entry,
f"Processing crawled content from: {url}",
{"stage": "content_processing", "content_length": len(content_in_markdown)},
)
# Format document metadata in a more maintainable way
metadata_sections = [
(
"METADATA",
[
f"{key.upper()}: {value}"
for key, value in url_crawled[0].metadata.items()
],
),
(
"CONTENT",
["FORMAT: markdown", "TEXT_START", content_in_markdown, "TEXT_END"],
),
]
# Build the document string more efficiently
document_parts = []
document_parts.append("<DOCUMENT>")
for section_title, section_content in metadata_sections:
document_parts.append(f"<{section_title}>")
document_parts.extend(section_content)
document_parts.append(f"</{section_title}>")
document_parts.append("</DOCUMENT>")
combined_document_string = "\n".join(document_parts)
# Generate unique identifier hash for this URL
unique_identifier_hash = generate_unique_identifier_hash(
DocumentType.CRAWLED_URL, url, search_space_id
)
# Generate content hash
content_hash = generate_content_hash(combined_document_string, search_space_id)
# Check if document with this unique identifier already exists
await task_logger.log_task_progress(
log_entry,
f"Checking for existing URL: {url}",
{"stage": "duplicate_check", "url": url},
)
existing_document = await check_document_by_unique_identifier(
session, unique_identifier_hash
)
if existing_document:
# Document exists - check if content has changed
if existing_document.content_hash == content_hash:
await task_logger.log_task_success(
log_entry,
f"URL document unchanged: {url}",
{
"duplicate_detected": True,
"existing_document_id": existing_document.id,
},
)
logging.info(f"Document for URL {url} unchanged. Skipping.")
return existing_document
else:
# Content has changed - update the existing document
logging.info(f"Content changed for URL {url}. Updating document.")
await task_logger.log_task_progress(
log_entry,
f"Updating URL document: {url}",
{"stage": "document_update", "url": url},
)
# Get LLM for summary generation (needed for both create and update)
await task_logger.log_task_progress(
log_entry,
f"Preparing for summary generation: {url}",
{"stage": "llm_setup"},
)
# Get user's long context LLM
user_llm = await get_user_long_context_llm(session, user_id, search_space_id)
if not user_llm:
raise RuntimeError(
f"No long context LLM configured for user {user_id} in search space {search_space_id}"
)
# Generate summary
await task_logger.log_task_progress(
log_entry,
f"Generating summary for URL content: {url}",
{"stage": "summary_generation"},
)
# Generate summary with metadata
document_metadata = {
"url": url,
"title": url_crawled[0].metadata.get("title", url),
"document_type": "Crawled URL Document",
"crawler_type": "FirecrawlApp" if use_firecrawl else "AsyncChromiumLoader",
}
summary_content, summary_embedding = await generate_document_summary(
combined_document_string, user_llm, document_metadata
)
# Process chunks
await task_logger.log_task_progress(
log_entry,
f"Processing content chunks for URL: {url}",
{"stage": "chunk_processing"},
)
from app.utils.blocknote_converter import convert_markdown_to_blocknote
# Convert markdown to BlockNote JSON
blocknote_json = await convert_markdown_to_blocknote(combined_document_string)
if not blocknote_json:
logging.warning(
f"Failed to convert crawled URL '{url}' to BlockNote JSON, "
"document will not be editable"
)
chunks = await create_document_chunks(content_in_markdown)
# Update or create document
if existing_document:
# Update existing document
await task_logger.log_task_progress(
log_entry,
f"Updating document in database for URL: {url}",
{"stage": "document_update", "chunks_count": len(chunks)},
)
existing_document.title = url_crawled[0].metadata.get(
"title", url_crawled[0].metadata.get("source", url)
)
existing_document.content = summary_content
existing_document.content_hash = content_hash
existing_document.embedding = summary_embedding
existing_document.document_metadata = url_crawled[0].metadata
existing_document.chunks = chunks
existing_document.blocknote_document = blocknote_json
document = existing_document
else:
# Create new document
await task_logger.log_task_progress(
log_entry,
f"Creating document in database for URL: {url}",
{"stage": "document_creation", "chunks_count": len(chunks)},
)
document = Document(
search_space_id=search_space_id,
title=url_crawled[0].metadata.get(
"title", url_crawled[0].metadata.get("source", url)
),
document_type=DocumentType.CRAWLED_URL,
document_metadata=url_crawled[0].metadata,
content=summary_content,
embedding=summary_embedding,
chunks=chunks,
content_hash=content_hash,
unique_identifier_hash=unique_identifier_hash,
blocknote_document=blocknote_json,
)
session.add(document)
await session.commit()
await session.refresh(document)
# Log success
await task_logger.log_task_success(
log_entry,
f"Successfully crawled and processed URL: {url}",
{
"document_id": document.id,
"title": document.title,
"content_hash": content_hash,
"chunks_count": len(chunks),
"summary_length": len(summary_content),
},
)
return document
except SQLAlchemyError as db_error:
await session.rollback()
await task_logger.log_task_failure(
log_entry,
f"Database error while processing URL: {url}",
str(db_error),
{"error_type": "SQLAlchemyError"},
)
raise db_error
except Exception as e:
await session.rollback()
await task_logger.log_task_failure(
log_entry,
f"Failed to crawl URL: {url}",
str(e),
{"error_type": type(e).__name__},
)
raise RuntimeError(f"Failed to crawl URL: {e!s}") from e