feat: add Google Drive connector to knowledge base search

This commit is contained in:
CREDO23 2025-12-29 18:13:27 +02:00
parent 27beac4f62
commit 16bc991b13
3 changed files with 122 additions and 0 deletions

View file

@ -36,6 +36,7 @@ _ALL_CONNECTORS: list[str] = [
"CLICKUP_CONNECTOR",
"GOOGLE_CALENDAR_CONNECTOR",
"GOOGLE_GMAIL_CONNECTOR",
"GOOGLE_DRIVE_CONNECTOR",
"DISCORD_CONNECTOR",
"AIRTABLE_CONNECTOR",
"TAVILY_API",
@ -425,6 +426,16 @@ async def search_knowledge_base_async(
)
all_documents.extend(chunks)
elif connector == "GOOGLE_DRIVE_CONNECTOR":
_, chunks = await connector_service.search_google_drive(
user_query=query,
search_space_id=search_space_id,
top_k=top_k,
start_date=resolved_start_date,
end_date=resolved_end_date,
)
all_documents.extend(chunks)
elif connector == "CONFLUENCE_CONNECTOR":
_, chunks = await connector_service.search_confluence(
user_query=query,
@ -561,6 +572,7 @@ def create_search_knowledge_base_tool(
- CLICKUP_CONNECTOR: "ClickUp tasks and project data" (personal task management)
- GOOGLE_CALENDAR_CONNECTOR: "Google Calendar events, meetings, and schedules" (personal calendar and time management)
- GOOGLE_GMAIL_CONNECTOR: "Google Gmail emails and conversations" (personal emails and communications)
- GOOGLE_DRIVE_CONNECTOR: "Google Drive files and documents" (personal cloud storage and file management)
- DISCORD_CONNECTOR: "Discord server conversations and shared content" (personal community communications)
- AIRTABLE_CONNECTOR: "Airtable records, tables, and database content" (personal data management and organization)
- TAVILY_API: "Tavily search API results" (personalized search results)

View file

@ -93,6 +93,16 @@ async def download_and_process_file(
},
}
# Add additional Drive metadata if available
if "modifiedTime" in file:
connector_info["metadata"]["modified_time"] = file["modifiedTime"]
if "createdTime" in file:
connector_info["metadata"]["created_time"] = file["createdTime"]
if "size" in file:
connector_info["metadata"]["file_size"] = file["size"]
if "webViewLink" in file:
connector_info["metadata"]["web_view_link"] = file["webViewLink"]
if is_google_workspace_file(mime_type):
connector_info["metadata"]["exported_as"] = "pdf"
connector_info["metadata"]["original_workspace_type"] = mime_type.split(".")[-1]

View file

@ -1808,6 +1808,106 @@ class ConnectorService:
return result_object, gmail_docs
async def search_google_drive(
self,
user_query: str,
search_space_id: int,
top_k: int = 20,
start_date: datetime | None = None,
end_date: datetime | None = None,
) -> tuple:
"""
Search for Google Drive files and return both the source information and langchain documents.
Uses combined chunk-level and document-level hybrid search with RRF fusion.
Args:
user_query: The user's query
search_space_id: The search space ID to search in
top_k: Maximum number of results to return
start_date: Optional start date for filtering documents by updated_at
end_date: Optional end date for filtering documents by updated_at
Returns:
tuple: (sources_info, langchain_documents)
"""
drive_docs = await self._combined_rrf_search(
query_text=user_query,
search_space_id=search_space_id,
document_type="GOOGLE_DRIVE_CONNECTOR",
top_k=top_k,
start_date=start_date,
end_date=end_date,
)
# Early return if no results
if not drive_docs:
return {
"id": 33,
"name": "Google Drive Files",
"type": "GOOGLE_DRIVE_CONNECTOR",
"sources": [],
}, []
def _title_fn(doc_info: dict[str, Any], metadata: dict[str, Any]) -> str:
return (
doc_info.get("title")
or metadata.get("google_drive_file_name")
or metadata.get("FILE_NAME")
or "Untitled File"
)
def _url_fn(_doc_info: dict[str, Any], metadata: dict[str, Any]) -> str:
file_id = metadata.get("google_drive_file_id", "")
return f"https://drive.google.com/file/d/{file_id}/view" if file_id else ""
def _description_fn(
chunk: dict[str, Any], _doc_info: dict[str, Any], metadata: dict[str, Any]
) -> str:
description = self._chunk_preview(chunk.get("content", ""))
info_parts = []
mime_type = metadata.get("google_drive_mime_type", "")
modified_time = metadata.get("modified_time", "")
if mime_type:
# Simplify mime type for display
if "google-apps" in mime_type:
file_type = mime_type.split(".")[-1].title()
else:
file_type = mime_type.split("/")[-1].upper()
info_parts.append(f"Type: {file_type}")
if modified_time:
info_parts.append(f"Modified: {modified_time}")
if info_parts:
description = (description + " | " + " | ".join(info_parts)).strip(" |")
return description
def _extra_fields_fn(
_chunk: dict[str, Any], _doc_info: dict[str, Any], metadata: dict[str, Any]
) -> dict[str, Any]:
return {
"google_drive_file_id": metadata.get("google_drive_file_id", ""),
"google_drive_mime_type": metadata.get("google_drive_mime_type", ""),
"modified_time": metadata.get("modified_time", ""),
}
sources_list = self._build_chunk_sources_from_documents(
drive_docs,
title_fn=_title_fn,
url_fn=_url_fn,
description_fn=_description_fn,
extra_fields_fn=_extra_fields_fn,
)
# Create result object
result_object = {
"id": 33, # Assign a unique ID for the Google Drive connector
"name": "Google Drive Files",
"type": "GOOGLE_DRIVE_CONNECTOR",
"sources": sources_list,
}
return result_object, drive_docs
async def search_confluence(
self,
user_query: str,