From c4cf773bb07749c8365057d645edd1724f2d5b3a Mon Sep 17 00:00:00 2001 From: Anish Sarkar <104695310+AnishSarkar22@users.noreply.github.com> Date: Tue, 3 Feb 2026 20:45:52 +0530 Subject: [PATCH 1/4] feat: improve mentioned document prefetching and add skeletons in new chat interface --- .../new-chat/[[...chat_id]]/page.tsx | 87 ++++++++++++++----- .../new-chat/document-mention-picker.tsx | 53 +++++------ 2 files changed, 86 insertions(+), 54 deletions(-) diff --git a/surfsense_web/app/dashboard/[search_space_id]/new-chat/[[...chat_id]]/page.tsx b/surfsense_web/app/dashboard/[search_space_id]/new-chat/[[...chat_id]]/page.tsx index 5d00232f6..fb84849e7 100644 --- a/surfsense_web/app/dashboard/[search_space_id]/new-chat/[[...chat_id]]/page.tsx +++ b/surfsense_web/app/dashboard/[search_space_id]/new-chat/[[...chat_id]]/page.tsx @@ -9,7 +9,6 @@ import { import { useQueryClient } from "@tanstack/react-query"; import { useAtomValue, useSetAtom } from "jotai"; import { useParams, useSearchParams } from "next/navigation"; -import { useTranslations } from "next-intl"; import { useCallback, useEffect, useMemo, useRef, useState } from "react"; import { toast } from "sonner"; import { z } from "zod"; @@ -39,7 +38,7 @@ import { GeneratePodcastToolUI } from "@/components/tool-ui/generate-podcast"; import { LinkPreviewToolUI } from "@/components/tool-ui/link-preview"; import { ScrapeWebpageToolUI } from "@/components/tool-ui/scrape-webpage"; import { RecallMemoryToolUI, SaveMemoryToolUI } from "@/components/tool-ui/user-memory"; -import { Spinner } from "@/components/ui/spinner"; +import { Skeleton } from "@/components/ui/skeleton"; import { useChatSessionStateSync } from "@/hooks/use-chat-session-state"; import { useMessagesElectric } from "@/hooks/use-messages-electric"; // import { WriteTodosToolUI } from "@/components/tool-ui/write-todos"; @@ -53,12 +52,10 @@ import { } from "@/lib/chat/podcast-state"; import { appendMessage, - type ChatVisibility, createThread, getRegenerateUrl, getThreadFull, getThreadMessages, - type MessageRecord, type ThreadRecord, } from "@/lib/chat/thread-persistence"; import { @@ -67,6 +64,7 @@ import { trackChatMessageSent, trackChatResponseReceived, } from "@/lib/posthog/events"; +import { documentsApiService } from "@/lib/apis/documents-api.service"; /** * Extract thinking steps from message content @@ -137,7 +135,6 @@ interface ThinkingStepData { } export default function NewChatPage() { - const t = useTranslations("dashboard"); const params = useParams(); const queryClient = useQueryClient(); const [isInitializing, setIsInitializing] = useState(true); @@ -329,6 +326,33 @@ export default function NewChatPage() { initializeThread(); }, [initializeThread]); + // Prefetch document titles for @ mention picker + // Runs when user lands on page so data is ready when they type @ + useEffect(() => { + if (!searchSpaceId) return; + + const prefetchParams = { + search_space_id: searchSpaceId, + page: 0, + page_size: 20, + }; + + queryClient.prefetchQuery({ + queryKey: ["document-titles", prefetchParams], + queryFn: () => documentsApiService.searchDocumentTitles({ queryParams: prefetchParams }), + staleTime: 60 * 1000, + }); + + queryClient.prefetchQuery({ + queryKey: ["surfsense-docs-mention", "", false], + queryFn: () => + documentsApiService.getSurfsenseDocs({ + queryParams: { page: 0, page_size: 20 }, + }), + staleTime: 3 * 60 * 1000, + }); + }, [searchSpaceId, queryClient]); + // Handle scroll to comment from URL query params (e.g., from inbox item click) const searchParams = useSearchParams(); const targetCommentIdParam = searchParams.get("commentId"); @@ -366,20 +390,7 @@ export default function NewChatPage() { } setIsRunning(false); }, []); - - // Handle visibility change from ChatShareButton - const handleVisibilityChange = useCallback( - (newVisibility: ChatVisibility) => { - setCurrentThread((prev) => (prev ? { ...prev, visibility: newVisibility } : null)); - // Refetch all thread queries so sidebar reflects the change immediately - // Use predicate to match any query that starts with "threads" - queryClient.refetchQueries({ - predicate: (query) => Array.isArray(query.queryKey) && query.queryKey[0] === "threads", - }); - }, - [queryClient] - ); - + // Handle new message from user const onNew = useCallback( async (message: AppendMessage) => { @@ -1347,7 +1358,7 @@ export default function NewChatPage() { // Handle reloading/refreshing the last AI response const onReload = useCallback( - async (parentId: string | null) => { + async () => { // parentId is the ID of the message to reload from (the user message) // We call regenerate without a query to use the same query await handleRegenerate(null); @@ -1372,9 +1383,39 @@ export default function NewChatPage() { // Show loading state only when loading an existing thread if (isInitializing) { return ( -
- -
{t("loading_chat")}
+
+
+ {/* User message */} +
+ +
+ + {/* Assistant message */} +
+ + + +
+ + {/* User message */} +
+ +
+ + {/* Assistant message */} +
+ + + +
+
+ + {/* Input bar */} +
+
+ +
+
); } diff --git a/surfsense_web/components/new-chat/document-mention-picker.tsx b/surfsense_web/components/new-chat/document-mention-picker.tsx index 5c4c9bc61..7a0d0f009 100644 --- a/surfsense_web/components/new-chat/document-mention-picker.tsx +++ b/surfsense_web/components/new-chat/document-mention-picker.tsx @@ -1,6 +1,6 @@ "use client"; -import { keepPreviousData, useQuery, useQueryClient } from "@tanstack/react-query"; +import { keepPreviousData, useQuery } from "@tanstack/react-query"; import { forwardRef, useCallback, @@ -14,6 +14,7 @@ import { getConnectorIcon } from "@/contracts/enums/connectorIcons"; import type { Document, SearchDocumentTitlesResponse } from "@/contracts/types/document.types"; import { documentsApiService } from "@/lib/apis/documents-api.service"; import { cn } from "@/lib/utils"; +import { Skeleton } from "@/components/ui/skeleton"; export interface DocumentMentionPickerRef { selectHighlighted: () => void; @@ -77,8 +78,6 @@ export const DocumentMentionPicker = forwardRef< }, ref ) { - const queryClient = useQueryClient(); - // Debounced search value to minimize API calls and prevent race conditions const search = externalSearch; const debouncedSearch = useDebounced(search, DEBOUNCE_MS); @@ -106,32 +105,6 @@ export const DocumentMentionPicker = forwardRef< const shouldSearch = debouncedSearch.trim().length > 0; const isSingleCharSearch = debouncedSearch.trim().length === 1; - // Prefetch initial data on mount for instant display when picker opens - useEffect(() => { - if (!searchSpaceId) return; - - const prefetchParams = { - search_space_id: searchSpaceId, - page: 0, - page_size: PAGE_SIZE, - }; - - queryClient.prefetchQuery({ - queryKey: ["document-titles", prefetchParams], - queryFn: () => documentsApiService.searchDocumentTitles({ queryParams: prefetchParams }), - staleTime: 60 * 1000, - }); - - queryClient.prefetchQuery({ - queryKey: ["surfsense-docs-mention", "", false], - queryFn: () => - documentsApiService.getSurfsenseDocs({ - queryParams: { page: 0, page_size: PAGE_SIZE }, - }), - staleTime: 3 * 60 * 1000, - }); - }, [searchSpaceId, queryClient]); - // Reset pagination state when search query or search space changes. // Documents are not cleared to maintain visual continuity during fetches. // biome-ignore lint/correctness/useExhaustiveDependencies: Intentional reset on search/space change @@ -439,8 +412,26 @@ export const DocumentMentionPicker = forwardRef< onScroll={handleScroll} > {actualLoading ? ( -
-
+
+
+ +
+ {["a", "b", "c", "d", "e"].map((id, i) => ( +
= 3 && "hidden sm:flex" + )} + > + + + + + + +
+ ))}
) : actualDocuments.length > 0 ? (
From 65b79f37053edc054daeec5ba9f606d2ed70b03a Mon Sep 17 00:00:00 2001 From: Anish Sarkar <104695310+AnishSarkar22@users.noreply.github.com> Date: Tue, 3 Feb 2026 22:57:01 +0530 Subject: [PATCH 2/4] feat: enhance Google Drive connector with file MIME type file detection and content based detection as fallback --- .../composio_google_drive_connector.py | 120 ++++++++++++++---- .../app/services/composio_service.py | 77 ++++++++++- 2 files changed, 173 insertions(+), 24 deletions(-) diff --git a/surfsense_backend/app/connectors/composio_google_drive_connector.py b/surfsense_backend/app/connectors/composio_google_drive_connector.py index 369f5d8b3..93d7f360a 100644 --- a/surfsense_backend/app/connectors/composio_google_drive_connector.py +++ b/surfsense_backend/app/connectors/composio_google_drive_connector.py @@ -179,13 +179,14 @@ class ComposioGoogleDriveConnector(ComposioConnector): ) async def get_drive_file_content( - self, file_id: str + self, file_id: str, original_mime_type: str | None = None ) -> tuple[bytes | None, str | None]: """ Download file content from Google Drive via Composio. Args: file_id: Google Drive file ID. + original_mime_type: Original MIME type (used to detect Google Workspace files for export). Returns: Tuple of (file content bytes, error message). @@ -200,6 +201,31 @@ class ComposioGoogleDriveConnector(ComposioConnector): connected_account_id=connected_account_id, entity_id=entity_id, file_id=file_id, + original_mime_type=original_mime_type, + ) + + async def get_file_metadata( + self, file_id: str + ) -> tuple[dict[str, Any] | None, str | None]: + """ + Get metadata for a specific file from Google Drive. + + Args: + file_id: The ID of the file to get metadata for. + + Returns: + Tuple of (metadata dict, error message). + """ + connected_account_id = await self.get_connected_account_id() + if not connected_account_id: + return None, "No connected account ID found" + + entity_id = await self.get_entity_id() + service = await self._get_service() + return await service.get_file_metadata( + connected_account_id=connected_account_id, + entity_id=entity_id, + file_id=file_id, ) async def get_drive_start_page_token(self) -> tuple[str | None, str | None]: @@ -292,8 +318,18 @@ async def _process_file_content( if isinstance(content, str): content = content.encode("utf-8") - # Check if this is a binary file - if _is_binary_file(file_name, mime_type): + # Check if this is a binary file based on extension or MIME type + is_binary = _is_binary_file(file_name, mime_type) + + # Content-based binary detection as fallback + # This catches PDFs and other binary files even if MIME type is missing/incorrect + if not is_binary and content: + has_pdf_magic = content[:4] == b"%PDF" + has_null_bytes = b"\x00" in content[:1000] + if has_pdf_magic or has_null_bytes: + is_binary = True + + if is_binary: # Use ETL service for binary files (PDF, Office docs, etc.) temp_file_path = None try: @@ -316,7 +352,7 @@ async def _process_file_content( return extracted_text else: # Fallback if extraction fails - logger.warning(f"Could not extract text from binary file {file_name}") + logger.warning(f"ETL returned empty for binary file {file_name}") return f"# {file_name}\n\n[Binary file - text extraction failed]\n\n**File ID:** {file_id}\n**Type:** {mime_type}\n" except Exception as e: @@ -329,8 +365,8 @@ async def _process_file_content( if temp_file_path and os.path.exists(temp_file_path): try: os.unlink(temp_file_path) - except Exception as e: - logger.debug(f"Could not delete temp file {temp_file_path}: {e}") + except Exception: + pass else: # Text file - try to decode as UTF-8 try: @@ -372,9 +408,11 @@ async def _extract_text_with_etl( from logging import ERROR, getLogger etl_service = config.ETL_SERVICE + logger.debug(f"[_extract_text_with_etl] START - file_path={file_path}, file_name={file_name}, etl_service={etl_service}") try: if etl_service == "UNSTRUCTURED": + logger.debug("[_extract_text_with_etl] Using UNSTRUCTURED ETL") from langchain_unstructured import UnstructuredLoader from app.utils.document_converters import convert_document_to_markdown @@ -390,11 +428,16 @@ async def _extract_text_with_etl( ) docs = await loader.aload() + logger.debug(f"[_extract_text_with_etl] UNSTRUCTURED loaded {len(docs) if docs else 0} docs") if docs: - return await convert_document_to_markdown(docs) + result = await convert_document_to_markdown(docs) + logger.debug(f"[_extract_text_with_etl] UNSTRUCTURED result: {len(result) if result else 0} chars") + return result + logger.debug("[_extract_text_with_etl] UNSTRUCTURED returned no docs") return None elif etl_service == "LLAMACLOUD": + logger.debug("[_extract_text_with_etl] Using LLAMACLOUD ETL") from app.tasks.document_processors.file_processors import ( parse_with_llamacloud_retry, ) @@ -413,11 +456,16 @@ async def _extract_text_with_etl( markdown_documents = await result.aget_markdown_documents( split_by_page=False ) + logger.debug(f"[_extract_text_with_etl] LLAMACLOUD got {len(markdown_documents) if markdown_documents else 0} markdown docs") if markdown_documents: - return markdown_documents[0].text + text = markdown_documents[0].text + logger.debug(f"[_extract_text_with_etl] LLAMACLOUD result: {len(text) if text else 0} chars") + return text + logger.debug("[_extract_text_with_etl] LLAMACLOUD returned no markdown docs") return None elif etl_service == "DOCLING": + logger.debug("[_extract_text_with_etl] Using DOCLING ETL") from app.services.docling_service import create_docling_service docling_service = create_docling_service() @@ -441,16 +489,21 @@ async def _extract_text_with_etl( result = await docling_service.process_document( file_path, file_name ) + logger.debug(f"[_extract_text_with_etl] DOCLING result keys: {list(result.keys()) if result else 'None'}") finally: pdfminer_logger.setLevel(original_level) - return result.get("content") + content = result.get("content") + logger.debug(f"[_extract_text_with_etl] DOCLING content: {len(content) if content else 0} chars") + return content else: - logger.warning(f"Unknown ETL service: {etl_service}") + logger.warning(f"[_extract_text_with_etl] Unknown ETL service: {etl_service}") return None except Exception as e: - logger.error(f"ETL extraction failed for {file_name}: {e!s}") + logger.error(f"[_extract_text_with_etl] ETL extraction EXCEPTION for {file_name}: {e!s}") + import traceback + logger.error(f"[_extract_text_with_etl] Traceback: {traceback.format_exc()}") return None @@ -979,7 +1032,7 @@ async def _index_composio_drive_full_scan( all_files.extend(folder_files[:max_files_per_folder]) logger.info(f"Found {len(folder_files)} files in folder {folder_name}") - # Add specifically selected files + # Add specifically selected files - fetch metadata to get mimeType for selected_file in selected_files: file_id = selected_file.get("id") file_name = selected_file.get("name", "Unknown") @@ -987,14 +1040,35 @@ async def _index_composio_drive_full_scan( if not file_id: continue - # Add file info (we'll fetch content later during indexing) - all_files.append( - { - "id": file_id, - "name": file_name, - "mimeType": "", # Will be determined later - } - ) + # Fetch file metadata to get proper mimeType + metadata, meta_error = await composio_connector.get_file_metadata(file_id) + if metadata and not meta_error: + all_files.append( + { + "id": file_id, + "name": metadata.get("name") or file_name, + "mimeType": metadata.get("mimeType", ""), + "modifiedTime": metadata.get("modifiedTime", ""), + "createdTime": metadata.get("createdTime", ""), + } + ) + logger.info( + f"Fetched metadata for UI-selected file: {file_name} " + f"(mimeType={metadata.get('mimeType', 'unknown')})" + ) + else: + # Fallback if metadata fetch fails - content-based detection will handle it + logger.warning( + f"Could not fetch metadata for file {file_name}: {meta_error}. " + f"Falling back to content-based detection." + ) + all_files.append( + { + "id": file_id, + "name": file_name, + "mimeType": "", # Content-based detection will handle this + } + ) else: # No selection specified - fetch all files (original behavior) page_token = None @@ -1128,8 +1202,10 @@ async def _process_single_drive_file( session, unique_identifier_hash ) - # Get file content - content, content_error = await composio_connector.get_drive_file_content(file_id) + # Get file content (pass mime_type for Google Workspace export handling) + content, content_error = await composio_connector.get_drive_file_content( + file_id, original_mime_type=mime_type + ) if content_error or not content: logger.warning(f"Could not get content for file {file_name}: {content_error}") diff --git a/surfsense_backend/app/services/composio_service.py b/surfsense_backend/app/services/composio_service.py index ad7841a8b..2aedfcb39 100644 --- a/surfsense_backend/app/services/composio_service.py +++ b/surfsense_backend/app/services/composio_service.py @@ -449,8 +449,11 @@ class ComposioService: """ try: # Composio uses snake_case for parameters + # IMPORTANT: Include 'fields' to ensure mimeType is returned in the response + # Without this, Google Drive API may not include mimeType for some files params = { "page_size": min(page_size, 100), + "fields": "files(id,name,mimeType,modifiedTime,createdTime),nextPageToken", } if folder_id: # List contents of a specific folder (exclude shortcuts - we don't have access to them) @@ -498,7 +501,11 @@ class ComposioService: return [], None, str(e) async def get_drive_file_content( - self, connected_account_id: str, entity_id: str, file_id: str + self, + connected_account_id: str, + entity_id: str, + file_id: str, + original_mime_type: str | None = None, ) -> tuple[bytes | None, str | None]: """ Download file content from Google Drive via Composio. @@ -507,10 +514,13 @@ class ComposioService: to a local directory, and the local file path is provided in the response. Response includes: file_path, file_name, size fields. + For Google Workspace files (Docs, Sheets, Slides), exports to PDF format. + Args: connected_account_id: Composio connected account ID. entity_id: The entity/user ID that owns the connected account. file_id: Google Drive file ID. + original_mime_type: Original MIME type of the file (used to detect Google Workspace files). Returns: Tuple of (file content bytes, error message). @@ -518,10 +528,19 @@ class ComposioService: from pathlib import Path try: + params = {"file_id": file_id} + + # For Google Workspace files, explicitly export as PDF + # This ensures consistent behavior and proper binary detection + if original_mime_type and original_mime_type.startswith( + "application/vnd.google-apps." + ): + params["mime_type"] = "application/pdf" + result = await self.execute_tool( connected_account_id=connected_account_id, tool_name="GOOGLEDRIVE_DOWNLOAD_FILE", - params={"file_id": file_id}, + params=params, entity_id=entity_id, ) @@ -651,6 +670,60 @@ class ComposioService: logger.error(f"Failed to get Drive file content: {e!s}") return None, str(e) + async def get_file_metadata( + self, connected_account_id: str, entity_id: str, file_id: str + ) -> tuple[dict[str, Any] | None, str | None]: + """ + Get metadata for a specific file from Google Drive. + + Args: + connected_account_id: Composio connected account ID. + entity_id: The entity/user ID that owns the connected account. + file_id: The ID of the file to get metadata for. + + Returns: + Tuple of (metadata dict, error message). + """ + try: + result = await self.execute_tool( + connected_account_id=connected_account_id, + tool_name="GOOGLEDRIVE_GET_FILE_METADATA", + params={ + "file_id": file_id, + "fields": "id,name,mimeType,modifiedTime,createdTime,size", + }, + entity_id=entity_id, + ) + + if not result.get("success"): + return None, result.get("error", "Unknown error") + + data = result.get("data", {}) + + # Handle nested response structure + if isinstance(data, dict): + inner_data = data.get("data", data) + if isinstance(inner_data, dict): + # Extract metadata fields with fallbacks for camelCase/snake_case + metadata = { + "id": inner_data.get("id") or file_id, + "name": inner_data.get("name", ""), + "mimeType": inner_data.get("mimeType") + or inner_data.get("mime_type", ""), + "modifiedTime": inner_data.get("modifiedTime") + or inner_data.get("modified_time", ""), + "createdTime": inner_data.get("createdTime") + or inner_data.get("created_time", ""), + "size": inner_data.get("size", ""), + } + return metadata, None + + return None, "Could not extract metadata from response" + + except Exception as e: + logger.error(f"Failed to get file metadata: {e!s}") + return None, str(e) + async def get_drive_start_page_token( self, connected_account_id: str, entity_id: str ) -> tuple[str | None, str | None]: From 30c6f421020442157fbb1b92b1df03d364fbfd60 Mon Sep 17 00:00:00 2001 From: Anish Sarkar <104695310+AnishSarkar22@users.noreply.github.com> Date: Wed, 4 Feb 2026 03:03:40 +0530 Subject: [PATCH 3/4] feat: streamline Composio connector logic by removing redundant checks and enhancing email retrieval for user accounts --- .../composio_google_drive_connector.py | 13 +- .../app/routes/composio_routes.py | 83 ++--- .../app/services/composio_service.py | 311 ++++++++++-------- .../celery_tasks/connector_deletion_task.py | 4 +- 4 files changed, 221 insertions(+), 190 deletions(-) diff --git a/surfsense_backend/app/connectors/composio_google_drive_connector.py b/surfsense_backend/app/connectors/composio_google_drive_connector.py index 93d7f360a..e71b4ef52 100644 --- a/surfsense_backend/app/connectors/composio_google_drive_connector.py +++ b/surfsense_backend/app/connectors/composio_google_drive_connector.py @@ -4,6 +4,7 @@ Composio Google Drive Connector Module. Provides Google Drive specific methods for data retrieval and indexing via Composio. """ +import contextlib import hashlib import json import logging @@ -321,14 +322,6 @@ async def _process_file_content( # Check if this is a binary file based on extension or MIME type is_binary = _is_binary_file(file_name, mime_type) - # Content-based binary detection as fallback - # This catches PDFs and other binary files even if MIME type is missing/incorrect - if not is_binary and content: - has_pdf_magic = content[:4] == b"%PDF" - has_null_bytes = b"\x00" in content[:1000] - if has_pdf_magic or has_null_bytes: - is_binary = True - if is_binary: # Use ETL service for binary files (PDF, Office docs, etc.) temp_file_path = None @@ -363,10 +356,8 @@ async def _process_file_content( finally: # Cleanup temp file if temp_file_path and os.path.exists(temp_file_path): - try: + with contextlib.suppress(Exception): os.unlink(temp_file_path) - except Exception: - pass else: # Text file - try to decode as UTF-8 try: diff --git a/surfsense_backend/app/routes/composio_routes.py b/surfsense_backend/app/routes/composio_routes.py index 602aa876c..e0c6c1f65 100644 --- a/surfsense_backend/app/routes/composio_routes.py +++ b/surfsense_backend/app/routes/composio_routes.py @@ -42,10 +42,6 @@ from app.utils.connector_naming import ( ) from app.utils.oauth_security import OAuthStateManager -# Note: We no longer use check_duplicate_connector for Composio connectors because -# Composio generates a new connected_account_id each time, even for the same Google account. -# Instead, we check for existing connectors by type/space/user and update them. - logger = logging.getLogger(__name__) router = APIRouter() @@ -256,11 +252,6 @@ async def composio_callback( "connectedAccountId" ) or query_params.get("connected_account_id") - # DEBUG: Log query parameter received - logger.info( - f"DEBUG: Callback received - connectedAccountId: {query_params.get('connectedAccountId')}, connected_account_id: {query_params.get('connected_account_id')}, using: {final_connected_account_id}" - ) - # If we still don't have a connected_account_id, warn but continue # (the connector will be created but indexing won't work until updated) if not final_connected_account_id: @@ -273,6 +264,9 @@ async def composio_callback( f"Successfully got connected_account_id: {final_connected_account_id}" ) + # Build entity_id for Composio API calls (same format as used in initiate) + entity_id = f"surfsense_{user_id}" + # Build connector config connector_config = { "composio_connected_account_id": final_connected_account_id, @@ -290,20 +284,51 @@ async def composio_callback( ) connector_type = SearchSourceConnectorType(connector_type_str) - # Check for existing connector of the same type for this user/space - # When reconnecting, Composio gives a new connected_account_id, so we need to - # check by connector_type, user_id, and search_space_id instead of connected_account_id + # Get the base name for this connector type (e.g., "Google Drive", "Gmail") + base_name = get_base_name_for_type(connector_type) + + # FIRST: Get the email for this connected account + # This is needed to determine if it's a reconnection (same email) or new account + email = None + try: + email = await service.get_connected_account_email( + connected_account_id=final_connected_account_id, + entity_id=entity_id, + toolkit_id=toolkit_id, + ) + if email: + logger.info(f"Retrieved email {email} for {toolkit_id} connector") + except Exception as email_error: + logger.warning(f"Could not get email for connector: {email_error!s}") + + # Generate the connector name (with email if available) + # Format: "Gmail (Composio) - john@gmail.com" or "Gmail (Composio) 1" if no email + if email: + connector_name = f"{base_name} (Composio) - {email}" + else: + # Fallback to generic naming if email not available + count = await count_connectors_of_type( + session, connector_type, space_id, user_id + ) + if count == 0: + connector_name = f"{base_name} (Composio) 1" + else: + connector_name = f"{base_name} (Composio) {count + 1}" + + # Check if a connector with this SAME name already exists (reconnection case) + # This allows multiple accounts (different emails) while supporting reconnection existing_connector_result = await session.execute( select(SearchSourceConnector).where( SearchSourceConnector.connector_type == connector_type, SearchSourceConnector.search_space_id == space_id, SearchSourceConnector.user_id == user_id, + SearchSourceConnector.name == connector_name, ) ) existing_connector = existing_connector_result.scalars().first() if existing_connector: - # Delete the old Composio connected account before updating + # This is a RECONNECTION of the same account - update existing connector old_connected_account_id = existing_connector.config.get( "composio_connected_account_id" ) @@ -320,22 +345,16 @@ async def composio_callback( f"Deleted old Composio connected account {old_connected_account_id} " f"before updating connector {existing_connector.id}" ) - else: - logger.warning( - f"Failed to delete old Composio connected account {old_connected_account_id}" - ) except Exception as delete_error: - # Log but don't fail - the old account may already be deleted logger.warning( f"Error deleting old Composio connected account {old_connected_account_id}: {delete_error!s}" ) # Update existing connector with new connected_account_id - # IMPORTANT: Merge new credentials with existing config to preserve - # user settings like selected_folders, selected_files, indexing_options, - # drive_page_token, etc. that would otherwise be wiped on reconnection. + # Merge new credentials with existing config to preserve user settings logger.info( - f"Updating existing Composio connector {existing_connector.id} with new connected_account_id {final_connected_account_id}" + f"Reconnecting existing Composio connector {existing_connector.id} ({connector_name}) " + f"with new connected_account_id {final_connected_account_id}" ) existing_config = ( existing_connector.config.copy() if existing_connector.config else {} @@ -347,28 +366,16 @@ async def composio_callback( await session.commit() await session.refresh(existing_connector) - # Get the frontend connector ID based on toolkit_id frontend_connector_id = TOOLKIT_TO_FRONTEND_CONNECTOR_ID.get( toolkit_id, "composio-connector" ) return RedirectResponse( - url=f"{config.NEXT_FRONTEND_URL}/dashboard/{space_id}/new-chat?modal=connectors&tab=all&success=true&connector={frontend_connector_id}&connectorId={existing_connector.id}" + url=f"{config.NEXT_FRONTEND_URL}/dashboard/{space_id}/new-chat?modal=connectors&tab=all&success=true&connector={frontend_connector_id}&connectorId={existing_connector.id}&view=configure" ) + # This is a NEW account - create a new connector try: - # Count existing connectors of this type to determine the number - count = await count_connectors_of_type( - session, connector_type, space_id, user_id - ) - - # Generate base name (e.g., "Gmail", "Google Drive") - base_name = get_base_name_for_type(connector_type) - - # Format: "Gmail (Composio) 1", "Gmail (Composio) 2", etc. - if count == 0: - connector_name = f"{base_name} (Composio) 1" - else: - connector_name = f"{base_name} (Composio) {count + 1}" + logger.info(f"Creating new Composio connector: {connector_name}") db_connector = SearchSourceConnector( name=connector_name, @@ -392,7 +399,7 @@ async def composio_callback( toolkit_id, "composio-connector" ) return RedirectResponse( - url=f"{config.NEXT_FRONTEND_URL}/dashboard/{space_id}/new-chat?modal=connectors&tab=all&success=true&connector={frontend_connector_id}&connectorId={db_connector.id}" + url=f"{config.NEXT_FRONTEND_URL}/dashboard/{space_id}/new-chat?modal=connectors&tab=all&success=true&connector={frontend_connector_id}&connectorId={db_connector.id}&view=configure" ) except IntegrityError as e: diff --git a/surfsense_backend/app/services/composio_service.py b/surfsense_backend/app/services/composio_service.py index 2aedfcb39..763347d5a 100644 --- a/surfsense_backend/app/services/composio_service.py +++ b/surfsense_backend/app/services/composio_service.py @@ -15,17 +15,6 @@ from app.config import config logger = logging.getLogger(__name__) -# Mapping of toolkit IDs to their Composio auth config IDs -# These use Composio's managed OAuth (no custom credentials needed) -COMPOSIO_TOOLKIT_AUTH_CONFIGS = { - "googledrive": "default", # Uses Composio's managed Google OAuth - "gmail": "default", - "googlecalendar": "default", - "slack": "default", - "notion": "default", - "github": "default", -} - # Mapping of toolkit IDs to their display names COMPOSIO_TOOLKIT_NAMES = { "googledrive": "Google Drive", @@ -234,134 +223,6 @@ class ComposioService: logger.error(f"Failed to initiate Composio connection: {e!s}") raise - async def get_connected_account( - self, connected_account_id: str - ) -> dict[str, Any] | None: - """ - Get details of a connected account. - - Args: - connected_account_id: The Composio connected account ID. - - Returns: - Connected account details or None if not found. - """ - try: - # Pass connected_account_id as positional argument (not keyword) - account = self.client.connected_accounts.get(connected_account_id) - return { - "id": account.id, - "status": getattr(account, "status", None), - "toolkit": getattr(account, "toolkit", None), - "user_id": getattr(account, "user_id", None), - } - except Exception as e: - logger.error( - f"Failed to get connected account {connected_account_id}: {e!s}" - ) - return None - - async def list_all_connections(self) -> list[dict[str, Any]]: - """ - List ALL connected accounts (for debugging). - - Returns: - List of all connected account details. - """ - try: - accounts_response = self.client.connected_accounts.list() - - if hasattr(accounts_response, "items"): - accounts = accounts_response.items - elif hasattr(accounts_response, "__iter__"): - accounts = accounts_response - else: - logger.warning( - f"Unexpected accounts response type: {type(accounts_response)}" - ) - return [] - - result = [] - for acc in accounts: - toolkit_raw = getattr(acc, "toolkit", None) - toolkit_info = None - if toolkit_raw: - if isinstance(toolkit_raw, str): - toolkit_info = toolkit_raw - elif hasattr(toolkit_raw, "slug"): - toolkit_info = toolkit_raw.slug - elif hasattr(toolkit_raw, "name"): - toolkit_info = toolkit_raw.name - else: - toolkit_info = str(toolkit_raw) - - result.append( - { - "id": acc.id, - "status": getattr(acc, "status", None), - "toolkit": toolkit_info, - "user_id": getattr(acc, "user_id", None), - } - ) - - return result - except Exception as e: - logger.error(f"Failed to list all connections: {e!s}") - return [] - - async def list_user_connections(self, user_id: str) -> list[dict[str, Any]]: - """ - List all connected accounts for a user. - - Args: - user_id: The user's unique identifier. - - Returns: - List of connected account details. - """ - try: - accounts_response = self.client.connected_accounts.list(user_id=user_id) - - # Handle paginated response (may have .items attribute) or direct list - if hasattr(accounts_response, "items"): - accounts = accounts_response.items - elif hasattr(accounts_response, "__iter__"): - accounts = accounts_response - else: - logger.warning( - f"Unexpected accounts response type: {type(accounts_response)}" - ) - return [] - - result = [] - for acc in accounts: - # Extract toolkit info - might be string or object - toolkit_raw = getattr(acc, "toolkit", None) - toolkit_info = None - if toolkit_raw: - if isinstance(toolkit_raw, str): - toolkit_info = toolkit_raw - elif hasattr(toolkit_raw, "slug"): - toolkit_info = toolkit_raw.slug - elif hasattr(toolkit_raw, "name"): - toolkit_info = toolkit_raw.name - else: - toolkit_info = toolkit_raw - - result.append( - { - "id": acc.id, - "status": getattr(acc, "status", None), - "toolkit": toolkit_info, - } - ) - - logger.info(f"Found {len(result)} connections for user {user_id}: {result}") - return result - except Exception as e: - logger.error(f"Failed to list connections for user {user_id}: {e!s}") - return [] - async def delete_connected_account(self, connected_account_id: str) -> bool: """ Delete a connected account from Composio. @@ -1018,6 +879,178 @@ class ComposioService: logger.error(f"Failed to list Calendar events: {e!s}") return [], str(e) + # ===== User Info Methods ===== + + async def get_connected_account_email( + self, + connected_account_id: str, + entity_id: str, + toolkit_id: str, + ) -> str | None: + """ + Get the email address associated with a connected account. + + Uses toolkit-specific API calls: + - Google Drive: List files and extract owner email + - Gmail: Get user profile + - Google Calendar: List events and extract organizer/creator email + + Args: + connected_account_id: Composio connected account ID. + entity_id: The entity/user ID that owns the connected account. + toolkit_id: The toolkit identifier (googledrive, gmail, googlecalendar). + + Returns: + Email address string or None if not available. + """ + try: + email = await self._extract_email_for_toolkit( + connected_account_id, entity_id, toolkit_id + ) + + if email: + logger.info(f"Retrieved email {email} for {toolkit_id} connector") + else: + logger.warning(f"Could not retrieve email for {toolkit_id} connector") + + return email + + except Exception as e: + logger.error(f"Failed to get email for {toolkit_id} connector: {e!s}") + return None + + async def _extract_email_for_toolkit( + self, + connected_account_id: str, + entity_id: str, + toolkit_id: str, + ) -> str | None: + """Extract email based on toolkit type.""" + if toolkit_id == "googledrive": + return await self._get_drive_owner_email(connected_account_id, entity_id) + elif toolkit_id == "gmail": + return await self._get_gmail_profile_email(connected_account_id, entity_id) + elif toolkit_id == "googlecalendar": + return await self._get_calendar_user_email(connected_account_id, entity_id) + return None + + async def _get_drive_owner_email( + self, connected_account_id: str, entity_id: str + ) -> str | None: + """Get email from Google Drive file owner where me=True.""" + # List files owned by the user and find one where owner.me=True + result = await self.execute_tool( + connected_account_id=connected_account_id, + tool_name="GOOGLEDRIVE_LIST_FILES", + params={ + "page_size": 10, + "fields": "files(owners)", + "q": "'me' in owners", # Only files owned by current user + }, + entity_id=entity_id, + ) + + if not result.get("success"): + return None + + data = result.get("data", {}) + if not isinstance(data, dict): + return None + + files = data.get("files") or data.get("data", {}).get("files", []) + for file in files: + owners = file.get("owners", []) + for owner in owners: + # Only return email if this is the current user (me=True) + if owner.get("me") and owner.get("emailAddress"): + return owner.get("emailAddress") + + return None + + async def _get_gmail_profile_email( + self, connected_account_id: str, entity_id: str + ) -> str | None: + """Get email from Gmail profile.""" + result = await self.execute_tool( + connected_account_id=connected_account_id, + tool_name="GMAIL_GET_PROFILE", + params={}, + entity_id=entity_id, + ) + + if not result.get("success"): + return None + + data = result.get("data", {}) + if not isinstance(data, dict): + return None + + return data.get("emailAddress") or data.get("data", {}).get("emailAddress") + + async def _get_calendar_user_email( + self, connected_account_id: str, entity_id: str + ) -> str | None: + """Get email from Google Calendar primary calendar or event organizer/creator.""" + # Method 1: Get primary calendar - the "summary" field is the user's email + result = await self.execute_tool( + connected_account_id=connected_account_id, + tool_name="GOOGLECALENDAR_GET_CALENDAR", + params={"calendar_id": "primary"}, + entity_id=entity_id, + ) + + if result.get("success"): + data = result.get("data", {}) + if isinstance(data, dict): + # Handle nested structure: data['data']['calendar_data']['summary'] + calendar_data = ( + data.get("data", {}).get("calendar_data", {}) + if isinstance(data.get("data"), dict) + else {} + ) + summary = ( + calendar_data.get("summary") + or calendar_data.get("id") + or data.get("data", {}).get("summary") + or data.get("summary") + ) + if summary and "@" in summary: + return summary + + # Method 2: Fallback - list events to get calendar summary (owner's email) + result = await self.execute_tool( + connected_account_id=connected_account_id, + tool_name="GOOGLECALENDAR_EVENTS_LIST", + params={"max_results": 20}, + entity_id=entity_id, + ) + + if not result.get("success"): + return None + + data = result.get("data", {}) + if not isinstance(data, dict): + return None + + # The events list response contains 'summary' which is the calendar owner's email + nested_data = data.get("data", {}) if isinstance(data.get("data"), dict) else {} + summary = nested_data.get("summary") or data.get("summary") + if summary and "@" in summary: + return summary + + # Method 3: Check event organizers/creators + items = nested_data.get("items", []) or data.get("items", []) + for event in items: + organizer = event.get("organizer", {}) + if organizer.get("self"): + return organizer.get("email") + + creator = event.get("creator", {}) + if creator.get("self"): + return creator.get("email") + + return None + # Singleton instance _composio_service: ComposioService | None = None diff --git a/surfsense_backend/app/tasks/celery_tasks/connector_deletion_task.py b/surfsense_backend/app/tasks/celery_tasks/connector_deletion_task.py index 0fd68637c..e024aca29 100644 --- a/surfsense_backend/app/tasks/celery_tasks/connector_deletion_task.py +++ b/surfsense_backend/app/tasks/celery_tasks/connector_deletion_task.py @@ -166,8 +166,8 @@ async def _delete_connector_async( user_id=UUID(user_id), search_space_id=search_space_id, type="connector_deletion", - title=f"{connector_name} Removed", - message=f"Connector and {total_deleted} {doc_text} have been removed from your knowledge base.", + title=f"{connector_name} removed", + message=f"Cleanup complete. {total_deleted} {doc_text} removed.", notification_metadata={ "connector_id": connector_id, "connector_name": connector_name, From 580b75c3c4a508fd06c8d1ea12cb50f0279fbd85 Mon Sep 17 00:00:00 2001 From: Anish Sarkar <104695310+AnishSarkar22@users.noreply.github.com> Date: Wed, 4 Feb 2026 03:04:25 +0530 Subject: [PATCH 4/4] chore: ran linting --- .../composio_google_drive_connector.py | 41 ++++++++++++++----- .../new-chat/[[...chat_id]]/page.tsx | 15 +++---- .../new-chat/document-mention-picker.tsx | 6 +-- 3 files changed, 40 insertions(+), 22 deletions(-) diff --git a/surfsense_backend/app/connectors/composio_google_drive_connector.py b/surfsense_backend/app/connectors/composio_google_drive_connector.py index e71b4ef52..66669e4e0 100644 --- a/surfsense_backend/app/connectors/composio_google_drive_connector.py +++ b/surfsense_backend/app/connectors/composio_google_drive_connector.py @@ -399,7 +399,9 @@ async def _extract_text_with_etl( from logging import ERROR, getLogger etl_service = config.ETL_SERVICE - logger.debug(f"[_extract_text_with_etl] START - file_path={file_path}, file_name={file_name}, etl_service={etl_service}") + logger.debug( + f"[_extract_text_with_etl] START - file_path={file_path}, file_name={file_name}, etl_service={etl_service}" + ) try: if etl_service == "UNSTRUCTURED": @@ -419,10 +421,14 @@ async def _extract_text_with_etl( ) docs = await loader.aload() - logger.debug(f"[_extract_text_with_etl] UNSTRUCTURED loaded {len(docs) if docs else 0} docs") + logger.debug( + f"[_extract_text_with_etl] UNSTRUCTURED loaded {len(docs) if docs else 0} docs" + ) if docs: result = await convert_document_to_markdown(docs) - logger.debug(f"[_extract_text_with_etl] UNSTRUCTURED result: {len(result) if result else 0} chars") + logger.debug( + f"[_extract_text_with_etl] UNSTRUCTURED result: {len(result) if result else 0} chars" + ) return result logger.debug("[_extract_text_with_etl] UNSTRUCTURED returned no docs") return None @@ -447,12 +453,18 @@ async def _extract_text_with_etl( markdown_documents = await result.aget_markdown_documents( split_by_page=False ) - logger.debug(f"[_extract_text_with_etl] LLAMACLOUD got {len(markdown_documents) if markdown_documents else 0} markdown docs") + logger.debug( + f"[_extract_text_with_etl] LLAMACLOUD got {len(markdown_documents) if markdown_documents else 0} markdown docs" + ) if markdown_documents: text = markdown_documents[0].text - logger.debug(f"[_extract_text_with_etl] LLAMACLOUD result: {len(text) if text else 0} chars") + logger.debug( + f"[_extract_text_with_etl] LLAMACLOUD result: {len(text) if text else 0} chars" + ) return text - logger.debug("[_extract_text_with_etl] LLAMACLOUD returned no markdown docs") + logger.debug( + "[_extract_text_with_etl] LLAMACLOUD returned no markdown docs" + ) return None elif etl_service == "DOCLING": @@ -480,20 +492,29 @@ async def _extract_text_with_etl( result = await docling_service.process_document( file_path, file_name ) - logger.debug(f"[_extract_text_with_etl] DOCLING result keys: {list(result.keys()) if result else 'None'}") + logger.debug( + f"[_extract_text_with_etl] DOCLING result keys: {list(result.keys()) if result else 'None'}" + ) finally: pdfminer_logger.setLevel(original_level) content = result.get("content") - logger.debug(f"[_extract_text_with_etl] DOCLING content: {len(content) if content else 0} chars") + logger.debug( + f"[_extract_text_with_etl] DOCLING content: {len(content) if content else 0} chars" + ) return content else: - logger.warning(f"[_extract_text_with_etl] Unknown ETL service: {etl_service}") + logger.warning( + f"[_extract_text_with_etl] Unknown ETL service: {etl_service}" + ) return None except Exception as e: - logger.error(f"[_extract_text_with_etl] ETL extraction EXCEPTION for {file_name}: {e!s}") + logger.error( + f"[_extract_text_with_etl] ETL extraction EXCEPTION for {file_name}: {e!s}" + ) import traceback + logger.error(f"[_extract_text_with_etl] Traceback: {traceback.format_exc()}") return None diff --git a/surfsense_web/app/dashboard/[search_space_id]/new-chat/[[...chat_id]]/page.tsx b/surfsense_web/app/dashboard/[search_space_id]/new-chat/[[...chat_id]]/page.tsx index fb84849e7..1be292f4d 100644 --- a/surfsense_web/app/dashboard/[search_space_id]/new-chat/[[...chat_id]]/page.tsx +++ b/surfsense_web/app/dashboard/[search_space_id]/new-chat/[[...chat_id]]/page.tsx @@ -390,7 +390,7 @@ export default function NewChatPage() { } setIsRunning(false); }, []); - + // Handle new message from user const onNew = useCallback( async (message: AppendMessage) => { @@ -1357,14 +1357,11 @@ export default function NewChatPage() { ); // Handle reloading/refreshing the last AI response - const onReload = useCallback( - async () => { - // parentId is the ID of the message to reload from (the user message) - // We call regenerate without a query to use the same query - await handleRegenerate(null); - }, - [handleRegenerate] - ); + const onReload = useCallback(async () => { + // parentId is the ID of the message to reload from (the user message) + // We call regenerate without a query to use the same query + await handleRegenerate(null); + }, [handleRegenerate]); // Create external store runtime with attachment support const runtime = useExternalStoreRuntime({ diff --git a/surfsense_web/components/new-chat/document-mention-picker.tsx b/surfsense_web/components/new-chat/document-mention-picker.tsx index 7a0d0f009..8dbd680bc 100644 --- a/surfsense_web/components/new-chat/document-mention-picker.tsx +++ b/surfsense_web/components/new-chat/document-mention-picker.tsx @@ -417,8 +417,8 @@ export const DocumentMentionPicker = forwardRef<
{["a", "b", "c", "d", "e"].map((id, i) => ( -
= 3 && "hidden sm:flex" @@ -428,7 +428,7 @@ export const DocumentMentionPicker = forwardRef< - +
))}