feat: implement Google Drive knowledge base synchronization after file creation

- Added a new GoogleDriveKBSyncService to handle synchronization of newly created Google Drive files with the knowledge base.
- Enhanced the create_file.py tool to include feedback on the success of the knowledge base update, informing users if their file has been added or will be synced later.
- Updated the Google Drive tool metadata service to include parent folder information for improved file organization.
- Modified the UI components to support selection of parent folders during file creation, enhancing user experience and file management.
This commit is contained in:
Anish Sarkar 2026-03-20 15:41:08 +05:30
parent 510f9150cb
commit 3d6ff39bf4
6 changed files with 324 additions and 50 deletions

View file

@ -242,12 +242,36 @@ def create_create_google_drive_file_tool(
logger.info(
f"Google Drive file created: id={created.get('id')}, name={created.get('name')}"
)
kb_message_suffix = ""
try:
from app.services.google_drive import GoogleDriveKBSyncService
kb_service = GoogleDriveKBSyncService(db_session)
kb_result = await kb_service.sync_after_create(
file_id=created.get("id"),
file_name=created.get("name", final_name),
mime_type=mime_type,
web_view_link=created.get("webViewLink"),
content=final_content,
connector_id=actual_connector_id,
search_space_id=search_space_id,
user_id=user_id,
)
if kb_result["status"] == "success":
kb_message_suffix = " Your knowledge base has also been updated."
else:
kb_message_suffix = " This file will be added to your knowledge base in the next scheduled sync."
except Exception as kb_err:
logger.warning(f"KB sync after create failed: {kb_err}")
kb_message_suffix = " This file will be added to your knowledge base in the next scheduled sync."
return {
"status": "success",
"file_id": created.get("id"),
"name": created.get("name"),
"web_view_link": created.get("webViewLink"),
"message": f"Successfully created '{created.get('name')}' in Google Drive.",
"message": f"Successfully created '{created.get('name')}' in Google Drive.{kb_message_suffix}",
}
except Exception as e:

View file

@ -1,3 +1,4 @@
from app.services.google_drive.kb_sync_service import GoogleDriveKBSyncService
from app.services.google_drive.tool_metadata_service import (
GoogleDriveAccount,
GoogleDriveFile,
@ -7,5 +8,6 @@ from app.services.google_drive.tool_metadata_service import (
__all__ = [
"GoogleDriveAccount",
"GoogleDriveFile",
"GoogleDriveKBSyncService",
"GoogleDriveToolMetadataService",
]

View file

@ -0,0 +1,159 @@
import logging
from datetime import datetime
from sqlalchemy.ext.asyncio import AsyncSession
from app.db import Document, DocumentType
from app.services.llm_service import get_user_long_context_llm
from app.utils.document_converters import (
create_document_chunks,
embed_text,
generate_content_hash,
generate_document_summary,
generate_unique_identifier_hash,
)
logger = logging.getLogger(__name__)
class GoogleDriveKBSyncService:
def __init__(self, db_session: AsyncSession):
self.db_session = db_session
async def sync_after_create(
self,
file_id: str,
file_name: str,
mime_type: str,
web_view_link: str | None,
content: str | None,
connector_id: int,
search_space_id: int,
user_id: str,
) -> dict:
from app.tasks.connector_indexers.base import (
check_document_by_unique_identifier,
check_duplicate_document_by_hash,
get_current_timestamp,
safe_set_chunks,
)
try:
unique_hash = generate_unique_identifier_hash(
DocumentType.GOOGLE_DRIVE_FILE, file_id, search_space_id
)
existing = await check_document_by_unique_identifier(
self.db_session, unique_hash
)
if existing:
logger.info(
"Document for Drive file %s already exists (doc_id=%s), skipping",
file_id,
existing.id,
)
return {"status": "success"}
indexable_content = (content or "").strip()
if not indexable_content:
indexable_content = f"Google Drive file: {file_name} (type: {mime_type})"
content_hash = generate_content_hash(indexable_content, search_space_id)
with self.db_session.no_autoflush:
dup = await check_duplicate_document_by_hash(
self.db_session, content_hash
)
if dup:
logger.info(
"Content-hash collision for Drive file %s — identical content "
"exists in doc %s. Using unique_identifier_hash as content_hash.",
file_id,
dup.id,
)
content_hash = unique_hash
user_llm = await get_user_long_context_llm(
self.db_session,
user_id,
search_space_id,
disable_streaming=True,
)
doc_metadata_for_summary = {
"file_name": file_name,
"mime_type": mime_type,
"document_type": "Google Drive File",
"connector_type": "Google Drive",
}
if user_llm:
summary_content, summary_embedding = await generate_document_summary(
indexable_content, user_llm, doc_metadata_for_summary
)
else:
logger.warning("No LLM configured — using fallback summary")
summary_content = f"Google Drive File: {file_name}\n\n{indexable_content}"
summary_embedding = embed_text(summary_content)
chunks = await create_document_chunks(indexable_content)
now_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
document = Document(
title=file_name,
document_type=DocumentType.GOOGLE_DRIVE_FILE,
document_metadata={
"google_drive_file_id": file_id,
"google_drive_file_name": file_name,
"google_drive_mime_type": mime_type,
"web_view_link": web_view_link,
"source_connector": "google_drive",
"indexed_at": now_str,
"connector_id": connector_id,
},
content=summary_content,
content_hash=content_hash,
unique_identifier_hash=unique_hash,
embedding=summary_embedding,
search_space_id=search_space_id,
connector_id=connector_id,
source_markdown=content,
updated_at=get_current_timestamp(),
)
self.db_session.add(document)
await self.db_session.flush()
await safe_set_chunks(self.db_session, document, chunks)
await self.db_session.commit()
logger.info(
"KB sync after create succeeded: doc_id=%s, file=%s, chunks=%d",
document.id,
file_name,
len(chunks),
)
return {"status": "success"}
except Exception as e:
error_str = str(e).lower()
if (
"duplicate key value violates unique constraint" in error_str
or "uniqueviolationerror" in error_str
):
logger.warning(
"Duplicate constraint hit during KB sync for file %s. "
"Rolling back — periodic indexer will handle it. Error: %s",
file_id,
e,
)
await self.db_session.rollback()
return {"status": "error", "message": "Duplicate document detected"}
logger.error(
"KB sync after create failed for file %s: %s",
file_id,
e,
exc_info=True,
)
await self.db_session.rollback()
return {"status": "error", "message": str(e)}

View file

@ -74,6 +74,7 @@ class GoogleDriveToolMetadataService:
return {
"accounts": [],
"supported_types": [],
"parent_folders": {},
"error": "No Google Drive account connected",
}
@ -86,9 +87,12 @@ class GoogleDriveToolMetadataService:
await self._persist_auth_expired(acc.id)
accounts_with_status.append(acc_dict)
parent_folders = await self._get_parent_folders_by_account(accounts_with_status)
return {
"accounts": accounts_with_status,
"supported_types": ["google_doc", "google_sheet"],
"parent_folders": parent_folders,
}
async def get_trash_context(
@ -236,3 +240,74 @@ class GoogleDriveToolMetadataService:
connector_id,
exc_info=True,
)
async def _get_parent_folders_by_account(
self, accounts_with_status: list[dict]
) -> dict[int, list[dict]]:
"""Fetch root-level folders for each healthy account.
Skips accounts where ``auth_expired`` is True so we don't waste an API
call that will fail anyway.
"""
parent_folders: dict[int, list[dict]] = {}
for acc in accounts_with_status:
connector_id = acc["id"]
if acc.get("auth_expired"):
parent_folders[connector_id] = []
continue
try:
result = await self._db_session.execute(
select(SearchSourceConnector).where(
SearchSourceConnector.id == connector_id
)
)
connector = result.scalar_one_or_none()
if not connector:
parent_folders[connector_id] = []
continue
pre_built_creds = None
if (
connector.connector_type
== SearchSourceConnectorType.COMPOSIO_GOOGLE_DRIVE_CONNECTOR
):
cca_id = connector.config.get("composio_connected_account_id")
if cca_id:
pre_built_creds = build_composio_credentials(cca_id)
client = GoogleDriveClient(
session=self._db_session,
connector_id=connector_id,
credentials=pre_built_creds,
)
folders, _, error = await client.list_files(
query="mimeType = 'application/vnd.google-apps.folder' and trashed = false and 'root' in parents",
fields="files(id, name)",
page_size=50,
)
if error:
logger.warning(
"Failed to list folders for connector %s: %s",
connector_id,
error,
)
parent_folders[connector_id] = []
else:
parent_folders[connector_id] = [
{"folder_id": f["id"], "name": f["name"]}
for f in folders
if f.get("id") and f.get("name")
]
except Exception:
logger.warning(
"Error fetching folders for connector %s",
connector_id,
exc_info=True,
)
parent_folders[connector_id] = []
return parent_folders