refactor(connectors): remove verbose docstrings and obvious comments

- Simplify module docstrings (remove meta-commentary about 'small focused modules')
- Remove redundant inline comments (e.g., 'Log task start', 'Get connector from database')
- Trim verbose function docstrings to essential information only
- Remove over-explanatory comments that restate what code does
- Keep necessary documentation, remove noise for better readability
This commit is contained in:
CREDO23 2025-12-28 18:53:13 +02:00
parent 506a9297a9
commit acf47e3b0c
8 changed files with 12 additions and 110 deletions

View file

@ -1,8 +1,4 @@
"""
Google Drive Connector Module.
Simple, modular approach to Google Drive indexing.
"""
"""Google Drive Connector Module."""
from .change_tracker import categorize_change, fetch_all_changes, get_start_page_token
from .client import GoogleDriveClient

View file

@ -1,9 +1,4 @@
"""
Change Tracking for Google Drive - Delta Sync Support.
Handles change detection and incremental syncing using Drive API's changes endpoint.
Small, focused module for tracking file modifications.
"""
"""Change tracking for Google Drive delta sync."""
import logging
from datetime import datetime
@ -110,7 +105,6 @@ async def _filter_changes_by_folder(
for change in changes:
file = change.get("file")
if not file:
# File was removed
filtered.append(change)
continue
@ -147,7 +141,6 @@ def categorize_change(change: dict[str, Any]) -> str:
if file.get("trashed"):
return "trashed"
# Check if file was recently created
created_time = file.get("createdTime")
modified_time = file.get("modifiedTime")
@ -198,7 +191,6 @@ async def fetch_all_changes(
all_changes.extend(changes)
# If next_token is None, we've reached the end
if not next_token or next_token == current_token:
break

View file

@ -1,9 +1,4 @@
"""
Google Drive API Client.
Core client for interacting with Google Drive API.
Handles service initialization and basic file operations.
"""
"""Google Drive API client."""
from typing import Any
@ -16,12 +11,7 @@ from .credentials import get_valid_credentials
class GoogleDriveClient:
"""
Main client for Google Drive API operations.
Handles service initialization and provides methods for
listing files, getting metadata, and downloading content.
"""
"""Client for Google Drive API operations."""
def __init__(self, session: AsyncSession, connector_id: int):
"""
@ -140,7 +130,6 @@ class GoogleDriveClient:
service = await self.get_service()
request = service.files().get_media(fileId=file_id)
# Execute the download
import io
fh = io.BytesIO()

View file

@ -1,8 +1,4 @@
"""
Content Extraction for Google Drive Files.
Downloads files and delegates to Surfsense's existing file processors.
"""
"""Content extraction for Google Drive files."""
import logging
import os
@ -31,9 +27,7 @@ async def download_and_process_file(
log_entry: Log,
) -> tuple[Any, str | None, dict[str, Any] | None]:
"""
Download Google Drive file and process using Surfsense's existing infrastructure.
This is the ONLY function needed - it delegates everything to process_file_in_background.
Download Google Drive file and process using Surfsense file processors.
Args:
client: GoogleDriveClient instance
@ -71,10 +65,8 @@ async def download_and_process_file(
if error:
return None, error
# Set extension based on export format
extension = ".pdf" if export_mime == "application/pdf" else ".txt"
else:
# Regular files - download directly
content_bytes, error = await client.download_file(file_id)
if error:
return None, error
@ -82,19 +74,15 @@ async def download_and_process_file(
# Preserve original file extension
extension = Path(file_name).suffix or ".bin"
# Save to temporary file
with tempfile.NamedTemporaryFile(delete=False, suffix=extension) as tmp_file:
tmp_file.write(content_bytes)
temp_file_path = tmp_file.name
# Step 2: Delegate to Surfsense's existing file processor
# This handles ALL file types: markdown, audio, PDFs, Office docs, images, etc.
from app.tasks.document_processors.file_processors import (
process_file_in_background,
)
from app.db import DocumentType
# Prepare connector info
connector_info = {
"type": DocumentType.GOOGLE_DRIVE_CONNECTOR,
"metadata": {
@ -105,7 +93,6 @@ async def download_and_process_file(
},
}
# If it was a Google Workspace file, note the export format
if is_google_workspace_file(mime_type):
connector_info["metadata"]["exported_as"] = "pdf"
connector_info["metadata"]["original_workspace_type"] = mime_type.split(".")[-1]
@ -119,10 +106,9 @@ async def download_and_process_file(
session=session,
task_logger=task_logger,
log_entry=log_entry,
connector=connector_info, # Pass connector info
connector=connector_info,
)
# process_file_in_background doesn't return the document
return None, None, connector_info["metadata"]
except Exception as e:

View file

@ -1,9 +1,4 @@
"""
Google Drive OAuth Credentials Management.
Handles credential validation, token refresh, and persistence to database.
Small, focused module for credential operations only.
"""
"""Google Drive OAuth credential management."""
import json
from datetime import datetime
@ -35,7 +30,6 @@ async def get_valid_credentials(
ValueError: If credentials are missing or invalid
Exception: If token refresh fails
"""
# Fetch connector from database
result = await session.execute(
select(SearchSourceConnector).filter(
SearchSourceConnector.id == connector_id
@ -46,11 +40,9 @@ async def get_valid_credentials(
if not connector:
raise ValueError(f"Connector {connector_id} not found")
# Extract credentials from config
config_data = connector.config
exp = config_data.get("expiry", "").replace("Z", "")
# Validate required fields
if not all(
[
config_data.get("client_id"),
@ -62,7 +54,6 @@ async def get_valid_credentials(
"Google OAuth credentials (client_id, client_secret, refresh_token) must be set"
)
# Create credentials object
credentials = Credentials(
token=config_data.get("token"),
refresh_token=config_data.get("refresh_token"),
@ -73,12 +64,10 @@ async def get_valid_credentials(
expiry=datetime.fromisoformat(exp) if exp else None,
)
# Refresh token if expired
if credentials.expired or not credentials.valid:
try:
credentials.refresh(Request())
# Persist refreshed token to database
connector.config = json.loads(credentials.to_json())
flag_modified(connector, "config")
await session.commit()

View file

@ -1,18 +1,11 @@
"""
File Type Handlers for Google Drive.
"""File type handlers for Google Drive."""
Simple module for basic file type detection.
"""
# Google Workspace MIME types that need export
GOOGLE_DOC = "application/vnd.google-apps.document"
GOOGLE_SHEET = "application/vnd.google-apps.spreadsheet"
GOOGLE_SLIDE = "application/vnd.google-apps.presentation"
GOOGLE_FOLDER = "application/vnd.google-apps.folder"
GOOGLE_SHORTCUT = "application/vnd.google-apps.shortcut"
# Export MIME types for Google Workspace files
# Export as PDF to preserve formatting, images, and structure
EXPORT_FORMATS = {
GOOGLE_DOC: "application/pdf",
GOOGLE_SHEET: "application/pdf",

View file

@ -1,9 +1,4 @@
"""
Folder Management for Google Drive.
Handles folder listing, selection, and hierarchy operations.
Small, focused module for folder-related operations.
"""
"""Folder management for Google Drive."""
import logging
from typing import Any
@ -165,11 +160,7 @@ async def list_folder_contents(
parent_id: str | None = None,
) -> tuple[list[dict[str, Any]], str | None]:
"""
List both folders and files in a Google Drive folder.
Fetches ALL items using pagination (handles folders with >100 items).
Returns items sorted with folders first, then files.
Each item includes 'isFolder' boolean for frontend rendering.
List folders and files in a Google Drive folder with pagination support.
Args:
client: GoogleDriveClient instance
@ -212,20 +203,16 @@ async def list_folder_contents(
all_items.extend(items)
# If no more pages, break
if not next_token:
break
page_token = next_token
# Add 'isFolder' flag and sort (folders first, then files)
for item in all_items:
item["isFolder"] = item["mimeType"] == "application/vnd.google-apps.folder"
# Sort: folders first (alphabetically), then files (alphabetically)
all_items.sort(key=lambda x: (not x["isFolder"], x["name"].lower()))
# Count folders and files for logging
folder_count = sum(1 for item in all_items if item["isFolder"])
file_count = len(all_items) - folder_count

View file

@ -1,11 +1,4 @@
"""
Google Drive Indexer - Delegates all processing to Surfsense's file processors.
Handles:
- Folder-specific indexing (user selects folder)
- Delta sync (only index changed files)
- Delegates file processing to process_file_in_background
"""
"""Google Drive indexer using Surfsense file processors."""
import logging
from datetime import datetime
@ -63,7 +56,6 @@ async def index_google_drive_files(
"""
task_logger = TaskLoggingService(session, search_space_id)
# Log task start
log_entry = await task_logger.log_task_start(
task_name="google_drive_files_indexing",
source="connector_indexing_task",
@ -78,7 +70,6 @@ async def index_google_drive_files(
)
try:
# Get connector from database
connector = await get_connector_by_id(
session, connector_id, SearchSourceConnectorType.GOOGLE_DRIVE_CONNECTOR
)
@ -90,7 +81,6 @@ async def index_google_drive_files(
)
return 0, error_msg
# Initialize Drive client
await task_logger.log_task_progress(
log_entry,
f"Initializing Google Drive client for connector {connector_id}",
@ -99,7 +89,6 @@ async def index_google_drive_files(
drive_client = GoogleDriveClient(session, connector_id)
# Use folder from request params (required for Google Drive)
if not folder_id:
error_msg = "folder_id is required for Google Drive indexing"
await task_logger.log_task_failure(
@ -112,7 +101,6 @@ async def index_google_drive_files(
logger.info(f"Indexing Google Drive folder: {target_folder_name} ({target_folder_id})")
# Decide sync strategy - track tokens per folder
folder_tokens = connector.config.get("folder_tokens", {})
start_page_token = folder_tokens.get(target_folder_id)
can_use_delta_sync = use_delta_sync and start_page_token and connector.last_indexed_at
@ -150,14 +138,11 @@ async def index_google_drive_files(
documents_indexed, documents_skipped = result
# Update last indexed timestamp and get new start page token
if documents_indexed > 0 or can_use_delta_sync:
# Get new start page token for next sync
new_token, token_error = await get_start_page_token(drive_client)
if new_token and not token_error:
from sqlalchemy.orm.attributes import flag_modified
# Store token per folder
if "folder_tokens" not in connector.config:
connector.config["folder_tokens"] = {}
connector.config["folder_tokens"][target_folder_id] = new_token
@ -165,13 +150,11 @@ async def index_google_drive_files(
await update_connector_last_indexed(session, connector, update_last_indexed)
# Final commit
await session.commit()
logger.info(
f"Successfully committed Google Drive indexing changes to database"
)
# Log success
await task_logger.log_task_success(
log_entry,
f"Successfully completed Google Drive indexing for connector {connector_id}",
@ -235,7 +218,6 @@ async def _index_full_scan(
page_token = None
files_processed = 0
# Paginate through all files in folder
while files_processed < max_files:
files, next_token, error = await get_files_in_folder(
drive_client, folder_id, include_subfolders=False, page_token=page_token
@ -254,7 +236,6 @@ async def _index_full_scan(
files_processed += 1
# Process file
indexed, skipped = await _process_single_file(
drive_client=drive_client,
session=session,
@ -269,7 +250,6 @@ async def _index_full_scan(
documents_indexed += indexed
documents_skipped += skipped
# Batch commit every 10 files
if documents_indexed % 10 == 0 and documents_indexed > 0:
await session.commit()
logger.info(f"Committed batch: {documents_indexed} files indexed so far")
@ -304,7 +284,6 @@ async def _index_with_delta_sync(
{"stage": "delta_sync", "start_token": start_page_token},
)
# Fetch all changes since last sync
changes, final_token, error = await fetch_all_changes(
drive_client, start_page_token, folder_id
)
@ -330,14 +309,12 @@ async def _index_with_delta_sync(
files_processed += 1
change_type = categorize_change(change)
# Handle removed/trashed files
if change_type in ["removed", "trashed"]:
file_id = change.get("fileId")
if file_id:
await _remove_document(session, file_id, search_space_id)
continue
# Handle modified/new files
file = change.get("file")
if not file:
continue
@ -356,7 +333,6 @@ async def _index_with_delta_sync(
documents_indexed += indexed
documents_skipped += skipped
# Batch commit every 10 files
if documents_indexed % 10 == 0 and documents_indexed > 0:
await session.commit()
logger.info(f"Committed batch: {documents_indexed} changes processed")
@ -389,10 +365,6 @@ async def _process_single_file(
try:
logger.info(f"Processing file: {file_name} ({mime_type})")
# Download and process using Surfsense's existing infrastructure
# This handles: markdown, audio, PDFs, Office docs, images, etc.
# It also handles: deduplication, chunking, summarization, embedding
# Document type is set to GOOGLE_DRIVE_CONNECTOR during processing
_, error, _ = await download_and_process_file(
client=drive_client,
file=file,
@ -404,7 +376,6 @@ async def _process_single_file(
)
if error:
# Log and skip - not an error, just unsupported or empty
await task_logger.log_task_progress(
log_entry,
f"Skipped {file_name}: {error}",
@ -412,7 +383,6 @@ async def _process_single_file(
)
return 0, 1
# File was processed successfully (document type already set in processor)
logger.info(f"Successfully indexed Google Drive file: {file_name}")
return 1, 0