mirror of
https://github.com/MODSetter/SurfSense.git
synced 2026-04-30 03:16:25 +02:00
Webcrawler connector draft
This commit is contained in:
parent
419f94e8ee
commit
896e410e2a
26 changed files with 1225 additions and 9 deletions
|
|
@ -600,3 +600,46 @@ async def _index_elasticsearch_documents(
|
|||
await run_elasticsearch_indexing(
|
||||
session, connector_id, search_space_id, user_id, start_date, end_date
|
||||
)
|
||||
|
||||
|
||||
@celery_app.task(name="index_webcrawler_urls", bind=True)
|
||||
def index_webcrawler_urls_task(
|
||||
self,
|
||||
connector_id: int,
|
||||
search_space_id: int,
|
||||
user_id: str,
|
||||
start_date: str,
|
||||
end_date: str,
|
||||
):
|
||||
"""Celery task to index Webcrawler Urls."""
|
||||
import asyncio
|
||||
|
||||
loop = asyncio.new_event_loop()
|
||||
asyncio.set_event_loop(loop)
|
||||
|
||||
try:
|
||||
loop.run_until_complete(
|
||||
_index_webcrawler_urls(
|
||||
connector_id, search_space_id, user_id, start_date, end_date
|
||||
)
|
||||
)
|
||||
finally:
|
||||
loop.close()
|
||||
|
||||
|
||||
async def _index_webcrawler_urls(
|
||||
connector_id: int,
|
||||
search_space_id: int,
|
||||
user_id: str,
|
||||
start_date: str,
|
||||
end_date: str,
|
||||
):
|
||||
"""Index Webcrawler Urls with new session."""
|
||||
from app.routes.search_source_connectors_routes import (
|
||||
run_webcrawler_indexing,
|
||||
)
|
||||
|
||||
async with get_celery_session_maker()() as session:
|
||||
await run_webcrawler_indexing(
|
||||
session, connector_id, search_space_id, user_id, start_date, end_date
|
||||
)
|
||||
|
|
|
|||
|
|
@ -77,6 +77,7 @@ async def _check_and_trigger_schedules():
|
|||
index_luma_events_task,
|
||||
index_notion_pages_task,
|
||||
index_slack_messages_task,
|
||||
index_webcrawler_urls_task
|
||||
)
|
||||
|
||||
# Map connector types to their tasks
|
||||
|
|
@ -94,6 +95,7 @@ async def _check_and_trigger_schedules():
|
|||
SearchSourceConnectorType.DISCORD_CONNECTOR: index_discord_messages_task,
|
||||
SearchSourceConnectorType.LUMA_CONNECTOR: index_luma_events_task,
|
||||
SearchSourceConnectorType.ELASTICSEARCH_CONNECTOR: index_elasticsearch_documents_task,
|
||||
SearchSourceConnectorType.WEBCRAWLER_CONNECTOR: index_webcrawler_urls_task,
|
||||
}
|
||||
|
||||
# Trigger indexing for each due connector
|
||||
|
|
|
|||
|
|
@ -17,6 +17,7 @@ Available indexers:
|
|||
- Google Gmail: Index messages from Google Gmail
|
||||
- Google Calendar: Index events from Google Calendar
|
||||
- Luma: Index events from Luma
|
||||
- Webcrawler: Index crawled URLs
|
||||
- Elasticsearch: Index documents from Elasticsearch instances
|
||||
"""
|
||||
|
||||
|
|
@ -41,6 +42,7 @@ from .luma_indexer import index_luma_events
|
|||
# Documentation and knowledge management
|
||||
from .notion_indexer import index_notion_pages
|
||||
from .slack_indexer import index_slack_messages
|
||||
from .webcrawler_indexer import index_webcrawler_urls
|
||||
|
||||
__all__ = [ # noqa: RUF022
|
||||
"index_airtable_records",
|
||||
|
|
@ -58,6 +60,7 @@ __all__ = [ # noqa: RUF022
|
|||
"index_linear_issues",
|
||||
# Documentation and knowledge management
|
||||
"index_notion_pages",
|
||||
"index_webcrawler_urls",
|
||||
# Communication platforms
|
||||
"index_slack_messages",
|
||||
"index_google_gmail_messages",
|
||||
|
|
|
|||
|
|
@ -0,0 +1,439 @@
|
|||
"""
|
||||
Webcrawler connector indexer.
|
||||
"""
|
||||
|
||||
from datetime import datetime
|
||||
|
||||
from sqlalchemy.exc import SQLAlchemyError
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from app.config import config
|
||||
from app.connectors.webcrawler_connector import WebCrawlerConnector
|
||||
from app.db import Document, DocumentType, SearchSourceConnectorType
|
||||
from app.services.llm_service import get_user_long_context_llm
|
||||
from app.services.task_logging_service import TaskLoggingService
|
||||
from app.utils.document_converters import (
|
||||
create_document_chunks,
|
||||
generate_content_hash,
|
||||
generate_document_summary,
|
||||
generate_unique_identifier_hash,
|
||||
)
|
||||
|
||||
from .base import (
|
||||
check_document_by_unique_identifier,
|
||||
get_connector_by_id,
|
||||
logger,
|
||||
update_connector_last_indexed,
|
||||
)
|
||||
|
||||
|
||||
async def index_webcrawler_urls(
|
||||
session: AsyncSession,
|
||||
connector_id: int,
|
||||
search_space_id: int,
|
||||
user_id: str,
|
||||
start_date: str | None = None,
|
||||
end_date: str | None = None,
|
||||
update_last_indexed: bool = True,
|
||||
) -> tuple[int, str | None]:
|
||||
"""
|
||||
Index webcrawler URLs.
|
||||
|
||||
Args:
|
||||
session: Database session
|
||||
connector_id: ID of the webcrawler connector
|
||||
search_space_id: ID of the search space to store documents in
|
||||
user_id: User ID
|
||||
start_date: Start date for filtering (YYYY-MM-DD format) - optional
|
||||
end_date: End date for filtering (YYYY-MM-DD format) - optional
|
||||
update_last_indexed: Whether to update the last_indexed_at timestamp (default: True)
|
||||
|
||||
Returns:
|
||||
Tuple containing (number of documents indexed, error message or None)
|
||||
"""
|
||||
task_logger = TaskLoggingService(session, search_space_id)
|
||||
|
||||
# Log task start
|
||||
log_entry = await task_logger.log_task_start(
|
||||
task_name="webcrawler_url_indexing",
|
||||
source="connector_indexing_task",
|
||||
message=f"Starting webcrawler URL indexing for connector {connector_id}",
|
||||
metadata={
|
||||
"connector_id": connector_id,
|
||||
"user_id": str(user_id),
|
||||
"start_date": start_date,
|
||||
"end_date": end_date,
|
||||
},
|
||||
)
|
||||
|
||||
try:
|
||||
# Get the connector
|
||||
await task_logger.log_task_progress(
|
||||
log_entry,
|
||||
f"Retrieving webcrawler connector {connector_id} from database",
|
||||
{"stage": "connector_retrieval"},
|
||||
)
|
||||
|
||||
# Get the connector from the database
|
||||
connector = await get_connector_by_id(
|
||||
session, connector_id, SearchSourceConnectorType.WEBCRAWLER_CONNECTOR
|
||||
)
|
||||
|
||||
if not connector:
|
||||
await task_logger.log_task_failure(
|
||||
log_entry,
|
||||
f"Connector with ID {connector_id} not found or is not a webcrawler connector",
|
||||
"Connector not found",
|
||||
{"error_type": "ConnectorNotFound"},
|
||||
)
|
||||
return (
|
||||
0,
|
||||
f"Connector with ID {connector_id} not found or is not a webcrawler connector",
|
||||
)
|
||||
|
||||
# Get the Firecrawl API key from the connector config (optional)
|
||||
api_key = connector.config.get("FIRECRAWL_API_KEY")
|
||||
|
||||
# Get URLs from connector config
|
||||
initial_urls = connector.config.get("INITIAL_URLS", "")
|
||||
if isinstance(initial_urls, str):
|
||||
urls = [url.strip() for url in initial_urls.split("\n") if url.strip()]
|
||||
elif isinstance(initial_urls, list):
|
||||
urls = [url.strip() for url in initial_urls if url.strip()]
|
||||
else:
|
||||
urls = []
|
||||
|
||||
logger.info(
|
||||
f"Starting webcrawler indexing for connector {connector_id} with {len(urls)} URLs"
|
||||
)
|
||||
|
||||
# Initialize webcrawler client
|
||||
await task_logger.log_task_progress(
|
||||
log_entry,
|
||||
f"Initializing webcrawler client for connector {connector_id}",
|
||||
{
|
||||
"stage": "client_initialization",
|
||||
"use_firecrawl": bool(api_key),
|
||||
},
|
||||
)
|
||||
|
||||
crawler = WebCrawlerConnector(firecrawl_api_key=api_key)
|
||||
|
||||
# Validate URLs
|
||||
if not urls:
|
||||
await task_logger.log_task_failure(
|
||||
log_entry,
|
||||
"No URLs provided for indexing",
|
||||
"Empty URL list",
|
||||
{"error_type": "ValidationError"},
|
||||
)
|
||||
return 0, "No URLs provided for indexing"
|
||||
|
||||
await task_logger.log_task_progress(
|
||||
log_entry,
|
||||
f"Starting to crawl {len(urls)} URLs",
|
||||
{
|
||||
"stage": "crawling",
|
||||
"total_urls": len(urls),
|
||||
},
|
||||
)
|
||||
|
||||
documents_indexed = 0
|
||||
documents_updated = 0
|
||||
documents_skipped = 0
|
||||
failed_urls = []
|
||||
|
||||
for idx, url in enumerate(urls, 1):
|
||||
try:
|
||||
logger.info(f"Processing URL {idx}/{len(urls)}: {url}")
|
||||
|
||||
await task_logger.log_task_progress(
|
||||
log_entry,
|
||||
f"Crawling URL {idx}/{len(urls)}: {url}",
|
||||
{
|
||||
"stage": "crawling_url",
|
||||
"url_index": idx,
|
||||
"url": url,
|
||||
},
|
||||
)
|
||||
|
||||
# Crawl the URL
|
||||
crawl_result, error = await crawler.crawl_url(url)
|
||||
|
||||
if error or not crawl_result:
|
||||
logger.warning(f"Failed to crawl URL {url}: {error}")
|
||||
failed_urls.append((url, error or "Unknown error"))
|
||||
continue
|
||||
|
||||
# Extract content and metadata
|
||||
content = crawl_result.get("content", "")
|
||||
metadata = crawl_result.get("metadata", {})
|
||||
crawler_type = crawl_result.get("crawler_type", "unknown")
|
||||
|
||||
if not content.strip():
|
||||
logger.warning(f"Skipping URL with no content: {url}")
|
||||
failed_urls.append((url, "No content extracted"))
|
||||
documents_skipped += 1
|
||||
continue
|
||||
|
||||
# Format content as structured document
|
||||
structured_document = crawler.format_to_structured_document(crawl_result)
|
||||
|
||||
# Generate unique identifier hash for this URL
|
||||
unique_identifier_hash = generate_unique_identifier_hash(
|
||||
DocumentType.CRAWLED_URL, url, search_space_id
|
||||
)
|
||||
|
||||
# Generate content hash
|
||||
content_hash = generate_content_hash(structured_document, search_space_id)
|
||||
|
||||
# Check if document with this unique identifier already exists
|
||||
existing_document = await check_document_by_unique_identifier(
|
||||
session, unique_identifier_hash
|
||||
)
|
||||
|
||||
# Extract useful metadata
|
||||
title = metadata.get("title", url)
|
||||
description = metadata.get("description", "")
|
||||
language = metadata.get("language", "")
|
||||
|
||||
if existing_document:
|
||||
# Document exists - check if content has changed
|
||||
if existing_document.content_hash == content_hash:
|
||||
logger.info(f"Document for URL {url} unchanged. Skipping.")
|
||||
documents_skipped += 1
|
||||
continue
|
||||
else:
|
||||
# Content has changed - update the existing document
|
||||
logger.info(f"Content changed for URL {url}. Updating document.")
|
||||
|
||||
# Generate summary with metadata
|
||||
user_llm = await get_user_long_context_llm(
|
||||
session, user_id, search_space_id
|
||||
)
|
||||
|
||||
if user_llm:
|
||||
document_metadata = {
|
||||
"url": url,
|
||||
"title": title,
|
||||
"description": description,
|
||||
"language": language,
|
||||
"document_type": "Crawled URL",
|
||||
"crawler_type": crawler_type,
|
||||
}
|
||||
(
|
||||
summary_content,
|
||||
summary_embedding,
|
||||
) = await generate_document_summary(
|
||||
structured_document, user_llm, document_metadata
|
||||
)
|
||||
else:
|
||||
# Fallback to simple summary if no LLM configured
|
||||
summary_content = f"Crawled URL: {title}\n\n"
|
||||
summary_content += f"URL: {url}\n"
|
||||
if description:
|
||||
summary_content += f"Description: {description}\n"
|
||||
if language:
|
||||
summary_content += f"Language: {language}\n"
|
||||
summary_content += f"Crawler: {crawler_type}\n\n"
|
||||
|
||||
# Add content preview
|
||||
content_preview = content[:1000]
|
||||
if len(content) > 1000:
|
||||
content_preview += "..."
|
||||
summary_content += f"Content Preview:\n{content_preview}\n"
|
||||
|
||||
summary_embedding = config.embedding_model_instance.embed(
|
||||
summary_content
|
||||
)
|
||||
|
||||
# Process chunks
|
||||
chunks = await create_document_chunks(content)
|
||||
|
||||
# Update existing document
|
||||
existing_document.title = title
|
||||
existing_document.content = summary_content
|
||||
existing_document.content_hash = content_hash
|
||||
existing_document.embedding = summary_embedding
|
||||
existing_document.document_metadata = {
|
||||
**metadata,
|
||||
"crawler_type": crawler_type,
|
||||
"last_crawled_at": datetime.now().strftime(
|
||||
"%Y-%m-%d %H:%M:%S"
|
||||
),
|
||||
}
|
||||
existing_document.chunks = chunks
|
||||
|
||||
documents_updated += 1
|
||||
logger.info(f"Successfully updated URL {url}")
|
||||
continue
|
||||
|
||||
# Document doesn't exist - create new one
|
||||
# Generate summary with metadata
|
||||
user_llm = await get_user_long_context_llm(
|
||||
session, user_id, search_space_id
|
||||
)
|
||||
|
||||
if user_llm:
|
||||
document_metadata = {
|
||||
"url": url,
|
||||
"title": title,
|
||||
"description": description,
|
||||
"language": language,
|
||||
"document_type": "Crawled URL",
|
||||
"crawler_type": crawler_type,
|
||||
}
|
||||
(
|
||||
summary_content,
|
||||
summary_embedding,
|
||||
) = await generate_document_summary(
|
||||
structured_document, user_llm, document_metadata
|
||||
)
|
||||
else:
|
||||
# Fallback to simple summary if no LLM configured
|
||||
summary_content = f"Crawled URL: {title}\n\n"
|
||||
summary_content += f"URL: {url}\n"
|
||||
if description:
|
||||
summary_content += f"Description: {description}\n"
|
||||
if language:
|
||||
summary_content += f"Language: {language}\n"
|
||||
summary_content += f"Crawler: {crawler_type}\n\n"
|
||||
|
||||
# Add content preview
|
||||
content_preview = content[:1000]
|
||||
if len(content) > 1000:
|
||||
content_preview += "..."
|
||||
summary_content += f"Content Preview:\n{content_preview}\n"
|
||||
|
||||
summary_embedding = config.embedding_model_instance.embed(
|
||||
summary_content
|
||||
)
|
||||
|
||||
chunks = await create_document_chunks(content)
|
||||
|
||||
document = Document(
|
||||
search_space_id=search_space_id,
|
||||
title=title,
|
||||
document_type=DocumentType.CRAWLED_URL,
|
||||
document_metadata={
|
||||
**metadata,
|
||||
"crawler_type": crawler_type,
|
||||
"indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
|
||||
},
|
||||
content=summary_content,
|
||||
content_hash=content_hash,
|
||||
unique_identifier_hash=unique_identifier_hash,
|
||||
embedding=summary_embedding,
|
||||
chunks=chunks,
|
||||
)
|
||||
|
||||
session.add(document)
|
||||
documents_indexed += 1
|
||||
logger.info(f"Successfully indexed new URL {url}")
|
||||
|
||||
# Batch commit every 10 documents
|
||||
if (documents_indexed + documents_updated) % 10 == 0:
|
||||
logger.info(
|
||||
f"Committing batch: {documents_indexed + documents_updated} URLs processed so far"
|
||||
)
|
||||
await session.commit()
|
||||
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"Error processing URL {url}: {e!s}",
|
||||
exc_info=True,
|
||||
)
|
||||
failed_urls.append((url, str(e)))
|
||||
continue
|
||||
|
||||
total_processed = documents_indexed + documents_updated
|
||||
|
||||
if total_processed > 0:
|
||||
await update_connector_last_indexed(session, connector, update_last_indexed)
|
||||
|
||||
# Final commit for any remaining documents not yet committed in batches
|
||||
logger.info(
|
||||
f"Final commit: Total {documents_indexed} new, {documents_updated} updated URLs processed"
|
||||
)
|
||||
await session.commit()
|
||||
|
||||
# Build result message
|
||||
result_message = None
|
||||
if failed_urls:
|
||||
failed_summary = "; ".join([f"{url}: {error}" for url, error in failed_urls[:5]])
|
||||
if len(failed_urls) > 5:
|
||||
failed_summary += f" (and {len(failed_urls) - 5} more)"
|
||||
result_message = f"Completed with {len(failed_urls)} failures: {failed_summary}"
|
||||
|
||||
await task_logger.log_task_success(
|
||||
log_entry,
|
||||
f"Successfully completed webcrawler indexing for connector {connector_id}",
|
||||
{
|
||||
"urls_processed": total_processed,
|
||||
"documents_indexed": documents_indexed,
|
||||
"documents_updated": documents_updated,
|
||||
"documents_skipped": documents_skipped,
|
||||
"failed_urls_count": len(failed_urls),
|
||||
},
|
||||
)
|
||||
|
||||
logger.info(
|
||||
f"Webcrawler indexing completed: {documents_indexed} new, "
|
||||
f"{documents_updated} updated, {documents_skipped} skipped, "
|
||||
f"{len(failed_urls)} failed"
|
||||
)
|
||||
return total_processed, result_message
|
||||
|
||||
except SQLAlchemyError as db_error:
|
||||
await session.rollback()
|
||||
await task_logger.log_task_failure(
|
||||
log_entry,
|
||||
f"Database error during webcrawler indexing for connector {connector_id}",
|
||||
str(db_error),
|
||||
{"error_type": "SQLAlchemyError"},
|
||||
)
|
||||
logger.error(f"Database error: {db_error!s}", exc_info=True)
|
||||
return 0, f"Database error: {db_error!s}"
|
||||
except Exception as e:
|
||||
await session.rollback()
|
||||
await task_logger.log_task_failure(
|
||||
log_entry,
|
||||
f"Failed to index webcrawler URLs for connector {connector_id}",
|
||||
str(e),
|
||||
{"error_type": type(e).__name__},
|
||||
)
|
||||
logger.error(f"Failed to index webcrawler URLs: {e!s}", exc_info=True)
|
||||
return 0, f"Failed to index webcrawler URLs: {e!s}"
|
||||
|
||||
|
||||
async def get_crawled_url_documents(
|
||||
session: AsyncSession,
|
||||
search_space_id: int,
|
||||
connector_id: int | None = None,
|
||||
) -> list[Document]:
|
||||
"""
|
||||
Get all crawled URL documents for a search space.
|
||||
|
||||
Args:
|
||||
session: Database session
|
||||
search_space_id: ID of the search space
|
||||
connector_id: Optional connector ID to filter by
|
||||
|
||||
Returns:
|
||||
List of Document objects
|
||||
"""
|
||||
from sqlalchemy import select
|
||||
|
||||
query = select(Document).filter(
|
||||
Document.search_space_id == search_space_id,
|
||||
Document.document_type == DocumentType.CRAWLED_URL,
|
||||
)
|
||||
|
||||
if connector_id:
|
||||
# Filter by connector if needed - you might need to add a connector_id field to Document
|
||||
# or filter by some other means depending on your schema
|
||||
pass
|
||||
|
||||
result = await session.execute(query)
|
||||
documents = result.scalars().all()
|
||||
return list(documents)
|
||||
Loading…
Add table
Add a link
Reference in a new issue