feat: enhance legacy document migration for Google connectors

- Implemented fallback logic in Google Calendar, Drive, and Gmail indexers to handle legacy Composio document types, ensuring smooth migration to native types.
- Updated document indexing functions to check for existing documents using both primary and legacy hashes, improving data integrity during indexing.
This commit is contained in:
Anish Sarkar 2026-03-20 03:39:05 +05:30
parent 8e7cda31c5
commit aaf34800e6
4 changed files with 99 additions and 6 deletions

View file

@ -614,10 +614,23 @@ async def search_knowledge_base_async(
connectors = _normalize_connectors(connectors_to_search, available_connectors) connectors = _normalize_connectors(connectors_to_search, available_connectors)
# --- Optimization 1: skip connectors that have zero indexed documents --- # --- Optimization 1: skip connectors that have zero indexed documents ---
# Native Google types must also match their legacy Composio equivalents
# (old documents may still carry the Composio type until re-indexed).
_NATIVE_TO_LEGACY: dict[str, str] = {
"GOOGLE_DRIVE_FILE": "COMPOSIO_GOOGLE_DRIVE_CONNECTOR",
"GOOGLE_GMAIL_CONNECTOR": "COMPOSIO_GMAIL_CONNECTOR",
"GOOGLE_CALENDAR_CONNECTOR": "COMPOSIO_GOOGLE_CALENDAR_CONNECTOR",
}
if available_document_types: if available_document_types:
doc_types_set = set(available_document_types) doc_types_set = set(available_document_types)
before_count = len(connectors) before_count = len(connectors)
connectors = [c for c in connectors if c in doc_types_set] connectors = [
c
for c in connectors
if c in doc_types_set
or _NATIVE_TO_LEGACY.get(c, "") in doc_types_set
]
skipped = before_count - len(connectors) skipped = before_count - len(connectors)
if skipped: if skipped:
perf.info( perf.info(
@ -793,6 +806,10 @@ async def search_knowledge_base_async(
deduplicated.append(doc) deduplicated.append(doc)
# Sort by RRF score so the most relevant documents from ANY connector
# appear first, preventing budget truncation from hiding top results.
deduplicated.sort(key=lambda d: d.get("score", 0), reverse=True)
output_budget = _compute_tool_output_budget(max_input_tokens) output_budget = _compute_tool_output_budget(max_input_tokens)
result = format_documents_for_context(deduplicated, max_chars=output_budget) result = format_documents_for_context(deduplicated, max_chars=output_budget)

View file

@ -385,6 +385,22 @@ async def index_google_calendar_events(
session, unique_identifier_hash session, unique_identifier_hash
) )
# Fallback: legacy Composio hash
if not existing_document:
legacy_hash = generate_unique_identifier_hash(
DocumentType.COMPOSIO_GOOGLE_CALENDAR_CONNECTOR,
event_id,
search_space_id,
)
existing_document = await check_document_by_unique_identifier(
session, legacy_hash
)
if existing_document:
existing_document.unique_identifier_hash = unique_identifier_hash
if existing_document.document_type == DocumentType.COMPOSIO_GOOGLE_CALENDAR_CONNECTOR:
existing_document.document_type = DocumentType.GOOGLE_CALENDAR_CONNECTOR
logger.info(f"Migrated legacy Composio Calendar document: {event_id}")
if existing_document: if existing_document:
# Document exists - check if content has changed # Document exists - check if content has changed
if existing_document.content_hash == content_hash: if existing_document.content_hash == content_hash:

View file

@ -859,10 +859,22 @@ async def _create_pending_document_for_file(
DocumentType.GOOGLE_DRIVE_FILE, file_id, search_space_id DocumentType.GOOGLE_DRIVE_FILE, file_id, search_space_id
) )
# Check if document exists # Check if document exists (primary hash first, then legacy Composio hash)
existing_document = await check_document_by_unique_identifier( existing_document = await check_document_by_unique_identifier(
session, unique_identifier_hash session, unique_identifier_hash
) )
if not existing_document:
legacy_hash = generate_unique_identifier_hash(
DocumentType.COMPOSIO_GOOGLE_DRIVE_CONNECTOR, file_id, search_space_id
)
existing_document = await check_document_by_unique_identifier(
session, legacy_hash
)
if existing_document:
existing_document.unique_identifier_hash = unique_identifier_hash
if existing_document.document_type == DocumentType.COMPOSIO_GOOGLE_DRIVE_CONNECTOR:
existing_document.document_type = DocumentType.GOOGLE_DRIVE_FILE
logger.info(f"Migrated legacy Composio document to native type: {file_id}")
if existing_document: if existing_document:
# Check if this is a rename-only update (content unchanged) # Check if this is a rename-only update (content unchanged)
@ -958,12 +970,24 @@ async def _check_rename_only_update(
) )
existing_document = await check_document_by_unique_identifier(session, primary_hash) existing_document = await check_document_by_unique_identifier(session, primary_hash)
# If not found by primary hash, try searching by metadata (for legacy documents) # Fallback: legacy Composio hash
if not existing_document:
legacy_hash = generate_unique_identifier_hash(
DocumentType.COMPOSIO_GOOGLE_DRIVE_CONNECTOR, file_id, search_space_id
)
existing_document = await check_document_by_unique_identifier(
session, legacy_hash
)
# Fallback: metadata search (covers old filename-based hashes)
if not existing_document: if not existing_document:
result = await session.execute( result = await session.execute(
select(Document).where( select(Document).where(
Document.search_space_id == search_space_id, Document.search_space_id == search_space_id,
Document.document_type == DocumentType.GOOGLE_DRIVE_FILE, Document.document_type.in_([
DocumentType.GOOGLE_DRIVE_FILE,
DocumentType.COMPOSIO_GOOGLE_DRIVE_CONNECTOR,
]),
cast(Document.document_metadata["google_drive_file_id"], String) cast(Document.document_metadata["google_drive_file_id"], String)
== file_id, == file_id,
) )
@ -972,6 +996,14 @@ async def _check_rename_only_update(
if existing_document: if existing_document:
logger.debug(f"Found legacy document by metadata for file_id: {file_id}") logger.debug(f"Found legacy document by metadata for file_id: {file_id}")
# Migrate legacy Composio document to native type
if existing_document:
if existing_document.unique_identifier_hash != primary_hash:
existing_document.unique_identifier_hash = primary_hash
if existing_document.document_type == DocumentType.COMPOSIO_GOOGLE_DRIVE_CONNECTOR:
existing_document.document_type = DocumentType.GOOGLE_DRIVE_FILE
logger.info(f"Migrated legacy Composio Drive document: {file_id}")
if not existing_document: if not existing_document:
# New file, needs full processing # New file, needs full processing
return False, None return False, None
@ -1186,12 +1218,24 @@ async def _remove_document(session: AsyncSession, file_id: str, search_space_id:
session, unique_identifier_hash session, unique_identifier_hash
) )
# If not found, search by metadata (for legacy documents with filename-based hash) # Fallback: legacy Composio hash
if not existing_document:
legacy_hash = generate_unique_identifier_hash(
DocumentType.COMPOSIO_GOOGLE_DRIVE_CONNECTOR, file_id, search_space_id
)
existing_document = await check_document_by_unique_identifier(
session, legacy_hash
)
# Fallback: metadata search (covers old filename-based hashes, both native and Composio)
if not existing_document: if not existing_document:
result = await session.execute( result = await session.execute(
select(Document).where( select(Document).where(
Document.search_space_id == search_space_id, Document.search_space_id == search_space_id,
Document.document_type == DocumentType.GOOGLE_DRIVE_FILE, Document.document_type.in_([
DocumentType.GOOGLE_DRIVE_FILE,
DocumentType.COMPOSIO_GOOGLE_DRIVE_CONNECTOR,
]),
cast(Document.document_metadata["google_drive_file_id"], String) cast(Document.document_metadata["google_drive_file_id"], String)
== file_id, == file_id,
) )

View file

@ -312,6 +312,22 @@ async def index_google_gmail_messages(
session, unique_identifier_hash session, unique_identifier_hash
) )
# Fallback: legacy Composio hash
if not existing_document:
legacy_hash = generate_unique_identifier_hash(
DocumentType.COMPOSIO_GMAIL_CONNECTOR,
message_id,
search_space_id,
)
existing_document = await check_document_by_unique_identifier(
session, legacy_hash
)
if existing_document:
existing_document.unique_identifier_hash = unique_identifier_hash
if existing_document.document_type == DocumentType.COMPOSIO_GMAIL_CONNECTOR:
existing_document.document_type = DocumentType.GOOGLE_GMAIL_CONNECTOR
logger.info(f"Migrated legacy Composio Gmail document: {message_id}")
if existing_document: if existing_document:
# Document exists - check if content has changed # Document exists - check if content has changed
if existing_document.content_hash == content_hash: if existing_document.content_hash == content_hash: