diff --git a/surfsense_backend/alembic/versions/37_add_webcrawler_connector_enum.py b/surfsense_backend/alembic/versions/37_add_webcrawler_connector_enum.py index be50dd807..3a76b762c 100644 --- a/surfsense_backend/alembic/versions/37_add_webcrawler_connector_enum.py +++ b/surfsense_backend/alembic/versions/37_add_webcrawler_connector_enum.py @@ -1,7 +1,7 @@ """Add Webcrawler connector enums -Revision ID: 37 -Revises: 36 +Revision ID: 38 +Revises: 37 Create Date: 2025-11-17 17:00:00.000000 """ @@ -10,8 +10,8 @@ from collections.abc import Sequence from alembic import op -revision: str = "37" -down_revision: str | None = "36" +revision: str = "38" +down_revision: str | None = "37" branch_labels: str | Sequence[str] | None = None depends_on: str | Sequence[str] | None = None diff --git a/surfsense_backend/app/tasks/celery_tasks/document_tasks.py b/surfsense_backend/app/tasks/celery_tasks/document_tasks.py index 73af21550..5cf5a662a 100644 --- a/surfsense_backend/app/tasks/celery_tasks/document_tasks.py +++ b/surfsense_backend/app/tasks/celery_tasks/document_tasks.py @@ -9,7 +9,6 @@ from app.celery_app import celery_app from app.config import config from app.services.task_logging_service import TaskLoggingService from app.tasks.document_processors import ( - add_crawled_url_document, add_extension_received_document, add_youtube_video_document, ) @@ -120,71 +119,6 @@ async def _process_extension_document( raise -@celery_app.task(name="process_crawled_url", bind=True) -def process_crawled_url_task(self, url: str, search_space_id: int, user_id: str): - """ - Celery task to process crawled URL. - - Args: - url: URL to crawl and process - search_space_id: ID of the search space - user_id: ID of the user - """ - import asyncio - - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - - try: - loop.run_until_complete(_process_crawled_url(url, search_space_id, user_id)) - finally: - loop.close() - - -async def _process_crawled_url(url: str, search_space_id: int, user_id: str): - """Process crawled URL with new session.""" - async with get_celery_session_maker()() as session: - task_logger = TaskLoggingService(session, search_space_id) - - log_entry = await task_logger.log_task_start( - task_name="process_crawled_url", - source="document_processor", - message=f"Starting URL crawling and processing for: {url}", - metadata={"document_type": "CRAWLED_URL", "url": url, "user_id": user_id}, - ) - - try: - result = await add_crawled_url_document( - session, url, search_space_id, user_id - ) - - if result: - await task_logger.log_task_success( - log_entry, - f"Successfully crawled and processed URL: {url}", - { - "document_id": result.id, - "title": result.title, - "content_hash": result.content_hash, - }, - ) - else: - await task_logger.log_task_success( - log_entry, - f"URL document already exists (duplicate): {url}", - {"duplicate_detected": True}, - ) - except Exception as e: - await task_logger.log_task_failure( - log_entry, - f"Failed to crawl URL: {url}", - str(e), - {"error_type": type(e).__name__}, - ) - logger.error(f"Error processing crawled URL: {e!s}") - raise - - @celery_app.task(name="process_youtube_video", bind=True) def process_youtube_video_task(self, url: str, search_space_id: int, user_id: str): """ diff --git a/surfsense_backend/app/tasks/document_processors/__init__.py b/surfsense_backend/app/tasks/document_processors/__init__.py index a238ac877..e70c41cb4 100644 --- a/surfsense_backend/app/tasks/document_processors/__init__.py +++ b/surfsense_backend/app/tasks/document_processors/__init__.py @@ -6,7 +6,6 @@ and sources. Each processor is responsible for handling a specific type of docum processing task in the background. Available processors: -- URL crawler: Process web pages from URLs - Extension processor: Handle documents from browser extension - Markdown processor: Process markdown files - File processors: Handle files using different ETL services (Unstructured, LlamaCloud, Docling) @@ -26,14 +25,11 @@ from .file_processors import ( # Markdown processor from .markdown_processor import add_received_markdown_file_document -from .url_crawler import add_crawled_url_document # YouTube processor from .youtube_processor import add_youtube_video_document __all__ = [ - # URL processing - "add_crawled_url_document", # Extension processing "add_extension_received_document", "add_received_file_document_using_docling", diff --git a/surfsense_backend/app/tasks/document_processors/url_crawler.py b/surfsense_backend/app/tasks/document_processors/url_crawler.py deleted file mode 100644 index ce2764897..000000000 --- a/surfsense_backend/app/tasks/document_processors/url_crawler.py +++ /dev/null @@ -1,330 +0,0 @@ -""" -URL crawler document processor. -""" - -import logging - -import validators -from firecrawl import AsyncFirecrawlApp -from langchain_community.document_loaders import AsyncChromiumLoader -from langchain_core.documents import Document as LangchainDocument -from sqlalchemy.exc import SQLAlchemyError -from sqlalchemy.ext.asyncio import AsyncSession - -from app.config import config -from app.db import Document, DocumentType -from app.services.llm_service import get_user_long_context_llm -from app.services.task_logging_service import TaskLoggingService -from app.utils.document_converters import ( - create_document_chunks, - generate_content_hash, - generate_document_summary, - generate_unique_identifier_hash, -) - -from .base import ( - check_document_by_unique_identifier, - md, -) - - -async def add_crawled_url_document( - session: AsyncSession, url: str, search_space_id: int, user_id: str -) -> Document | None: - """ - Process and store a document from a crawled URL. - - Args: - session: Database session - url: URL to crawl - search_space_id: ID of the search space - user_id: ID of the user - - Returns: - Document object if successful, None if failed - """ - task_logger = TaskLoggingService(session, search_space_id) - - # Log task start - log_entry = await task_logger.log_task_start( - task_name="crawl_url_document", - source="background_task", - message=f"Starting URL crawling process for: {url}", - metadata={"url": url, "user_id": str(user_id)}, - ) - - try: - # URL validation step - await task_logger.log_task_progress( - log_entry, f"Validating URL: {url}", {"stage": "validation"} - ) - - if not validators.url(url): - raise ValueError(f"Url {url} is not a valid URL address") - - # Set up crawler - await task_logger.log_task_progress( - log_entry, - f"Setting up crawler for URL: {url}", - { - "stage": "crawler_setup", - "firecrawl_available": bool(config.FIRECRAWL_API_KEY), - }, - ) - - use_firecrawl = bool(config.FIRECRAWL_API_KEY) - - if use_firecrawl: - # Use Firecrawl SDK directly - firecrawl_app = AsyncFirecrawlApp(api_key=config.FIRECRAWL_API_KEY) - else: - crawl_loader = AsyncChromiumLoader(urls=[url], headless=True) - - # Perform crawling - await task_logger.log_task_progress( - log_entry, - f"Crawling URL content: {url}", - { - "stage": "crawling", - "crawler_type": "AsyncFirecrawlApp" - if use_firecrawl - else "AsyncChromiumLoader", - }, - ) - - if use_firecrawl: - # Use async Firecrawl SDK with v1 API - properly awaited - scrape_result = await firecrawl_app.scrape_url( - url=url, formats=["markdown"] - ) - - # scrape_result is a Pydantic ScrapeResponse object - # Access attributes directly - if scrape_result and scrape_result.success: - # Extract markdown content - markdown_content = scrape_result.markdown or "" - - # Extract metadata - this is a DICT - metadata = scrape_result.metadata if scrape_result.metadata else {} - - # Convert to LangChain Document format - url_crawled = [ - LangchainDocument( - page_content=markdown_content, - metadata={ - "source": url, - "title": metadata.get("title", url), - "description": metadata.get("description", ""), - "language": metadata.get("language", ""), - "sourceURL": metadata.get("sourceURL", url), - **metadata, # Include all other metadata fields - }, - ) - ] - content_in_markdown = url_crawled[0].page_content - else: - error_msg = ( - scrape_result.error - if scrape_result and hasattr(scrape_result, "error") - else "Unknown error" - ) - raise ValueError(f"Firecrawl failed to scrape URL: {error_msg}") - else: - # Use AsyncChromiumLoader as fallback - url_crawled = await crawl_loader.aload() - content_in_markdown = md.transform_documents(url_crawled)[0].page_content - - # Format document - await task_logger.log_task_progress( - log_entry, - f"Processing crawled content from: {url}", - {"stage": "content_processing", "content_length": len(content_in_markdown)}, - ) - - # Format document metadata in a more maintainable way - metadata_sections = [ - ( - "METADATA", - [ - f"{key.upper()}: {value}" - for key, value in url_crawled[0].metadata.items() - ], - ), - ( - "CONTENT", - ["FORMAT: markdown", "TEXT_START", content_in_markdown, "TEXT_END"], - ), - ] - - # Build the document string more efficiently - document_parts = [] - document_parts.append("") - - for section_title, section_content in metadata_sections: - document_parts.append(f"<{section_title}>") - document_parts.extend(section_content) - document_parts.append(f"") - - document_parts.append("") - combined_document_string = "\n".join(document_parts) - - # Generate unique identifier hash for this URL - unique_identifier_hash = generate_unique_identifier_hash( - DocumentType.CRAWLED_URL, url, search_space_id - ) - - # Generate content hash - content_hash = generate_content_hash(combined_document_string, search_space_id) - - # Check if document with this unique identifier already exists - await task_logger.log_task_progress( - log_entry, - f"Checking for existing URL: {url}", - {"stage": "duplicate_check", "url": url}, - ) - - existing_document = await check_document_by_unique_identifier( - session, unique_identifier_hash - ) - - if existing_document: - # Document exists - check if content has changed - if existing_document.content_hash == content_hash: - await task_logger.log_task_success( - log_entry, - f"URL document unchanged: {url}", - { - "duplicate_detected": True, - "existing_document_id": existing_document.id, - }, - ) - logging.info(f"Document for URL {url} unchanged. Skipping.") - return existing_document - else: - # Content has changed - update the existing document - logging.info(f"Content changed for URL {url}. Updating document.") - await task_logger.log_task_progress( - log_entry, - f"Updating URL document: {url}", - {"stage": "document_update", "url": url}, - ) - - # Get LLM for summary generation (needed for both create and update) - await task_logger.log_task_progress( - log_entry, - f"Preparing for summary generation: {url}", - {"stage": "llm_setup"}, - ) - - # Get user's long context LLM - user_llm = await get_user_long_context_llm(session, user_id, search_space_id) - if not user_llm: - raise RuntimeError( - f"No long context LLM configured for user {user_id} in search space {search_space_id}" - ) - - # Generate summary - await task_logger.log_task_progress( - log_entry, - f"Generating summary for URL content: {url}", - {"stage": "summary_generation"}, - ) - - # Generate summary with metadata - document_metadata = { - "url": url, - "title": url_crawled[0].metadata.get("title", url), - "document_type": "Crawled URL Document", - "crawler_type": "FirecrawlApp" if use_firecrawl else "AsyncChromiumLoader", - } - summary_content, summary_embedding = await generate_document_summary( - combined_document_string, user_llm, document_metadata - ) - - # Process chunks - await task_logger.log_task_progress( - log_entry, - f"Processing content chunks for URL: {url}", - {"stage": "chunk_processing"}, - ) - - chunks = await create_document_chunks(content_in_markdown) - - # Update or create document - if existing_document: - # Update existing document - await task_logger.log_task_progress( - log_entry, - f"Updating document in database for URL: {url}", - {"stage": "document_update", "chunks_count": len(chunks)}, - ) - - existing_document.title = url_crawled[0].metadata.get( - "title", url_crawled[0].metadata.get("source", url) - ) - existing_document.content = summary_content - existing_document.content_hash = content_hash - existing_document.embedding = summary_embedding - existing_document.document_metadata = url_crawled[0].metadata - existing_document.chunks = chunks - - document = existing_document - else: - # Create new document - await task_logger.log_task_progress( - log_entry, - f"Creating document in database for URL: {url}", - {"stage": "document_creation", "chunks_count": len(chunks)}, - ) - - document = Document( - search_space_id=search_space_id, - title=url_crawled[0].metadata.get( - "title", url_crawled[0].metadata.get("source", url) - ), - document_type=DocumentType.CRAWLED_URL, - document_metadata=url_crawled[0].metadata, - content=summary_content, - embedding=summary_embedding, - chunks=chunks, - content_hash=content_hash, - unique_identifier_hash=unique_identifier_hash, - ) - - session.add(document) - await session.commit() - await session.refresh(document) - - # Log success - await task_logger.log_task_success( - log_entry, - f"Successfully crawled and processed URL: {url}", - { - "document_id": document.id, - "title": document.title, - "content_hash": content_hash, - "chunks_count": len(chunks), - "summary_length": len(summary_content), - }, - ) - - return document - - except SQLAlchemyError as db_error: - await session.rollback() - await task_logger.log_task_failure( - log_entry, - f"Database error while processing URL: {url}", - str(db_error), - {"error_type": "SQLAlchemyError"}, - ) - raise db_error - except Exception as e: - await session.rollback() - await task_logger.log_task_failure( - log_entry, - f"Failed to crawl URL: {url}", - str(e), - {"error_type": type(e).__name__}, - ) - raise RuntimeError(f"Failed to crawl URL: {e!s}") from e