refactor: added batch commits and Increased task time limits in celery_app.py

- Increased task time limits in celery_app.py for longer processing times.
- Enhanced pagination logic in NotionHistoryConnector to handle large result sets.
- Implemented batch commits every 10 documents across various indexers (Airtable, ClickUp, Confluence, Discord, GitHub, Google Calendar, Gmail, JIRA, Linear, Luma, Notion, Slack) to improve performance and reduce database load.
- Updated final commit logging for clarity on total documents processed.
This commit is contained in:
DESKTOP-RTLN3BA\$punk 2025-11-03 15:57:19 -08:00
parent 49b7cb9ad5
commit e65d74f2e2
15 changed files with 305 additions and 102 deletions

View file

@ -389,6 +389,11 @@ async def index_airtable_records(
logger.info(
f"Successfully indexed new Airtable record {summary_content}"
)
# Batch commit every 10 documents
if documents_indexed % 10 == 0:
logger.info(f"Committing batch: {documents_indexed} Airtable records processed so far")
await session.commit()
except Exception as e:
logger.error(
@ -408,7 +413,8 @@ async def index_airtable_records(
session, connector, update_last_indexed
)
# Commit all changes
# Final commit for any remaining documents not yet committed in batches
logger.info(f"Final commit: Total {documents_indexed} Airtable records processed")
await session.commit()
logger.info(
"Successfully committed all Airtable document changes to database"

View file

@ -353,6 +353,11 @@ async def index_clickup_tasks(
session.add(document)
documents_indexed += 1
logger.info(f"Successfully indexed new task {task_name}")
# Batch commit every 10 documents
if documents_indexed % 10 == 0:
logger.info(f"Committing batch: {documents_indexed} ClickUp tasks processed so far")
await session.commit()
except Exception as e:
logger.error(
@ -366,6 +371,8 @@ async def index_clickup_tasks(
if total_processed > 0:
await update_connector_last_indexed(session, connector, update_last_indexed)
# Final commit for any remaining documents not yet committed in batches
logger.info(f"Final commit: Total {documents_indexed} ClickUp tasks processed")
await session.commit()
await task_logger.log_task_success(

View file

@ -367,6 +367,11 @@ async def index_confluence_pages(
session.add(document)
documents_indexed += 1
logger.info(f"Successfully indexed new page {page_title}")
# Batch commit every 10 documents
if documents_indexed % 10 == 0:
logger.info(f"Committing batch: {documents_indexed} Confluence pages processed so far")
await session.commit()
except Exception as e:
logger.error(
@ -384,7 +389,8 @@ async def index_confluence_pages(
if update_last_indexed:
await update_connector_last_indexed(session, connector, update_last_indexed)
# Commit all changes
# Final commit for any remaining documents not yet committed in batches
logger.info(f"Final commit: Total {documents_indexed} Confluence pages processed")
await session.commit()
logger.info(
"Successfully committed all Confluence document changes to database"

View file

@ -461,6 +461,11 @@ async def index_discord_messages(
logger.info(
f"Successfully indexed new channel {guild_name}#{channel_name} with {len(formatted_messages)} messages"
)
# Batch commit every 10 documents
if documents_indexed % 10 == 0:
logger.info(f"Committing batch: {documents_indexed} Discord channels processed so far")
await session.commit()
except Exception as e:
logger.error(
@ -476,6 +481,8 @@ async def index_discord_messages(
if documents_indexed > 0:
await update_connector_last_indexed(session, connector, update_last_indexed)
# Final commit for any remaining documents not yet committed in batches
logger.info(f"Final commit: Total {documents_indexed} Discord channels processed")
await session.commit()
# Prepare result message

View file

@ -380,6 +380,11 @@ async def index_github_repos(
)
session.add(document)
documents_processed += 1
# Batch commit every 10 documents
if documents_processed % 10 == 0:
logger.info(f"Committing batch: {documents_processed} GitHub files processed so far")
await session.commit()
except Exception as repo_err:
logger.error(
@ -387,7 +392,8 @@ async def index_github_repos(
)
errors.append(f"Failed processing {repo_full_name}: {repo_err}")
# Commit all changes at the end
# Final commit for any remaining documents not yet committed in batches
logger.info(f"Final commit: Total {documents_processed} GitHub files processed")
await session.commit()
logger.info(
f"Finished GitHub indexing for connector {connector_id}. Processed {documents_processed} files."

View file

@ -406,6 +406,11 @@ async def index_google_calendar_events(
session.add(document)
documents_indexed += 1
logger.info(f"Successfully indexed new event {event_summary}")
# Batch commit every 10 documents
if documents_indexed % 10 == 0:
logger.info(f"Committing batch: {documents_indexed} Google Calendar events processed so far")
await session.commit()
except Exception as e:
logger.error(
@ -422,6 +427,8 @@ async def index_google_calendar_events(
if total_processed > 0:
await update_connector_last_indexed(session, connector, update_last_indexed)
# Final commit for any remaining documents not yet committed in batches
logger.info(f"Final commit: Total {documents_indexed} Google Calendar events processed")
await session.commit()
await task_logger.log_task_success(

View file

@ -323,6 +323,11 @@ async def index_google_gmail_messages(
session.add(document)
documents_indexed += 1
logger.info(f"Successfully indexed new email {summary_content}")
# Batch commit every 10 documents
if documents_indexed % 10 == 0:
logger.info(f"Committing batch: {documents_indexed} Gmail messages processed so far")
await session.commit()
except Exception as e:
logger.error(
@ -338,7 +343,8 @@ async def index_google_gmail_messages(
if total_processed > 0:
await update_connector_last_indexed(session, connector, update_last_indexed)
# Commit all changes
# Final commit for any remaining documents not yet committed in batches
logger.info(f"Final commit: Total {documents_indexed} Gmail messages processed")
await session.commit()
logger.info(
"Successfully committed all Google gmail document changes to database"

View file

@ -351,6 +351,11 @@ async def index_jira_issues(
logger.info(
f"Successfully indexed new issue {issue_identifier} - {issue_title}"
)
# Batch commit every 10 documents
if documents_indexed % 10 == 0:
logger.info(f"Committing batch: {documents_indexed} Jira issues processed so far")
await session.commit()
except Exception as e:
logger.error(
@ -368,7 +373,8 @@ async def index_jira_issues(
if update_last_indexed:
await update_connector_last_indexed(session, connector, update_last_indexed)
# Commit all changes
# Final commit for any remaining documents not yet committed in batches
logger.info(f"Final commit: Total {documents_indexed} Jira issues processed")
await session.commit()
logger.info("Successfully committed all JIRA document changes to database")

View file

@ -370,6 +370,11 @@ async def index_linear_issues(
logger.info(
f"Successfully indexed new issue {issue_identifier} - {issue_title}"
)
# Batch commit every 10 documents
if documents_indexed % 10 == 0:
logger.info(f"Committing batch: {documents_indexed} Linear issues processed so far")
await session.commit()
except Exception as e:
logger.error(
@ -387,7 +392,8 @@ async def index_linear_issues(
if update_last_indexed:
await update_connector_last_indexed(session, connector, update_last_indexed)
# Commit all changes
# Final commit for any remaining documents not yet committed in batches
logger.info(f"Final commit: Total {documents_indexed} Linear issues processed")
await session.commit()
logger.info("Successfully committed all Linear document changes to database")

View file

@ -437,6 +437,11 @@ async def index_luma_events(
session.add(document)
documents_indexed += 1
logger.info(f"Successfully indexed new event {event_name}")
# Batch commit every 10 documents
if documents_indexed % 10 == 0:
logger.info(f"Committing batch: {documents_indexed} Luma events processed so far")
await session.commit()
except Exception as e:
logger.error(
@ -453,6 +458,8 @@ async def index_luma_events(
if total_processed > 0:
await update_connector_last_indexed(session, connector, update_last_indexed)
# Final commit for any remaining documents not yet committed in batches
logger.info(f"Final commit: Total {documents_indexed} Luma events processed")
await session.commit()
await task_logger.log_task_success(

View file

@ -356,6 +356,12 @@ async def index_notion_pages(
documents_indexed += 1
logger.info(f"Successfully updated Notion page: {page_title}")
# Batch commit every 10 documents
if documents_indexed % 10 == 0:
logger.info(f"Committing batch: {documents_indexed} documents processed so far")
await session.commit()
continue
# Document doesn't exist - create new one
@ -405,6 +411,11 @@ async def index_notion_pages(
session.add(document)
documents_indexed += 1
logger.info(f"Successfully indexed new Notion page: {page_title}")
# Batch commit every 10 documents
if documents_indexed % 10 == 0:
logger.info(f"Committing batch: {documents_indexed} documents processed so far")
await session.commit()
except Exception as e:
logger.error(
@ -423,7 +434,8 @@ async def index_notion_pages(
if total_processed > 0:
await update_connector_last_indexed(session, connector, update_last_indexed)
# Commit all changes
# Final commit for any remaining documents not yet committed in batches
logger.info(f"Final commit: Total {documents_indexed} documents processed")
await session.commit()
# Prepare result message

View file

@ -353,6 +353,12 @@ async def index_slack_messages(
session.add(document)
documents_indexed += 1
# Batch commit every 10 documents
if documents_indexed % 10 == 0:
logger.info(f"Committing batch: {documents_indexed} Slack channels processed so far")
await session.commit()
logger.info(
f"Successfully indexed new channel {channel_name} with {len(formatted_messages)} messages"
)
@ -376,7 +382,8 @@ async def index_slack_messages(
if total_processed > 0:
await update_connector_last_indexed(session, connector, update_last_indexed)
# Commit all changes
# Final commit for any remaining documents not yet committed in batches
logger.info(f"Final commit: Total {documents_indexed} Slack channels processed")
await session.commit()
# Prepare result message