mirror of
https://github.com/MODSetter/SurfSense.git
synced 2026-07-04 22:02:16 +02:00
feat: add BookStack connector for wiki documentation indexing
This commit is contained in:
parent
e0725741c9
commit
6b1b8d0f2e
18 changed files with 1362 additions and 1 deletions
343
surfsense_backend/app/connectors/bookstack_connector.py
Normal file
343
surfsense_backend/app/connectors/bookstack_connector.py
Normal file
|
|
@ -0,0 +1,343 @@
|
|||
"""
|
||||
BookStack Connector Module
|
||||
|
||||
A module for retrieving data from BookStack wiki systems.
|
||||
Allows fetching pages, books, and chapters from BookStack instances.
|
||||
|
||||
BookStack API Documentation: https://demo.bookstackapp.com/api/docs
|
||||
"""
|
||||
|
||||
import logging
|
||||
import time
|
||||
from typing import Any
|
||||
|
||||
import requests
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class BookStackConnector:
|
||||
"""Class for retrieving data from BookStack."""
|
||||
|
||||
# Rate limiting: 180 requests per minute = 0.33 seconds per request
|
||||
# Using 0.35 seconds to be safe
|
||||
REQUEST_INTERVAL = 0.35
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
base_url: str | None = None,
|
||||
token_id: str | None = None,
|
||||
token_secret: str | None = None,
|
||||
):
|
||||
"""
|
||||
Initialize the BookStackConnector class.
|
||||
|
||||
Args:
|
||||
base_url: BookStack instance base URL (e.g., 'https://docs.example.com')
|
||||
token_id: BookStack API Token ID
|
||||
token_secret: BookStack API Token Secret
|
||||
"""
|
||||
self.base_url = base_url.rstrip("/") if base_url else None
|
||||
self.token_id = token_id
|
||||
self.token_secret = token_secret
|
||||
self._last_request_time = 0.0
|
||||
|
||||
def set_credentials(
|
||||
self, base_url: str, token_id: str, token_secret: str
|
||||
) -> None:
|
||||
"""
|
||||
Set the BookStack credentials.
|
||||
|
||||
Args:
|
||||
base_url: BookStack instance base URL
|
||||
token_id: BookStack API Token ID
|
||||
token_secret: BookStack API Token Secret
|
||||
"""
|
||||
self.base_url = base_url.rstrip("/")
|
||||
self.token_id = token_id
|
||||
self.token_secret = token_secret
|
||||
|
||||
def get_headers(self) -> dict[str, str]:
|
||||
"""
|
||||
Get headers for BookStack API requests using Token Authentication.
|
||||
|
||||
Returns:
|
||||
Dictionary of headers
|
||||
|
||||
Raises:
|
||||
ValueError: If token_id, token_secret, or base_url have not been set
|
||||
"""
|
||||
if not all([self.base_url, self.token_id, self.token_secret]):
|
||||
raise ValueError(
|
||||
"BookStack credentials not initialized. Call set_credentials() first."
|
||||
)
|
||||
|
||||
return {
|
||||
"Authorization": f"Token {self.token_id}:{self.token_secret}",
|
||||
"Content-Type": "application/json",
|
||||
"Accept": "application/json",
|
||||
}
|
||||
|
||||
def _rate_limit(self) -> None:
|
||||
"""Apply rate limiting between API requests."""
|
||||
current_time = time.time()
|
||||
elapsed = current_time - self._last_request_time
|
||||
if elapsed < self.REQUEST_INTERVAL:
|
||||
time.sleep(self.REQUEST_INTERVAL - elapsed)
|
||||
self._last_request_time = time.time()
|
||||
|
||||
def make_api_request(
|
||||
self,
|
||||
endpoint: str,
|
||||
params: dict[str, Any] | None = None,
|
||||
raw_response: bool = False,
|
||||
) -> dict[str, Any] | str:
|
||||
"""
|
||||
Make a request to the BookStack API.
|
||||
|
||||
Args:
|
||||
endpoint: API endpoint (without base URL, e.g., 'pages' or 'pages/1')
|
||||
params: Query parameters for the request (optional)
|
||||
raw_response: If True, return raw text response instead of JSON
|
||||
|
||||
Returns:
|
||||
Response data from the API (dict for JSON, str for raw)
|
||||
|
||||
Raises:
|
||||
ValueError: If credentials have not been set
|
||||
Exception: If the API request fails
|
||||
"""
|
||||
if not all([self.base_url, self.token_id, self.token_secret]):
|
||||
raise ValueError(
|
||||
"BookStack credentials not initialized. Call set_credentials() first."
|
||||
)
|
||||
|
||||
# Apply rate limiting
|
||||
self._rate_limit()
|
||||
|
||||
url = f"{self.base_url}/api/{endpoint}"
|
||||
headers = self.get_headers()
|
||||
|
||||
try:
|
||||
response = requests.get(url, headers=headers, params=params, timeout=30)
|
||||
response.raise_for_status()
|
||||
|
||||
if raw_response:
|
||||
return response.text
|
||||
return response.json()
|
||||
|
||||
except requests.exceptions.HTTPError as e:
|
||||
if e.response.status_code == 429:
|
||||
logger.warning("Rate limit exceeded, waiting 60 seconds...")
|
||||
time.sleep(60)
|
||||
return self.make_api_request(endpoint, params, raw_response)
|
||||
raise Exception(f"BookStack API request failed: {e!s}") from e
|
||||
except requests.exceptions.RequestException as e:
|
||||
raise Exception(f"BookStack API request failed: {e!s}") from e
|
||||
|
||||
def get_all_pages(self, count: int = 500) -> list[dict[str, Any]]:
|
||||
"""
|
||||
Fetch all pages from BookStack with pagination.
|
||||
|
||||
Args:
|
||||
count: Number of records per request (max 500)
|
||||
|
||||
Returns:
|
||||
List of page objects
|
||||
|
||||
Raises:
|
||||
ValueError: If credentials have not been set
|
||||
Exception: If the API request fails
|
||||
"""
|
||||
all_pages = []
|
||||
offset = 0
|
||||
|
||||
while True:
|
||||
params = {
|
||||
"count": min(count, 500),
|
||||
"offset": offset,
|
||||
}
|
||||
|
||||
result = self.make_api_request("pages", params)
|
||||
|
||||
if not isinstance(result, dict) or "data" not in result:
|
||||
raise Exception("Invalid response from BookStack API")
|
||||
|
||||
pages = result["data"]
|
||||
all_pages.extend(pages)
|
||||
|
||||
logger.info(f"Fetched {len(pages)} pages (offset: {offset})")
|
||||
|
||||
# Check if we've fetched all pages
|
||||
total = result.get("total", 0)
|
||||
if offset + len(pages) >= total:
|
||||
break
|
||||
|
||||
offset += len(pages)
|
||||
|
||||
logger.info(f"Total pages fetched: {len(all_pages)}")
|
||||
return all_pages
|
||||
|
||||
def get_page_detail(self, page_id: int) -> dict[str, Any]:
|
||||
"""
|
||||
Get detailed information for a single page.
|
||||
|
||||
The response includes 'html' (rendered) and optionally 'markdown' content.
|
||||
|
||||
Args:
|
||||
page_id: The ID of the page
|
||||
|
||||
Returns:
|
||||
Page detail object with content
|
||||
|
||||
Raises:
|
||||
ValueError: If credentials have not been set
|
||||
Exception: If the API request fails
|
||||
"""
|
||||
result = self.make_api_request(f"pages/{page_id}")
|
||||
|
||||
if not isinstance(result, dict):
|
||||
raise Exception(f"Invalid response for page {page_id}")
|
||||
|
||||
return result
|
||||
|
||||
def export_page_markdown(self, page_id: int) -> str:
|
||||
"""
|
||||
Export a page as Markdown content.
|
||||
|
||||
Args:
|
||||
page_id: The ID of the page
|
||||
|
||||
Returns:
|
||||
Markdown content as string
|
||||
|
||||
Raises:
|
||||
ValueError: If credentials have not been set
|
||||
Exception: If the API request fails
|
||||
"""
|
||||
result = self.make_api_request(
|
||||
f"pages/{page_id}/export/markdown", raw_response=True
|
||||
)
|
||||
return result if isinstance(result, str) else ""
|
||||
|
||||
def get_book_detail(self, book_id: int) -> dict[str, Any]:
|
||||
"""
|
||||
Get detailed information for a single book.
|
||||
|
||||
The response includes a 'content' property with the book's structure.
|
||||
|
||||
Args:
|
||||
book_id: The ID of the book
|
||||
|
||||
Returns:
|
||||
Book detail object
|
||||
|
||||
Raises:
|
||||
ValueError: If credentials have not been set
|
||||
Exception: If the API request fails
|
||||
"""
|
||||
result = self.make_api_request(f"books/{book_id}")
|
||||
|
||||
if not isinstance(result, dict):
|
||||
raise Exception(f"Invalid response for book {book_id}")
|
||||
|
||||
return result
|
||||
|
||||
def get_pages_by_date_range(
|
||||
self,
|
||||
start_date: str,
|
||||
end_date: str,
|
||||
count: int = 500,
|
||||
) -> tuple[list[dict[str, Any]], str | None]:
|
||||
"""
|
||||
Fetch pages updated within a specific date range.
|
||||
|
||||
Uses the filter[updated_at:gt] parameter for incremental indexing.
|
||||
|
||||
Args:
|
||||
start_date: Start date in YYYY-MM-DD format
|
||||
end_date: End date in YYYY-MM-DD format (currently unused, for future use)
|
||||
count: Number of records per request (max 500)
|
||||
|
||||
Returns:
|
||||
Tuple of (list of page objects, error message or None)
|
||||
|
||||
Raises:
|
||||
ValueError: If credentials have not been set
|
||||
"""
|
||||
all_pages = []
|
||||
offset = 0
|
||||
|
||||
try:
|
||||
while True:
|
||||
params = {
|
||||
"count": min(count, 500),
|
||||
"offset": offset,
|
||||
"filter[updated_at:gt]": start_date,
|
||||
"sort": "-updated_at", # Most recently updated first
|
||||
}
|
||||
|
||||
result = self.make_api_request("pages", params)
|
||||
|
||||
if not isinstance(result, dict) or "data" not in result:
|
||||
return [], "Invalid response from BookStack API"
|
||||
|
||||
pages = result["data"]
|
||||
all_pages.extend(pages)
|
||||
|
||||
logger.info(
|
||||
f"Fetched {len(pages)} pages updated after {start_date} (offset: {offset})"
|
||||
)
|
||||
|
||||
# Check if we've fetched all pages
|
||||
total = result.get("total", 0)
|
||||
if offset + len(pages) >= total:
|
||||
break
|
||||
|
||||
offset += len(pages)
|
||||
|
||||
if not all_pages:
|
||||
return [], f"No pages found updated after {start_date}"
|
||||
|
||||
logger.info(
|
||||
f"Total pages fetched for date range {start_date} to {end_date}: {len(all_pages)}"
|
||||
)
|
||||
return all_pages, None
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error fetching pages by date range: {e!s}", exc_info=True)
|
||||
return [], str(e)
|
||||
|
||||
def get_page_with_content(
|
||||
self, page_id: int, use_markdown: bool = True
|
||||
) -> tuple[dict[str, Any], str]:
|
||||
"""
|
||||
Get page details along with its full content.
|
||||
|
||||
Args:
|
||||
page_id: The ID of the page
|
||||
use_markdown: If True, export as Markdown; otherwise use HTML
|
||||
|
||||
Returns:
|
||||
Tuple of (page detail dict, content string)
|
||||
|
||||
Raises:
|
||||
ValueError: If credentials have not been set
|
||||
Exception: If the API request fails
|
||||
"""
|
||||
# Get page details first
|
||||
page_detail = self.get_page_detail(page_id)
|
||||
|
||||
# Get content
|
||||
if use_markdown:
|
||||
try:
|
||||
content = self.export_page_markdown(page_id)
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
f"Failed to export markdown for page {page_id}, falling back to HTML: {e}"
|
||||
)
|
||||
content = page_detail.get("html", "")
|
||||
else:
|
||||
content = page_detail.get("html", "")
|
||||
|
||||
return page_detail, content
|
||||
|
|
@ -52,6 +52,7 @@ class DocumentType(str, Enum):
|
|||
AIRTABLE_CONNECTOR = "AIRTABLE_CONNECTOR"
|
||||
LUMA_CONNECTOR = "LUMA_CONNECTOR"
|
||||
ELASTICSEARCH_CONNECTOR = "ELASTICSEARCH_CONNECTOR"
|
||||
BOOKSTACK_CONNECTOR = "BOOKSTACK_CONNECTOR"
|
||||
|
||||
|
||||
class SearchSourceConnectorType(str, Enum):
|
||||
|
|
@ -74,6 +75,7 @@ class SearchSourceConnectorType(str, Enum):
|
|||
LUMA_CONNECTOR = "LUMA_CONNECTOR"
|
||||
ELASTICSEARCH_CONNECTOR = "ELASTICSEARCH_CONNECTOR"
|
||||
WEBCRAWLER_CONNECTOR = "WEBCRAWLER_CONNECTOR"
|
||||
BOOKSTACK_CONNECTOR = "BOOKSTACK_CONNECTOR"
|
||||
|
||||
|
||||
class ChatType(str, Enum):
|
||||
|
|
|
|||
|
|
@ -597,6 +597,19 @@ async def index_connector_content(
|
|||
)
|
||||
response_message = "Confluence indexing started in the background."
|
||||
|
||||
elif connector.connector_type == SearchSourceConnectorType.BOOKSTACK_CONNECTOR:
|
||||
from app.tasks.celery_tasks.connector_tasks import (
|
||||
index_bookstack_pages_task,
|
||||
)
|
||||
|
||||
logger.info(
|
||||
f"Triggering BookStack indexing for connector {connector_id} into search space {search_space_id} from {indexing_from} to {indexing_to}"
|
||||
)
|
||||
index_bookstack_pages_task.delay(
|
||||
connector_id, search_space_id, str(user.id), indexing_from, indexing_to
|
||||
)
|
||||
response_message = "BookStack indexing started in the background."
|
||||
|
||||
elif connector.connector_type == SearchSourceConnectorType.CLICKUP_CONNECTOR:
|
||||
from app.tasks.celery_tasks.connector_tasks import index_clickup_tasks_task
|
||||
|
||||
|
|
@ -1597,3 +1610,73 @@ async def run_web_page_indexing(
|
|||
)
|
||||
except Exception as e:
|
||||
logger.error(f"Error in background Web page indexing task: {e!s}")
|
||||
|
||||
|
||||
# Add new helper functions for BookStack indexing
|
||||
async def run_bookstack_indexing_with_new_session(
|
||||
connector_id: int,
|
||||
search_space_id: int,
|
||||
user_id: str,
|
||||
start_date: str,
|
||||
end_date: str,
|
||||
):
|
||||
"""Wrapper to run BookStack indexing with its own database session."""
|
||||
logger.info(
|
||||
f"Background task started: Indexing BookStack connector {connector_id} into space {search_space_id} from {start_date} to {end_date}"
|
||||
)
|
||||
async with async_session_maker() as session:
|
||||
await run_bookstack_indexing(
|
||||
session, connector_id, search_space_id, user_id, start_date, end_date
|
||||
)
|
||||
logger.info(
|
||||
f"Background task finished: Indexing BookStack connector {connector_id}"
|
||||
)
|
||||
|
||||
|
||||
async def run_bookstack_indexing(
|
||||
session: AsyncSession,
|
||||
connector_id: int,
|
||||
search_space_id: int,
|
||||
user_id: str,
|
||||
start_date: str,
|
||||
end_date: str,
|
||||
):
|
||||
"""
|
||||
Background task to run BookStack indexing.
|
||||
|
||||
Args:
|
||||
session: Database session
|
||||
connector_id: ID of the BookStack connector
|
||||
search_space_id: ID of the search space
|
||||
user_id: ID of the user
|
||||
start_date: Start date for indexing
|
||||
end_date: End date for indexing
|
||||
"""
|
||||
from app.tasks.connector_indexers import index_bookstack_pages
|
||||
|
||||
try:
|
||||
indexed_count, error_message = await index_bookstack_pages(
|
||||
session,
|
||||
connector_id,
|
||||
search_space_id,
|
||||
user_id,
|
||||
start_date,
|
||||
end_date,
|
||||
update_last_indexed=False,
|
||||
)
|
||||
if error_message:
|
||||
logger.error(
|
||||
f"BookStack indexing failed for connector {connector_id}: {error_message}"
|
||||
)
|
||||
else:
|
||||
logger.info(
|
||||
f"BookStack indexing successful for connector {connector_id}. Indexed {indexed_count} documents."
|
||||
)
|
||||
# Update the last indexed timestamp only on success
|
||||
await update_connector_last_indexed(session, connector_id)
|
||||
await session.commit() # Commit timestamp update
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"Critical error in run_bookstack_indexing for connector {connector_id}: {e}",
|
||||
exc_info=True,
|
||||
)
|
||||
|
|
|
|||
|
|
@ -2576,3 +2576,98 @@ class ConnectorService:
|
|||
}
|
||||
|
||||
return result_object, elasticsearch_chunks
|
||||
|
||||
async def search_bookstack(
|
||||
self,
|
||||
user_query: str,
|
||||
user_id: str,
|
||||
search_space_id: int,
|
||||
top_k: int = 20,
|
||||
search_mode: SearchMode = SearchMode.CHUNKS,
|
||||
) -> tuple:
|
||||
"""
|
||||
Search for BookStack pages and return both the source information and langchain documents
|
||||
|
||||
Args:
|
||||
user_query: The user's query
|
||||
user_id: The user's ID
|
||||
search_space_id: The search space ID to search in
|
||||
top_k: Maximum number of results to return
|
||||
search_mode: Search mode (CHUNKS or DOCUMENTS)
|
||||
|
||||
Returns:
|
||||
tuple: (sources_info, langchain_documents)
|
||||
"""
|
||||
if search_mode == SearchMode.CHUNKS:
|
||||
bookstack_chunks = await self.chunk_retriever.hybrid_search(
|
||||
query_text=user_query,
|
||||
top_k=top_k,
|
||||
user_id=user_id,
|
||||
search_space_id=search_space_id,
|
||||
document_type="BOOKSTACK_CONNECTOR",
|
||||
)
|
||||
elif search_mode == SearchMode.DOCUMENTS:
|
||||
bookstack_chunks = await self.document_retriever.hybrid_search(
|
||||
query_text=user_query,
|
||||
top_k=top_k,
|
||||
user_id=user_id,
|
||||
search_space_id=search_space_id,
|
||||
document_type="BOOKSTACK_CONNECTOR",
|
||||
)
|
||||
# Transform document retriever results to match expected format
|
||||
bookstack_chunks = self._transform_document_results(bookstack_chunks)
|
||||
|
||||
# Early return if no results
|
||||
if not bookstack_chunks:
|
||||
return {
|
||||
"id": 50,
|
||||
"name": "BookStack",
|
||||
"type": "BOOKSTACK_CONNECTOR",
|
||||
"sources": [],
|
||||
}, []
|
||||
|
||||
# Process each chunk and create sources directly without deduplication
|
||||
sources_list = []
|
||||
async with self.counter_lock:
|
||||
for _i, chunk in enumerate(bookstack_chunks):
|
||||
# Extract document metadata
|
||||
document = chunk.get("document", {})
|
||||
metadata = document.get("metadata", {})
|
||||
|
||||
# Extract BookStack-specific metadata
|
||||
page_name = metadata.get("page_name", "Untitled Page")
|
||||
page_slug = metadata.get("page_slug", "")
|
||||
book_slug = metadata.get("book_slug", "")
|
||||
base_url = metadata.get("base_url", "")
|
||||
page_url = metadata.get("page_url", "")
|
||||
|
||||
# Create a more descriptive title for BookStack pages
|
||||
title = f"BookStack: {page_name}"
|
||||
|
||||
# Create description from content
|
||||
description = chunk.get("content", "")
|
||||
|
||||
# Build URL to the BookStack page
|
||||
url = page_url
|
||||
if not url and base_url and book_slug and page_slug:
|
||||
url = f"{base_url}/books/{book_slug}/page/{page_slug}"
|
||||
|
||||
source = {
|
||||
"id": chunk.get("chunk_id", self.source_id_counter),
|
||||
"title": title,
|
||||
"description": description,
|
||||
"url": url,
|
||||
}
|
||||
|
||||
self.source_id_counter += 1
|
||||
sources_list.append(source)
|
||||
|
||||
# Create result object
|
||||
result_object = {
|
||||
"id": 50, # Assign a unique ID for the BookStack connector
|
||||
"name": "BookStack",
|
||||
"type": "BOOKSTACK_CONNECTOR",
|
||||
"sources": sources_list,
|
||||
}
|
||||
|
||||
return result_object, bookstack_chunks
|
||||
|
|
|
|||
|
|
@ -643,3 +643,46 @@ async def _index_crawled_urls(
|
|||
await run_web_page_indexing(
|
||||
session, connector_id, search_space_id, user_id, start_date, end_date
|
||||
)
|
||||
|
||||
|
||||
@celery_app.task(name="index_bookstack_pages", bind=True)
|
||||
def index_bookstack_pages_task(
|
||||
self,
|
||||
connector_id: int,
|
||||
search_space_id: int,
|
||||
user_id: str,
|
||||
start_date: str,
|
||||
end_date: str,
|
||||
):
|
||||
"""Celery task to index BookStack pages."""
|
||||
import asyncio
|
||||
|
||||
loop = asyncio.new_event_loop()
|
||||
asyncio.set_event_loop(loop)
|
||||
|
||||
try:
|
||||
loop.run_until_complete(
|
||||
_index_bookstack_pages(
|
||||
connector_id, search_space_id, user_id, start_date, end_date
|
||||
)
|
||||
)
|
||||
finally:
|
||||
loop.close()
|
||||
|
||||
|
||||
async def _index_bookstack_pages(
|
||||
connector_id: int,
|
||||
search_space_id: int,
|
||||
user_id: str,
|
||||
start_date: str,
|
||||
end_date: str,
|
||||
):
|
||||
"""Index BookStack pages with new session."""
|
||||
from app.routes.search_source_connectors_routes import (
|
||||
run_bookstack_indexing,
|
||||
)
|
||||
|
||||
async with get_celery_session_maker()() as session:
|
||||
await run_bookstack_indexing(
|
||||
session, connector_id, search_space_id, user_id, start_date, end_date
|
||||
)
|
||||
|
|
|
|||
|
|
@ -12,6 +12,7 @@ Available indexers:
|
|||
- Linear: Index issues from Linear workspaces
|
||||
- Jira: Index issues from Jira projects
|
||||
- Confluence: Index pages from Confluence spaces
|
||||
- BookStack: Index pages from BookStack wiki instances
|
||||
- Discord: Index messages from Discord servers
|
||||
- ClickUp: Index tasks from ClickUp workspaces
|
||||
- Google Gmail: Index messages from Google Gmail
|
||||
|
|
@ -24,6 +25,7 @@ Available indexers:
|
|||
# Communication platforms
|
||||
# Calendar and scheduling
|
||||
from .airtable_indexer import index_airtable_records
|
||||
from .bookstack_indexer import index_bookstack_pages
|
||||
from .clickup_indexer import index_clickup_tasks
|
||||
from .confluence_indexer import index_confluence_pages
|
||||
from .discord_indexer import index_discord_messages
|
||||
|
|
@ -46,6 +48,7 @@ from .webcrawler_indexer import index_crawled_urls
|
|||
|
||||
__all__ = [ # noqa: RUF022
|
||||
"index_airtable_records",
|
||||
"index_bookstack_pages",
|
||||
"index_clickup_tasks",
|
||||
"index_confluence_pages",
|
||||
"index_discord_messages",
|
||||
|
|
|
|||
|
|
@ -0,0 +1,434 @@
|
|||
"""
|
||||
BookStack connector indexer.
|
||||
"""
|
||||
|
||||
from datetime import datetime
|
||||
|
||||
from sqlalchemy.exc import SQLAlchemyError
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from app.config import config
|
||||
from app.connectors.bookstack_connector import BookStackConnector
|
||||
from app.db import Document, DocumentType, SearchSourceConnectorType
|
||||
from app.services.llm_service import get_user_long_context_llm
|
||||
from app.services.task_logging_service import TaskLoggingService
|
||||
from app.utils.document_converters import (
|
||||
create_document_chunks,
|
||||
generate_content_hash,
|
||||
generate_document_summary,
|
||||
generate_unique_identifier_hash,
|
||||
)
|
||||
|
||||
from .base import (
|
||||
calculate_date_range,
|
||||
check_document_by_unique_identifier,
|
||||
get_connector_by_id,
|
||||
logger,
|
||||
update_connector_last_indexed,
|
||||
)
|
||||
|
||||
|
||||
async def index_bookstack_pages(
|
||||
session: AsyncSession,
|
||||
connector_id: int,
|
||||
search_space_id: int,
|
||||
user_id: str,
|
||||
start_date: str | None = None,
|
||||
end_date: str | None = None,
|
||||
update_last_indexed: bool = True,
|
||||
) -> tuple[int, str | None]:
|
||||
"""
|
||||
Index BookStack pages.
|
||||
|
||||
Args:
|
||||
session: Database session
|
||||
connector_id: ID of the BookStack connector
|
||||
search_space_id: ID of the search space to store documents in
|
||||
user_id: User ID
|
||||
start_date: Start date for indexing (YYYY-MM-DD format)
|
||||
end_date: End date for indexing (YYYY-MM-DD format)
|
||||
update_last_indexed: Whether to update the last_indexed_at timestamp (default: True)
|
||||
|
||||
Returns:
|
||||
Tuple containing (number of documents indexed, error message or None)
|
||||
"""
|
||||
task_logger = TaskLoggingService(session, search_space_id)
|
||||
|
||||
# Log task start
|
||||
log_entry = await task_logger.log_task_start(
|
||||
task_name="bookstack_pages_indexing",
|
||||
source="connector_indexing_task",
|
||||
message=f"Starting BookStack pages indexing for connector {connector_id}",
|
||||
metadata={
|
||||
"connector_id": connector_id,
|
||||
"user_id": str(user_id),
|
||||
"start_date": start_date,
|
||||
"end_date": end_date,
|
||||
},
|
||||
)
|
||||
|
||||
try:
|
||||
# Get the connector from the database
|
||||
connector = await get_connector_by_id(
|
||||
session, connector_id, SearchSourceConnectorType.BOOKSTACK_CONNECTOR
|
||||
)
|
||||
|
||||
if not connector:
|
||||
await task_logger.log_task_failure(
|
||||
log_entry,
|
||||
f"Connector with ID {connector_id} not found",
|
||||
"Connector not found",
|
||||
{"error_type": "ConnectorNotFound"},
|
||||
)
|
||||
return 0, f"Connector with ID {connector_id} not found"
|
||||
|
||||
# Get the BookStack credentials from the connector config
|
||||
bookstack_base_url = connector.config.get("BOOKSTACK_BASE_URL")
|
||||
bookstack_token_id = connector.config.get("BOOKSTACK_TOKEN_ID")
|
||||
bookstack_token_secret = connector.config.get("BOOKSTACK_TOKEN_SECRET")
|
||||
|
||||
if not bookstack_base_url or not bookstack_token_id or not bookstack_token_secret:
|
||||
await task_logger.log_task_failure(
|
||||
log_entry,
|
||||
f"BookStack credentials not found in connector config for connector {connector_id}",
|
||||
"Missing BookStack credentials",
|
||||
{"error_type": "MissingCredentials"},
|
||||
)
|
||||
return 0, "BookStack credentials not found in connector config"
|
||||
|
||||
# Initialize BookStack client
|
||||
await task_logger.log_task_progress(
|
||||
log_entry,
|
||||
f"Initializing BookStack client for connector {connector_id}",
|
||||
{"stage": "client_initialization"},
|
||||
)
|
||||
|
||||
bookstack_client = BookStackConnector(
|
||||
base_url=bookstack_base_url,
|
||||
token_id=bookstack_token_id,
|
||||
token_secret=bookstack_token_secret,
|
||||
)
|
||||
|
||||
# Calculate date range
|
||||
start_date_str, end_date_str = calculate_date_range(
|
||||
connector, start_date, end_date, default_days_back=365
|
||||
)
|
||||
|
||||
await task_logger.log_task_progress(
|
||||
log_entry,
|
||||
f"Fetching BookStack pages from {start_date_str} to {end_date_str}",
|
||||
{
|
||||
"stage": "fetching_pages",
|
||||
"start_date": start_date_str,
|
||||
"end_date": end_date_str,
|
||||
},
|
||||
)
|
||||
|
||||
# Get pages within date range
|
||||
try:
|
||||
pages, error = bookstack_client.get_pages_by_date_range(
|
||||
start_date=start_date_str, end_date=end_date_str
|
||||
)
|
||||
|
||||
if error:
|
||||
logger.error(f"Failed to get BookStack pages: {error}")
|
||||
|
||||
# Don't treat "No pages found" as an error that should stop indexing
|
||||
if "No pages found" in error:
|
||||
logger.info(
|
||||
"No pages found is not a critical error, continuing with update"
|
||||
)
|
||||
if update_last_indexed:
|
||||
await update_connector_last_indexed(
|
||||
session, connector, update_last_indexed
|
||||
)
|
||||
await session.commit()
|
||||
logger.info(
|
||||
f"Updated last_indexed_at to {connector.last_indexed_at} despite no pages found"
|
||||
)
|
||||
|
||||
await task_logger.log_task_success(
|
||||
log_entry,
|
||||
f"No BookStack pages found in date range {start_date_str} to {end_date_str}",
|
||||
{"pages_found": 0},
|
||||
)
|
||||
return 0, None
|
||||
else:
|
||||
await task_logger.log_task_failure(
|
||||
log_entry,
|
||||
f"Failed to get BookStack pages: {error}",
|
||||
"API Error",
|
||||
{"error_type": "APIError"},
|
||||
)
|
||||
return 0, f"Failed to get BookStack pages: {error}"
|
||||
|
||||
logger.info(f"Retrieved {len(pages)} pages from BookStack API")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error fetching BookStack pages: {e!s}", exc_info=True)
|
||||
return 0, f"Error fetching BookStack pages: {e!s}"
|
||||
|
||||
# Process and index each page
|
||||
documents_indexed = 0
|
||||
skipped_pages = []
|
||||
documents_skipped = 0
|
||||
|
||||
for page in pages:
|
||||
try:
|
||||
page_id = page.get("id")
|
||||
page_name = page.get("name", "")
|
||||
page_slug = page.get("slug", "")
|
||||
book_id = page.get("book_id")
|
||||
book_slug = page.get("book_slug", "")
|
||||
chapter_id = page.get("chapter_id")
|
||||
|
||||
if not page_id or not page_name:
|
||||
logger.warning(
|
||||
f"Skipping page with missing ID or name: {page_id or 'Unknown'}"
|
||||
)
|
||||
skipped_pages.append(f"{page_name or 'Unknown'} (missing data)")
|
||||
documents_skipped += 1
|
||||
continue
|
||||
|
||||
# Fetch full page content (Markdown preferred)
|
||||
try:
|
||||
page_detail, page_content = bookstack_client.get_page_with_content(
|
||||
page_id, use_markdown=True
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
f"Failed to fetch content for page {page_name}: {e}"
|
||||
)
|
||||
skipped_pages.append(f"{page_name} (content fetch error)")
|
||||
documents_skipped += 1
|
||||
continue
|
||||
|
||||
# Build full content with title
|
||||
full_content = f"# {page_name}\n\n{page_content}"
|
||||
|
||||
if not full_content.strip():
|
||||
logger.warning(f"Skipping page with no content: {page_name}")
|
||||
skipped_pages.append(f"{page_name} (no content)")
|
||||
documents_skipped += 1
|
||||
continue
|
||||
|
||||
# Generate unique identifier hash for this BookStack page
|
||||
unique_identifier_hash = generate_unique_identifier_hash(
|
||||
DocumentType.BOOKSTACK_CONNECTOR, page_id, search_space_id
|
||||
)
|
||||
|
||||
# Generate content hash
|
||||
content_hash = generate_content_hash(full_content, search_space_id)
|
||||
|
||||
# Check if document with this unique identifier already exists
|
||||
existing_document = await check_document_by_unique_identifier(
|
||||
session, unique_identifier_hash
|
||||
)
|
||||
|
||||
# Build page URL
|
||||
page_url = f"{bookstack_base_url}/books/{book_slug}/page/{page_slug}"
|
||||
|
||||
# Build document metadata
|
||||
doc_metadata = {
|
||||
"page_id": page_id,
|
||||
"page_name": page_name,
|
||||
"page_slug": page_slug,
|
||||
"book_id": book_id,
|
||||
"book_slug": book_slug,
|
||||
"chapter_id": chapter_id,
|
||||
"base_url": bookstack_base_url,
|
||||
"page_url": page_url,
|
||||
"indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
|
||||
}
|
||||
|
||||
if existing_document:
|
||||
# Document exists - check if content has changed
|
||||
if existing_document.content_hash == content_hash:
|
||||
logger.info(
|
||||
f"Document for BookStack page {page_name} unchanged. Skipping."
|
||||
)
|
||||
documents_skipped += 1
|
||||
continue
|
||||
else:
|
||||
# Content has changed - update the existing document
|
||||
logger.info(
|
||||
f"Content changed for BookStack page {page_name}. Updating document."
|
||||
)
|
||||
|
||||
# Generate summary with metadata
|
||||
user_llm = await get_user_long_context_llm(
|
||||
session, user_id, search_space_id
|
||||
)
|
||||
|
||||
if user_llm:
|
||||
summary_metadata = {
|
||||
"page_name": page_name,
|
||||
"page_id": page_id,
|
||||
"book_id": book_id,
|
||||
"document_type": "BookStack Page",
|
||||
"connector_type": "BookStack",
|
||||
}
|
||||
(
|
||||
summary_content,
|
||||
summary_embedding,
|
||||
) = await generate_document_summary(
|
||||
full_content, user_llm, summary_metadata
|
||||
)
|
||||
else:
|
||||
summary_content = f"BookStack Page: {page_name}\n\nBook ID: {book_id}\n\n"
|
||||
if page_content:
|
||||
content_preview = page_content[:1000]
|
||||
if len(page_content) > 1000:
|
||||
content_preview += "..."
|
||||
summary_content += (
|
||||
f"Content Preview: {content_preview}\n\n"
|
||||
)
|
||||
summary_embedding = config.embedding_model_instance.embed(
|
||||
summary_content
|
||||
)
|
||||
|
||||
# Process chunks
|
||||
chunks = await create_document_chunks(full_content)
|
||||
|
||||
# Update existing document
|
||||
existing_document.title = f"BookStack - {page_name}"
|
||||
existing_document.content = summary_content
|
||||
existing_document.content_hash = content_hash
|
||||
existing_document.embedding = summary_embedding
|
||||
existing_document.document_metadata = doc_metadata
|
||||
existing_document.chunks = chunks
|
||||
|
||||
documents_indexed += 1
|
||||
logger.info(
|
||||
f"Successfully updated BookStack page {page_name}"
|
||||
)
|
||||
continue
|
||||
|
||||
# Document doesn't exist - create new one
|
||||
# Generate summary with metadata
|
||||
user_llm = await get_user_long_context_llm(
|
||||
session, user_id, search_space_id
|
||||
)
|
||||
|
||||
if user_llm:
|
||||
summary_metadata = {
|
||||
"page_name": page_name,
|
||||
"page_id": page_id,
|
||||
"book_id": book_id,
|
||||
"document_type": "BookStack Page",
|
||||
"connector_type": "BookStack",
|
||||
}
|
||||
(
|
||||
summary_content,
|
||||
summary_embedding,
|
||||
) = await generate_document_summary(
|
||||
full_content, user_llm, summary_metadata
|
||||
)
|
||||
else:
|
||||
# Fallback to simple summary if no LLM configured
|
||||
summary_content = (
|
||||
f"BookStack Page: {page_name}\n\nBook ID: {book_id}\n\n"
|
||||
)
|
||||
if page_content:
|
||||
# Take first 1000 characters of content for summary
|
||||
content_preview = page_content[:1000]
|
||||
if len(page_content) > 1000:
|
||||
content_preview += "..."
|
||||
summary_content += f"Content Preview: {content_preview}\n\n"
|
||||
summary_embedding = config.embedding_model_instance.embed(
|
||||
summary_content
|
||||
)
|
||||
|
||||
# Process chunks - using the full page content
|
||||
chunks = await create_document_chunks(full_content)
|
||||
|
||||
# Create and store new document
|
||||
logger.info(f"Creating new document for page {page_name}")
|
||||
document = Document(
|
||||
search_space_id=search_space_id,
|
||||
title=f"BookStack - {page_name}",
|
||||
document_type=DocumentType.BOOKSTACK_CONNECTOR,
|
||||
document_metadata=doc_metadata,
|
||||
content=summary_content,
|
||||
content_hash=content_hash,
|
||||
unique_identifier_hash=unique_identifier_hash,
|
||||
embedding=summary_embedding,
|
||||
chunks=chunks,
|
||||
)
|
||||
|
||||
session.add(document)
|
||||
documents_indexed += 1
|
||||
logger.info(f"Successfully indexed new page {page_name}")
|
||||
|
||||
# Batch commit every 10 documents
|
||||
if documents_indexed % 10 == 0:
|
||||
logger.info(
|
||||
f"Committing batch: {documents_indexed} BookStack pages processed so far"
|
||||
)
|
||||
await session.commit()
|
||||
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"Error processing page {page.get('name', 'Unknown')}: {e!s}",
|
||||
exc_info=True,
|
||||
)
|
||||
skipped_pages.append(
|
||||
f"{page.get('name', 'Unknown')} (processing error)"
|
||||
)
|
||||
documents_skipped += 1
|
||||
continue # Skip this page and continue with others
|
||||
|
||||
# Update the last_indexed_at timestamp for the connector only if requested
|
||||
total_processed = documents_indexed
|
||||
if update_last_indexed:
|
||||
await update_connector_last_indexed(session, connector, update_last_indexed)
|
||||
|
||||
# Final commit for any remaining documents not yet committed in batches
|
||||
logger.info(
|
||||
f"Final commit: Total {documents_indexed} BookStack pages processed"
|
||||
)
|
||||
await session.commit()
|
||||
logger.info(
|
||||
"Successfully committed all BookStack document changes to database"
|
||||
)
|
||||
|
||||
# Log success
|
||||
await task_logger.log_task_success(
|
||||
log_entry,
|
||||
f"Successfully completed BookStack indexing for connector {connector_id}",
|
||||
{
|
||||
"pages_processed": total_processed,
|
||||
"documents_indexed": documents_indexed,
|
||||
"documents_skipped": documents_skipped,
|
||||
"skipped_pages_count": len(skipped_pages),
|
||||
},
|
||||
)
|
||||
|
||||
logger.info(
|
||||
f"BookStack indexing completed: {documents_indexed} new pages, {documents_skipped} skipped"
|
||||
)
|
||||
return (
|
||||
total_processed,
|
||||
None,
|
||||
) # Return None as the error message to indicate success
|
||||
|
||||
except SQLAlchemyError as db_error:
|
||||
await session.rollback()
|
||||
await task_logger.log_task_failure(
|
||||
log_entry,
|
||||
f"Database error during BookStack indexing for connector {connector_id}",
|
||||
str(db_error),
|
||||
{"error_type": "SQLAlchemyError"},
|
||||
)
|
||||
logger.error(f"Database error: {db_error!s}", exc_info=True)
|
||||
return 0, f"Database error: {db_error!s}"
|
||||
except Exception as e:
|
||||
await session.rollback()
|
||||
await task_logger.log_task_failure(
|
||||
log_entry,
|
||||
f"Failed to index BookStack pages for connector {connector_id}",
|
||||
str(e),
|
||||
{"error_type": type(e).__name__},
|
||||
)
|
||||
logger.error(f"Failed to index BookStack pages: {e!s}", exc_info=True)
|
||||
return 0, f"Failed to index BookStack pages: {e!s}"
|
||||
|
|
@ -32,6 +32,7 @@ CONNECTOR_TASK_MAP = {
|
|||
SearchSourceConnectorType.LUMA_CONNECTOR: "index_luma_events",
|
||||
SearchSourceConnectorType.ELASTICSEARCH_CONNECTOR: "index_elasticsearch_documents",
|
||||
SearchSourceConnectorType.WEBCRAWLER_CONNECTOR: "index_crawled_urls",
|
||||
SearchSourceConnectorType.BOOKSTACK_CONNECTOR: "index_bookstack_pages",
|
||||
}
|
||||
|
||||
|
||||
|
|
@ -68,6 +69,7 @@ def create_periodic_schedule(
|
|||
# Import all indexing tasks
|
||||
from app.tasks.celery_tasks.connector_tasks import (
|
||||
index_airtable_records_task,
|
||||
index_bookstack_pages_task,
|
||||
index_clickup_tasks_task,
|
||||
index_confluence_pages_task,
|
||||
index_crawled_urls_task,
|
||||
|
|
@ -99,6 +101,7 @@ def create_periodic_schedule(
|
|||
SearchSourceConnectorType.LUMA_CONNECTOR: index_luma_events_task,
|
||||
SearchSourceConnectorType.ELASTICSEARCH_CONNECTOR: index_elasticsearch_documents_task,
|
||||
SearchSourceConnectorType.WEBCRAWLER_CONNECTOR: index_crawled_urls_task,
|
||||
SearchSourceConnectorType.BOOKSTACK_CONNECTOR: index_bookstack_pages_task,
|
||||
}
|
||||
|
||||
# Trigger the first run immediately
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue