Merge remote-tracking branch 'upstream/dev' into fix/index-future-date

This commit is contained in:
Anish Sarkar 2026-01-09 13:24:38 +05:30
commit e21bc8086a
125 changed files with 5644 additions and 2592 deletions

View file

@ -564,6 +564,49 @@ async def _index_discord_messages(
)
@celery_app.task(name="index_teams_messages", bind=True)
def index_teams_messages_task(
self,
connector_id: int,
search_space_id: int,
user_id: str,
start_date: str,
end_date: str,
):
"""Celery task to index Microsoft Teams messages."""
import asyncio
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(
_index_teams_messages(
connector_id, search_space_id, user_id, start_date, end_date
)
)
finally:
loop.close()
async def _index_teams_messages(
connector_id: int,
search_space_id: int,
user_id: str,
start_date: str,
end_date: str,
):
"""Index Microsoft Teams messages with new session."""
from app.routes.search_source_connectors_routes import (
run_teams_indexing,
)
async with get_celery_session_maker()() as session:
await run_teams_indexing(
session, connector_id, search_space_id, user_id, start_date, end_date
)
@celery_app.task(name="index_luma_events", bind=True)
def index_luma_events_task(
self,

View file

@ -6,10 +6,8 @@ from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.ext.asyncio import AsyncSession
from app.config import config
from app.connectors.airtable_connector import AirtableConnector
from app.connectors.airtable_history import AirtableHistoryConnector
from app.db import Document, DocumentType, SearchSourceConnectorType
from app.routes.airtable_add_connector_route import refresh_airtable_token
from app.schemas.airtable_auth_credentials import AirtableAuthCredentialsBase
from app.services.llm_service import get_user_long_context_llm
from app.services.task_logging_service import TaskLoggingService
from app.utils.document_converters import (
@ -18,7 +16,6 @@ from app.utils.document_converters import (
generate_document_summary,
generate_unique_identifier_hash,
)
from app.utils.oauth_security import TokenEncryption
from .base import (
calculate_date_range,
@ -85,76 +82,11 @@ async def index_airtable_records(
)
return 0, f"Connector with ID {connector_id} not found"
# Create credentials from connector config
config_data = (
connector.config.copy()
) # Work with a copy to avoid modifying original
# Decrypt tokens if they are encrypted (only when explicitly marked)
token_encrypted = config_data.get("_token_encrypted", False)
if token_encrypted:
# Tokens are explicitly marked as encrypted, attempt decryption
if not config.SECRET_KEY:
await task_logger.log_task_failure(
log_entry,
f"SECRET_KEY not configured but tokens are marked as encrypted for connector {connector_id}",
"Missing SECRET_KEY for token decryption",
{"error_type": "MissingSecretKey"},
)
return 0, "SECRET_KEY not configured but tokens are marked as encrypted"
try:
token_encryption = TokenEncryption(config.SECRET_KEY)
# Decrypt access_token
if config_data.get("access_token"):
config_data["access_token"] = token_encryption.decrypt_token(
config_data["access_token"]
)
logger.info(
f"Decrypted Airtable access token for connector {connector_id}"
)
# Decrypt refresh_token if present
if config_data.get("refresh_token"):
config_data["refresh_token"] = token_encryption.decrypt_token(
config_data["refresh_token"]
)
logger.info(
f"Decrypted Airtable refresh token for connector {connector_id}"
)
except Exception as e:
await task_logger.log_task_failure(
log_entry,
f"Failed to decrypt Airtable tokens for connector {connector_id}: {e!s}",
"Token decryption failed",
{"error_type": "TokenDecryptionError"},
)
return 0, f"Failed to decrypt Airtable tokens: {e!s}"
# If _token_encrypted is False or not set, treat tokens as plaintext
try:
credentials = AirtableAuthCredentialsBase.from_dict(config_data)
except Exception as e:
await task_logger.log_task_failure(
log_entry,
f"Invalid Airtable credentials in connector {connector_id}",
str(e),
{"error_type": "InvalidCredentials"},
)
return 0, f"Invalid Airtable credentials: {e!s}"
# Check if credentials are expired
if credentials.is_expired:
await task_logger.log_task_failure(
log_entry,
f"Airtable credentials expired for connector {connector_id}",
"Credentials expired",
{"error_type": "ExpiredCredentials"},
)
connector = await refresh_airtable_token(session, connector)
# return 0, "Airtable credentials have expired. Please re-authenticate."
# Normalize "undefined" strings to None (from frontend)
if start_date == "undefined" or start_date == "":
start_date = None
if end_date == "undefined" or end_date == "":
end_date = None
# Calculate date range for indexing
start_date_str, end_date_str = calculate_date_range(
@ -166,8 +98,9 @@ async def index_airtable_records(
f"from {start_date_str} to {end_date_str}"
)
# Initialize Airtable connector
airtable_connector = AirtableConnector(credentials)
# Initialize Airtable history connector with auto-refresh capability
airtable_history = AirtableHistoryConnector(session, connector_id)
airtable_connector = await airtable_history._get_connector()
total_processed = 0
try:
@ -459,47 +392,56 @@ async def index_airtable_records(
documents_skipped += 1
continue # Skip this message and continue with others
# Update the last_indexed_at timestamp for the connector only if requested
total_processed = documents_indexed
if total_processed > 0:
await update_connector_last_indexed(
session, connector, update_last_indexed
)
# Accumulate total processed across all tables
total_processed += documents_indexed
# Final commit for any remaining documents not yet committed in batches
logger.info(
f"Final commit: Total {documents_indexed} Airtable records processed"
)
await session.commit()
logger.info(
"Successfully committed all Airtable document changes to database"
)
if documents_indexed > 0:
logger.info(
f"Final commit for table {table_name}: {documents_indexed} Airtable records processed"
)
await session.commit()
logger.info(
f"Successfully committed all Airtable document changes for table {table_name}"
)
# Log success
await task_logger.log_task_success(
log_entry,
f"Successfully completed Airtable indexing for connector {connector_id}",
{
"events_processed": total_processed,
"documents_indexed": documents_indexed,
"documents_skipped": documents_skipped,
"skipped_messages_count": len(skipped_messages),
},
)
# Update the last_indexed_at timestamp for the connector only if requested
# (after all tables in all bases are processed)
if total_processed > 0:
await update_connector_last_indexed(
session, connector, update_last_indexed
)
logger.info(
f"Airtable indexing completed: {documents_indexed} new records, {documents_skipped} skipped"
)
return (
total_processed,
None,
) # Return None as the error message to indicate success
# Log success after processing all bases and tables
await task_logger.log_task_success(
log_entry,
f"Successfully completed Airtable indexing for connector {connector_id}",
{
"events_processed": total_processed,
"documents_indexed": total_processed,
},
)
logger.info(
f"Airtable indexing completed: {total_processed} total records processed"
)
return (
total_processed,
None,
) # Return None as the error message to indicate success
except Exception as e:
logger.error(
f"Fetching Airtable bases for connector {connector_id} failed: {e!s}",
exc_info=True,
)
await task_logger.log_task_failure(
log_entry,
f"Failed to fetch Airtable bases for connector {connector_id}",
str(e),
{"error_type": type(e).__name__},
)
return 0, f"Failed to fetch Airtable bases: {e!s}"
except SQLAlchemyError as db_error:
await session.rollback()

View file

@ -2,13 +2,14 @@
ClickUp connector indexer.
"""
import contextlib
from datetime import datetime
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.ext.asyncio import AsyncSession
from app.config import config
from app.connectors.clickup_connector import ClickUpConnector
from app.connectors.clickup_history import ClickUpHistoryConnector
from app.db import Document, DocumentType, SearchSourceConnectorType
from app.services.llm_service import get_user_long_context_llm
from app.services.task_logging_service import TaskLoggingService
@ -82,26 +83,30 @@ async def index_clickup_tasks(
)
return 0, error_msg
# Extract ClickUp configuration
clickup_api_token = connector.config.get("CLICKUP_API_TOKEN")
# Check if using OAuth (has access_token in config) or legacy (has CLICKUP_API_TOKEN)
has_oauth = connector.config.get("access_token") is not None
has_legacy = connector.config.get("CLICKUP_API_TOKEN") is not None
if not clickup_api_token:
error_msg = "ClickUp API token not found in connector configuration"
if not has_oauth and not has_legacy:
error_msg = "ClickUp credentials not found in connector configuration (neither OAuth nor API token)"
await task_logger.log_task_failure(
log_entry,
f"ClickUp API token not found in connector config for connector {connector_id}",
"Missing ClickUp token",
{"error_type": "MissingToken"},
f"ClickUp credentials not found in connector config for connector {connector_id}",
"Missing ClickUp credentials",
{"error_type": "MissingCredentials"},
)
return 0, error_msg
await task_logger.log_task_progress(
log_entry,
f"Initializing ClickUp client for connector {connector_id}",
f"Initializing ClickUp client for connector {connector_id} ({'OAuth' if has_oauth else 'API Token'})",
{"stage": "client_initialization"},
)
clickup_client = ClickUpConnector(api_token=clickup_api_token)
# Use history connector which supports both OAuth and legacy API tokens
clickup_client = ClickUpHistoryConnector(
session=session, connector_id=connector_id
)
# Get authorized workspaces
await task_logger.log_task_progress(
@ -110,7 +115,7 @@ async def index_clickup_tasks(
{"stage": "workspace_fetching"},
)
workspaces_response = clickup_client.get_authorized_workspaces()
workspaces_response = await clickup_client.get_authorized_workspaces()
workspaces = workspaces_response.get("teams", [])
if not workspaces:
@ -141,7 +146,7 @@ async def index_clickup_tasks(
# Fetch tasks for date range if provided
if start_date and end_date:
tasks, error = clickup_client.get_tasks_in_date_range(
tasks, error = await clickup_client.get_tasks_in_date_range(
workspace_id=workspace_id,
start_date=start_date,
end_date=end_date,
@ -153,7 +158,7 @@ async def index_clickup_tasks(
)
continue
else:
tasks = clickup_client.get_workspace_tasks(
tasks = await clickup_client.get_workspace_tasks(
workspace_id=workspace_id, include_closed=True
)
@ -393,10 +398,21 @@ async def index_clickup_tasks(
logger.info(
f"clickup indexing completed: {documents_indexed} new tasks, {documents_skipped} skipped"
)
# Close client connection
try:
await clickup_client.close()
except Exception as e:
logger.warning(f"Error closing ClickUp client: {e!s}")
return total_processed, None
except SQLAlchemyError as db_error:
await session.rollback()
# Clean up the connector in case of error
if "clickup_client" in locals():
with contextlib.suppress(Exception):
await clickup_client.close()
await task_logger.log_task_failure(
log_entry,
f"Database error during ClickUp indexing for connector {connector_id}",
@ -407,6 +423,10 @@ async def index_clickup_tasks(
return 0, f"Database error: {db_error!s}"
except Exception as e:
await session.rollback()
# Clean up the connector in case of error
if "clickup_client" in locals():
with contextlib.suppress(Exception):
await clickup_client.close()
await task_logger.log_task_failure(
log_entry,
f"Failed to index ClickUp tasks for connector {connector_id}",

View file

@ -2,6 +2,7 @@
Confluence connector indexer.
"""
import contextlib
from datetime import datetime
from sqlalchemy.exc import SQLAlchemyError
@ -142,10 +143,8 @@ async def index_confluence_pages(
)
# Close client before returning
if confluence_client:
try:
with contextlib.suppress(Exception):
await confluence_client.close()
except Exception:
pass
return 0, None
else:
await task_logger.log_task_failure(
@ -156,10 +155,8 @@ async def index_confluence_pages(
)
# Close client on error
if confluence_client:
try:
with contextlib.suppress(Exception):
await confluence_client.close()
except Exception:
pass
return 0, f"Failed to get Confluence pages: {error}"
logger.info(f"Retrieved {len(pages)} pages from Confluence API")
@ -168,10 +165,8 @@ async def index_confluence_pages(
logger.error(f"Error fetching Confluence pages: {e!s}", exc_info=True)
# Close client on error
if confluence_client:
try:
with contextlib.suppress(Exception):
await confluence_client.close()
except Exception:
pass
return 0, f"Error fetching Confluence pages: {e!s}"
# Process and index each page
@ -437,10 +432,8 @@ async def index_confluence_pages(
await session.rollback()
# Close client if it exists
if confluence_client:
try:
with contextlib.suppress(Exception):
await confluence_client.close()
except Exception:
pass
await task_logger.log_task_failure(
log_entry,
f"Database error during Confluence indexing for connector {connector_id}",
@ -453,10 +446,8 @@ async def index_confluence_pages(
await session.rollback()
# Close client if it exists
if confluence_client:
try:
with contextlib.suppress(Exception):
await confluence_client.close()
except Exception:
pass
await task_logger.log_task_failure(
log_entry,
f"Failed to index Confluence pages for connector {connector_id}",

View file

@ -2,6 +2,7 @@
Jira connector indexer.
"""
import contextlib
from datetime import datetime
from sqlalchemy.exc import SQLAlchemyError
@ -413,10 +414,8 @@ async def index_jira_issues(
logger.error(f"Database error: {db_error!s}", exc_info=True)
# Clean up the connector in case of error
if "jira_client" in locals():
try:
with contextlib.suppress(Exception):
await jira_client.close()
except Exception:
pass
return 0, f"Database error: {db_error!s}"
except Exception as e:
await session.rollback()
@ -429,8 +428,6 @@ async def index_jira_issues(
logger.error(f"Failed to index JIRA issues: {e!s}", exc_info=True)
# Clean up the connector in case of error
if "jira_client" in locals():
try:
with contextlib.suppress(Exception):
await jira_client.close()
except Exception:
pass
return 0, f"Failed to index JIRA issues: {e!s}"

View file

@ -0,0 +1,473 @@
"""
Microsoft Teams connector indexer.
"""
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.ext.asyncio import AsyncSession
from app.config import config
from app.connectors.teams_history import TeamsHistory
from app.db import Document, DocumentType, SearchSourceConnectorType
from app.services.task_logging_service import TaskLoggingService
from app.utils.document_converters import (
create_document_chunks,
generate_content_hash,
generate_unique_identifier_hash,
)
from .base import (
build_document_metadata_markdown,
calculate_date_range,
check_document_by_unique_identifier,
get_connector_by_id,
get_current_timestamp,
logger,
update_connector_last_indexed,
)
async def index_teams_messages(
session: AsyncSession,
connector_id: int,
search_space_id: int,
user_id: str,
start_date: str | None = None,
end_date: str | None = None,
update_last_indexed: bool = True,
) -> tuple[int, str | None]:
"""
Index Microsoft Teams messages from all accessible teams and channels.
Args:
session: Database session
connector_id: ID of the Teams connector
search_space_id: ID of the search space to store documents in
user_id: ID of the user
start_date: Start date for indexing (YYYY-MM-DD format)
end_date: End date for indexing (YYYY-MM-DD format)
update_last_indexed: Whether to update the last_indexed_at timestamp (default: True)
Returns:
Tuple containing (number of documents indexed, error message or None)
"""
task_logger = TaskLoggingService(session, search_space_id)
# Log task start
log_entry = await task_logger.log_task_start(
task_name="teams_messages_indexing",
source="connector_indexing_task",
message=f"Starting Microsoft Teams messages indexing for connector {connector_id}",
metadata={
"connector_id": connector_id,
"user_id": str(user_id),
"start_date": start_date,
"end_date": end_date,
},
)
try:
# Get the connector
await task_logger.log_task_progress(
log_entry,
f"Retrieving Teams connector {connector_id} from database",
{"stage": "connector_retrieval"},
)
connector = await get_connector_by_id(
session, connector_id, SearchSourceConnectorType.TEAMS_CONNECTOR
)
if not connector:
await task_logger.log_task_failure(
log_entry,
f"Connector with ID {connector_id} not found or is not a Teams connector",
"Connector not found",
{"error_type": "ConnectorNotFound"},
)
return (
0,
f"Connector with ID {connector_id} not found or is not a Teams connector",
)
# Initialize Teams client with auto-refresh support
await task_logger.log_task_progress(
log_entry,
f"Initializing Teams client for connector {connector_id}",
{"stage": "client_initialization"},
)
teams_client = TeamsHistory(session=session, connector_id=connector_id)
# Handle 'undefined' string from frontend (treat as None)
if start_date == "undefined" or start_date == "":
start_date = None
if end_date == "undefined" or end_date == "":
end_date = None
# Calculate date range
await task_logger.log_task_progress(
log_entry,
"Calculating date range for Teams indexing",
{
"stage": "date_calculation",
"provided_start_date": start_date,
"provided_end_date": end_date,
},
)
start_date_str, end_date_str = calculate_date_range(
connector, start_date, end_date, default_days_back=365
)
logger.info(
"Indexing Teams messages from %s to %s", start_date_str, end_date_str
)
await task_logger.log_task_progress(
log_entry,
f"Fetching Teams from {start_date_str} to {end_date_str}",
{
"stage": "fetch_teams",
"start_date": start_date_str,
"end_date": end_date_str,
},
)
# Get all teams
try:
teams = await teams_client.get_all_teams()
except Exception as e:
await task_logger.log_task_failure(
log_entry,
f"Failed to get Teams for connector {connector_id}",
str(e),
{"error_type": "TeamsFetchError"},
)
return 0, f"Failed to get Teams: {e!s}"
if not teams:
await task_logger.log_task_success(
log_entry,
f"No Teams found for connector {connector_id}",
{"teams_found": 0},
)
return 0, "No Teams found"
# Track the number of documents indexed
documents_indexed = 0
documents_skipped = 0
skipped_channels = []
await task_logger.log_task_progress(
log_entry,
f"Starting to process {len(teams)} Teams",
{"stage": "process_teams", "total_teams": len(teams)},
)
# Convert date strings to datetime objects for filtering
from datetime import datetime, timezone
start_datetime = None
end_datetime = None
if start_date_str:
# Parse as naive datetime and make it timezone-aware (UTC)
start_datetime = datetime.strptime(start_date_str, "%Y-%m-%d").replace(tzinfo=timezone.utc)
if end_date_str:
# Parse as naive datetime, set to end of day, and make it timezone-aware (UTC)
end_datetime = datetime.strptime(end_date_str, "%Y-%m-%d").replace(hour=23, minute=59, second=59, tzinfo=timezone.utc)
# Process each team
for team in teams:
team_id = team.get("id")
team_name = team.get("displayName", "Unknown Team")
try:
# Get channels for this team
channels = await teams_client.get_channels_for_team(team_id)
if not channels:
logger.info("No channels found in team %s", team_name)
continue
# Process each channel in the team
for channel in channels:
channel_id = channel.get("id")
channel_name = channel.get("displayName", "Unknown Channel")
try:
# Get messages for this channel
messages = await teams_client.get_messages_from_channel(
team_id,
channel_id,
start_datetime,
end_datetime,
include_replies=True,
)
if not messages:
logger.info(
"No messages found in channel %s of team %s for the specified date range.",
channel_name,
team_name,
)
documents_skipped += 1
continue
# Process each message
for msg in messages:
# Skip deleted messages or empty content
if msg.get("deletedDateTime"):
continue
# Extract message details
message_id = msg.get("id", "")
created_datetime = msg.get("createdDateTime", "")
from_user = msg.get("from", {})
user_name = from_user.get("user", {}).get(
"displayName", "Unknown User"
)
user_email = from_user.get("user", {}).get(
"userPrincipalName", "Unknown Email"
)
# Extract message content
body = msg.get("body", {})
content_type = body.get("contentType", "text")
msg_text = body.get("content", "")
# Skip empty messages
if not msg_text or msg_text.strip() == "":
continue
# Format document metadata
metadata_sections = [
(
"METADATA",
[
f"TEAM_NAME: {team_name}",
f"TEAM_ID: {team_id}",
f"CHANNEL_NAME: {channel_name}",
f"CHANNEL_ID: {channel_id}",
f"MESSAGE_TIMESTAMP: {created_datetime}",
f"MESSAGE_USER_NAME: {user_name}",
f"MESSAGE_USER_EMAIL: {user_email}",
f"CONTENT_TYPE: {content_type}",
],
),
(
"CONTENT",
[
f"FORMAT: {content_type}",
"TEXT_START",
msg_text,
"TEXT_END",
],
),
]
# Build the document string
combined_document_string = build_document_metadata_markdown(
metadata_sections
)
# Generate unique identifier hash for this Teams message
unique_identifier = f"{team_id}_{channel_id}_{message_id}"
unique_identifier_hash = generate_unique_identifier_hash(
DocumentType.TEAMS_CONNECTOR,
unique_identifier,
search_space_id,
)
# Generate content hash
content_hash = generate_content_hash(
combined_document_string, search_space_id
)
# Check if document with this unique identifier already exists
existing_document = (
await check_document_by_unique_identifier(
session, unique_identifier_hash
)
)
if existing_document:
# Document exists - check if content has changed
if existing_document.content_hash == content_hash:
logger.info(
"Document for Teams message %s in channel %s unchanged. Skipping.",
message_id,
channel_name,
)
documents_skipped += 1
continue
else:
# Content has changed - update the existing document
logger.info(
"Content changed for Teams message %s in channel %s. Updating document.",
message_id,
channel_name,
)
# Update chunks and embedding
chunks = await create_document_chunks(
combined_document_string
)
doc_embedding = config.embedding_model_instance.embed(
combined_document_string
)
# Update existing document
existing_document.content = combined_document_string
existing_document.content_hash = content_hash
existing_document.embedding = doc_embedding
existing_document.document_metadata = {
"team_name": team_name,
"team_id": team_id,
"channel_name": channel_name,
"channel_id": channel_id,
"start_date": start_date_str,
"end_date": end_date_str,
"message_count": len(messages),
"indexed_at": datetime.now().strftime(
"%Y-%m-%d %H:%M:%S"
),
}
# Delete old chunks and add new ones
existing_document.chunks = chunks
existing_document.updated_at = get_current_timestamp()
documents_indexed += 1
logger.info(
"Successfully updated Teams message %s", message_id
)
continue
# Document doesn't exist - create new one
# Process chunks
chunks = await create_document_chunks(
combined_document_string
)
doc_embedding = config.embedding_model_instance.embed(
combined_document_string
)
# Create and store new document
document = Document(
search_space_id=search_space_id,
title=f"Teams - {team_name} - {channel_name}",
document_type=DocumentType.TEAMS_CONNECTOR,
document_metadata={
"team_name": team_name,
"team_id": team_id,
"channel_name": channel_name,
"channel_id": channel_id,
"start_date": start_date_str,
"end_date": end_date_str,
"message_count": len(messages),
"indexed_at": datetime.now().strftime(
"%Y-%m-%d %H:%M:%S"
),
},
content=combined_document_string,
embedding=doc_embedding,
chunks=chunks,
content_hash=content_hash,
unique_identifier_hash=unique_identifier_hash,
updated_at=get_current_timestamp(),
)
session.add(document)
documents_indexed += 1
# Batch commit every 10 documents
if documents_indexed % 10 == 0:
logger.info(
"Committing batch: %s Teams messages processed so far",
documents_indexed,
)
await session.commit()
logger.info(
"Successfully indexed channel %s in team %s with %s messages",
channel_name,
team_name,
len(messages),
)
except Exception as e:
logger.error(
"Error processing channel %s in team %s: %s",
channel_name,
team_name,
str(e),
)
skipped_channels.append(
f"{team_name}/{channel_name} (processing error)"
)
documents_skipped += 1
continue
except Exception as e:
logger.error("Error processing team %s: %s", team_name, str(e))
continue
# Update the last_indexed_at timestamp for the connector only if requested
# and if we successfully indexed at least one document
total_processed = documents_indexed
if total_processed > 0:
await update_connector_last_indexed(session, connector, update_last_indexed)
# Final commit for any remaining documents not yet committed in batches
logger.info(
"Final commit: Total %s Teams messages processed", documents_indexed
)
await session.commit()
# Prepare result message
result_message = None
if skipped_channels:
result_message = f"Processed {total_processed} messages. Skipped {len(skipped_channels)} channels: {', '.join(skipped_channels)}"
else:
result_message = f"Processed {total_processed} messages."
# Log success
await task_logger.log_task_success(
log_entry,
f"Successfully completed Teams indexing for connector {connector_id}",
{
"messages_processed": total_processed,
"documents_indexed": documents_indexed,
"documents_skipped": documents_skipped,
"skipped_channels_count": len(skipped_channels),
"result_message": result_message,
},
)
logger.info(
"Teams indexing completed: %s new messages, %s skipped",
documents_indexed,
documents_skipped,
)
return total_processed, result_message
except SQLAlchemyError as db_error:
await session.rollback()
await task_logger.log_task_failure(
log_entry,
f"Database error during Teams indexing for connector {connector_id}",
str(db_error),
{"error_type": "SQLAlchemyError"},
)
logger.error("Database error: %s", str(db_error))
return 0, f"Database error: {db_error!s}"
except Exception as e:
await session.rollback()
await task_logger.log_task_failure(
log_entry,
f"Failed to index Teams messages for connector {connector_id}",
str(e),
{"error_type": type(e).__name__},
)
logger.error("Failed to index Teams messages: %s", str(e))
return 0, f"Failed to index Teams messages: {e!s}"