diff --git a/surfsense_backend/app/agents/multi_agent_chat/subagents/connectors/google_drive/tools/__init__.py b/surfsense_backend/app/agents/multi_agent_chat/subagents/connectors/google_drive/tools/__init__.py index 1f5feca60..403140a5d 100644 --- a/surfsense_backend/app/agents/multi_agent_chat/subagents/connectors/google_drive/tools/__init__.py +++ b/surfsense_backend/app/agents/multi_agent_chat/subagents/connectors/google_drive/tools/__init__.py @@ -1,9 +1,5 @@ -from app.agents.shared.tools.google_drive.create_file import ( - create_create_google_drive_file_tool, -) -from app.agents.shared.tools.google_drive.trash_file import ( - create_delete_google_drive_file_tool, -) +from .create_file import create_create_google_drive_file_tool +from .trash_file import create_delete_google_drive_file_tool __all__ = [ "create_create_google_drive_file_tool", diff --git a/surfsense_backend/app/agents/shared/tools/google_drive/__init__.py b/surfsense_backend/app/agents/shared/tools/google_drive/__init__.py deleted file mode 100644 index 1f5feca60..000000000 --- a/surfsense_backend/app/agents/shared/tools/google_drive/__init__.py +++ /dev/null @@ -1,11 +0,0 @@ -from app.agents.shared.tools.google_drive.create_file import ( - create_create_google_drive_file_tool, -) -from app.agents.shared.tools.google_drive.trash_file import ( - create_delete_google_drive_file_tool, -) - -__all__ = [ - "create_create_google_drive_file_tool", - "create_delete_google_drive_file_tool", -] diff --git a/surfsense_backend/app/agents/shared/tools/google_drive/create_file.py b/surfsense_backend/app/agents/shared/tools/google_drive/create_file.py deleted file mode 100644 index dc64d8c92..000000000 --- a/surfsense_backend/app/agents/shared/tools/google_drive/create_file.py +++ /dev/null @@ -1,340 +0,0 @@ -import logging -from typing import Any, Literal - -from googleapiclient.errors import HttpError -from langchain_core.tools import tool -from sqlalchemy.ext.asyncio import AsyncSession - -from app.agents.shared.tools.hitl import request_approval -from app.connectors.google_drive.client import GoogleDriveClient -from app.connectors.google_drive.file_types import GOOGLE_DOC, GOOGLE_SHEET -from app.db import async_session_maker -from app.services.google_drive import GoogleDriveToolMetadataService - -logger = logging.getLogger(__name__) - -_MIME_MAP: dict[str, str] = { - "google_doc": GOOGLE_DOC, - "google_sheet": GOOGLE_SHEET, -} - - -def create_create_google_drive_file_tool( - db_session: AsyncSession | None = None, - search_space_id: int | None = None, - user_id: str | None = None, -): - """ - Factory function to create the create_google_drive_file tool. - - The tool acquires its own short-lived ``AsyncSession`` per call via - :data:`async_session_maker` so the closure is safe to share across - HTTP requests by the compiled-agent cache. Capturing a per-request - session here would surface stale/closed sessions on cache hits. - - Args: - db_session: Reserved for registry compatibility. Per-call sessions - are opened via :data:`async_session_maker` inside the tool body. - search_space_id: Search space ID to find the Google Drive connector - user_id: User ID for fetching user-specific context - - Returns: - Configured create_google_drive_file tool - """ - del db_session # per-call session — see docstring - - @tool - async def create_google_drive_file( - name: str, - file_type: Literal["google_doc", "google_sheet"], - content: str | None = None, - ) -> dict[str, Any]: - """Create a new Google Doc or Google Sheet in Google Drive. - - Use this tool when the user explicitly asks to create a new document - or spreadsheet in Google Drive. The user MUST specify a topic before - you call this tool. If the request does not contain a topic (e.g. - "create a drive doc" or "make a Google Sheet"), ask what the file - should be about. Never call this tool without a clear topic from the user. - - Args: - name: The file name (without extension). - file_type: Either "google_doc" or "google_sheet". - content: Optional initial content. Generate from the user's topic. - For google_doc, provide markdown text. For google_sheet, provide CSV-formatted text. - - Returns: - Dictionary with: - - status: "success", "rejected", or "error" - - file_id: Google Drive file ID (if success) - - name: File name (if success) - - web_view_link: URL to open the file (if success) - - message: Result message - - IMPORTANT: - - If status is "rejected", the user explicitly declined the action. - Respond with a brief acknowledgment and do NOT retry or suggest alternatives. - - If status is "insufficient_permissions", the connector lacks the required OAuth scope. - Inform the user they need to re-authenticate and do NOT retry the action. - - Examples: - - "Create a Google Doc with today's meeting notes" - - "Create a spreadsheet for the 2026 budget" - """ - logger.info( - f"create_google_drive_file called: name='{name}', type='{file_type}'" - ) - - if search_space_id is None or user_id is None: - return { - "status": "error", - "message": "Google Drive tool not properly configured. Please contact support.", - } - - if file_type not in _MIME_MAP: - return { - "status": "error", - "message": f"Unsupported file type '{file_type}'. Use 'google_doc' or 'google_sheet'.", - } - - try: - async with async_session_maker() as db_session: - metadata_service = GoogleDriveToolMetadataService(db_session) - context = await metadata_service.get_creation_context( - search_space_id, user_id - ) - - if "error" in context: - logger.error( - f"Failed to fetch creation context: {context['error']}" - ) - return {"status": "error", "message": context["error"]} - - accounts = context.get("accounts", []) - if accounts and all(a.get("auth_expired") for a in accounts): - logger.warning( - "All Google Drive accounts have expired authentication" - ) - return { - "status": "auth_error", - "message": "All connected Google Drive accounts need re-authentication. Please re-authenticate in your connector settings.", - "connector_type": "google_drive", - } - - logger.info( - f"Requesting approval for creating Google Drive file: name='{name}', type='{file_type}'" - ) - result = request_approval( - action_type="google_drive_file_creation", - tool_name="create_google_drive_file", - params={ - "name": name, - "file_type": file_type, - "content": content, - "connector_id": None, - "parent_folder_id": None, - }, - context=context, - ) - - if result.rejected: - return { - "status": "rejected", - "message": "User declined. The file was not created. Do not ask again or suggest alternatives.", - } - - final_name = result.params.get("name", name) - final_file_type = result.params.get("file_type", file_type) - final_content = result.params.get("content", content) - final_connector_id = result.params.get("connector_id") - final_parent_folder_id = result.params.get("parent_folder_id") - - if not final_name or not final_name.strip(): - return {"status": "error", "message": "File name cannot be empty."} - - mime_type = _MIME_MAP.get(final_file_type) - if not mime_type: - return { - "status": "error", - "message": f"Unsupported file type '{final_file_type}'.", - } - - from sqlalchemy.future import select - - from app.db import SearchSourceConnector, SearchSourceConnectorType - - _drive_types = [ - SearchSourceConnectorType.GOOGLE_DRIVE_CONNECTOR, - SearchSourceConnectorType.COMPOSIO_GOOGLE_DRIVE_CONNECTOR, - ] - - if final_connector_id is not None: - result = await db_session.execute( - select(SearchSourceConnector).filter( - SearchSourceConnector.id == final_connector_id, - SearchSourceConnector.search_space_id == search_space_id, - SearchSourceConnector.user_id == user_id, - SearchSourceConnector.connector_type.in_(_drive_types), - ) - ) - connector = result.scalars().first() - if not connector: - return { - "status": "error", - "message": "Selected Google Drive connector is invalid or has been disconnected.", - } - actual_connector_id = connector.id - else: - result = await db_session.execute( - select(SearchSourceConnector).filter( - SearchSourceConnector.search_space_id == search_space_id, - SearchSourceConnector.user_id == user_id, - SearchSourceConnector.connector_type.in_(_drive_types), - ) - ) - connector = result.scalars().first() - if not connector: - return { - "status": "error", - "message": "No Google Drive connector found. Please connect Google Drive in your workspace settings.", - } - actual_connector_id = connector.id - - logger.info( - f"Creating Google Drive file: name='{final_name}', type='{final_file_type}', connector={actual_connector_id}" - ) - - is_composio_drive = ( - connector.connector_type - == SearchSourceConnectorType.COMPOSIO_GOOGLE_DRIVE_CONNECTOR - ) - if is_composio_drive: - cca_id = connector.config.get("composio_connected_account_id") - if not cca_id: - return { - "status": "error", - "message": "Composio connected account ID not found for this Drive connector.", - } - client = GoogleDriveClient( - session=db_session, - connector_id=actual_connector_id, - ) - try: - if is_composio_drive: - from app.services.composio_service import ComposioService - - params: dict[str, Any] = { - "name": final_name, - "mimeType": mime_type, - "fields": "id,name,webViewLink,mimeType", - } - if final_parent_folder_id: - params["parents"] = [final_parent_folder_id] - if final_content: - params["description"] = final_content[:4096] - - result = await ComposioService().execute_tool( - connected_account_id=cca_id, - tool_name="GOOGLEDRIVE_CREATE_FILE", - params=params, - entity_id=f"surfsense_{user_id}", - ) - if not result.get("success"): - raise RuntimeError( - result.get("error", "Unknown Composio Drive error") - ) - created = result.get("data", {}) - if isinstance(created, dict): - created = created.get("data", created) - if isinstance(created, dict): - created = created.get("response_data", created) - if not isinstance(created, dict): - created = {} - else: - created = await client.create_file( - name=final_name, - mime_type=mime_type, - parent_folder_id=final_parent_folder_id, - content=final_content, - ) - except HttpError as http_err: - if http_err.resp.status == 403: - logger.warning( - f"Insufficient permissions for connector {actual_connector_id}: {http_err}" - ) - try: - from sqlalchemy.orm.attributes import flag_modified - - _res = await db_session.execute( - select(SearchSourceConnector).where( - SearchSourceConnector.id == actual_connector_id - ) - ) - _conn = _res.scalar_one_or_none() - if _conn and not _conn.config.get("auth_expired"): - _conn.config = {**_conn.config, "auth_expired": True} - flag_modified(_conn, "config") - await db_session.commit() - except Exception: - logger.warning( - "Failed to persist auth_expired for connector %s", - actual_connector_id, - exc_info=True, - ) - return { - "status": "insufficient_permissions", - "connector_id": actual_connector_id, - "message": "This Google Drive account needs additional permissions. Please re-authenticate in connector settings.", - } - raise - - logger.info( - f"Google Drive file created: id={created.get('id')}, name={created.get('name')}" - ) - - kb_message_suffix = "" - try: - from app.services.google_drive import GoogleDriveKBSyncService - - kb_service = GoogleDriveKBSyncService(db_session) - kb_result = await kb_service.sync_after_create( - file_id=created.get("id"), - file_name=created.get("name", final_name), - mime_type=mime_type, - web_view_link=created.get("webViewLink"), - content=final_content, - connector_id=actual_connector_id, - search_space_id=search_space_id, - user_id=user_id, - ) - if kb_result["status"] == "success": - kb_message_suffix = ( - " Your knowledge base has also been updated." - ) - else: - kb_message_suffix = " This file will be added to your knowledge base in the next scheduled sync." - except Exception as kb_err: - logger.warning(f"KB sync after create failed: {kb_err}") - kb_message_suffix = " This file will be added to your knowledge base in the next scheduled sync." - - return { - "status": "success", - "file_id": created.get("id"), - "name": created.get("name"), - "web_view_link": created.get("webViewLink"), - "message": f"Successfully created '{created.get('name')}' in Google Drive.{kb_message_suffix}", - } - - except Exception as e: - from langgraph.errors import GraphInterrupt - - if isinstance(e, GraphInterrupt): - raise - - logger.error(f"Error creating Google Drive file: {e}", exc_info=True) - return { - "status": "error", - "message": "Something went wrong while creating the file. Please try again.", - } - - return create_google_drive_file diff --git a/surfsense_backend/app/agents/shared/tools/google_drive/trash_file.py b/surfsense_backend/app/agents/shared/tools/google_drive/trash_file.py deleted file mode 100644 index 69e8ba6d0..000000000 --- a/surfsense_backend/app/agents/shared/tools/google_drive/trash_file.py +++ /dev/null @@ -1,299 +0,0 @@ -import logging -from typing import Any - -from googleapiclient.errors import HttpError -from langchain_core.tools import tool -from sqlalchemy.ext.asyncio import AsyncSession - -from app.agents.shared.tools.hitl import request_approval -from app.connectors.google_drive.client import GoogleDriveClient -from app.db import async_session_maker -from app.services.google_drive import GoogleDriveToolMetadataService - -logger = logging.getLogger(__name__) - - -def create_delete_google_drive_file_tool( - db_session: AsyncSession | None = None, - search_space_id: int | None = None, - user_id: str | None = None, -): - """ - Factory function to create the delete_google_drive_file tool. - - The tool acquires its own short-lived ``AsyncSession`` per call via - :data:`async_session_maker` so the closure is safe to share across - HTTP requests by the compiled-agent cache. Capturing a per-request - session here would surface stale/closed sessions on cache hits. - - Args: - db_session: Reserved for registry compatibility. Per-call sessions - are opened via :data:`async_session_maker` inside the tool body. - search_space_id: Search space ID to find the Google Drive connector - user_id: User ID for fetching user-specific context - - Returns: - Configured delete_google_drive_file tool - """ - del db_session # per-call session — see docstring - - @tool - async def delete_google_drive_file( - file_name: str, - delete_from_kb: bool = False, - ) -> dict[str, Any]: - """Move a Google Drive file to trash. - - Use this tool when the user explicitly asks to delete, remove, or trash - a file in Google Drive. - - Args: - file_name: The exact name of the file to trash (as it appears in Drive). - delete_from_kb: Whether to also remove the file from the knowledge base. - Default is False. - Set to True to remove from both Google Drive and knowledge base. - - Returns: - Dictionary with: - - status: "success", "rejected", "not_found", or "error" - - file_id: Google Drive file ID (if success) - - deleted_from_kb: whether the document was removed from the knowledge base - - message: Result message - - IMPORTANT: - - If status is "rejected", the user explicitly declined. Respond with a brief - acknowledgment and do NOT retry or suggest alternatives. - - If status is "not_found", relay the exact message to the user and ask them - to verify the file name or check if it has been indexed. - - If status is "insufficient_permissions", the connector lacks the required OAuth scope. - Inform the user they need to re-authenticate and do NOT retry this tool. - Examples: - - "Delete the 'Meeting Notes' file from Google Drive" - - "Trash the 'Old Budget' spreadsheet" - """ - logger.info( - f"delete_google_drive_file called: file_name='{file_name}', delete_from_kb={delete_from_kb}" - ) - - if search_space_id is None or user_id is None: - return { - "status": "error", - "message": "Google Drive tool not properly configured. Please contact support.", - } - - try: - async with async_session_maker() as db_session: - metadata_service = GoogleDriveToolMetadataService(db_session) - context = await metadata_service.get_trash_context( - search_space_id, user_id, file_name - ) - - if "error" in context: - error_msg = context["error"] - if "not found" in error_msg.lower(): - logger.warning(f"File not found: {error_msg}") - return {"status": "not_found", "message": error_msg} - logger.error(f"Failed to fetch trash context: {error_msg}") - return {"status": "error", "message": error_msg} - - account = context.get("account", {}) - if account.get("auth_expired"): - logger.warning( - "Google Drive account %s has expired authentication", - account.get("id"), - ) - return { - "status": "auth_error", - "message": "The Google Drive account for this file needs re-authentication. Please re-authenticate in your connector settings.", - "connector_type": "google_drive", - } - - file = context["file"] - file_id = file["file_id"] - document_id = file.get("document_id") - connector_id_from_context = context["account"]["id"] - - if not file_id: - return { - "status": "error", - "message": "File ID is missing from the indexed document. Please re-index the file and try again.", - } - - logger.info( - f"Requesting approval for deleting Google Drive file: '{file_name}' (file_id={file_id}, delete_from_kb={delete_from_kb})" - ) - result = request_approval( - action_type="google_drive_file_trash", - tool_name="delete_google_drive_file", - params={ - "file_id": file_id, - "connector_id": connector_id_from_context, - "delete_from_kb": delete_from_kb, - }, - context=context, - ) - - if result.rejected: - return { - "status": "rejected", - "message": "User declined. The file was not trashed. Do not ask again or suggest alternatives.", - } - - final_file_id = result.params.get("file_id", file_id) - final_connector_id = result.params.get( - "connector_id", connector_id_from_context - ) - final_delete_from_kb = result.params.get( - "delete_from_kb", delete_from_kb - ) - - if not final_connector_id: - return { - "status": "error", - "message": "No connector found for this file.", - } - - from sqlalchemy.future import select - - from app.db import SearchSourceConnector, SearchSourceConnectorType - - _drive_types = [ - SearchSourceConnectorType.GOOGLE_DRIVE_CONNECTOR, - SearchSourceConnectorType.COMPOSIO_GOOGLE_DRIVE_CONNECTOR, - ] - - result = await db_session.execute( - select(SearchSourceConnector).filter( - SearchSourceConnector.id == final_connector_id, - SearchSourceConnector.search_space_id == search_space_id, - SearchSourceConnector.user_id == user_id, - SearchSourceConnector.connector_type.in_(_drive_types), - ) - ) - connector = result.scalars().first() - if not connector: - return { - "status": "error", - "message": "Selected Google Drive connector is invalid or has been disconnected.", - } - - logger.info( - f"Deleting Google Drive file: file_id='{final_file_id}', connector={final_connector_id}" - ) - - is_composio_drive = ( - connector.connector_type - == SearchSourceConnectorType.COMPOSIO_GOOGLE_DRIVE_CONNECTOR - ) - if is_composio_drive: - cca_id = connector.config.get("composio_connected_account_id") - if not cca_id: - return { - "status": "error", - "message": "Composio connected account ID not found for this Drive connector.", - } - - client = GoogleDriveClient( - session=db_session, - connector_id=connector.id, - ) - try: - if is_composio_drive: - from app.services.composio_service import ComposioService - - result = await ComposioService().execute_tool( - connected_account_id=cca_id, - tool_name="GOOGLEDRIVE_TRASH_FILE", - params={"file_id": final_file_id}, - entity_id=f"surfsense_{user_id}", - ) - if not result.get("success"): - raise RuntimeError( - result.get("error", "Unknown Composio Drive error") - ) - else: - await client.trash_file(file_id=final_file_id) - except HttpError as http_err: - if http_err.resp.status == 403: - logger.warning( - f"Insufficient permissions for connector {connector.id}: {http_err}" - ) - try: - from sqlalchemy.orm.attributes import flag_modified - - if not connector.config.get("auth_expired"): - connector.config = { - **connector.config, - "auth_expired": True, - } - flag_modified(connector, "config") - await db_session.commit() - except Exception: - logger.warning( - "Failed to persist auth_expired for connector %s", - connector.id, - exc_info=True, - ) - return { - "status": "insufficient_permissions", - "connector_id": connector.id, - "message": "This Google Drive account needs additional permissions. Please re-authenticate in connector settings.", - } - raise - - logger.info( - f"Google Drive file deleted (moved to trash): file_id={final_file_id}" - ) - - trash_result: dict[str, Any] = { - "status": "success", - "file_id": final_file_id, - "message": f"Successfully moved '{file['name']}' to trash.", - } - - deleted_from_kb = False - if final_delete_from_kb and document_id: - try: - from app.db import Document - - doc_result = await db_session.execute( - select(Document).filter(Document.id == document_id) - ) - document = doc_result.scalars().first() - if document: - await db_session.delete(document) - await db_session.commit() - deleted_from_kb = True - logger.info( - f"Deleted document {document_id} from knowledge base" - ) - else: - logger.warning(f"Document {document_id} not found in KB") - except Exception as e: - logger.error(f"Failed to delete document from KB: {e}") - await db_session.rollback() - trash_result["warning"] = ( - f"File moved to trash, but failed to remove from knowledge base: {e!s}" - ) - - trash_result["deleted_from_kb"] = deleted_from_kb - if deleted_from_kb: - trash_result["message"] = ( - f"{trash_result.get('message', '')} (also removed from knowledge base)" - ) - - return trash_result - - except Exception as e: - from langgraph.errors import GraphInterrupt - - if isinstance(e, GraphInterrupt): - raise - - logger.error(f"Error deleting Google Drive file: {e}", exc_info=True) - return { - "status": "error", - "message": "Something went wrong while trashing the file. Please try again.", - } - - return delete_google_drive_file