chore: ran linting

This commit is contained in:
Anish Sarkar 2026-03-30 01:50:41 +05:30
parent 74826b3714
commit 04691d572b
61 changed files with 1962 additions and 1516 deletions

View file

@ -1076,7 +1076,11 @@ async def _stream_agent_events(
},
)
elif tool_name == "web_search":
xml = tool_output.get("result", str(tool_output)) if isinstance(tool_output, dict) else str(tool_output)
xml = (
tool_output.get("result", str(tool_output))
if isinstance(tool_output, dict)
else str(tool_output)
)
citations: dict[str, dict[str, str]] = {}
for m in re.finditer(
r"<title><!\[CDATA\[(.*?)\]\]></title>\s*<url><!\[CDATA\[(.*?)\]\]></url>",

View file

@ -45,6 +45,7 @@ logger = logging.getLogger(__name__)
# Helpers
# ---------------------------------------------------------------------------
async def _should_skip_file(
session: AsyncSession,
file: dict,
@ -186,9 +187,13 @@ async def _download_files_parallel(
logger.warning(f"Download/ETL failed for {file_name}: {reason}")
return None
doc = _build_connector_doc(
file, markdown, od_metadata,
connector_id=connector_id, search_space_id=search_space_id,
user_id=user_id, enable_summary=enable_summary,
file,
markdown,
od_metadata,
connector_id=connector_id,
search_space_id=search_space_id,
user_id=user_id,
enable_summary=enable_summary,
)
async with hb_lock:
completed_count += 1
@ -204,9 +209,7 @@ async def _download_files_parallel(
failed = 0
for outcome in outcomes:
if isinstance(outcome, Exception):
failed += 1
elif outcome is None:
if isinstance(outcome, Exception) or outcome is None:
failed += 1
else:
results.append(outcome)
@ -227,9 +230,12 @@ async def _download_and_index(
) -> tuple[int, int]:
"""Parallel download then parallel indexing. Returns (batch_indexed, total_failed)."""
connector_docs, download_failed = await _download_files_parallel(
onedrive_client, files,
connector_id=connector_id, search_space_id=search_space_id,
user_id=user_id, enable_summary=enable_summary,
onedrive_client,
files,
connector_id=connector_id,
search_space_id=search_space_id,
user_id=user_id,
enable_summary=enable_summary,
on_heartbeat=on_heartbeat,
)
@ -242,7 +248,9 @@ async def _download_and_index(
return await get_user_long_context_llm(s, user_id, search_space_id)
_, batch_indexed, batch_failed = await pipeline.index_batch_parallel(
connector_docs, _get_llm, max_concurrency=3,
connector_docs,
_get_llm,
max_concurrency=3,
on_heartbeat=on_heartbeat,
)
@ -305,10 +313,14 @@ async def _index_selected_files(
files_to_download.append(file)
batch_indexed, failed = await _download_and_index(
onedrive_client, session, files_to_download,
connector_id=connector_id, search_space_id=search_space_id,
user_id=user_id, enable_summary=enable_summary,
batch_indexed, _failed = await _download_and_index(
onedrive_client,
session,
files_to_download,
connector_id=connector_id,
search_space_id=search_space_id,
user_id=user_id,
enable_summary=enable_summary,
on_heartbeat=on_heartbeat,
)
@ -319,6 +331,7 @@ async def _index_selected_files(
# Scan strategies
# ---------------------------------------------------------------------------
async def _index_full_scan(
onedrive_client: OneDriveClient,
session: AsyncSession,
@ -338,7 +351,11 @@ async def _index_full_scan(
await task_logger.log_task_progress(
log_entry,
f"Starting full scan of folder: {folder_name}",
{"stage": "full_scan", "folder_id": folder_id, "include_subfolders": include_subfolders},
{
"stage": "full_scan",
"folder_id": folder_id,
"include_subfolders": include_subfolders,
},
)
renamed_count = 0
@ -346,12 +363,16 @@ async def _index_full_scan(
files_to_download: list[dict] = []
all_files, error = await get_files_in_folder(
onedrive_client, folder_id, include_subfolders=include_subfolders,
onedrive_client,
folder_id,
include_subfolders=include_subfolders,
)
if error:
err_lower = error.lower()
if "401" in error or "authentication expired" in err_lower:
raise Exception(f"OneDrive authentication failed. Please re-authenticate. (Error: {error})")
raise Exception(
f"OneDrive authentication failed. Please re-authenticate. (Error: {error})"
)
raise Exception(f"Failed to list OneDrive files: {error}")
for file in all_files[:max_files]:
@ -365,14 +386,20 @@ async def _index_full_scan(
files_to_download.append(file)
batch_indexed, failed = await _download_and_index(
onedrive_client, session, files_to_download,
connector_id=connector_id, search_space_id=search_space_id,
user_id=user_id, enable_summary=enable_summary,
onedrive_client,
session,
files_to_download,
connector_id=connector_id,
search_space_id=search_space_id,
user_id=user_id,
enable_summary=enable_summary,
on_heartbeat=on_heartbeat_callback,
)
indexed = renamed_count + batch_indexed
logger.info(f"Full scan complete: {indexed} indexed, {skipped} skipped, {failed} failed")
logger.info(
f"Full scan complete: {indexed} indexed, {skipped} skipped, {failed} failed"
)
return indexed, skipped
@ -392,7 +419,8 @@ async def _index_with_delta_sync(
) -> tuple[int, int, str | None]:
"""Delta sync using OneDrive change tracking. Returns (indexed, skipped, new_delta_link)."""
await task_logger.log_task_progress(
log_entry, "Starting delta sync",
log_entry,
"Starting delta sync",
{"stage": "delta_sync"},
)
@ -402,7 +430,9 @@ async def _index_with_delta_sync(
if error:
err_lower = error.lower()
if "401" in error or "authentication expired" in err_lower:
raise Exception(f"OneDrive authentication failed. Please re-authenticate. (Error: {error})")
raise Exception(
f"OneDrive authentication failed. Please re-authenticate. (Error: {error})"
)
raise Exception(f"Failed to fetch OneDrive changes: {error}")
if not changes:
@ -444,14 +474,20 @@ async def _index_with_delta_sync(
files_to_download.append(change)
batch_indexed, failed = await _download_and_index(
onedrive_client, session, files_to_download,
connector_id=connector_id, search_space_id=search_space_id,
user_id=user_id, enable_summary=enable_summary,
onedrive_client,
session,
files_to_download,
connector_id=connector_id,
search_space_id=search_space_id,
user_id=user_id,
enable_summary=enable_summary,
on_heartbeat=on_heartbeat_callback,
)
indexed = renamed_count + batch_indexed
logger.info(f"Delta sync complete: {indexed} indexed, {skipped} skipped, {failed} failed")
logger.info(
f"Delta sync complete: {indexed} indexed, {skipped} skipped, {failed} failed"
)
return indexed, skipped, new_delta_link
@ -459,6 +495,7 @@ async def _index_with_delta_sync(
# Public entry point
# ---------------------------------------------------------------------------
async def index_onedrive_files(
session: AsyncSession,
connector_id: int,
@ -489,13 +526,20 @@ async def index_onedrive_files(
)
if not connector:
error_msg = f"OneDrive connector with ID {connector_id} not found"
await task_logger.log_task_failure(log_entry, error_msg, None, {"error_type": "ConnectorNotFound"})
await task_logger.log_task_failure(
log_entry, error_msg, None, {"error_type": "ConnectorNotFound"}
)
return 0, 0, error_msg
token_encrypted = connector.config.get("_token_encrypted", False)
if token_encrypted and not config.SECRET_KEY:
error_msg = "SECRET_KEY not configured but credentials are encrypted"
await task_logger.log_task_failure(log_entry, error_msg, "Missing SECRET_KEY", {"error_type": "MissingSecretKey"})
await task_logger.log_task_failure(
log_entry,
error_msg,
"Missing SECRET_KEY",
{"error_type": "MissingSecretKey"},
)
return 0, 0, error_msg
connector_enable_summary = getattr(connector, "enable_summary", True)
@ -513,10 +557,14 @@ async def index_onedrive_files(
selected_files = items_dict.get("files", [])
if selected_files:
file_tuples = [(f["id"], f.get("name")) for f in selected_files]
indexed, skipped, errors = await _index_selected_files(
onedrive_client, session, file_tuples,
connector_id=connector_id, search_space_id=search_space_id,
user_id=user_id, enable_summary=connector_enable_summary,
indexed, skipped, _errors = await _index_selected_files(
onedrive_client,
session,
file_tuples,
connector_id=connector_id,
search_space_id=search_space_id,
user_id=user_id,
enable_summary=connector_enable_summary,
)
total_indexed += indexed
total_skipped += skipped
@ -534,8 +582,16 @@ async def index_onedrive_files(
if can_use_delta:
logger.info(f"Using delta sync for folder {folder_name}")
indexed, skipped, new_delta_link = await _index_with_delta_sync(
onedrive_client, session, connector_id, search_space_id, user_id,
folder_id, delta_link, task_logger, log_entry, max_files,
onedrive_client,
session,
connector_id,
search_space_id,
user_id,
folder_id,
delta_link,
task_logger,
log_entry,
max_files,
enable_summary=connector_enable_summary,
)
total_indexed += indexed
@ -550,18 +606,36 @@ async def index_onedrive_files(
# Reconciliation full scan
ri, rs = await _index_full_scan(
onedrive_client, session, connector_id, search_space_id, user_id,
folder_id, folder_name, task_logger, log_entry, max_files,
include_subfolders, enable_summary=connector_enable_summary,
onedrive_client,
session,
connector_id,
search_space_id,
user_id,
folder_id,
folder_name,
task_logger,
log_entry,
max_files,
include_subfolders,
enable_summary=connector_enable_summary,
)
total_indexed += ri
total_skipped += rs
else:
logger.info(f"Using full scan for folder {folder_name}")
indexed, skipped = await _index_full_scan(
onedrive_client, session, connector_id, search_space_id, user_id,
folder_id, folder_name, task_logger, log_entry, max_files,
include_subfolders, enable_summary=connector_enable_summary,
onedrive_client,
session,
connector_id,
search_space_id,
user_id,
folder_id,
folder_name,
task_logger,
log_entry,
max_files,
include_subfolders,
enable_summary=connector_enable_summary,
)
total_indexed += indexed
total_skipped += skipped
@ -585,22 +659,28 @@ async def index_onedrive_files(
f"Successfully completed OneDrive indexing for connector {connector_id}",
{"files_processed": total_indexed, "files_skipped": total_skipped},
)
logger.info(f"OneDrive indexing completed: {total_indexed} indexed, {total_skipped} skipped")
logger.info(
f"OneDrive indexing completed: {total_indexed} indexed, {total_skipped} skipped"
)
return total_indexed, total_skipped, None
except SQLAlchemyError as db_error:
await session.rollback()
await task_logger.log_task_failure(
log_entry, f"Database error during OneDrive indexing for connector {connector_id}",
str(db_error), {"error_type": "SQLAlchemyError"},
log_entry,
f"Database error during OneDrive indexing for connector {connector_id}",
str(db_error),
{"error_type": "SQLAlchemyError"},
)
logger.error(f"Database error: {db_error!s}", exc_info=True)
return 0, 0, f"Database error: {db_error!s}"
except Exception as e:
await session.rollback()
await task_logger.log_task_failure(
log_entry, f"Failed to index OneDrive files for connector {connector_id}",
str(e), {"error_type": type(e).__name__},
log_entry,
f"Failed to index OneDrive files for connector {connector_id}",
str(e),
{"error_type": type(e).__name__},
)
logger.error(f"Failed to index OneDrive files: {e!s}", exc_info=True)
return 0, 0, f"Failed to index OneDrive files: {e!s}"