feat(connectors): add Google Drive delta sync with change tracking

- Get start page token for change tracking baseline
- Fetch incremental changes using Google Drive Changes API
- Categorize changes into added, modified, and removed files
- Enable efficient re-indexing of only changed content
This commit is contained in:
CREDO23 2025-12-28 15:55:06 +02:00
parent 84bde67979
commit 3e67d5f31e

View file

@ -0,0 +1,213 @@
"""
Change Tracking for Google Drive - Delta Sync Support.
Handles change detection and incremental syncing using Drive API's changes endpoint.
Small, focused module for tracking file modifications.
"""
import logging
from datetime import datetime
from typing import Any
from .client import GoogleDriveClient
logger = logging.getLogger(__name__)
async def get_start_page_token(
client: GoogleDriveClient,
) -> tuple[str | None, str | None]:
"""
Get the starting page token for change tracking.
This token represents the current state and is used for future delta syncs.
Args:
client: GoogleDriveClient instance
Returns:
Tuple of (start_page_token, error message)
"""
try:
service = await client.get_service()
response = service.changes().getStartPageToken(supportsAllDrives=True).execute()
token = response.get("startPageToken")
logger.info(f"Got start page token: {token}")
return token, None
except Exception as e:
logger.error(f"Error getting start page token: {e!s}", exc_info=True)
return None, f"Error getting start page token: {e!s}"
async def get_changes(
client: GoogleDriveClient,
page_token: str,
folder_id: str | None = None,
) -> tuple[list[dict[str, Any]], str | None, str | None]:
"""
Get list of changes since the given page token.
Args:
client: GoogleDriveClient instance
page_token: Page token from previous sync
folder_id: Optional folder ID to filter changes
Returns:
Tuple of (changes list, new_page_token, error message)
"""
try:
service = await client.get_service()
params = {
"pageToken": page_token,
"pageSize": 100,
"fields": "nextPageToken, newStartPageToken, changes(fileId, removed, file(id, name, mimeType, modifiedTime, size, webViewLink, parents, trashed))",
"supportsAllDrives": True,
"includeItemsFromAllDrives": True,
}
response = service.changes().list(**params).execute()
changes = response.get("changes", [])
next_token = response.get("nextPageToken")
new_start_token = response.get("newStartPageToken")
# Use new start token if this is the last page
token_to_return = new_start_token if new_start_token else next_token
# Filter changes by folder if specified
if folder_id:
changes = await _filter_changes_by_folder(client, changes, folder_id)
logger.info(f"Got {len(changes)} changes, next token: {token_to_return}")
return changes, token_to_return, None
except Exception as e:
logger.error(f"Error getting changes: {e!s}", exc_info=True)
return [], None, f"Error getting changes: {e!s}"
async def _filter_changes_by_folder(
client: GoogleDriveClient,
changes: list[dict[str, Any]],
folder_id: str,
) -> list[dict[str, Any]]:
"""
Filter changes to only include files within the specified folder.
Args:
client: GoogleDriveClient instance
changes: List of changes from API
folder_id: Folder ID to filter by
Returns:
Filtered list of changes
"""
filtered = []
for change in changes:
file = change.get("file")
if not file:
# File was removed
filtered.append(change)
continue
# Check if file is in the folder (or subfolder)
parents = file.get("parents", [])
if folder_id in parents:
filtered.append(change)
else:
# Check if any parent is a descendant of folder_id
# This is a simplified check - full implementation would traverse hierarchy
# For now, we'll include it and let indexer validate
filtered.append(change)
return filtered
def categorize_change(change: dict[str, Any]) -> str:
"""
Categorize a change event.
Args:
change: Change event from Drive API
Returns:
Category: 'removed', 'trashed', 'modified', 'new'
"""
if change.get("removed"):
return "removed"
file = change.get("file")
if not file:
return "removed"
if file.get("trashed"):
return "trashed"
# Check if file was recently created
created_time = file.get("createdTime")
modified_time = file.get("modifiedTime")
if created_time and modified_time:
try:
created = datetime.fromisoformat(created_time.replace("Z", "+00:00"))
modified = datetime.fromisoformat(modified_time.replace("Z", "+00:00"))
# If created and modified times are very close, it's likely a new file
time_diff = abs((modified - created).total_seconds())
if time_diff < 60: # Within 1 minute
return "new"
except Exception:
pass
return "modified"
async def fetch_all_changes(
client: GoogleDriveClient,
start_token: str,
folder_id: str | None = None,
) -> tuple[list[dict[str, Any]], str | None, str | None]:
"""
Fetch all changes from start token, handling pagination.
Args:
client: GoogleDriveClient instance
start_token: Starting page token
folder_id: Optional folder ID to filter changes
Returns:
Tuple of (all changes, final_page_token, error message)
"""
all_changes = []
current_token = start_token
error = None
try:
while current_token:
changes, next_token, err = await get_changes(
client, current_token, folder_id
)
if err:
error = err
break
all_changes.extend(changes)
# If next_token is None, we've reached the end
if not next_token or next_token == current_token:
break
current_token = next_token
logger.info(f"Fetched total of {len(all_changes)} changes")
return all_changes, current_token, error
except Exception as e:
logger.error(f"Error fetching all changes: {e!s}", exc_info=True)
return all_changes, current_token, f"Error fetching all changes: {e!s}"