refactor: Add indexing functions with notification support

- Refactored multiple indexing functions in search_source_connectors_routes to utilize a new helper function for handling notifications.
- Removed redundant error handling and logging from individual indexing tasks, streamlining the code.
This commit is contained in:
Anish Sarkar 2026-01-13 11:30:45 +05:30
parent 38f907e65b
commit 271ddffbf5

View file

@ -30,7 +30,6 @@ from app.db import (
async_session_maker,
get_async_session,
)
from app.services.notification_service import NotificationService
from app.schemas import (
GoogleDriveIndexRequest,
SearchSourceConnectorBase,
@ -38,6 +37,7 @@ from app.schemas import (
SearchSourceConnectorRead,
SearchSourceConnectorUpdate,
)
from app.services.notification_service import NotificationService
from app.tasks.connector_indexers import (
index_airtable_records,
index_clickup_tasks,
@ -948,30 +948,16 @@ async def run_slack_indexing(
start_date: Start date for indexing
end_date: End date for indexing
"""
try:
# Index Slack messages without updating last_indexed_at (we'll do it separately)
documents_processed, error_or_warning = await index_slack_messages(
session=session,
connector_id=connector_id,
search_space_id=search_space_id,
user_id=user_id,
start_date=start_date,
end_date=end_date,
update_last_indexed=False, # Don't update timestamp in the indexing function
)
# Only update last_indexed_at if indexing was successful (either new docs or updated docs)
if documents_processed > 0:
await _update_connector_timestamp_by_id(session, connector_id)
logger.info(
f"Slack indexing completed successfully: {documents_processed} documents processed"
)
else:
logger.error(
f"Slack indexing failed or no documents processed: {error_or_warning}"
)
except Exception as e:
logger.error(f"Error in background Slack indexing task: {e!s}")
await _run_indexing_with_notifications(
session=session,
connector_id=connector_id,
search_space_id=search_space_id,
user_id=user_id,
start_date=start_date,
end_date=end_date,
indexing_function=index_slack_messages,
update_timestamp_func=_update_connector_timestamp_by_id,
)
async def _run_indexing_with_notifications(
@ -1168,36 +1154,27 @@ async def run_github_indexing(
start_date: str,
end_date: str,
):
"""Runs the GitHub indexing task and updates the timestamp."""
try:
indexed_count, error_message = await index_github_repos(
session,
connector_id,
search_space_id,
user_id,
start_date,
end_date,
update_last_indexed=False,
)
if error_message:
logger.error(
f"GitHub indexing failed for connector {connector_id}: {error_message}"
)
# Optionally update status in DB to indicate failure
else:
logger.info(
f"GitHub indexing successful for connector {connector_id}. Indexed {indexed_count} documents."
)
# Update the last indexed timestamp only on success
await _update_connector_timestamp_by_id(session, connector_id)
await session.commit() # Commit timestamp update
except Exception as e:
await session.rollback()
logger.error(
f"Critical error in run_github_indexing for connector {connector_id}: {e}",
exc_info=True,
)
# Optionally update status in DB to indicate failure
"""
Background task to run GitHub indexing.
Args:
session: Database session
connector_id: ID of the GitHub connector
search_space_id: ID of the search space
user_id: ID of the user
start_date: Start date for indexing
end_date: End date for indexing
"""
await _run_indexing_with_notifications(
session=session,
connector_id=connector_id,
search_space_id=search_space_id,
user_id=user_id,
start_date=start_date,
end_date=end_date,
indexing_function=index_github_repos,
update_timestamp_func=_update_connector_timestamp_by_id,
)
# Add new helper functions for Linear indexing
@ -1278,6 +1255,7 @@ async def run_discord_indexing(
):
"""
Background task to run Discord indexing.
Args:
session: Database session
connector_id: ID of the Discord connector
@ -1286,30 +1264,16 @@ async def run_discord_indexing(
start_date: Start date for indexing
end_date: End date for indexing
"""
try:
# Index Discord messages without updating last_indexed_at (we'll do it separately)
documents_processed, error_or_warning = await index_discord_messages(
session=session,
connector_id=connector_id,
search_space_id=search_space_id,
user_id=user_id,
start_date=start_date,
end_date=end_date,
update_last_indexed=False, # Don't update timestamp in the indexing function
)
# Only update last_indexed_at if indexing was successful (either new docs or updated docs)
if documents_processed > 0:
await _update_connector_timestamp_by_id(session, connector_id)
logger.info(
f"Discord indexing completed successfully: {documents_processed} documents processed"
)
else:
logger.error(
f"Discord indexing failed or no documents processed: {error_or_warning}"
)
except Exception as e:
logger.error(f"Error in background Discord indexing task: {e!s}")
await _run_indexing_with_notifications(
session=session,
connector_id=connector_id,
search_space_id=search_space_id,
user_id=user_id,
start_date=start_date,
end_date=end_date,
indexing_function=index_discord_messages,
update_timestamp_func=_update_connector_timestamp_by_id,
)
async def run_teams_indexing_with_new_session(
@ -1339,6 +1303,7 @@ async def run_teams_indexing(
):
"""
Background task to run Microsoft Teams indexing.
Args:
session: Database session
connector_id: ID of the Teams connector
@ -1347,27 +1312,18 @@ async def run_teams_indexing(
start_date: Start date for indexing
end_date: End date for indexing
"""
try:
from app.tasks.connector_indexers.teams_indexer import index_teams_messages
from app.tasks.connector_indexers.teams_indexer import index_teams_messages
# Index Teams messages without updating last_indexed_at (we'll do it separately)
documents_processed, error_or_warning = await index_teams_messages(
session=session,
connector_id=connector_id,
search_space_id=search_space_id,
user_id=user_id,
start_date=start_date,
end_date=end_date,
update_last_indexed=False, # Don't update timestamp in the indexing function
)
# Update last_indexed_at after successful indexing (even if 0 new docs - they were checked)
await _update_connector_timestamp_by_id(session, connector_id)
logger.info(
f"Teams indexing completed successfully: {documents_processed} documents processed. {error_or_warning or ''}"
)
except Exception as e:
logger.error(f"Error in background Teams indexing task: {e!s}")
await _run_indexing_with_notifications(
session=session,
connector_id=connector_id,
search_space_id=search_space_id,
user_id=user_id,
start_date=start_date,
end_date=end_date,
indexing_function=index_teams_messages,
update_timestamp_func=_update_connector_timestamp_by_id,
)
# Add new helper functions for Jira indexing
@ -1397,35 +1353,27 @@ async def run_jira_indexing(
start_date: str,
end_date: str,
):
"""Runs the Jira indexing task and updates the timestamp."""
try:
indexed_count, error_message = await index_jira_issues(
session,
connector_id,
search_space_id,
user_id,
start_date,
end_date,
update_last_indexed=False,
)
if error_message:
logger.error(
f"Jira indexing failed for connector {connector_id}: {error_message}"
)
# Optionally update status in DB to indicate failure
else:
logger.info(
f"Jira indexing successful for connector {connector_id}. Indexed {indexed_count} documents."
)
# Update the last indexed timestamp only on success
await _update_connector_timestamp_by_id(session, connector_id)
await session.commit() # Commit timestamp update
except Exception as e:
logger.error(
f"Critical error in run_jira_indexing for connector {connector_id}: {e}",
exc_info=True,
)
# Optionally update status in DB to indicate failure
"""
Background task to run Jira indexing.
Args:
session: Database session
connector_id: ID of the Jira connector
search_space_id: ID of the search space
user_id: ID of the user
start_date: Start date for indexing
end_date: End date for indexing
"""
await _run_indexing_with_notifications(
session=session,
connector_id=connector_id,
search_space_id=search_space_id,
user_id=user_id,
start_date=start_date,
end_date=end_date,
indexing_function=index_jira_issues,
update_timestamp_func=_update_connector_timestamp_by_id,
)
# Add new helper functions for Confluence indexing
@ -1457,35 +1405,27 @@ async def run_confluence_indexing(
start_date: str,
end_date: str,
):
"""Runs the Confluence indexing task and updates the timestamp."""
try:
indexed_count, error_message = await index_confluence_pages(
session,
connector_id,
search_space_id,
user_id,
start_date,
end_date,
update_last_indexed=False,
)
if error_message:
logger.error(
f"Confluence indexing failed for connector {connector_id}: {error_message}"
)
# Optionally update status in DB to indicate failure
else:
logger.info(
f"Confluence indexing successful for connector {connector_id}. Indexed {indexed_count} documents."
)
# Update the last indexed timestamp only on success
await _update_connector_timestamp_by_id(session, connector_id)
await session.commit() # Commit timestamp update
except Exception as e:
logger.error(
f"Critical error in run_confluence_indexing for connector {connector_id}: {e}",
exc_info=True,
)
# Optionally update status in DB to indicate failure
"""
Background task to run Confluence indexing.
Args:
session: Database session
connector_id: ID of the Confluence connector
search_space_id: ID of the search space
user_id: ID of the user
start_date: Start date for indexing
end_date: End date for indexing
"""
await _run_indexing_with_notifications(
session=session,
connector_id=connector_id,
search_space_id=search_space_id,
user_id=user_id,
start_date=start_date,
end_date=end_date,
indexing_function=index_confluence_pages,
update_timestamp_func=_update_connector_timestamp_by_id,
)
# Add new helper functions for ClickUp indexing
@ -1515,35 +1455,27 @@ async def run_clickup_indexing(
start_date: str,
end_date: str,
):
"""Runs the ClickUp indexing task and updates the timestamp."""
try:
indexed_count, error_message = await index_clickup_tasks(
session,
connector_id,
search_space_id,
user_id,
start_date,
end_date,
update_last_indexed=False,
)
if error_message:
logger.error(
f"ClickUp indexing failed for connector {connector_id}: {error_message}"
)
# Optionally update status in DB to indicate failure
else:
logger.info(
f"ClickUp indexing successful for connector {connector_id}. Indexed {indexed_count} tasks."
)
# Update the last indexed timestamp only on success
await _update_connector_timestamp_by_id(session, connector_id)
await session.commit() # Commit timestamp update
except Exception as e:
logger.error(
f"Critical error in run_clickup_indexing for connector {connector_id}: {e}",
exc_info=True,
)
# Optionally update status in DB to indicate failure
"""
Background task to run ClickUp indexing.
Args:
session: Database session
connector_id: ID of the ClickUp connector
search_space_id: ID of the search space
user_id: ID of the user
start_date: Start date for indexing
end_date: End date for indexing
"""
await _run_indexing_with_notifications(
session=session,
connector_id=connector_id,
search_space_id=search_space_id,
user_id=user_id,
start_date=start_date,
end_date=end_date,
indexing_function=index_clickup_tasks,
update_timestamp_func=_update_connector_timestamp_by_id,
)
# Add new helper functions for Airtable indexing
@ -1833,6 +1765,7 @@ async def run_luma_indexing(
):
"""
Background task to run Luma indexing.
Args:
session: Database session
connector_id: ID of the Luma connector
@ -1841,30 +1774,16 @@ async def run_luma_indexing(
start_date: Start date for indexing
end_date: End date for indexing
"""
try:
# Index Luma events without updating last_indexed_at (we'll do it separately)
documents_processed, error_or_warning = await index_luma_events(
session=session,
connector_id=connector_id,
search_space_id=search_space_id,
user_id=user_id,
start_date=start_date,
end_date=end_date,
update_last_indexed=False, # Don't update timestamp in the indexing function
)
# Only update last_indexed_at if indexing was successful (either new docs or updated docs)
if documents_processed > 0:
await _update_connector_timestamp_by_id(session, connector_id)
logger.info(
f"Luma indexing completed successfully: {documents_processed} documents processed"
)
else:
logger.error(
f"Luma indexing failed or no documents processed: {error_or_warning}"
)
except Exception as e:
logger.error(f"Error in background Luma indexing task: {e!s}")
await _run_indexing_with_notifications(
session=session,
connector_id=connector_id,
search_space_id=search_space_id,
user_id=user_id,
start_date=start_date,
end_date=end_date,
indexing_function=index_luma_events,
update_timestamp_func=_update_connector_timestamp_by_id,
)
async def run_elasticsearch_indexing_with_new_session(
@ -1895,34 +1814,27 @@ async def run_elasticsearch_indexing(
start_date: str,
end_date: str,
):
"""Runs the Elasticsearch indexing task and updates the timestamp."""
try:
indexed_count, error_message = await index_elasticsearch_documents(
session,
connector_id,
search_space_id,
user_id,
start_date,
end_date,
update_last_indexed=False,
)
if error_message:
logger.error(
f"Elasticsearch indexing failed for connector {connector_id}: {error_message}"
)
else:
logger.info(
f"Elasticsearch indexing successful for connector {connector_id}. Indexed {indexed_count} documents."
)
# Update the last indexed timestamp only on success
await _update_connector_timestamp_by_id(session, connector_id)
await session.commit()
except Exception as e:
await session.rollback()
logger.error(
f"Critical error in run_elasticsearch_indexing for connector {connector_id}: {e}",
exc_info=True,
)
"""
Background task to run Elasticsearch indexing.
Args:
session: Database session
connector_id: ID of the Elasticsearch connector
search_space_id: ID of the search space
user_id: ID of the user
start_date: Start date for indexing
end_date: End date for indexing
"""
await _run_indexing_with_notifications(
session=session,
connector_id=connector_id,
search_space_id=search_space_id,
user_id=user_id,
start_date=start_date,
end_date=end_date,
indexing_function=index_elasticsearch_documents,
update_timestamp_func=_update_connector_timestamp_by_id,
)
# Add new helper functions for crawled web page indexing
@ -1953,6 +1865,7 @@ async def run_web_page_indexing(
):
"""
Background task to run Web page indexing.
Args:
session: Database session
connector_id: ID of the webcrawler connector
@ -1961,29 +1874,16 @@ async def run_web_page_indexing(
start_date: Start date for indexing
end_date: End date for indexing
"""
try:
documents_processed, error_or_warning = await index_crawled_urls(
session=session,
connector_id=connector_id,
search_space_id=search_space_id,
user_id=user_id,
start_date=start_date,
end_date=end_date,
update_last_indexed=False, # Don't update timestamp in the indexing function
)
# Only update last_indexed_at if indexing was successful (either new docs or updated docs)
if documents_processed > 0:
await _update_connector_timestamp_by_id(session, connector_id)
logger.info(
f"Web page indexing completed successfully: {documents_processed} documents processed"
)
else:
logger.error(
f"Web page indexing failed or no documents processed: {error_or_warning}"
)
except Exception as e:
logger.error(f"Error in background Web page indexing task: {e!s}")
await _run_indexing_with_notifications(
session=session,
connector_id=connector_id,
search_space_id=search_space_id,
user_id=user_id,
start_date=start_date,
end_date=end_date,
indexing_function=index_crawled_urls,
update_timestamp_func=_update_connector_timestamp_by_id,
)
# Add new helper functions for BookStack indexing
@ -2028,29 +1928,13 @@ async def run_bookstack_indexing(
"""
from app.tasks.connector_indexers import index_bookstack_pages
try:
indexed_count, error_message = await index_bookstack_pages(
session,
connector_id,
search_space_id,
user_id,
start_date,
end_date,
update_last_indexed=False,
)
if error_message:
logger.error(
f"BookStack indexing failed for connector {connector_id}: {error_message}"
)
else:
logger.info(
f"BookStack indexing successful for connector {connector_id}. Indexed {indexed_count} documents."
)
# Update the last indexed timestamp only on success
await _update_connector_timestamp_by_id(session, connector_id)
await session.commit() # Commit timestamp update
except Exception as e:
logger.error(
f"Critical error in run_bookstack_indexing for connector {connector_id}: {e}",
exc_info=True,
)
await _run_indexing_with_notifications(
session=session,
connector_id=connector_id,
search_space_id=search_space_id,
user_id=user_id,
start_date=start_date,
end_date=end_date,
indexing_function=index_bookstack_pages,
update_timestamp_func=_update_connector_timestamp_by_id,
)