From 896e410e2a36dcf3cdc68e6f219571925746eee7 Mon Sep 17 00:00:00 2001 From: samkul-swe Date: Fri, 21 Nov 2025 20:45:59 -0800 Subject: [PATCH 1/8] Webcrawler connector draft --- .vscode/settings.json | 10 +- .../37_add_webcrawler_connector_enum.py | 59 +++ .../app/agents/researcher/nodes.py | 4 +- .../researcher/qna_agent/default_prompts.py | 2 +- .../app/agents/researcher/utils.py | 4 +- .../app/connectors/webcrawler_connector.py | 191 ++++++++ surfsense_backend/app/db.py | 1 + .../routes/search_source_connectors_routes.py | 61 +++ .../app/tasks/celery_tasks/connector_tasks.py | 43 ++ .../celery_tasks/schedule_checker_task.py | 2 + .../app/tasks/connector_indexers/__init__.py | 3 + .../connector_indexers/webcrawler_indexer.py | 439 ++++++++++++++++++ .../app/utils/periodic_scheduler.py | 3 + surfsense_backend/app/utils/validators.py | 28 ++ .../connectors/[connector_id]/edit/page.tsx | 12 + .../connectors/[connector_id]/page.tsx | 2 + .../add/webcrawler-connector/page.tsx | 334 +++++++++++++ .../components/dashboard-breadcrumb.tsx | 1 + .../components/editConnector/types.ts | 1 + .../components/homepage/integrations.tsx | 1 + .../components/sources/connector-data.tsx | 7 + surfsense_web/contracts/enums/connector.ts | 1 + .../contracts/enums/connectorIcons.tsx | 4 +- .../hooks/use-connector-edit-page.ts | 17 + surfsense_web/lib/connectors/utils.ts | 1 + surfsense_web/messages/en.json | 3 +- 26 files changed, 1225 insertions(+), 9 deletions(-) create mode 100644 surfsense_backend/alembic/versions/37_add_webcrawler_connector_enum.py create mode 100644 surfsense_backend/app/connectors/webcrawler_connector.py create mode 100644 surfsense_backend/app/tasks/connector_indexers/webcrawler_indexer.py create mode 100644 surfsense_web/app/dashboard/[search_space_id]/connectors/add/webcrawler-connector/page.tsx diff --git a/.vscode/settings.json b/.vscode/settings.json index f134660b6..81a4ff6a9 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -1,3 +1,11 @@ { - "biome.configurationPath": "./surfsense_web/biome.json" + "biome.configurationPath": "./surfsense_web/biome.json", + "files.exclude": { + "**/.git": true, + "**/.svn": true, + "**/.hg": true, + "**/.DS_Store": true, + "**/Thumbs.db": true, + ".mule": true + } } \ No newline at end of file diff --git a/surfsense_backend/alembic/versions/37_add_webcrawler_connector_enum.py b/surfsense_backend/alembic/versions/37_add_webcrawler_connector_enum.py new file mode 100644 index 000000000..be50dd807 --- /dev/null +++ b/surfsense_backend/alembic/versions/37_add_webcrawler_connector_enum.py @@ -0,0 +1,59 @@ +"""Add Webcrawler connector enums + +Revision ID: 37 +Revises: 36 +Create Date: 2025-11-17 17:00:00.000000 + +""" + +from collections.abc import Sequence + +from alembic import op + +revision: str = "37" +down_revision: str | None = "36" +branch_labels: str | Sequence[str] | None = None +depends_on: str | Sequence[str] | None = None + + +def upgrade() -> None: + """Safely add 'WEBCRAWLER_CONNECTOR' to enum types if missing.""" + + # Add to searchsourceconnectortype enum + op.execute( + """ + DO $$ + BEGIN + IF NOT EXISTS ( + SELECT 1 FROM pg_type t + JOIN pg_enum e ON t.oid = e.enumtypid + WHERE t.typname = 'searchsourceconnectortype' AND e.enumlabel = 'WEBCRAWLER_CONNECTOR' + ) THEN + ALTER TYPE searchsourceconnectortype ADD VALUE 'WEBCRAWLER_CONNECTOR'; + END IF; + END + $$; + """ + ) + + # Add to documenttype enum + op.execute( + """ + DO $$ + BEGIN + IF NOT EXISTS ( + SELECT 1 FROM pg_type t + JOIN pg_enum e ON t.oid = e.enumtypid + WHERE t.typname = 'documenttype' AND e.enumlabel = 'WEBCRAWLER_CONNECTOR' + ) THEN + ALTER TYPE documenttype ADD VALUE 'WEBCRAWLER_CONNECTOR'; + END IF; + END + $$; + """ + ) + + +def downgrade() -> None: + """Remove 'WEBCRAWLER_CONNECTOR' from enum types.""" + pass diff --git a/surfsense_backend/app/agents/researcher/nodes.py b/surfsense_backend/app/agents/researcher/nodes.py index 7b0e18a11..8269f08de 100644 --- a/surfsense_backend/app/agents/researcher/nodes.py +++ b/surfsense_backend/app/agents/researcher/nodes.py @@ -667,7 +667,7 @@ async def fetch_relevant_documents( } ) - elif connector == "CRAWLED_URL": + elif connector == "WEBCRAWLER_CONNECTOR": ( source_object, crawled_urls_chunks, @@ -689,7 +689,7 @@ async def fetch_relevant_documents( writer( { "yield_value": streaming_service.format_terminal_info_delta( - f"🌐 Found {len(crawled_urls_chunks)} Web Pages chunks related to your query" + f"🌐 Found {len(crawled_urls_chunks)} Web Page chunks related to your query" ) } ) diff --git a/surfsense_backend/app/agents/researcher/qna_agent/default_prompts.py b/surfsense_backend/app/agents/researcher/qna_agent/default_prompts.py index 18ad16682..65c609d6f 100644 --- a/surfsense_backend/app/agents/researcher/qna_agent/default_prompts.py +++ b/surfsense_backend/app/agents/researcher/qna_agent/default_prompts.py @@ -17,7 +17,6 @@ You are SurfSense, an advanced AI research assistant that provides detailed, wel {chat_history_section} - EXTENSION: "Web content saved via SurfSense browser extension" (personal browsing history) -- CRAWLED_URL: "Webpages indexed by SurfSense web crawler" (personally selected websites) - FILE: "User-uploaded documents (PDFs, Word, etc.)" (personal files) - SLACK_CONNECTOR: "Slack conversations and shared content" (personal workspace communications) - NOTION_CONNECTOR: "Notion workspace pages and databases" (personal knowledge management) @@ -35,6 +34,7 @@ You are SurfSense, an advanced AI research assistant that provides detailed, wel - TAVILY_API: "Tavily search API results" (personalized search results) - LINKUP_API: "Linkup search API results" (personalized search results) - LUMA_CONNECTOR: "Luma events" +- WEBCRAWLER_CONNECTOR: "Webpages indexed by SurfSense web crawler" (personally selected websites) diff --git a/surfsense_backend/app/agents/researcher/utils.py b/surfsense_backend/app/agents/researcher/utils.py index a2c211f28..41d5a1f55 100644 --- a/surfsense_backend/app/agents/researcher/utils.py +++ b/surfsense_backend/app/agents/researcher/utils.py @@ -19,7 +19,6 @@ def get_connector_emoji(connector_name: str) -> str: connector_emojis = { "YOUTUBE_VIDEO": "📹", "EXTENSION": "🧩", - "CRAWLED_URL": "🌐", "FILE": "📄", "SLACK_CONNECTOR": "💬", "NOTION_CONNECTOR": "📘", @@ -34,6 +33,7 @@ def get_connector_emoji(connector_name: str) -> str: "AIRTABLE_CONNECTOR": "🗃️", "LUMA_CONNECTOR": "✨", "ELASTICSEARCH_CONNECTOR": "⚡", + "WEBCRAWLER_CONNECTOR": "🌐", } return connector_emojis.get(connector_name, "🔎") @@ -43,7 +43,6 @@ def get_connector_friendly_name(connector_name: str) -> str: connector_friendly_names = { "YOUTUBE_VIDEO": "YouTube", "EXTENSION": "Browser Extension", - "CRAWLED_URL": "Web Pages", "FILE": "Files", "SLACK_CONNECTOR": "Slack", "NOTION_CONNECTOR": "Notion", @@ -59,6 +58,7 @@ def get_connector_friendly_name(connector_name: str) -> str: "AIRTABLE_CONNECTOR": "Airtable", "LUMA_CONNECTOR": "Luma", "ELASTICSEARCH_CONNECTOR": "Elasticsearch", + "WEBCRAWLER_CONNECTOR": "Web Pages", } return connector_friendly_names.get(connector_name, connector_name) diff --git a/surfsense_backend/app/connectors/webcrawler_connector.py b/surfsense_backend/app/connectors/webcrawler_connector.py new file mode 100644 index 000000000..9434e046f --- /dev/null +++ b/surfsense_backend/app/connectors/webcrawler_connector.py @@ -0,0 +1,191 @@ +""" +WebCrawler Connector Module + +A module for crawling web pages and extracting content using Firecrawl or AsyncChromiumLoader. +Provides a unified interface for web scraping. +""" + +from typing import Any + +import validators +from firecrawl import AsyncFirecrawlApp +from langchain_community.document_loaders import AsyncChromiumLoader + + +class WebCrawlerConnector: + """Class for crawling web pages and extracting content.""" + + def __init__(self, firecrawl_api_key: str | None = None): + """ + Initialize the WebCrawlerConnector class. + + Args: + firecrawl_api_key: Firecrawl API key (optional, will use AsyncChromiumLoader if not provided) + """ + self.firecrawl_api_key = firecrawl_api_key + self.use_firecrawl = bool(firecrawl_api_key) + + def set_api_key(self, api_key: str) -> None: + """ + Set the Firecrawl API key and enable Firecrawl usage. + + Args: + api_key: Firecrawl API key + """ + self.firecrawl_api_key = api_key + self.use_firecrawl = True + + async def crawl_url( + self, url: str, formats: list[str] | None = None + ) -> tuple[dict[str, Any] | None, str | None]: + """ + Crawl a single URL and extract its content. + + Args: + url: URL to crawl + formats: List of formats to extract (e.g., ["markdown", "html"]) - only for Firecrawl + + Returns: + Tuple containing (crawl result dict, error message or None) + Result dict contains: + - content: Extracted content (markdown or HTML) + - metadata: Page metadata (title, description, etc.) + - source: Original URL + - crawler_type: Type of crawler used + """ + try: + # Validate URL + if not validators.url(url): + return None, f"Invalid URL: {url}" + + if self.use_firecrawl: + result = await self._crawl_with_firecrawl(url, formats) + else: + result = await self._crawl_with_chromium(url) + + return result, None + + except Exception as e: + return None, f"Error crawling URL {url}: {e!s}" + + async def _crawl_with_firecrawl( + self, url: str, formats: list[str] | None = None + ) -> dict[str, Any]: + """ + Crawl URL using Firecrawl. + + Args: + url: URL to crawl + formats: List of formats to extract + + Returns: + Dict containing crawled content and metadata + + Raises: + ValueError: If Firecrawl scraping fails + """ + if not self.firecrawl_api_key: + raise ValueError("Firecrawl API key not set. Call set_api_key() first.") + + firecrawl_app = AsyncFirecrawlApp(api_key=self.firecrawl_api_key) + + # Default to markdown format + if formats is None: + formats = ["markdown"] + + scrape_result = await firecrawl_app.scrape_url(url=url, formats=formats) + + if not scrape_result or not scrape_result.success: + error_msg = ( + scrape_result.error + if scrape_result and hasattr(scrape_result, "error") + else "Unknown error" + ) + raise ValueError(f"Firecrawl failed to scrape URL: {error_msg}") + + # Extract content based on format + content = scrape_result.markdown or scrape_result.html or "" + + # Extract metadata + metadata = scrape_result.metadata if scrape_result.metadata else {} + + return { + "content": content, + "metadata": { + "source": url, + "title": metadata.get("title", url), + "description": metadata.get("description", ""), + "language": metadata.get("language", ""), + "sourceURL": metadata.get("sourceURL", url), + **metadata, + }, + "crawler_type": "firecrawl", + } + + async def _crawl_with_chromium(self, url: str) -> dict[str, Any]: + """ + Crawl URL using AsyncChromiumLoader. + + Args: + url: URL to crawl + + Returns: + Dict containing crawled content and metadata + + Raises: + Exception: If crawling fails + """ + crawl_loader = AsyncChromiumLoader(urls=[url], headless=True) + documents = await crawl_loader.aload() + + if not documents: + raise ValueError(f"Failed to load content from {url}") + + doc = documents[0] + + # Extract basic metadata from the document + metadata = doc.metadata if doc.metadata else {} + + return { + "content": doc.page_content, + "metadata": { + "source": url, + "title": metadata.get("title", url), + **metadata, + }, + "crawler_type": "chromium", + } + + def format_to_structured_document(self, crawl_result: dict[str, Any]) -> str: + """ + Format crawl result as a structured document (similar to url_crawler.py format). + + Args: + crawl_result: Result from crawl_url method + + Returns: + Structured document string + """ + metadata = crawl_result["metadata"] + content = crawl_result["content"] + + document_parts = ["", ""] + + # Add all metadata fields + for key, value in metadata.items(): + document_parts.append(f"{key.upper()}: {value}") + + document_parts.extend( + [ + "", + "", + "FORMAT: markdown", + "TEXT_START", + content, + "TEXT_END", + "", + "", + ] + ) + + return "\n".join(document_parts) diff --git a/surfsense_backend/app/db.py b/surfsense_backend/app/db.py index 4ad31b508..06abb7a39 100644 --- a/surfsense_backend/app/db.py +++ b/surfsense_backend/app/db.py @@ -73,6 +73,7 @@ class SearchSourceConnectorType(str, Enum): AIRTABLE_CONNECTOR = "AIRTABLE_CONNECTOR" LUMA_CONNECTOR = "LUMA_CONNECTOR" ELASTICSEARCH_CONNECTOR = "ELASTICSEARCH_CONNECTOR" + WEBCRAWLER_CONNECTOR = "WEBCRAWLER_CONNECTOR" class ChatType(str, Enum): diff --git a/surfsense_backend/app/routes/search_source_connectors_routes.py b/surfsense_backend/app/routes/search_source_connectors_routes.py index 4e62035ff..74080b065 100644 --- a/surfsense_backend/app/routes/search_source_connectors_routes.py +++ b/surfsense_backend/app/routes/search_source_connectors_routes.py @@ -49,6 +49,7 @@ from app.tasks.connector_indexers import ( index_luma_events, index_notion_pages, index_slack_messages, + index_webcrawler_urls, ) from app.users import current_active_user from app.utils.check_ownership import check_ownership @@ -1523,3 +1524,63 @@ async def run_elasticsearch_indexing( f"Critical error in run_elasticsearch_indexing for connector {connector_id}: {e}", exc_info=True, ) + +# Add new helper functions for webcrawler indexing +async def run_webcrawler_indexing_with_new_session( + connector_id: int, + search_space_id: int, + user_id: str, + start_date: str, + end_date: str, +): + """ + Create a new session and run the Webcrawler indexing task. + This prevents session leaks by creating a dedicated session for the background task. + """ + async with async_session_maker() as session: + await run_webcrawler_indexing( + session, connector_id, search_space_id, user_id, start_date, end_date + ) + + +async def run_webcrawler_indexing( + session: AsyncSession, + connector_id: int, + search_space_id: int, + user_id: str, + start_date: str, + end_date: str, +): + """ + Background task to run Webcrawler indexing. + Args: + session: Database session + connector_id: ID of the webcrawler connector + search_space_id: ID of the search space + user_id: ID of the user + start_date: Start date for indexing + end_date: End date for indexing + """ + try: + documents_processed, error_or_warning = await index_webcrawler_urls( + session=session, + connector_id=connector_id, + search_space_id=search_space_id, + user_id=user_id, + start_date=start_date, + end_date=end_date, + update_last_indexed=False, # Don't update timestamp in the indexing function + ) + + # Only update last_indexed_at if indexing was successful (either new docs or updated docs) + if documents_processed > 0: + await update_connector_last_indexed(session, connector_id) + logger.info( + f"Webcrawler indexing completed successfully: {documents_processed} documents processed" + ) + else: + logger.error( + f"Webcrawler indexing failed or no documents processed: {error_or_warning}" + ) + except Exception as e: + logger.error(f"Error in background Webcrawler indexing task: {e!s}") \ No newline at end of file diff --git a/surfsense_backend/app/tasks/celery_tasks/connector_tasks.py b/surfsense_backend/app/tasks/celery_tasks/connector_tasks.py index 5e6907499..29a82d77a 100644 --- a/surfsense_backend/app/tasks/celery_tasks/connector_tasks.py +++ b/surfsense_backend/app/tasks/celery_tasks/connector_tasks.py @@ -600,3 +600,46 @@ async def _index_elasticsearch_documents( await run_elasticsearch_indexing( session, connector_id, search_space_id, user_id, start_date, end_date ) + + +@celery_app.task(name="index_webcrawler_urls", bind=True) +def index_webcrawler_urls_task( + self, + connector_id: int, + search_space_id: int, + user_id: str, + start_date: str, + end_date: str, +): + """Celery task to index Webcrawler Urls.""" + import asyncio + + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + try: + loop.run_until_complete( + _index_webcrawler_urls( + connector_id, search_space_id, user_id, start_date, end_date + ) + ) + finally: + loop.close() + + +async def _index_webcrawler_urls( + connector_id: int, + search_space_id: int, + user_id: str, + start_date: str, + end_date: str, +): + """Index Webcrawler Urls with new session.""" + from app.routes.search_source_connectors_routes import ( + run_webcrawler_indexing, + ) + + async with get_celery_session_maker()() as session: + await run_webcrawler_indexing( + session, connector_id, search_space_id, user_id, start_date, end_date + ) diff --git a/surfsense_backend/app/tasks/celery_tasks/schedule_checker_task.py b/surfsense_backend/app/tasks/celery_tasks/schedule_checker_task.py index 39d6bf840..a0704fcfa 100644 --- a/surfsense_backend/app/tasks/celery_tasks/schedule_checker_task.py +++ b/surfsense_backend/app/tasks/celery_tasks/schedule_checker_task.py @@ -77,6 +77,7 @@ async def _check_and_trigger_schedules(): index_luma_events_task, index_notion_pages_task, index_slack_messages_task, + index_webcrawler_urls_task ) # Map connector types to their tasks @@ -94,6 +95,7 @@ async def _check_and_trigger_schedules(): SearchSourceConnectorType.DISCORD_CONNECTOR: index_discord_messages_task, SearchSourceConnectorType.LUMA_CONNECTOR: index_luma_events_task, SearchSourceConnectorType.ELASTICSEARCH_CONNECTOR: index_elasticsearch_documents_task, + SearchSourceConnectorType.WEBCRAWLER_CONNECTOR: index_webcrawler_urls_task, } # Trigger indexing for each due connector diff --git a/surfsense_backend/app/tasks/connector_indexers/__init__.py b/surfsense_backend/app/tasks/connector_indexers/__init__.py index 766506f70..2ec7ce497 100644 --- a/surfsense_backend/app/tasks/connector_indexers/__init__.py +++ b/surfsense_backend/app/tasks/connector_indexers/__init__.py @@ -17,6 +17,7 @@ Available indexers: - Google Gmail: Index messages from Google Gmail - Google Calendar: Index events from Google Calendar - Luma: Index events from Luma +- Webcrawler: Index crawled URLs - Elasticsearch: Index documents from Elasticsearch instances """ @@ -41,6 +42,7 @@ from .luma_indexer import index_luma_events # Documentation and knowledge management from .notion_indexer import index_notion_pages from .slack_indexer import index_slack_messages +from .webcrawler_indexer import index_webcrawler_urls __all__ = [ # noqa: RUF022 "index_airtable_records", @@ -58,6 +60,7 @@ __all__ = [ # noqa: RUF022 "index_linear_issues", # Documentation and knowledge management "index_notion_pages", + "index_webcrawler_urls", # Communication platforms "index_slack_messages", "index_google_gmail_messages", diff --git a/surfsense_backend/app/tasks/connector_indexers/webcrawler_indexer.py b/surfsense_backend/app/tasks/connector_indexers/webcrawler_indexer.py new file mode 100644 index 000000000..d68c989b4 --- /dev/null +++ b/surfsense_backend/app/tasks/connector_indexers/webcrawler_indexer.py @@ -0,0 +1,439 @@ +""" +Webcrawler connector indexer. +""" + +from datetime import datetime + +from sqlalchemy.exc import SQLAlchemyError +from sqlalchemy.ext.asyncio import AsyncSession + +from app.config import config +from app.connectors.webcrawler_connector import WebCrawlerConnector +from app.db import Document, DocumentType, SearchSourceConnectorType +from app.services.llm_service import get_user_long_context_llm +from app.services.task_logging_service import TaskLoggingService +from app.utils.document_converters import ( + create_document_chunks, + generate_content_hash, + generate_document_summary, + generate_unique_identifier_hash, +) + +from .base import ( + check_document_by_unique_identifier, + get_connector_by_id, + logger, + update_connector_last_indexed, +) + + +async def index_webcrawler_urls( + session: AsyncSession, + connector_id: int, + search_space_id: int, + user_id: str, + start_date: str | None = None, + end_date: str | None = None, + update_last_indexed: bool = True, +) -> tuple[int, str | None]: + """ + Index webcrawler URLs. + + Args: + session: Database session + connector_id: ID of the webcrawler connector + search_space_id: ID of the search space to store documents in + user_id: User ID + start_date: Start date for filtering (YYYY-MM-DD format) - optional + end_date: End date for filtering (YYYY-MM-DD format) - optional + update_last_indexed: Whether to update the last_indexed_at timestamp (default: True) + + Returns: + Tuple containing (number of documents indexed, error message or None) + """ + task_logger = TaskLoggingService(session, search_space_id) + + # Log task start + log_entry = await task_logger.log_task_start( + task_name="webcrawler_url_indexing", + source="connector_indexing_task", + message=f"Starting webcrawler URL indexing for connector {connector_id}", + metadata={ + "connector_id": connector_id, + "user_id": str(user_id), + "start_date": start_date, + "end_date": end_date, + }, + ) + + try: + # Get the connector + await task_logger.log_task_progress( + log_entry, + f"Retrieving webcrawler connector {connector_id} from database", + {"stage": "connector_retrieval"}, + ) + + # Get the connector from the database + connector = await get_connector_by_id( + session, connector_id, SearchSourceConnectorType.WEBCRAWLER_CONNECTOR + ) + + if not connector: + await task_logger.log_task_failure( + log_entry, + f"Connector with ID {connector_id} not found or is not a webcrawler connector", + "Connector not found", + {"error_type": "ConnectorNotFound"}, + ) + return ( + 0, + f"Connector with ID {connector_id} not found or is not a webcrawler connector", + ) + + # Get the Firecrawl API key from the connector config (optional) + api_key = connector.config.get("FIRECRAWL_API_KEY") + + # Get URLs from connector config + initial_urls = connector.config.get("INITIAL_URLS", "") + if isinstance(initial_urls, str): + urls = [url.strip() for url in initial_urls.split("\n") if url.strip()] + elif isinstance(initial_urls, list): + urls = [url.strip() for url in initial_urls if url.strip()] + else: + urls = [] + + logger.info( + f"Starting webcrawler indexing for connector {connector_id} with {len(urls)} URLs" + ) + + # Initialize webcrawler client + await task_logger.log_task_progress( + log_entry, + f"Initializing webcrawler client for connector {connector_id}", + { + "stage": "client_initialization", + "use_firecrawl": bool(api_key), + }, + ) + + crawler = WebCrawlerConnector(firecrawl_api_key=api_key) + + # Validate URLs + if not urls: + await task_logger.log_task_failure( + log_entry, + "No URLs provided for indexing", + "Empty URL list", + {"error_type": "ValidationError"}, + ) + return 0, "No URLs provided for indexing" + + await task_logger.log_task_progress( + log_entry, + f"Starting to crawl {len(urls)} URLs", + { + "stage": "crawling", + "total_urls": len(urls), + }, + ) + + documents_indexed = 0 + documents_updated = 0 + documents_skipped = 0 + failed_urls = [] + + for idx, url in enumerate(urls, 1): + try: + logger.info(f"Processing URL {idx}/{len(urls)}: {url}") + + await task_logger.log_task_progress( + log_entry, + f"Crawling URL {idx}/{len(urls)}: {url}", + { + "stage": "crawling_url", + "url_index": idx, + "url": url, + }, + ) + + # Crawl the URL + crawl_result, error = await crawler.crawl_url(url) + + if error or not crawl_result: + logger.warning(f"Failed to crawl URL {url}: {error}") + failed_urls.append((url, error or "Unknown error")) + continue + + # Extract content and metadata + content = crawl_result.get("content", "") + metadata = crawl_result.get("metadata", {}) + crawler_type = crawl_result.get("crawler_type", "unknown") + + if not content.strip(): + logger.warning(f"Skipping URL with no content: {url}") + failed_urls.append((url, "No content extracted")) + documents_skipped += 1 + continue + + # Format content as structured document + structured_document = crawler.format_to_structured_document(crawl_result) + + # Generate unique identifier hash for this URL + unique_identifier_hash = generate_unique_identifier_hash( + DocumentType.CRAWLED_URL, url, search_space_id + ) + + # Generate content hash + content_hash = generate_content_hash(structured_document, search_space_id) + + # Check if document with this unique identifier already exists + existing_document = await check_document_by_unique_identifier( + session, unique_identifier_hash + ) + + # Extract useful metadata + title = metadata.get("title", url) + description = metadata.get("description", "") + language = metadata.get("language", "") + + if existing_document: + # Document exists - check if content has changed + if existing_document.content_hash == content_hash: + logger.info(f"Document for URL {url} unchanged. Skipping.") + documents_skipped += 1 + continue + else: + # Content has changed - update the existing document + logger.info(f"Content changed for URL {url}. Updating document.") + + # Generate summary with metadata + user_llm = await get_user_long_context_llm( + session, user_id, search_space_id + ) + + if user_llm: + document_metadata = { + "url": url, + "title": title, + "description": description, + "language": language, + "document_type": "Crawled URL", + "crawler_type": crawler_type, + } + ( + summary_content, + summary_embedding, + ) = await generate_document_summary( + structured_document, user_llm, document_metadata + ) + else: + # Fallback to simple summary if no LLM configured + summary_content = f"Crawled URL: {title}\n\n" + summary_content += f"URL: {url}\n" + if description: + summary_content += f"Description: {description}\n" + if language: + summary_content += f"Language: {language}\n" + summary_content += f"Crawler: {crawler_type}\n\n" + + # Add content preview + content_preview = content[:1000] + if len(content) > 1000: + content_preview += "..." + summary_content += f"Content Preview:\n{content_preview}\n" + + summary_embedding = config.embedding_model_instance.embed( + summary_content + ) + + # Process chunks + chunks = await create_document_chunks(content) + + # Update existing document + existing_document.title = title + existing_document.content = summary_content + existing_document.content_hash = content_hash + existing_document.embedding = summary_embedding + existing_document.document_metadata = { + **metadata, + "crawler_type": crawler_type, + "last_crawled_at": datetime.now().strftime( + "%Y-%m-%d %H:%M:%S" + ), + } + existing_document.chunks = chunks + + documents_updated += 1 + logger.info(f"Successfully updated URL {url}") + continue + + # Document doesn't exist - create new one + # Generate summary with metadata + user_llm = await get_user_long_context_llm( + session, user_id, search_space_id + ) + + if user_llm: + document_metadata = { + "url": url, + "title": title, + "description": description, + "language": language, + "document_type": "Crawled URL", + "crawler_type": crawler_type, + } + ( + summary_content, + summary_embedding, + ) = await generate_document_summary( + structured_document, user_llm, document_metadata + ) + else: + # Fallback to simple summary if no LLM configured + summary_content = f"Crawled URL: {title}\n\n" + summary_content += f"URL: {url}\n" + if description: + summary_content += f"Description: {description}\n" + if language: + summary_content += f"Language: {language}\n" + summary_content += f"Crawler: {crawler_type}\n\n" + + # Add content preview + content_preview = content[:1000] + if len(content) > 1000: + content_preview += "..." + summary_content += f"Content Preview:\n{content_preview}\n" + + summary_embedding = config.embedding_model_instance.embed( + summary_content + ) + + chunks = await create_document_chunks(content) + + document = Document( + search_space_id=search_space_id, + title=title, + document_type=DocumentType.CRAWLED_URL, + document_metadata={ + **metadata, + "crawler_type": crawler_type, + "indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), + }, + content=summary_content, + content_hash=content_hash, + unique_identifier_hash=unique_identifier_hash, + embedding=summary_embedding, + chunks=chunks, + ) + + session.add(document) + documents_indexed += 1 + logger.info(f"Successfully indexed new URL {url}") + + # Batch commit every 10 documents + if (documents_indexed + documents_updated) % 10 == 0: + logger.info( + f"Committing batch: {documents_indexed + documents_updated} URLs processed so far" + ) + await session.commit() + + except Exception as e: + logger.error( + f"Error processing URL {url}: {e!s}", + exc_info=True, + ) + failed_urls.append((url, str(e))) + continue + + total_processed = documents_indexed + documents_updated + + if total_processed > 0: + await update_connector_last_indexed(session, connector, update_last_indexed) + + # Final commit for any remaining documents not yet committed in batches + logger.info( + f"Final commit: Total {documents_indexed} new, {documents_updated} updated URLs processed" + ) + await session.commit() + + # Build result message + result_message = None + if failed_urls: + failed_summary = "; ".join([f"{url}: {error}" for url, error in failed_urls[:5]]) + if len(failed_urls) > 5: + failed_summary += f" (and {len(failed_urls) - 5} more)" + result_message = f"Completed with {len(failed_urls)} failures: {failed_summary}" + + await task_logger.log_task_success( + log_entry, + f"Successfully completed webcrawler indexing for connector {connector_id}", + { + "urls_processed": total_processed, + "documents_indexed": documents_indexed, + "documents_updated": documents_updated, + "documents_skipped": documents_skipped, + "failed_urls_count": len(failed_urls), + }, + ) + + logger.info( + f"Webcrawler indexing completed: {documents_indexed} new, " + f"{documents_updated} updated, {documents_skipped} skipped, " + f"{len(failed_urls)} failed" + ) + return total_processed, result_message + + except SQLAlchemyError as db_error: + await session.rollback() + await task_logger.log_task_failure( + log_entry, + f"Database error during webcrawler indexing for connector {connector_id}", + str(db_error), + {"error_type": "SQLAlchemyError"}, + ) + logger.error(f"Database error: {db_error!s}", exc_info=True) + return 0, f"Database error: {db_error!s}" + except Exception as e: + await session.rollback() + await task_logger.log_task_failure( + log_entry, + f"Failed to index webcrawler URLs for connector {connector_id}", + str(e), + {"error_type": type(e).__name__}, + ) + logger.error(f"Failed to index webcrawler URLs: {e!s}", exc_info=True) + return 0, f"Failed to index webcrawler URLs: {e!s}" + + +async def get_crawled_url_documents( + session: AsyncSession, + search_space_id: int, + connector_id: int | None = None, +) -> list[Document]: + """ + Get all crawled URL documents for a search space. + + Args: + session: Database session + search_space_id: ID of the search space + connector_id: Optional connector ID to filter by + + Returns: + List of Document objects + """ + from sqlalchemy import select + + query = select(Document).filter( + Document.search_space_id == search_space_id, + Document.document_type == DocumentType.CRAWLED_URL, + ) + + if connector_id: + # Filter by connector if needed - you might need to add a connector_id field to Document + # or filter by some other means depending on your schema + pass + + result = await session.execute(query) + documents = result.scalars().all() + return list(documents) \ No newline at end of file diff --git a/surfsense_backend/app/utils/periodic_scheduler.py b/surfsense_backend/app/utils/periodic_scheduler.py index 225425714..e303a3be5 100644 --- a/surfsense_backend/app/utils/periodic_scheduler.py +++ b/surfsense_backend/app/utils/periodic_scheduler.py @@ -31,6 +31,7 @@ CONNECTOR_TASK_MAP = { SearchSourceConnectorType.DISCORD_CONNECTOR: "index_discord_messages", SearchSourceConnectorType.LUMA_CONNECTOR: "index_luma_events", SearchSourceConnectorType.ELASTICSEARCH_CONNECTOR: "index_elasticsearch_documents", + SearchSourceConnectorType.WEBCRAWLER_CONNECTOR: "index_webcrawler_urls", } @@ -79,6 +80,7 @@ def create_periodic_schedule( index_luma_events_task, index_notion_pages_task, index_slack_messages_task, + index_webcrawler_urls_task, ) # Map connector type to task @@ -96,6 +98,7 @@ def create_periodic_schedule( SearchSourceConnectorType.DISCORD_CONNECTOR: index_discord_messages_task, SearchSourceConnectorType.LUMA_CONNECTOR: index_luma_events_task, SearchSourceConnectorType.ELASTICSEARCH_CONNECTOR: index_elasticsearch_documents_task, + SearchSourceConnectorType.WEBCRAWLER_CONNECTOR: index_webcrawler_urls_task, } # Trigger the first run immediately diff --git a/surfsense_backend/app/utils/validators.py b/surfsense_backend/app/utils/validators.py index a8460cd14..d9dab864c 100644 --- a/surfsense_backend/app/utils/validators.py +++ b/surfsense_backend/app/utils/validators.py @@ -468,6 +468,26 @@ def validate_connector_config( value = config.get(key) if not isinstance(value, list) or not value: raise ValueError(f"{field_name} must be a non-empty list of strings") + + def validate_firecrawl_api_key_format() -> None: + api_key = config.get("FIRECRAWL_API_KEY", "") + if api_key and api_key.strip(): + # Firecrawl API keys typically start with "fc-" + if not api_key.strip().startswith("fc-"): + raise ValueError( + "Firecrawl API key should start with 'fc-'. Please verify your API key." + ) + + + def validate_initial_urls() -> None: + initial_urls = config.get("INITIAL_URLS", "") + if initial_urls and initial_urls.strip(): + urls = [url.strip() for url in initial_urls.split("\n") if url.strip()] + for url in urls: + if not validators.url(url): + raise ValueError( + f"Invalid URL format in INITIAL_URLS: {url}" + ) # Lookup table for connector validation rules connector_rules = { @@ -550,6 +570,14 @@ def validate_connector_config( # "validators": {} # }, "LUMA_CONNECTOR": {"required": ["LUMA_API_KEY"], "validators": {}}, + "WEBCRAWLER_CONNECTOR": { + "required": [], # No required fields - API key is optional + "optional": ["FIRECRAWL_API_KEY", "INITIAL_URLS"], + "validators": { + "FIRECRAWL_API_KEY": lambda: validate_firecrawl_api_key_format(), + "INITIAL_URLS": lambda: validate_initial_urls(), + }, + }, } rules = connector_rules.get(connector_type_str) diff --git a/surfsense_web/app/dashboard/[search_space_id]/connectors/[connector_id]/edit/page.tsx b/surfsense_web/app/dashboard/[search_space_id]/connectors/[connector_id]/edit/page.tsx index f09069521..b558a5858 100644 --- a/surfsense_web/app/dashboard/[search_space_id]/connectors/[connector_id]/edit/page.tsx +++ b/surfsense_web/app/dashboard/[search_space_id]/connectors/[connector_id]/edit/page.tsx @@ -282,6 +282,18 @@ export default function EditConnectorPage() { placeholder="Your Elasticsearch API Key" /> )} + + {/* == Webcrawler == */} + {connector.connector_type === "WEBCRAWLER_CONNECTOR" && ( + + )} +