feat: unify handling of native and legacy document types for Google connectors

- Introduced a mapping of native Google document types to their legacy Composio equivalents, ensuring seamless search and indexing for both types.
- Updated relevant components to utilize the new mapping, enhancing the consistency of document type handling across the application.
- Improved search functionality to transparently include legacy types, maintaining accessibility for older documents until re-indexed.
This commit is contained in:
Anish Sarkar 2026-03-20 03:41:32 +05:30
parent aaf34800e6
commit d21593ee71
7 changed files with 104 additions and 34 deletions

View file

@ -65,7 +65,8 @@ _CONNECTOR_TYPE_TO_SEARCHABLE: dict[str, str] = {
"BOOKSTACK_CONNECTOR": "BOOKSTACK_CONNECTOR",
"CIRCLEBACK_CONNECTOR": "CIRCLEBACK", # Connector type differs from document type
"OBSIDIAN_CONNECTOR": "OBSIDIAN_CONNECTOR",
# Composio connectors (unified to native document types)
# Composio connectors (unified to native document types).
# Reverse of NATIVE_TO_LEGACY_DOCTYPE in app.db.
"COMPOSIO_GOOGLE_DRIVE_CONNECTOR": "GOOGLE_DRIVE_FILE",
"COMPOSIO_GMAIL_CONNECTOR": "GOOGLE_GMAIL_CONNECTOR",
"COMPOSIO_GOOGLE_CALENDAR_CONNECTOR": "GOOGLE_CALENDAR_CONNECTOR",

View file

@ -19,7 +19,7 @@ from langchain_core.tools import StructuredTool
from pydantic import BaseModel, Field
from sqlalchemy.ext.asyncio import AsyncSession
from app.db import shielded_async_session
from app.db import NATIVE_TO_LEGACY_DOCTYPE, shielded_async_session
from app.services.connector_service import ConnectorService
from app.utils.perf import get_perf_logger
@ -43,7 +43,6 @@ _DEGENERATE_QUERY_RE = re.compile(
# a real search. We want breadth (many docs) over depth (many chunks).
_BROWSE_MAX_CHUNKS_PER_DOC = 5
def _is_degenerate_query(query: str) -> bool:
"""Return True when the query carries no meaningful search signal.
@ -614,14 +613,6 @@ async def search_knowledge_base_async(
connectors = _normalize_connectors(connectors_to_search, available_connectors)
# --- Optimization 1: skip connectors that have zero indexed documents ---
# Native Google types must also match their legacy Composio equivalents
# (old documents may still carry the Composio type until re-indexed).
_NATIVE_TO_LEGACY: dict[str, str] = {
"GOOGLE_DRIVE_FILE": "COMPOSIO_GOOGLE_DRIVE_CONNECTOR",
"GOOGLE_GMAIL_CONNECTOR": "COMPOSIO_GMAIL_CONNECTOR",
"GOOGLE_CALENDAR_CONNECTOR": "COMPOSIO_GOOGLE_CALENDAR_CONNECTOR",
}
if available_document_types:
doc_types_set = set(available_document_types)
before_count = len(connectors)
@ -629,7 +620,7 @@ async def search_knowledge_base_async(
c
for c in connectors
if c in doc_types_set
or _NATIVE_TO_LEGACY.get(c, "") in doc_types_set
or NATIVE_TO_LEGACY_DOCTYPE.get(c, "") in doc_types_set
]
skipped = before_count - len(connectors)
if skipped:
@ -667,17 +658,10 @@ async def search_knowledge_base_async(
)
browse_connectors = connectors if connectors else [None] # type: ignore[list-item]
# Expand native Google types to include legacy Composio equivalents
# so old documents remain searchable until re-indexed.
_LEGACY_ALIASES: dict[str, str] = {
"GOOGLE_DRIVE_FILE": "COMPOSIO_GOOGLE_DRIVE_CONNECTOR",
"GOOGLE_GMAIL_CONNECTOR": "COMPOSIO_GMAIL_CONNECTOR",
"GOOGLE_CALENDAR_CONNECTOR": "COMPOSIO_GOOGLE_CALENDAR_CONNECTOR",
}
expanded_browse = []
for c in browse_connectors:
if c is not None and c in _LEGACY_ALIASES:
expanded_browse.append([c, _LEGACY_ALIASES[c]])
if c is not None and c in NATIVE_TO_LEGACY_DOCTYPE:
expanded_browse.append([c, NATIVE_TO_LEGACY_DOCTYPE[c]])
else:
expanded_browse.append(c)

View file

@ -63,6 +63,16 @@ class DocumentType(StrEnum):
COMPOSIO_GOOGLE_CALENDAR_CONNECTOR = "COMPOSIO_GOOGLE_CALENDAR_CONNECTOR"
# Native Google document types → their legacy Composio equivalents.
# Old documents may still carry the Composio type until they are re-indexed;
# search, browse, and indexing must transparently handle both.
NATIVE_TO_LEGACY_DOCTYPE: dict[str, str] = {
"GOOGLE_DRIVE_FILE": "COMPOSIO_GOOGLE_DRIVE_CONNECTOR",
"GOOGLE_GMAIL_CONNECTOR": "COMPOSIO_GMAIL_CONNECTOR",
"GOOGLE_CALENDAR_CONNECTOR": "COMPOSIO_GOOGLE_CALENDAR_CONNECTOR",
}
class SearchSourceConnectorType(StrEnum):
SERPER_API = "SERPER_API" # NOT IMPLEMENTED YET : DON'T REMEMBER WHY : MOST PROBABLY BECAUSE WE NEED TO CRAWL THE RESULTS RETURNED BY IT
TAVILY_API = "TAVILY_API"

View file

@ -11,6 +11,7 @@ from sqlalchemy.future import select
from tavily import TavilyClient
from app.db import (
NATIVE_TO_LEGACY_DOCTYPE,
Chunk,
Document,
SearchSourceConnector,
@ -215,15 +216,6 @@ class ConnectorService:
return result_object, files_docs
# Composio connectors that were unified into native Google pipelines.
# Old documents may still carry the legacy type until re-indexed; searching
# for a native type should transparently include its legacy equivalent.
_LEGACY_TYPE_ALIASES: dict[str, str] = {
"GOOGLE_DRIVE_FILE": "COMPOSIO_GOOGLE_DRIVE_CONNECTOR",
"GOOGLE_GMAIL_CONNECTOR": "COMPOSIO_GMAIL_CONNECTOR",
"GOOGLE_CALENDAR_CONNECTOR": "COMPOSIO_GOOGLE_CALENDAR_CONNECTOR",
}
async def _combined_rrf_search(
self,
query_text: str,
@ -266,10 +258,10 @@ class ConnectorService:
# Expand native Google types to include legacy Composio equivalents
# so old documents remain searchable until re-indexed.
if isinstance(document_type, str) and document_type in self._LEGACY_TYPE_ALIASES:
if isinstance(document_type, str) and document_type in NATIVE_TO_LEGACY_DOCTYPE:
resolved_type: str | list[str] = [
document_type,
self._LEGACY_TYPE_ALIASES[document_type],
NATIVE_TO_LEGACY_DOCTYPE[document_type],
]
else:
resolved_type = document_type

View file

@ -170,7 +170,34 @@ async def handle_existing_document_update(
logging.info(f"Document for file {filename} unchanged. Skipping.")
return True, existing_document
else:
# Content has changed - need to re-process
# Content has changed — guard against content_hash collision before
# expensive ETL processing. A collision means the exact same content
# already lives in a *different* document (e.g. a manual upload of the
# same file). Proceeding would trigger a unique-constraint violation
# on ix_documents_content_hash.
collision_doc = await check_duplicate_document(session, content_hash)
if collision_doc and collision_doc.id != existing_document.id:
logging.warning(
"Content-hash collision for %s: identical content exists in "
"document #%s (%s). Skipping re-processing.",
filename,
collision_doc.id,
collision_doc.document_type,
)
if DocumentStatus.is_state(
existing_document.status, DocumentStatus.PENDING
) or DocumentStatus.is_state(
existing_document.status, DocumentStatus.PROCESSING
):
# Pending/processing doc has no real content yet — remove it
# so the UI doesn't show a contentless entry.
await session.delete(existing_document)
await session.commit()
return True, None
# Document already has valid content — keep it as-is.
return True, existing_document
logging.info(f"Content changed for file {filename}. Updating document.")
return False, None
@ -537,6 +564,12 @@ async def add_received_file_document_using_unstructured(
return document
except SQLAlchemyError as db_error:
await session.rollback()
if "ix_documents_content_hash" in str(db_error):
logging.warning(
"content_hash collision during commit for %s (Unstructured). Skipping.",
file_name,
)
return None
raise db_error
except Exception as e:
await session.rollback()
@ -673,6 +706,12 @@ async def add_received_file_document_using_llamacloud(
return document
except SQLAlchemyError as db_error:
await session.rollback()
if "ix_documents_content_hash" in str(db_error):
logging.warning(
"content_hash collision during commit for %s (LlamaCloud). Skipping.",
file_name,
)
return None
raise db_error
except Exception as e:
await session.rollback()
@ -828,6 +867,12 @@ async def add_received_file_document_using_docling(
return document
except SQLAlchemyError as db_error:
await session.rollback()
if "ix_documents_content_hash" in str(db_error):
logging.warning(
"content_hash collision during commit for %s (Docling). Skipping.",
file_name,
)
return None
raise db_error
except Exception as e:
await session.rollback()

View file

@ -158,6 +158,28 @@ async def _handle_existing_document_update(
logging.info(f"Document for markdown file {filename} unchanged. Skipping.")
return True, existing_document
else:
# Content has changed — guard against content_hash collision (same
# content already lives in a different document).
collision_doc = await check_duplicate_document(session, content_hash)
if collision_doc and collision_doc.id != existing_document.id:
logging.warning(
"Content-hash collision for markdown %s: identical content "
"exists in document #%s (%s). Skipping re-processing.",
filename,
collision_doc.id,
collision_doc.document_type,
)
if DocumentStatus.is_state(
existing_document.status, DocumentStatus.PENDING
) or DocumentStatus.is_state(
existing_document.status, DocumentStatus.PROCESSING
):
await session.delete(existing_document)
await session.commit()
return True, None
return True, existing_document
logging.info(
f"Content changed for markdown file {filename}. Updating document."
)
@ -312,6 +334,12 @@ async def add_received_markdown_file_document(
return document
except SQLAlchemyError as db_error:
await session.rollback()
if "ix_documents_content_hash" in str(db_error):
logging.warning(
"content_hash collision during commit for %s (markdown). Skipping.",
file_name,
)
return None
await task_logger.log_task_failure(
log_entry,
f"Database error processing markdown file: {file_name}",

View file

@ -643,6 +643,16 @@ export function DocumentsTableShell({
return <StatusIndicator status={doc.status} />;
}
if (state === "failed") {
if (isMentioned) {
return (
<Checkbox
checked={isMentioned}
onCheckedChange={() => handleRowToggle()}
aria-label="Remove from chat"
className="border-foreground data-[state=checked]:bg-primary data-[state=checked]:border-primary"
/>
);
}
return (
<>
<span className="group-hover:hidden">
@ -652,7 +662,7 @@ export function DocumentsTableShell({
<Checkbox
checked={isMentioned}
onCheckedChange={() => handleRowToggle()}
aria-label={isMentioned ? "Remove from chat" : "Add to chat"}
aria-label="Add to chat"
className="border-foreground data-[state=checked]:bg-primary data-[state=checked]:border-primary"
/>
</span>