chore: ran frontend and backend linting

This commit is contained in:
Anish Sarkar 2026-02-01 22:54:25 +05:30
parent ff4a574248
commit 085653d3e3
32 changed files with 288 additions and 210 deletions

View file

@ -9,8 +9,7 @@ frontend from showing a perpetual "syncing" state.
import logging
from datetime import UTC, datetime, timedelta
from sqlalchemy import and_, update
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy import and_
from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine
from sqlalchemy.future import select
from sqlalchemy.orm.attributes import flag_modified
@ -42,12 +41,12 @@ def get_celery_session_maker():
def cleanup_stale_indexing_notifications_task():
"""
Check for stale connector indexing notifications and mark them as failed.
This task finds notifications that:
- Have type = 'connector_indexing'
- Have metadata.status = 'in_progress'
- Have updated_at older than STALE_NOTIFICATION_TIMEOUT_MINUTES
And marks them as failed with an appropriate error message.
"""
import asyncio
@ -78,7 +77,8 @@ async def _cleanup_stale_notifications():
select(Notification).filter(
and_(
Notification.type == "connector_indexing",
Notification.notification_metadata["status"].astext == "in_progress",
Notification.notification_metadata["status"].astext
== "in_progress",
Notification.updated_at < cutoff_time,
)
)
@ -98,22 +98,28 @@ async def _cleanup_stale_notifications():
for notification in stale_notifications:
try:
# Get current indexed count from metadata if available
indexed_count = notification.notification_metadata.get("indexed_count", 0)
connector_name = notification.notification_metadata.get("connector_name", "Unknown")
indexed_count = notification.notification_metadata.get(
"indexed_count", 0
)
connector_name = notification.notification_metadata.get(
"connector_name", "Unknown"
)
# Calculate how long it's been stale
stale_duration = datetime.now(UTC) - notification.updated_at
stale_minutes = int(stale_duration.total_seconds() / 60)
# Update notification metadata
notification.notification_metadata["status"] = "failed"
notification.notification_metadata["completed_at"] = datetime.now(UTC).isoformat()
notification.notification_metadata["completed_at"] = datetime.now(
UTC
).isoformat()
notification.notification_metadata["error_message"] = (
f"Indexing task appears to have crashed or timed out. "
f"No activity detected for {stale_minutes} minutes. "
f"Please try syncing again."
)
# Flag the JSONB column as modified for SQLAlchemy to detect the change
flag_modified(notification, "notification_metadata")
@ -138,4 +144,3 @@ async def _cleanup_stale_notifications():
except Exception as e:
logger.error(f"Error cleaning up stale notifications: {e!s}", exc_info=True)
await session.rollback()

View file

@ -12,9 +12,6 @@ import logging
from collections.abc import Awaitable, Callable
from importlib import import_module
# Type alias for heartbeat callback function
HeartbeatCallbackType = Callable[[int], Awaitable[None]]
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.future import select
@ -26,6 +23,9 @@ from app.db import (
from app.services.composio_service import INDEXABLE_TOOLKITS, TOOLKIT_TO_INDEXER
from app.services.task_logging_service import TaskLoggingService
# Type alias for heartbeat callback function
HeartbeatCallbackType = Callable[[int], Awaitable[None]]
# Set up logging
logger = logging.getLogger(__name__)

View file

@ -20,12 +20,6 @@ from app.utils.document_converters import (
generate_unique_identifier_hash,
)
# Type hint for heartbeat callback
HeartbeatCallbackType = Callable[[int], Awaitable[None]]
# Heartbeat interval in seconds
HEARTBEAT_INTERVAL_SECONDS = 30
from .base import (
calculate_date_range,
check_document_by_unique_identifier,
@ -36,6 +30,11 @@ from .base import (
update_connector_last_indexed,
)
# Type hint for heartbeat callback
HeartbeatCallbackType = Callable[[int], Awaitable[None]]
# Heartbeat interval in seconds
HEARTBEAT_INTERVAL_SECONDS = 30
async def index_airtable_records(
session: AsyncSession,
@ -145,7 +144,11 @@ async def index_airtable_records(
# Process each base
for base in bases:
# Check if it's time for a heartbeat update
if on_heartbeat_callback and (time.time() - last_heartbeat_time) >= HEARTBEAT_INTERVAL_SECONDS:
if (
on_heartbeat_callback
and (time.time() - last_heartbeat_time)
>= HEARTBEAT_INTERVAL_SECONDS
):
await on_heartbeat_callback(total_documents_indexed)
last_heartbeat_time = time.time()
base_id = base.get("id")
@ -224,7 +227,11 @@ async def index_airtable_records(
# Process each record
for record in records:
# Check if it's time for a heartbeat update
if on_heartbeat_callback and (time.time() - last_heartbeat_time) >= HEARTBEAT_INTERVAL_SECONDS:
if (
on_heartbeat_callback
and (time.time() - last_heartbeat_time)
>= HEARTBEAT_INTERVAL_SECONDS
):
await on_heartbeat_callback(total_documents_indexed)
last_heartbeat_time = time.time()

View file

@ -21,12 +21,6 @@ from app.utils.document_converters import (
generate_unique_identifier_hash,
)
# Type hint for heartbeat callback
HeartbeatCallbackType = Callable[[int], Awaitable[None]]
# Heartbeat interval in seconds
HEARTBEAT_INTERVAL_SECONDS = 30
from .base import (
calculate_date_range,
check_document_by_unique_identifier,
@ -37,6 +31,11 @@ from .base import (
update_connector_last_indexed,
)
# Type hint for heartbeat callback
HeartbeatCallbackType = Callable[[int], Awaitable[None]]
# Heartbeat interval in seconds
HEARTBEAT_INTERVAL_SECONDS = 30
async def index_bookstack_pages(
session: AsyncSession,
@ -194,7 +193,10 @@ async def index_bookstack_pages(
for page in pages:
# Check if it's time for a heartbeat update
if on_heartbeat_callback and (time.time() - last_heartbeat_time) >= HEARTBEAT_INTERVAL_SECONDS:
if (
on_heartbeat_callback
and (time.time() - last_heartbeat_time) >= HEARTBEAT_INTERVAL_SECONDS
):
await on_heartbeat_callback(documents_indexed)
last_heartbeat_time = time.time()
try:

View file

@ -22,12 +22,6 @@ from app.utils.document_converters import (
generate_unique_identifier_hash,
)
# Type hint for heartbeat callback
HeartbeatCallbackType = Callable[[int], Awaitable[None]]
# Heartbeat interval in seconds
HEARTBEAT_INTERVAL_SECONDS = 30
from .base import (
check_document_by_unique_identifier,
check_duplicate_document_by_hash,
@ -37,6 +31,11 @@ from .base import (
update_connector_last_indexed,
)
# Type hint for heartbeat callback
HeartbeatCallbackType = Callable[[int], Awaitable[None]]
# Heartbeat interval in seconds
HEARTBEAT_INTERVAL_SECONDS = 30
async def index_clickup_tasks(
session: AsyncSession,
@ -184,7 +183,11 @@ async def index_clickup_tasks(
for task in tasks:
# Check if it's time for a heartbeat update
if on_heartbeat_callback and (time.time() - last_heartbeat_time) >= HEARTBEAT_INTERVAL_SECONDS:
if (
on_heartbeat_callback
and (time.time() - last_heartbeat_time)
>= HEARTBEAT_INTERVAL_SECONDS
):
await on_heartbeat_callback(documents_indexed)
last_heartbeat_time = time.time()

View file

@ -22,12 +22,6 @@ from app.utils.document_converters import (
generate_unique_identifier_hash,
)
# Type hint for heartbeat callback
HeartbeatCallbackType = Callable[[int], Awaitable[None]]
# Heartbeat interval in seconds
HEARTBEAT_INTERVAL_SECONDS = 30
from .base import (
calculate_date_range,
check_document_by_unique_identifier,
@ -38,6 +32,11 @@ from .base import (
update_connector_last_indexed,
)
# Type hint for heartbeat callback
HeartbeatCallbackType = Callable[[int], Awaitable[None]]
# Heartbeat interval in seconds
HEARTBEAT_INTERVAL_SECONDS = 30
async def index_confluence_pages(
session: AsyncSession,
@ -190,7 +189,10 @@ async def index_confluence_pages(
for page in pages:
# Check if it's time for a heartbeat update
if on_heartbeat_callback and (time.time() - last_heartbeat_time) >= HEARTBEAT_INTERVAL_SECONDS:
if (
on_heartbeat_callback
and (time.time() - last_heartbeat_time) >= HEARTBEAT_INTERVAL_SECONDS
):
await on_heartbeat_callback(documents_indexed)
last_heartbeat_time = time.time()
try:

View file

@ -305,7 +305,11 @@ async def index_discord_messages(
try:
for guild in guilds:
# Check if it's time for a heartbeat update
if on_heartbeat_callback and (time.time() - last_heartbeat_time) >= HEARTBEAT_INTERVAL_SECONDS:
if (
on_heartbeat_callback
and (time.time() - last_heartbeat_time)
>= HEARTBEAT_INTERVAL_SECONDS
):
await on_heartbeat_callback(documents_indexed)
last_heartbeat_time = time.time()
guild_id = guild["id"]

View file

@ -21,18 +21,18 @@ from app.utils.document_converters import (
generate_unique_identifier_hash,
)
# Type hint for heartbeat callback
HeartbeatCallbackType = Callable[[int], Awaitable[None]]
# Heartbeat interval in seconds
HEARTBEAT_INTERVAL_SECONDS = 30
from .base import (
check_document_by_unique_identifier,
check_duplicate_document_by_hash,
get_current_timestamp,
)
# Type hint for heartbeat callback
HeartbeatCallbackType = Callable[[int], Awaitable[None]]
# Heartbeat interval in seconds
HEARTBEAT_INTERVAL_SECONDS = 30
logger = logging.getLogger(__name__)
@ -186,7 +186,11 @@ async def index_elasticsearch_documents(
fields=config.get("ELASTICSEARCH_FIELDS"),
):
# Check if it's time for a heartbeat update
if on_heartbeat_callback and (time.time() - last_heartbeat_time) >= HEARTBEAT_INTERVAL_SECONDS:
if (
on_heartbeat_callback
and (time.time() - last_heartbeat_time)
>= HEARTBEAT_INTERVAL_SECONDS
):
await on_heartbeat_callback(documents_processed)
last_heartbeat_time = time.time()

View file

@ -24,12 +24,6 @@ from app.utils.document_converters import (
generate_unique_identifier_hash,
)
# Type hint for heartbeat callback
HeartbeatCallbackType = Callable[[int], Awaitable[None]]
# Heartbeat interval in seconds - update notification every 30 seconds
HEARTBEAT_INTERVAL_SECONDS = 30
from .base import (
check_document_by_unique_identifier,
check_duplicate_document_by_hash,
@ -38,6 +32,12 @@ from .base import (
logger,
)
# Type hint for heartbeat callback
HeartbeatCallbackType = Callable[[int], Awaitable[None]]
# Heartbeat interval in seconds - update notification every 30 seconds
HEARTBEAT_INTERVAL_SECONDS = 30
# Maximum tokens for a single digest before splitting
# Most LLMs can handle 128k+ tokens now, but we'll be conservative
MAX_DIGEST_CHARS = 500_000 # ~125k tokens
@ -184,7 +184,10 @@ async def index_github_repos(
for repo_full_name in repo_full_names_to_index:
# Check if it's time for a heartbeat update
if on_heartbeat_callback and (time.time() - last_heartbeat_time) >= HEARTBEAT_INTERVAL_SECONDS:
if (
on_heartbeat_callback
and (time.time() - last_heartbeat_time) >= HEARTBEAT_INTERVAL_SECONDS
):
await on_heartbeat_callback(documents_indexed)
last_heartbeat_time = time.time()
if not repo_full_name or not isinstance(repo_full_name, str):

View file

@ -6,8 +6,6 @@ import time
from collections.abc import Awaitable, Callable
from datetime import datetime, timedelta
import pytz
from dateutil.parser import isoparse
from google.oauth2.credentials import Credentials
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.ext.asyncio import AsyncSession
@ -23,12 +21,6 @@ from app.utils.document_converters import (
generate_unique_identifier_hash,
)
# Type hint for heartbeat callback
HeartbeatCallbackType = Callable[[int], Awaitable[None]]
# Heartbeat interval in seconds
HEARTBEAT_INTERVAL_SECONDS = 30
from .base import (
check_document_by_unique_identifier,
check_duplicate_document_by_hash,
@ -38,6 +30,11 @@ from .base import (
update_connector_last_indexed,
)
# Type hint for heartbeat callback
HeartbeatCallbackType = Callable[[int], Awaitable[None]]
# Heartbeat interval in seconds
HEARTBEAT_INTERVAL_SECONDS = 30
async def index_google_calendar_events(
session: AsyncSession,
@ -296,7 +293,10 @@ async def index_google_calendar_events(
for event in events:
# Check if it's time for a heartbeat update
if on_heartbeat_callback and (time.time() - last_heartbeat_time) >= HEARTBEAT_INTERVAL_SECONDS:
if (
on_heartbeat_callback
and (time.time() - last_heartbeat_time) >= HEARTBEAT_INTERVAL_SECONDS
):
await on_heartbeat_callback(documents_indexed)
last_heartbeat_time = time.time()
try:

View file

@ -420,7 +420,10 @@ async def _index_full_scan(
while folders_to_process and files_processed < max_files:
# Check if it's time for a heartbeat update
if on_heartbeat_callback and (time.time() - last_heartbeat_time) >= HEARTBEAT_INTERVAL_SECONDS:
if (
on_heartbeat_callback
and (time.time() - last_heartbeat_time) >= HEARTBEAT_INTERVAL_SECONDS
):
await on_heartbeat_callback(documents_indexed)
last_heartbeat_time = time.time()
current_folder_id, current_folder_name = folders_to_process.pop(0)
@ -541,7 +544,10 @@ async def _index_with_delta_sync(
for change in changes:
# Check if it's time for a heartbeat update
if on_heartbeat_callback and (time.time() - last_heartbeat_time) >= HEARTBEAT_INTERVAL_SECONDS:
if (
on_heartbeat_callback
and (time.time() - last_heartbeat_time) >= HEARTBEAT_INTERVAL_SECONDS
):
await on_heartbeat_callback(documents_indexed)
last_heartbeat_time = time.time()
if files_processed >= max_files:

View file

@ -25,12 +25,6 @@ from app.utils.document_converters import (
generate_unique_identifier_hash,
)
# Type hint for heartbeat callback
HeartbeatCallbackType = Callable[[int], Awaitable[None]]
# Heartbeat interval in seconds
HEARTBEAT_INTERVAL_SECONDS = 30
from .base import (
calculate_date_range,
check_document_by_unique_identifier,
@ -41,6 +35,11 @@ from .base import (
update_connector_last_indexed,
)
# Type hint for heartbeat callback
HeartbeatCallbackType = Callable[[int], Awaitable[None]]
# Heartbeat interval in seconds
HEARTBEAT_INTERVAL_SECONDS = 30
async def index_google_gmail_messages(
session: AsyncSession,
@ -228,7 +227,10 @@ async def index_google_gmail_messages(
for message in messages:
# Check if it's time for a heartbeat update
if on_heartbeat_callback and (time.time() - last_heartbeat_time) >= HEARTBEAT_INTERVAL_SECONDS:
if (
on_heartbeat_callback
and (time.time() - last_heartbeat_time) >= HEARTBEAT_INTERVAL_SECONDS
):
await on_heartbeat_callback(documents_indexed)
last_heartbeat_time = time.time()
try:

View file

@ -22,12 +22,6 @@ from app.utils.document_converters import (
generate_unique_identifier_hash,
)
# Type hint for heartbeat callback
HeartbeatCallbackType = Callable[[int], Awaitable[None]]
# Heartbeat interval in seconds - update notification every 30 seconds
HEARTBEAT_INTERVAL_SECONDS = 30
from .base import (
calculate_date_range,
check_document_by_unique_identifier,
@ -38,6 +32,11 @@ from .base import (
update_connector_last_indexed,
)
# Type hint for heartbeat callback
HeartbeatCallbackType = Callable[[int], Awaitable[None]]
# Heartbeat interval in seconds - update notification every 30 seconds
HEARTBEAT_INTERVAL_SECONDS = 30
async def index_jira_issues(
session: AsyncSession,
@ -184,7 +183,10 @@ async def index_jira_issues(
for issue in issues:
# Check if it's time for a heartbeat update
if on_heartbeat_callback and (time.time() - last_heartbeat_time) >= HEARTBEAT_INTERVAL_SECONDS:
if (
on_heartbeat_callback
and (time.time() - last_heartbeat_time) >= HEARTBEAT_INTERVAL_SECONDS
):
await on_heartbeat_callback(documents_indexed)
last_heartbeat_time = time.time()
try:

View file

@ -21,12 +21,6 @@ from app.utils.document_converters import (
generate_unique_identifier_hash,
)
# Type hint for heartbeat callback
HeartbeatCallbackType = Callable[[int], Awaitable[None]]
# Heartbeat interval in seconds - update notification every 30 seconds
HEARTBEAT_INTERVAL_SECONDS = 30
from .base import (
calculate_date_range,
check_document_by_unique_identifier,
@ -37,6 +31,11 @@ from .base import (
update_connector_last_indexed,
)
# Type hint for heartbeat callback
HeartbeatCallbackType = Callable[[int], Awaitable[None]]
# Heartbeat interval in seconds - update notification every 30 seconds
HEARTBEAT_INTERVAL_SECONDS = 30
async def index_linear_issues(
session: AsyncSession,
@ -210,7 +209,10 @@ async def index_linear_issues(
# Process each issue
for issue in issues:
# Check if it's time for a heartbeat update
if on_heartbeat_callback and (time.time() - last_heartbeat_time) >= HEARTBEAT_INTERVAL_SECONDS:
if (
on_heartbeat_callback
and (time.time() - last_heartbeat_time) >= HEARTBEAT_INTERVAL_SECONDS
):
await on_heartbeat_callback(documents_indexed)
last_heartbeat_time = time.time()

View file

@ -21,12 +21,6 @@ from app.utils.document_converters import (
generate_unique_identifier_hash,
)
# Type hint for heartbeat callback
HeartbeatCallbackType = Callable[[int], Awaitable[None]]
# Heartbeat interval in seconds
HEARTBEAT_INTERVAL_SECONDS = 30
from .base import (
check_document_by_unique_identifier,
check_duplicate_document_by_hash,
@ -36,6 +30,11 @@ from .base import (
update_connector_last_indexed,
)
# Type hint for heartbeat callback
HeartbeatCallbackType = Callable[[int], Awaitable[None]]
# Heartbeat interval in seconds
HEARTBEAT_INTERVAL_SECONDS = 30
async def index_luma_events(
session: AsyncSession,
@ -236,7 +235,10 @@ async def index_luma_events(
for event in events:
# Check if it's time for a heartbeat update
if on_heartbeat_callback and (time.time() - last_heartbeat_time) >= HEARTBEAT_INTERVAL_SECONDS:
if (
on_heartbeat_callback
and (time.time() - last_heartbeat_time) >= HEARTBEAT_INTERVAL_SECONDS
):
await on_heartbeat_callback(documents_indexed)
last_heartbeat_time = time.time()
try:

View file

@ -234,7 +234,10 @@ async def index_notion_pages(
# Process each page
for page in pages:
# Check if it's time for a heartbeat update
if on_heartbeat_callback and (time.time() - last_heartbeat_time) >= HEARTBEAT_INTERVAL_SECONDS:
if (
on_heartbeat_callback
and (time.time() - last_heartbeat_time) >= HEARTBEAT_INTERVAL_SECONDS
):
await on_heartbeat_callback(documents_indexed)
last_heartbeat_time = time.time()

View file

@ -27,12 +27,6 @@ from app.utils.document_converters import (
generate_unique_identifier_hash,
)
# Type hint for heartbeat callback
HeartbeatCallbackType = Callable[[int], Awaitable[None]]
# Heartbeat interval in seconds
HEARTBEAT_INTERVAL_SECONDS = 30
from .base import (
build_document_metadata_string,
check_document_by_unique_identifier,
@ -43,6 +37,11 @@ from .base import (
update_connector_last_indexed,
)
# Type hint for heartbeat callback
HeartbeatCallbackType = Callable[[int], Awaitable[None]]
# Heartbeat interval in seconds
HEARTBEAT_INTERVAL_SECONDS = 30
def parse_frontmatter(content: str) -> tuple[dict | None, str]:
"""
@ -320,7 +319,10 @@ async def index_obsidian_vault(
for file_info in files:
# Check if it's time for a heartbeat update
if on_heartbeat_callback and (time.time() - last_heartbeat_time) >= HEARTBEAT_INTERVAL_SECONDS:
if (
on_heartbeat_callback
and (time.time() - last_heartbeat_time) >= HEARTBEAT_INTERVAL_SECONDS
):
await on_heartbeat_callback(indexed_count)
last_heartbeat_time = time.time()
try:

View file

@ -20,12 +20,6 @@ from app.utils.document_converters import (
generate_unique_identifier_hash,
)
# Type hint for heartbeat callback
HeartbeatCallbackType = Callable[[int], Awaitable[None]]
# Heartbeat interval in seconds - update notification every 30 seconds
HEARTBEAT_INTERVAL_SECONDS = 30
from .base import (
build_document_metadata_markdown,
calculate_date_range,
@ -37,6 +31,11 @@ from .base import (
update_connector_last_indexed,
)
# Type hint for heartbeat callback
HeartbeatCallbackType = Callable[[int], Awaitable[None]]
# Heartbeat interval in seconds - update notification every 30 seconds
HEARTBEAT_INTERVAL_SECONDS = 30
async def index_slack_messages(
session: AsyncSession,
@ -187,7 +186,10 @@ async def index_slack_messages(
# Process each channel
for channel_obj in channels:
# Check if it's time for a heartbeat update
if on_heartbeat_callback and (time.time() - last_heartbeat_time) >= HEARTBEAT_INTERVAL_SECONDS:
if (
on_heartbeat_callback
and (time.time() - last_heartbeat_time) >= HEARTBEAT_INTERVAL_SECONDS
):
await on_heartbeat_callback(documents_indexed)
last_heartbeat_time = time.time()
channel_id = channel_obj["id"]

View file

@ -19,12 +19,6 @@ from app.utils.document_converters import (
generate_unique_identifier_hash,
)
# Type hint for heartbeat callback
HeartbeatCallbackType = Callable[[int], Awaitable[None]]
# Heartbeat interval in seconds - update notification every 30 seconds
HEARTBEAT_INTERVAL_SECONDS = 30
from .base import (
build_document_metadata_markdown,
calculate_date_range,
@ -36,6 +30,11 @@ from .base import (
update_connector_last_indexed,
)
# Type hint for heartbeat callback
HeartbeatCallbackType = Callable[[int], Awaitable[None]]
# Heartbeat interval in seconds - update notification every 30 seconds
HEARTBEAT_INTERVAL_SECONDS = 30
async def index_teams_messages(
session: AsyncSession,
@ -200,7 +199,10 @@ async def index_teams_messages(
# Process each team
for team in teams:
# Check if it's time for a heartbeat update
if on_heartbeat_callback and (time.time() - last_heartbeat_time) >= HEARTBEAT_INTERVAL_SECONDS:
if (
on_heartbeat_callback
and (time.time() - last_heartbeat_time) >= HEARTBEAT_INTERVAL_SECONDS
):
await on_heartbeat_callback(documents_indexed)
last_heartbeat_time = time.time()

View file

@ -22,12 +22,6 @@ from app.utils.document_converters import (
)
from app.utils.webcrawler_utils import parse_webcrawler_urls
# Type hint for heartbeat callback
HeartbeatCallbackType = Callable[[int], Awaitable[None]]
# Heartbeat interval in seconds
HEARTBEAT_INTERVAL_SECONDS = 30
from .base import (
check_document_by_unique_identifier,
check_duplicate_document_by_hash,
@ -37,6 +31,11 @@ from .base import (
update_connector_last_indexed,
)
# Type hint for heartbeat callback
HeartbeatCallbackType = Callable[[int], Awaitable[None]]
# Heartbeat interval in seconds
HEARTBEAT_INTERVAL_SECONDS = 30
async def index_crawled_urls(
session: AsyncSession,
@ -155,7 +154,10 @@ async def index_crawled_urls(
for idx, url in enumerate(urls, 1):
# Check if it's time for a heartbeat update
if on_heartbeat_callback and (time.time() - last_heartbeat_time) >= HEARTBEAT_INTERVAL_SECONDS:
if (
on_heartbeat_callback
and (time.time() - last_heartbeat_time) >= HEARTBEAT_INTERVAL_SECONDS
):
await on_heartbeat_callback(documents_indexed)
last_heartbeat_time = time.time()
try: