chore: ran linting

This commit is contained in:
Anish Sarkar 2026-01-23 05:28:18 +05:30
parent 42752bbeab
commit 8a0b8346a5
15 changed files with 583 additions and 288 deletions

View file

@ -82,14 +82,14 @@ def upgrade() -> None:
def downgrade() -> None:
"""Downgrade schema - remove Composio connector types from connector and document enums.
Note: PostgreSQL does not support removing enum values directly.
To properly downgrade, you would need to:
1. Delete any rows using the Composio connector type values
2. Create new enums without the Composio connector types
3. Alter the columns to use the new enums
4. Drop the old enums
This is left as a no-op since removing enum values is complex
and typically not needed in practice.
"""

View file

@ -12,7 +12,7 @@ from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.future import select
from app.db import SearchSourceConnector
from app.services.composio_service import ComposioService, INDEXABLE_TOOLKITS
from app.services.composio_service import INDEXABLE_TOOLKITS, ComposioService
logger = logging.getLogger(__name__)
@ -271,7 +271,9 @@ class ComposioConnector:
from_email = header_dict.get("from", "Unknown Sender")
to_email = header_dict.get("to", "Unknown Recipient")
# Composio provides messageTimestamp directly
date_str = message.get("messageTimestamp", "") or header_dict.get("date", "Unknown Date")
date_str = message.get("messageTimestamp", "") or header_dict.get(
"date", "Unknown Date"
)
# Build markdown content
markdown_content = f"# {subject}\n\n"

View file

@ -58,7 +58,9 @@ class GitHubConnector:
if self.token:
logger.info("GitHub connector initialized with authentication token.")
else:
logger.info("GitHub connector initialized without token (public repos only).")
logger.info(
"GitHub connector initialized without token (public repos only)."
)
def ingest_repository(
self,
@ -95,17 +97,27 @@ class GitHubConnector:
cmd = [
"gitingest",
repo_url,
"--output", output_path,
"--max-size", str(max_file_size),
"--output",
output_path,
"--max-size",
str(max_file_size),
# Common exclude patterns
"-e", "node_modules/*",
"-e", "vendor/*",
"-e", ".git/*",
"-e", "__pycache__/*",
"-e", "dist/*",
"-e", "build/*",
"-e", "*.lock",
"-e", "package-lock.json",
"-e",
"node_modules/*",
"-e",
"vendor/*",
"-e",
".git/*",
"-e",
"__pycache__/*",
"-e",
"dist/*",
"-e",
"build/*",
"-e",
"*.lock",
"-e",
"package-lock.json",
]
# Add branch if specified
@ -147,7 +159,9 @@ class GitHubConnector:
os.unlink(output_path)
if not full_content or not full_content.strip():
logger.warning(f"No content retrieved from repository: {repo_full_name}")
logger.warning(
f"No content retrieved from repository: {repo_full_name}"
)
return None
# Parse the gitingest output
@ -171,11 +185,11 @@ class GitHubConnector:
logger.error(f"gitingest timed out for repository: {repo_full_name}")
return None
except FileNotFoundError:
logger.error(
"gitingest CLI not found. Falling back to Python library."
)
logger.error("gitingest CLI not found. Falling back to Python library.")
# Fall back to Python library
return self._ingest_with_python_library(repo_full_name, branch, max_file_size)
return self._ingest_with_python_library(
repo_full_name, branch, max_file_size
)
except Exception as e:
logger.error(f"Failed to ingest repository {repo_full_name}: {e}")
return None

View file

@ -11,7 +11,6 @@ Endpoints:
- GET /connectors/{connector_id}/composio-drive/folders - List folders/files for Composio Google Drive
"""
import asyncio
import logging
from uuid import UUID
@ -89,7 +88,9 @@ async def list_composio_toolkits(user: User = Depends(current_active_user)):
@router.get("/auth/composio/connector/add")
async def initiate_composio_auth(
space_id: int,
toolkit_id: str = Query(..., description="Composio toolkit ID (e.g., 'googledrive', 'gmail')"),
toolkit_id: str = Query(
..., description="Composio toolkit ID (e.g., 'googledrive', 'gmail')"
),
user: User = Depends(current_active_user),
):
"""
@ -239,13 +240,15 @@ async def composio_callback(
# Initialize Composio service
service = ComposioService()
entity_id = f"surfsense_{user_id}"
# Use camelCase param if provided (Composio's format), fallback to snake_case
final_connected_account_id = connectedAccountId or connected_account_id
# DEBUG: Log all query parameters received
logger.info(f"DEBUG: Callback received - connectedAccountId: {connectedAccountId}, connected_account_id: {connected_account_id}, using: {final_connected_account_id}")
logger.info(
f"DEBUG: Callback received - connectedAccountId: {connectedAccountId}, connected_account_id: {connected_account_id}, using: {final_connected_account_id}"
)
# If we still don't have a connected_account_id, warn but continue
# (the connector will be created but indexing won't work until updated)
if not final_connected_account_id:
@ -254,7 +257,9 @@ async def composio_callback(
"The connector will be created but indexing may not work."
)
else:
logger.info(f"Successfully got connected_account_id: {final_connected_account_id}")
logger.info(
f"Successfully got connected_account_id: {final_connected_account_id}"
)
# Build connector config
connector_config = {
@ -287,10 +292,17 @@ async def composio_callback(
if existing_connector:
# Delete the old Composio connected account before updating
old_connected_account_id = existing_connector.config.get("composio_connected_account_id")
if old_connected_account_id and old_connected_account_id != final_connected_account_id:
old_connected_account_id = existing_connector.config.get(
"composio_connected_account_id"
)
if (
old_connected_account_id
and old_connected_account_id != final_connected_account_id
):
try:
deleted = await service.delete_connected_account(old_connected_account_id)
deleted = await service.delete_connected_account(
old_connected_account_id
)
if deleted:
logger.info(
f"Deleted old Composio connected account {old_connected_account_id} "
@ -422,7 +434,9 @@ async def list_composio_drive_folders(
)
# Get Composio connected account ID from config
composio_connected_account_id = connector.config.get("composio_connected_account_id")
composio_connected_account_id = connector.config.get(
"composio_connected_account_id"
)
if not composio_connected_account_id:
raise HTTPException(
status_code=400,
@ -451,27 +465,37 @@ async def list_composio_drive_folders(
items = []
for file_info in files:
file_id = file_info.get("id", "") or file_info.get("fileId", "")
file_name = file_info.get("name", "") or file_info.get("fileName", "") or "Untitled"
file_name = (
file_info.get("name", "") or file_info.get("fileName", "") or "Untitled"
)
mime_type = file_info.get("mimeType", "") or file_info.get("mime_type", "")
if not file_id:
continue
is_folder = mime_type == "application/vnd.google-apps.folder"
items.append({
"id": file_id,
"name": file_name,
"mimeType": mime_type,
"isFolder": is_folder,
"parents": file_info.get("parents", []),
"size": file_info.get("size"),
"iconLink": file_info.get("iconLink"),
})
items.append(
{
"id": file_id,
"name": file_name,
"mimeType": mime_type,
"isFolder": is_folder,
"parents": file_info.get("parents", []),
"size": file_info.get("size"),
"iconLink": file_info.get("iconLink"),
}
)
# Sort: folders first, then files, both alphabetically
folders = sorted([item for item in items if item["isFolder"]], key=lambda x: x["name"].lower())
files_list = sorted([item for item in items if not item["isFolder"]], key=lambda x: x["name"].lower())
folders = sorted(
[item for item in items if item["isFolder"]],
key=lambda x: x["name"].lower(),
)
files_list = sorted(
[item for item in items if not item["isFolder"]],
key=lambda x: x["name"].lower(),
)
items = folders + files_list
folder_count = len(folders)

View file

@ -37,7 +37,6 @@ from app.db import (
async_session_maker,
get_async_session,
)
from app.services.composio_service import ComposioService
from app.schemas import (
GoogleDriveIndexRequest,
MCPConnectorCreate,
@ -48,6 +47,7 @@ from app.schemas import (
SearchSourceConnectorRead,
SearchSourceConnectorUpdate,
)
from app.services.composio_service import ComposioService
from app.services.notification_service import NotificationService
from app.tasks.connector_indexers import (
index_airtable_records,
@ -537,11 +537,15 @@ async def delete_search_source_connector(
SearchSourceConnectorType.COMPOSIO_GOOGLE_CALENDAR_CONNECTOR,
]
if db_connector.connector_type in composio_connector_types:
composio_connected_account_id = db_connector.config.get("composio_connected_account_id")
composio_connected_account_id = db_connector.config.get(
"composio_connected_account_id"
)
if composio_connected_account_id and ComposioService.is_enabled():
try:
service = ComposioService()
deleted = await service.delete_connected_account(composio_connected_account_id)
deleted = await service.delete_connected_account(
composio_connected_account_id
)
if deleted:
logger.info(
f"Successfully deleted Composio connected account {composio_connected_account_id} "
@ -897,7 +901,10 @@ async def index_connector_content(
)
response_message = "Web page indexing started in the background."
elif connector.connector_type == SearchSourceConnectorType.COMPOSIO_GOOGLE_DRIVE_CONNECTOR:
elif (
connector.connector_type
== SearchSourceConnectorType.COMPOSIO_GOOGLE_DRIVE_CONNECTOR
):
from app.tasks.celery_tasks.connector_tasks import (
index_composio_connector_task,
)
@ -907,8 +914,12 @@ async def index_connector_content(
if drive_items and drive_items.has_items():
# Update connector config with the selected folders/files
config = connector.config or {}
config["selected_folders"] = [{"id": f.id, "name": f.name} for f in drive_items.folders]
config["selected_files"] = [{"id": f.id, "name": f.name} for f in drive_items.files]
config["selected_folders"] = [
{"id": f.id, "name": f.name} for f in drive_items.folders
]
config["selected_files"] = [
{"id": f.id, "name": f.name} for f in drive_items.files
]
if drive_items.indexing_options:
config["indexing_options"] = {
"max_files_per_folder": drive_items.indexing_options.max_files_per_folder,
@ -917,6 +928,7 @@ async def index_connector_content(
}
connector.config = config
from sqlalchemy.orm.attributes import flag_modified
flag_modified(connector, "config")
await session.commit()
await session.refresh(connector)
@ -934,7 +946,9 @@ async def index_connector_content(
index_composio_connector_task.delay(
connector_id, search_space_id, str(user.id), indexing_from, indexing_to
)
response_message = "Composio Google Drive indexing started in the background."
response_message = (
"Composio Google Drive indexing started in the background."
)
elif connector.connector_type in [
SearchSourceConnectorType.COMPOSIO_GMAIL_CONNECTOR,
@ -995,7 +1009,9 @@ async def _update_connector_timestamp_by_id(session: AsyncSession, connector_id:
connector = result.scalars().first()
if connector:
connector.last_indexed_at = datetime.now(UTC) # Use UTC for timezone consistency
connector.last_indexed_at = datetime.now(
UTC
) # Use UTC for timezone consistency
await session.commit()
logger.info(f"Updated last_indexed_at for connector {connector_id}")
except Exception as e:
@ -1150,7 +1166,9 @@ async def _run_indexing_with_notifications(
indexed_count=documents_processed,
error_message=error_or_warning, # Show errors even if some documents were indexed
)
await session.commit() # Commit to ensure Electric SQL syncs the notification update
await (
session.commit()
) # Commit to ensure Electric SQL syncs the notification update
elif documents_processed > 0:
# Update notification to storing stage
if notification:
@ -1174,7 +1192,9 @@ async def _run_indexing_with_notifications(
indexed_count=documents_processed,
error_message=error_or_warning, # Show errors even if some documents were indexed
)
await session.commit() # Commit to ensure Electric SQL syncs the notification update
await (
session.commit()
) # Commit to ensure Electric SQL syncs the notification update
else:
# No new documents processed - check if this is an error or just no changes
if error_or_warning:
@ -1189,7 +1209,9 @@ async def _run_indexing_with_notifications(
indexed_count=0,
error_message=error_or_warning,
)
await session.commit() # Commit to ensure Electric SQL syncs the notification update
await (
session.commit()
) # Commit to ensure Electric SQL syncs the notification update
else:
# Success - just no new documents to index (all skipped/unchanged)
logger.info(
@ -1208,7 +1230,9 @@ async def _run_indexing_with_notifications(
indexed_count=0,
error_message=None, # No error - sync succeeded
)
await session.commit() # Commit to ensure Electric SQL syncs the notification update
await (
session.commit()
) # Commit to ensure Electric SQL syncs the notification update
except Exception as e:
logger.error(f"Error in indexing task: {e!s}", exc_info=True)

View file

@ -111,7 +111,7 @@ class ComposioService:
config_toolkit = getattr(auth_config, "toolkit", None)
if config_toolkit is None:
continue
# Extract toolkit name/slug from the object
toolkit_name = None
if isinstance(config_toolkit, str):
@ -122,18 +122,22 @@ class ComposioService:
toolkit_name = config_toolkit.name
elif hasattr(config_toolkit, "id"):
toolkit_name = config_toolkit.id
# Compare case-insensitively
if toolkit_name and toolkit_name.lower() == toolkit_id.lower():
logger.info(f"Found auth config {auth_config.id} for toolkit {toolkit_id}")
logger.info(
f"Found auth config {auth_config.id} for toolkit {toolkit_id}"
)
return auth_config.id
# Log available auth configs for debugging
logger.warning(f"No auth config found for toolkit '{toolkit_id}'. Available auth configs:")
logger.warning(
f"No auth config found for toolkit '{toolkit_id}'. Available auth configs:"
)
for auth_config in auth_configs.items:
config_toolkit = getattr(auth_config, "toolkit", None)
logger.warning(f" - {auth_config.id}: toolkit={config_toolkit}")
return None
except Exception as e:
logger.error(f"Failed to list auth configs: {e!s}")
@ -162,7 +166,7 @@ class ComposioService:
try:
# First, get the auth_config_id for this toolkit
auth_config_id = self._get_auth_config_for_toolkit(toolkit_id)
if not auth_config_id:
raise ValueError(
f"No auth config found for toolkit '{toolkit_id}'. "
@ -214,7 +218,9 @@ class ComposioService:
"user_id": getattr(account, "user_id", None),
}
except Exception as e:
logger.error(f"Failed to get connected account {connected_account_id}: {e!s}")
logger.error(
f"Failed to get connected account {connected_account_id}: {e!s}"
)
return None
async def list_all_connections(self) -> list[dict[str, Any]]:
@ -226,15 +232,17 @@ class ComposioService:
"""
try:
accounts_response = self.client.connected_accounts.list()
if hasattr(accounts_response, "items"):
accounts = accounts_response.items
elif hasattr(accounts_response, "__iter__"):
accounts = accounts_response
else:
logger.warning(f"Unexpected accounts response type: {type(accounts_response)}")
logger.warning(
f"Unexpected accounts response type: {type(accounts_response)}"
)
return []
result = []
for acc in accounts:
toolkit_raw = getattr(acc, "toolkit", None)
@ -248,14 +256,16 @@ class ComposioService:
toolkit_info = toolkit_raw.name
else:
toolkit_info = str(toolkit_raw)
result.append({
"id": acc.id,
"status": getattr(acc, "status", None),
"toolkit": toolkit_info,
"user_id": getattr(acc, "user_id", None),
})
result.append(
{
"id": acc.id,
"status": getattr(acc, "status", None),
"toolkit": toolkit_info,
"user_id": getattr(acc, "user_id", None),
}
)
return result
except Exception as e:
logger.error(f"Failed to list all connections: {e!s}")
@ -273,16 +283,18 @@ class ComposioService:
"""
try:
accounts_response = self.client.connected_accounts.list(user_id=user_id)
# Handle paginated response (may have .items attribute) or direct list
if hasattr(accounts_response, "items"):
accounts = accounts_response.items
elif hasattr(accounts_response, "__iter__"):
accounts = accounts_response
else:
logger.warning(f"Unexpected accounts response type: {type(accounts_response)}")
logger.warning(
f"Unexpected accounts response type: {type(accounts_response)}"
)
return []
result = []
for acc in accounts:
# Extract toolkit info - might be string or object
@ -297,13 +309,15 @@ class ComposioService:
toolkit_info = toolkit_raw.name
else:
toolkit_info = toolkit_raw
result.append({
"id": acc.id,
"status": getattr(acc, "status", None),
"toolkit": toolkit_info,
})
result.append(
{
"id": acc.id,
"status": getattr(acc, "status", None),
"toolkit": toolkit_info,
}
)
logger.info(f"Found {len(result)} connections for user {user_id}: {result}")
return result
except Exception as e:
@ -324,10 +338,14 @@ class ComposioService:
"""
try:
self.client.connected_accounts.delete(connected_account_id)
logger.info(f"Successfully deleted Composio connected account: {connected_account_id}")
logger.info(
f"Successfully deleted Composio connected account: {connected_account_id}"
)
return True
except Exception as e:
logger.error(f"Failed to delete Composio connected account {connected_account_id}: {e!s}")
logger.error(
f"Failed to delete Composio connected account {connected_account_id}: {e!s}"
)
return False
async def execute_tool(
@ -398,10 +416,14 @@ class ComposioService:
}
if folder_id:
# List contents of a specific folder (exclude shortcuts - we don't have access to them)
params["q"] = f"'{folder_id}' in parents and trashed = false and mimeType != 'application/vnd.google-apps.shortcut'"
params["q"] = (
f"'{folder_id}' in parents and trashed = false and mimeType != 'application/vnd.google-apps.shortcut'"
)
else:
# List root-level items only (My Drive root), exclude shortcuts
params["q"] = "'root' in parents and trashed = false and mimeType != 'application/vnd.google-apps.shortcut'"
params["q"] = (
"'root' in parents and trashed = false and mimeType != 'application/vnd.google-apps.shortcut'"
)
if page_token:
params["page_token"] = page_token
@ -416,17 +438,21 @@ class ComposioService:
return [], None, result.get("error", "Unknown error")
data = result.get("data", {})
# Handle nested response structure from Composio
files = []
next_token = None
if isinstance(data, dict):
# Try direct access first, then nested
files = data.get("files", []) or data.get("data", {}).get("files", [])
next_token = data.get("nextPageToken") or data.get("next_page_token") or data.get("data", {}).get("nextPageToken")
next_token = (
data.get("nextPageToken")
or data.get("next_page_token")
or data.get("data", {}).get("nextPageToken")
)
elif isinstance(data, list):
files = data
return files, next_token, None
except Exception as e:
@ -459,13 +485,13 @@ class ComposioService:
return None, result.get("error", "Unknown error")
data = result.get("data")
# Composio GOOGLEDRIVE_DOWNLOAD_FILE returns a dict with file info
# The actual content is in "downloaded_file_content" field
if isinstance(data, dict):
# Try known Composio response fields in order of preference
content = None
# Primary field from GOOGLEDRIVE_DOWNLOAD_FILE
if "downloaded_file_content" in data:
content = data["downloaded_file_content"]
@ -474,19 +500,24 @@ class ComposioService:
# Try to extract actual content from nested dict
# Note: Composio nests downloaded_file_content inside another downloaded_file_content
actual_content = (
content.get("downloaded_file_content") or
content.get("content") or
content.get("data") or
content.get("file_content") or
content.get("body") or
content.get("text")
content.get("downloaded_file_content")
or content.get("content")
or content.get("data")
or content.get("file_content")
or content.get("body")
or content.get("text")
)
if actual_content is not None:
content = actual_content
else:
# Log structure for debugging
logger.warning(f"downloaded_file_content is dict with keys: {list(content.keys())}")
return None, f"Cannot extract content from downloaded_file_content. Keys: {list(content.keys())}"
logger.warning(
f"downloaded_file_content is dict with keys: {list(content.keys())}"
)
return (
None,
f"Cannot extract content from downloaded_file_content. Keys: {list(content.keys())}",
)
# Fallback fields for compatibility
elif "content" in data:
content = data["content"]
@ -494,16 +525,20 @@ class ComposioService:
content = data["file_content"]
elif "data" in data:
content = data["data"]
if content is None:
# Log available keys for debugging
logger.warning(f"Composio response dict keys: {list(data.keys())}")
return None, f"No file content found in Composio response. Available keys: {list(data.keys())}"
return (
None,
f"No file content found in Composio response. Available keys: {list(data.keys())}",
)
# Convert content to bytes
if isinstance(content, str):
# Check if it's base64 encoded
import base64
try:
# Try to decode as base64 first
content = base64.b64decode(content)
@ -514,11 +549,19 @@ class ComposioService:
pass # Already bytes
elif isinstance(content, dict):
# Still a dict after all extraction attempts - log structure
logger.warning(f"Content still dict after extraction: {list(content.keys())}")
return None, f"Unexpected nested content structure: {list(content.keys())}"
logger.warning(
f"Content still dict after extraction: {list(content.keys())}"
)
return (
None,
f"Unexpected nested content structure: {list(content.keys())}",
)
else:
return None, f"Unexpected content type in Composio response: {type(content).__name__}"
return (
None,
f"Unexpected content type in Composio response: {type(content).__name__}",
)
return content, None
elif isinstance(data, str):
return data.encode("utf-8"), None
@ -527,7 +570,10 @@ class ComposioService:
elif data is None:
return None, "No data returned from Composio"
else:
return None, f"Unexpected data type from Composio: {type(data).__name__}"
return (
None,
f"Unexpected data type from Composio: {type(data).__name__}",
)
except Exception as e:
logger.error(f"Failed to get Drive file content: {e!s}")
@ -576,17 +622,21 @@ class ComposioService:
return [], None, result.get("error", "Unknown error")
data = result.get("data", {})
# Try different possible response structures
messages = []
next_token = None
result_size_estimate = None
if isinstance(data, dict):
messages = data.get("messages", []) or data.get("data", {}).get("messages", []) or data.get("emails", [])
messages = (
data.get("messages", [])
or data.get("data", {}).get("messages", [])
or data.get("emails", [])
)
# Check for pagination token in various possible locations
next_token = (
data.get("nextPageToken")
or data.get("next_page_token")
data.get("nextPageToken")
or data.get("next_page_token")
or data.get("data", {}).get("nextPageToken")
or data.get("data", {}).get("next_page_token")
)
@ -599,7 +649,7 @@ class ComposioService:
)
elif isinstance(data, list):
messages = data
return messages, next_token, result_size_estimate, None
except Exception as e:
@ -683,14 +733,18 @@ class ComposioService:
return [], result.get("error", "Unknown error")
data = result.get("data", {})
# Try different possible response structures
events = []
if isinstance(data, dict):
events = data.get("items", []) or data.get("data", {}).get("items", []) or data.get("events", [])
events = (
data.get("items", [])
or data.get("data", {}).get("items", [])
or data.get("events", [])
)
elif isinstance(data, list):
events = data
return events, None
except Exception as e:

View file

@ -64,10 +64,14 @@ async def check_document_by_unique_identifier(
async def get_connector_by_id(
session: AsyncSession, connector_id: int, connector_type: SearchSourceConnectorType | None
session: AsyncSession,
connector_id: int,
connector_type: SearchSourceConnectorType | None,
) -> SearchSourceConnector | None:
"""Get a connector by ID and optionally by type from the database."""
query = select(SearchSourceConnector).filter(SearchSourceConnector.id == connector_id)
query = select(SearchSourceConnector).filter(
SearchSourceConnector.id == connector_id
)
if connector_type is not None:
query = query.filter(SearchSourceConnector.connector_type == connector_type)
result = await session.execute(query)
@ -81,40 +85,90 @@ async def update_connector_last_indexed(
) -> None:
"""Update the last_indexed_at timestamp for a connector."""
if update_last_indexed:
connector.last_indexed_at = datetime.now(UTC) # Use UTC for timezone consistency
connector.last_indexed_at = datetime.now(
UTC
) # Use UTC for timezone consistency
logger.info(f"Updated last_indexed_at to {connector.last_indexed_at}")
# Binary file extensions that need file processor
BINARY_FILE_EXTENSIONS = {
".pdf", ".doc", ".docx", ".xls", ".xlsx", ".ppt", ".pptx",
".png", ".jpg", ".jpeg", ".gif", ".bmp", ".tiff", ".webp",
".zip", ".tar", ".gz", ".rar", ".7z",
".mp3", ".mp4", ".wav", ".avi", ".mov",
".exe", ".dll", ".so", ".bin",
".pdf",
".doc",
".docx",
".xls",
".xlsx",
".ppt",
".pptx",
".png",
".jpg",
".jpeg",
".gif",
".bmp",
".tiff",
".webp",
".zip",
".tar",
".gz",
".rar",
".7z",
".mp3",
".mp4",
".wav",
".avi",
".mov",
".exe",
".dll",
".so",
".bin",
}
# Text file extensions that can be decoded as UTF-8
TEXT_FILE_EXTENSIONS = {
".txt", ".md", ".markdown", ".json", ".xml", ".html", ".htm",
".css", ".js", ".ts", ".py", ".java", ".c", ".cpp", ".h",
".yaml", ".yml", ".toml", ".ini", ".cfg", ".conf",
".sh", ".bash", ".zsh", ".fish",
".sql", ".csv", ".tsv",
".rst", ".tex", ".log",
".txt",
".md",
".markdown",
".json",
".xml",
".html",
".htm",
".css",
".js",
".ts",
".py",
".java",
".c",
".cpp",
".h",
".yaml",
".yml",
".toml",
".ini",
".cfg",
".conf",
".sh",
".bash",
".zsh",
".fish",
".sql",
".csv",
".tsv",
".rst",
".tex",
".log",
}
def _is_binary_file(file_name: str, mime_type: str) -> bool:
"""Check if a file is binary based on extension or mime type."""
extension = Path(file_name).suffix.lower()
# Check extension first
if extension in BINARY_FILE_EXTENSIONS:
return True
if extension in TEXT_FILE_EXTENSIONS:
return False
# Check mime type
if mime_type:
if mime_type.startswith(("image/", "audio/", "video/", "application/pdf")):
@ -122,9 +176,13 @@ def _is_binary_file(file_name: str, mime_type: str) -> bool:
if mime_type.startswith(("text/", "application/json", "application/xml")):
return False
# Office documents
if "spreadsheet" in mime_type or "document" in mime_type or "presentation" in mime_type:
if (
"spreadsheet" in mime_type
or "document" in mime_type
or "presentation" in mime_type
):
return True
# Default to text for unknown types
return False
@ -143,10 +201,10 @@ async def _process_file_content(
) -> str:
"""
Process file content and return markdown text.
For binary files (PDFs, images, etc.), uses Surfsense's ETL service.
For text files, decodes as UTF-8.
Args:
content: File content as bytes or string
file_name: Name of the file
@ -158,14 +216,14 @@ async def _process_file_content(
task_logger: Task logging service
log_entry: Log entry for tracking
processing_errors: List to append errors to
Returns:
Markdown content string
"""
# Ensure content is bytes
if isinstance(content, str):
content = content.encode("utf-8")
# Check if this is a binary file
if _is_binary_file(file_name, mime_type):
# Use ETL service for binary files (PDF, Office docs, etc.)
@ -173,24 +231,26 @@ async def _process_file_content(
try:
# Get file extension
extension = Path(file_name).suffix or ".bin"
# Write to temp file
with tempfile.NamedTemporaryFile(delete=False, suffix=extension) as tmp_file:
with tempfile.NamedTemporaryFile(
delete=False, suffix=extension
) as tmp_file:
tmp_file.write(content)
temp_file_path = tmp_file.name
# Use the configured ETL service to extract text
extracted_text = await _extract_text_with_etl(
temp_file_path, file_name, task_logger, log_entry
)
if extracted_text:
return extracted_text
else:
# Fallback if extraction fails
logger.warning(f"Could not extract text from binary file {file_name}")
return f"# {file_name}\n\n[Binary file - text extraction failed]\n\n**File ID:** {file_id}\n**Type:** {mime_type}\n"
except Exception as e:
error_msg = f"Error processing binary file {file_name}: {e!s}"
logger.error(error_msg)
@ -214,7 +274,7 @@ async def _process_file_content(
return content.decode(encoding)
except UnicodeDecodeError:
continue
# If all encodings fail, treat as binary
error_msg = f"Could not decode text file {file_name} with any encoding"
logger.warning(error_msg)
@ -230,27 +290,27 @@ async def _extract_text_with_etl(
) -> str | None:
"""
Extract text from a file using the configured ETL service.
Args:
file_path: Path to the file
file_name: Name of the file
task_logger: Task logging service
log_entry: Log entry for tracking
Returns:
Extracted text as markdown, or None if extraction fails
"""
import warnings
from logging import ERROR, getLogger
etl_service = config.ETL_SERVICE
try:
if etl_service == "UNSTRUCTURED":
from langchain_unstructured import UnstructuredLoader
from app.utils.document_converters import convert_document_to_markdown
loader = UnstructuredLoader(
file_path,
mode="elements",
@ -260,57 +320,67 @@ async def _extract_text_with_etl(
include_metadata=False,
strategy="auto",
)
docs = await loader.aload()
if docs:
return await convert_document_to_markdown(docs)
return None
elif etl_service == "LLAMACLOUD":
from app.tasks.document_processors.file_processors import parse_with_llamacloud_retry
from app.tasks.document_processors.file_processors import (
parse_with_llamacloud_retry,
)
# Estimate pages (rough estimate based on file size)
file_size = os.path.getsize(file_path)
estimated_pages = max(1, file_size // (80 * 1024))
result = await parse_with_llamacloud_retry(
file_path=file_path,
estimated_pages=estimated_pages,
task_logger=task_logger,
log_entry=log_entry,
)
markdown_documents = await result.aget_markdown_documents(split_by_page=False)
markdown_documents = await result.aget_markdown_documents(
split_by_page=False
)
if markdown_documents:
return markdown_documents[0].text
return None
elif etl_service == "DOCLING":
from app.services.docling_service import create_docling_service
docling_service = create_docling_service()
# Suppress pdfminer warnings
pdfminer_logger = getLogger("pdfminer")
original_level = pdfminer_logger.level
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=UserWarning, module="pdfminer")
warnings.filterwarnings("ignore", message=".*Cannot set gray non-stroke color.*")
warnings.filterwarnings(
"ignore", category=UserWarning, module="pdfminer"
)
warnings.filterwarnings(
"ignore", message=".*Cannot set gray non-stroke color.*"
)
warnings.filterwarnings("ignore", message=".*invalid float value.*")
pdfminer_logger.setLevel(ERROR)
try:
result = await docling_service.process_document(file_path, file_name)
result = await docling_service.process_document(
file_path, file_name
)
finally:
pdfminer_logger.setLevel(original_level)
return result.get("content")
else:
logger.warning(f"Unknown ETL service: {etl_service}")
return None
except Exception as e:
logger.error(f"ETL extraction failed for {file_name}: {e!s}")
return None
@ -367,9 +437,11 @@ async def index_composio_connector(
# Get connector by id - accept any Composio connector type
# We'll check the actual type after loading
connector = await get_connector_by_id(
session, connector_id, None # Don't filter by type, we'll validate after
session,
connector_id,
None, # Don't filter by type, we'll validate after
)
# Validate it's a Composio connector
if connector and connector.connector_type not in [
SearchSourceConnectorType.COMPOSIO_GOOGLE_DRIVE_CONNECTOR,
@ -392,7 +464,9 @@ async def index_composio_connector(
# Get toolkit ID from config
toolkit_id = connector.config.get("toolkit_id")
if not toolkit_id:
error_msg = f"Composio connector {connector_id} has no toolkit_id configured"
error_msg = (
f"Composio connector {connector_id} has no toolkit_id configured"
)
await task_logger.log_task_failure(
log_entry, error_msg, {"error_type": "MissingToolkitId"}
)
@ -488,7 +562,7 @@ async def _index_composio_google_drive(
max_items: int = 1000,
) -> tuple[int, str]:
"""Index Google Drive files via Composio.
Supports folder/file selection via connector config:
- selected_folders: List of {id, name} for folders to index
- selected_files: List of {id, name} for individual files to index
@ -502,14 +576,18 @@ async def _index_composio_google_drive(
selected_folders = connector_config.get("selected_folders", [])
selected_files = connector_config.get("selected_files", [])
indexing_options = connector_config.get("indexing_options", {})
max_files_per_folder = indexing_options.get("max_files_per_folder", 100)
include_subfolders = indexing_options.get("include_subfolders", True)
await task_logger.log_task_progress(
log_entry,
f"Fetching Google Drive files via Composio for connector {connector_id}",
{"stage": "fetching_files", "selected_folders": len(selected_folders), "selected_files": len(selected_files)},
{
"stage": "fetching_files",
"selected_folders": len(selected_folders),
"selected_files": len(selected_files),
},
)
all_files = []
@ -520,34 +598,42 @@ async def _index_composio_google_drive(
for folder in selected_folders:
folder_id = folder.get("id")
folder_name = folder.get("name", "Unknown")
if not folder_id:
continue
# Handle special case for "root" folder
actual_folder_id = None if folder_id == "root" else folder_id
logger.info(f"Fetching files from folder: {folder_name} ({folder_id})")
# Fetch files from this folder
folder_files = []
page_token = None
while len(folder_files) < max_files_per_folder:
files, next_token, error = await composio_connector.list_drive_files(
(
files,
next_token,
error,
) = await composio_connector.list_drive_files(
folder_id=actual_folder_id,
page_token=page_token,
page_size=min(100, max_files_per_folder - len(folder_files)),
)
if error:
logger.warning(f"Failed to fetch files from folder {folder_name}: {error}")
logger.warning(
f"Failed to fetch files from folder {folder_name}: {error}"
)
break
# Process files
for file_info in files:
mime_type = file_info.get("mimeType", "") or file_info.get("mime_type", "")
mime_type = file_info.get("mimeType", "") or file_info.get(
"mime_type", ""
)
# If it's a folder and include_subfolders is enabled, recursively fetch
if mime_type == "application/vnd.google-apps.folder":
if include_subfolders:
@ -565,7 +651,7 @@ async def _index_composio_google_drive(
if not next_token:
break
page_token = next_token
all_files.extend(folder_files[:max_files_per_folder])
logger.info(f"Found {len(folder_files)} files in folder {folder_name}")
@ -573,16 +659,18 @@ async def _index_composio_google_drive(
for selected_file in selected_files:
file_id = selected_file.get("id")
file_name = selected_file.get("name", "Unknown")
if not file_id:
continue
# Add file info (we'll fetch content later during indexing)
all_files.append({
"id": file_id,
"name": file_name,
"mimeType": "", # Will be determined later
})
all_files.append(
{
"id": file_id,
"name": file_name,
"mimeType": "", # Will be determined later
}
)
else:
# No selection specified - fetch all files (original behavior)
page_token = None
@ -613,7 +701,10 @@ async def _index_composio_google_drive(
# CRITICAL: Update timestamp even when no files found so Electric SQL syncs and UI shows indexed status
await update_connector_last_indexed(session, connector, update_last_indexed)
await session.commit()
return 0, None # Return None (not error) when no items found - this is success with 0 items
return (
0,
None,
) # Return None (not error) when no items found - this is success with 0 items
logger.info(f"Found {len(all_files)} Google Drive files to index via Composio")
@ -625,8 +716,14 @@ async def _index_composio_google_drive(
try:
# Handle both standard Google API and potential Composio variations
file_id = file_info.get("id", "") or file_info.get("fileId", "")
file_name = file_info.get("name", "") or file_info.get("fileName", "") or "Untitled"
mime_type = file_info.get("mimeType", "") or file_info.get("mime_type", "")
file_name = (
file_info.get("name", "")
or file_info.get("fileName", "")
or "Untitled"
)
mime_type = file_info.get("mimeType", "") or file_info.get(
"mime_type", ""
)
if not file_id:
documents_skipped += 1
@ -648,12 +745,15 @@ async def _index_composio_google_drive(
)
# Get file content
content, content_error = await composio_connector.get_drive_file_content(
file_id
)
(
content,
content_error,
) = await composio_connector.get_drive_file_content(file_id)
if content_error or not content:
logger.warning(f"Could not get content for file {file_name}: {content_error}")
logger.warning(
f"Could not get content for file {file_name}: {content_error}"
)
# Use metadata as content fallback
markdown_content = f"# {file_name}\n\n"
markdown_content += f"**File ID:** {file_id}\n"
@ -700,12 +800,19 @@ async def _index_composio_google_drive(
"mime_type": mime_type,
"document_type": "Google Drive File (Composio)",
}
summary_content, summary_embedding = await generate_document_summary(
(
summary_content,
summary_embedding,
) = await generate_document_summary(
markdown_content, user_llm, document_metadata
)
else:
summary_content = f"Google Drive File: {file_name}\n\nType: {mime_type}"
summary_embedding = config.embedding_model_instance.embed(summary_content)
summary_content = (
f"Google Drive File: {file_name}\n\nType: {mime_type}"
)
summary_embedding = config.embedding_model_instance.embed(
summary_content
)
chunks = await create_document_chunks(markdown_content)
@ -724,8 +831,8 @@ async def _index_composio_google_drive(
existing_document.updated_at = get_current_timestamp()
documents_indexed += 1
# Batch commit every 10 documents
# Batch commit every 10 documents
if documents_indexed % 10 == 0:
logger.info(
f"Committing batch: {documents_indexed} Google Drive files processed so far"
@ -745,12 +852,19 @@ async def _index_composio_google_drive(
"mime_type": mime_type,
"document_type": "Google Drive File (Composio)",
}
summary_content, summary_embedding = await generate_document_summary(
(
summary_content,
summary_embedding,
) = await generate_document_summary(
markdown_content, user_llm, document_metadata
)
else:
summary_content = f"Google Drive File: {file_name}\n\nType: {mime_type}"
summary_embedding = config.embedding_model_instance.embed(summary_content)
summary_content = (
f"Google Drive File: {file_name}\n\nType: {mime_type}"
)
summary_embedding = config.embedding_model_instance.embed(
summary_content
)
chunks = await create_document_chunks(markdown_content)
@ -776,7 +890,7 @@ async def _index_composio_google_drive(
session.add(document)
documents_indexed += 1
# Batch commit every 10 documents
# Batch commit every 10 documents
if documents_indexed % 10 == 0:
logger.info(
f"Committing batch: {documents_indexed} Google Drive files processed so far"
@ -784,7 +898,9 @@ async def _index_composio_google_drive(
await session.commit()
except Exception as e:
error_msg = f"Error processing Drive file {file_name or 'unknown'}: {e!s}"
error_msg = (
f"Error processing Drive file {file_name or 'unknown'}: {e!s}"
)
logger.error(error_msg, exc_info=True)
processing_errors.append(error_msg)
documents_skipped += 1
@ -848,7 +964,7 @@ async def _fetch_folder_files_recursively(
) -> list[dict[str, Any]]:
"""
Recursively fetch files from a Google Drive folder via Composio.
Args:
composio_connector: The Composio connector instance
folder_id: Google Drive folder ID
@ -856,20 +972,20 @@ async def _fetch_folder_files_recursively(
current_count: Current number of files already fetched
depth: Current recursion depth
max_depth: Maximum recursion depth to prevent infinite loops
Returns:
List of file info dictionaries
"""
if depth >= max_depth:
logger.warning(f"Max recursion depth reached for folder {folder_id}")
return []
if current_count >= max_files:
return []
all_files = []
page_token = None
try:
while len(all_files) + current_count < max_files:
files, next_token, error = await composio_connector.list_drive_files(
@ -877,14 +993,18 @@ async def _fetch_folder_files_recursively(
page_token=page_token,
page_size=min(100, max_files - len(all_files) - current_count),
)
if error:
logger.warning(f"Error fetching files from subfolder {folder_id}: {error}")
logger.warning(
f"Error fetching files from subfolder {folder_id}: {error}"
)
break
for file_info in files:
mime_type = file_info.get("mimeType", "") or file_info.get("mime_type", "")
mime_type = file_info.get("mimeType", "") or file_info.get(
"mime_type", ""
)
if mime_type == "application/vnd.google-apps.folder":
# Recursively fetch from subfolders
subfolder_files = await _fetch_folder_files_recursively(
@ -898,16 +1018,16 @@ async def _fetch_folder_files_recursively(
all_files.extend(subfolder_files)
else:
all_files.append(file_info)
if len(all_files) + current_count >= max_files:
break
if not next_token:
break
page_token = next_token
return all_files[:max_files - current_count]
return all_files[: max_files - current_count]
except Exception as e:
logger.error(f"Error in recursive folder fetch: {e!s}")
return all_files
@ -924,10 +1044,10 @@ async def _process_gmail_message_batch(
) -> tuple[int, int]:
"""
Process a batch of Gmail messages and index them.
Args:
total_documents_indexed: Running total of documents indexed so far (for batch commits).
Returns:
Tuple of (documents_indexed, documents_skipped)
"""
@ -965,7 +1085,9 @@ async def _process_gmail_message_batch(
date_str = value
# Format to markdown using the full message data
markdown_content = composio_connector.format_gmail_message_to_markdown(message)
markdown_content = composio_connector.format_gmail_message_to_markdown(
message
)
# Check for empty content (defensive parsing per Composio best practices)
if not markdown_content.strip():
@ -1008,12 +1130,19 @@ async def _process_gmail_message_batch(
"sender": sender,
"document_type": "Gmail Message (Composio)",
}
summary_content, summary_embedding = await generate_document_summary(
(
summary_content,
summary_embedding,
) = await generate_document_summary(
markdown_content, user_llm, document_metadata
)
else:
summary_content = f"Gmail: {subject}\n\nFrom: {sender}\nDate: {date_str}"
summary_embedding = config.embedding_model_instance.embed(summary_content)
summary_content = (
f"Gmail: {subject}\n\nFrom: {sender}\nDate: {date_str}"
)
summary_embedding = config.embedding_model_instance.embed(
summary_content
)
chunks = await create_document_chunks(markdown_content)
@ -1035,8 +1164,8 @@ async def _process_gmail_message_batch(
existing_document.updated_at = get_current_timestamp()
documents_indexed += 1
# Batch commit every 10 documents
# Batch commit every 10 documents
current_total = total_documents_indexed + documents_indexed
if current_total % 10 == 0:
logger.info(
@ -1062,8 +1191,12 @@ async def _process_gmail_message_batch(
markdown_content, user_llm, document_metadata
)
else:
summary_content = f"Gmail: {subject}\n\nFrom: {sender}\nDate: {date_str}"
summary_embedding = config.embedding_model_instance.embed(summary_content)
summary_content = (
f"Gmail: {subject}\n\nFrom: {sender}\nDate: {date_str}"
)
summary_embedding = config.embedding_model_instance.embed(
summary_content
)
chunks = await create_document_chunks(markdown_content)
@ -1092,7 +1225,7 @@ async def _process_gmail_message_batch(
session.add(document)
documents_indexed += 1
# Batch commit every 10 documents
# Batch commit every 10 documents
current_total = total_documents_indexed + documents_indexed
if current_total % 10 == 0:
logger.info(
@ -1107,7 +1240,9 @@ async def _process_gmail_message_batch(
try:
await session.rollback()
except Exception as rollback_error:
logger.error(f"Error during rollback: {rollback_error!s}", exc_info=True)
logger.error(
f"Error during rollback: {rollback_error!s}", exc_info=True
)
continue
return documents_indexed, documents_skipped
@ -1169,7 +1304,9 @@ async def _index_composio_gmail(
current_batch_size = min(batch_size, remaining)
# Use result_size_estimate if available, otherwise fall back to max_items
estimated_total = result_size_estimate if result_size_estimate is not None else max_items
estimated_total = (
result_size_estimate if result_size_estimate is not None else max_items
)
# Cap estimated_total at max_items to avoid showing misleading progress
estimated_total = min(estimated_total, max_items)
@ -1187,7 +1324,12 @@ async def _index_composio_gmail(
)
# Fetch batch of messages
messages, next_token, result_size_estimate_batch, error = await composio_connector.list_gmail_messages(
(
messages,
next_token,
result_size_estimate_batch,
error,
) = await composio_connector.list_gmail_messages(
query=query,
max_results=current_batch_size,
page_token=page_token,
@ -1206,13 +1348,17 @@ async def _index_composio_gmail(
# Update result_size_estimate from first response (Gmail provides this estimate)
if result_size_estimate is None and result_size_estimate_batch is not None:
result_size_estimate = result_size_estimate_batch
logger.info(f"Gmail API estimated {result_size_estimate} total messages for query: '{query}'")
logger.info(
f"Gmail API estimated {result_size_estimate} total messages for query: '{query}'"
)
total_messages_fetched += len(messages)
# Recalculate estimated_total after potentially updating result_size_estimate
estimated_total = result_size_estimate if result_size_estimate is not None else max_items
estimated_total = (
result_size_estimate if result_size_estimate is not None else max_items
)
estimated_total = min(estimated_total, max_items)
logger.info(
f"Fetched batch of {len(messages)} Gmail messages "
f"(total: {total_messages_fetched}/{estimated_total})"
@ -1357,7 +1503,10 @@ async def _index_composio_google_calendar(
# CRITICAL: Update timestamp even when no events found so Electric SQL syncs and UI shows indexed status
await update_connector_last_indexed(session, connector, update_last_indexed)
await session.commit()
return 0, None # Return None (not error) when no items found - this is success with 0 items
return (
0,
None,
) # Return None (not error) when no items found - this is success with 0 items
logger.info(f"Found {len(events)} Google Calendar events to index via Composio")
@ -1368,14 +1517,18 @@ async def _index_composio_google_calendar(
try:
# Handle both standard Google API and potential Composio variations
event_id = event.get("id", "") or event.get("eventId", "")
summary = event.get("summary", "") or event.get("title", "") or "No Title"
summary = (
event.get("summary", "") or event.get("title", "") or "No Title"
)
if not event_id:
documents_skipped += 1
continue
# Format to markdown
markdown_content = composio_connector.format_calendar_event_to_markdown(event)
markdown_content = composio_connector.format_calendar_event_to_markdown(
event
)
# Generate unique identifier
document_type = DocumentType(TOOLKIT_TO_DOCUMENT_TYPE["googlecalendar"])
@ -1413,14 +1566,19 @@ async def _index_composio_google_calendar(
"start_time": start_time,
"document_type": "Google Calendar Event (Composio)",
}
summary_content, summary_embedding = await generate_document_summary(
(
summary_content,
summary_embedding,
) = await generate_document_summary(
markdown_content, user_llm, document_metadata
)
else:
summary_content = f"Calendar: {summary}\n\nStart: {start_time}\nEnd: {end_time}"
if location:
summary_content += f"\nLocation: {location}"
summary_embedding = config.embedding_model_instance.embed(summary_content)
summary_embedding = config.embedding_model_instance.embed(
summary_content
)
chunks = await create_document_chunks(markdown_content)
@ -1441,8 +1599,8 @@ async def _index_composio_google_calendar(
existing_document.updated_at = get_current_timestamp()
documents_indexed += 1
# Batch commit every 10 documents
# Batch commit every 10 documents
if documents_indexed % 10 == 0:
logger.info(
f"Committing batch: {documents_indexed} Google Calendar events processed so far"
@ -1462,21 +1620,30 @@ async def _index_composio_google_calendar(
"start_time": start_time,
"document_type": "Google Calendar Event (Composio)",
}
summary_content, summary_embedding = await generate_document_summary(
(
summary_content,
summary_embedding,
) = await generate_document_summary(
markdown_content, user_llm, document_metadata
)
else:
summary_content = f"Calendar: {summary}\n\nStart: {start_time}\nEnd: {end_time}"
summary_content = (
f"Calendar: {summary}\n\nStart: {start_time}\nEnd: {end_time}"
)
if location:
summary_content += f"\nLocation: {location}"
summary_embedding = config.embedding_model_instance.embed(summary_content)
summary_embedding = config.embedding_model_instance.embed(
summary_content
)
chunks = await create_document_chunks(markdown_content)
document = Document(
search_space_id=search_space_id,
title=f"Calendar: {summary}",
document_type=DocumentType(TOOLKIT_TO_DOCUMENT_TYPE["googlecalendar"]),
document_type=DocumentType(
TOOLKIT_TO_DOCUMENT_TYPE["googlecalendar"]
),
document_metadata={
"event_id": event_id,
"summary": summary,
@ -1497,7 +1664,7 @@ async def _index_composio_google_calendar(
session.add(document)
documents_indexed += 1
# Batch commit every 10 documents
# Batch commit every 10 documents
if documents_indexed % 10 == 0:
logger.info(
f"Committing batch: {documents_indexed} Google Calendar events processed so far"
@ -1535,5 +1702,7 @@ async def _index_composio_google_calendar(
return documents_indexed, None
except Exception as e:
logger.error(f"Failed to index Google Calendar via Composio: {e!s}", exc_info=True)
logger.error(
f"Failed to index Google Calendar via Composio: {e!s}", exc_info=True
)
return 0, f"Failed to index Google Calendar via Composio: {e!s}"

View file

@ -26,6 +26,7 @@ Available indexers:
# Calendar and scheduling
from .airtable_indexer import index_airtable_records
from .bookstack_indexer import index_bookstack_pages
# Note: composio_indexer is imported directly in connector_tasks.py to avoid circular imports
from .clickup_indexer import index_clickup_tasks
from .confluence_indexer import index_confluence_pages

View file

@ -128,7 +128,9 @@ async def index_github_repos(
if github_pat:
logger.info("Using GitHub PAT for authentication (private repos supported)")
else:
logger.info("No GitHub PAT provided - only public repositories can be indexed")
logger.info(
"No GitHub PAT provided - only public repositories can be indexed"
)
# 3. Initialize GitHub connector with gitingest backend
await task_logger.log_task_progress(
@ -308,9 +310,7 @@ async def _process_repository_digest(
if existing_document:
# Document exists - check if content has changed
if existing_document.content_hash == content_hash:
logger.info(
f"Repository {repo_full_name} unchanged. Skipping."
)
logger.info(f"Repository {repo_full_name} unchanged. Skipping.")
return 0
else:
logger.info(
@ -341,7 +341,7 @@ async def _process_repository_digest(
summary_content = (
f"# Repository: {repo_full_name}\n\n"
f"## File Structure\n\n{digest.tree}\n\n"
f"## File Contents (truncated)\n\n{digest.content[:MAX_DIGEST_CHARS - len(digest.tree) - 200]}..."
f"## File Contents (truncated)\n\n{digest.content[: MAX_DIGEST_CHARS - len(digest.tree) - 200]}..."
)
summary_text, summary_embedding = await generate_document_summary(
@ -362,9 +362,7 @@ async def _process_repository_digest(
# This preserves file-level granularity in search
chunks_data = await create_document_chunks(digest.content)
except Exception as chunk_err:
logger.error(
f"Failed to chunk repository {repo_full_name}: {chunk_err}"
)
logger.error(f"Failed to chunk repository {repo_full_name}: {chunk_err}")
# Fall back to a simpler chunking approach
chunks_data = await _simple_chunk_content(digest.content)

View file

@ -211,7 +211,9 @@ export const ComposioConfig: FC<ComposioConfigProps> = ({ connector, onConfigCha
);
}
if (selectedFiles.length > 0) {
parts.push(`${selectedFiles.length} file${selectedFiles.length > 1 ? "s" : ""}`);
parts.push(
`${selectedFiles.length} file${selectedFiles.length > 1 ? "s" : ""}`
);
}
return parts.length > 0 ? `(${parts.join(" ")})` : "";
})()}
@ -338,7 +340,9 @@ export const ComposioConfig: FC<ComposioConfigProps> = ({ connector, onConfigCha
<Switch
id="include-subfolders"
checked={indexingOptions.include_subfolders}
onCheckedChange={(checked) => handleIndexingOptionChange("include_subfolders", checked)}
onCheckedChange={(checked) =>
handleIndexingOptionChange("include_subfolders", checked)
}
/>
</div>
</div>

View file

@ -7,7 +7,9 @@ import { searchSourceConnectorTypeEnum } from "@/contracts/types/connector.types
export const connectorPopupQueryParamsSchema = z.object({
modal: z.enum(["connectors"]).optional(),
tab: z.enum(["all", "active"]).optional(),
view: z.enum(["configure", "edit", "connect", "youtube", "accounts", "mcp-list", "composio"]).optional(),
view: z
.enum(["configure", "edit", "connect", "youtube", "accounts", "mcp-list", "composio"])
.optional(),
connector: z.string().optional(),
connectorId: z.string().optional(),
connectorType: z.string().optional(),

View file

@ -26,7 +26,11 @@ import {
import { cacheKeys } from "@/lib/query-client/cache-keys";
import { queryClient } from "@/lib/query-client/client";
import type { IndexingConfigState } from "../constants/connector-constants";
import { COMPOSIO_CONNECTORS, OAUTH_CONNECTORS, OTHER_CONNECTORS } from "../constants/connector-constants";
import {
COMPOSIO_CONNECTORS,
OAUTH_CONNECTORS,
OTHER_CONNECTORS,
} from "../constants/connector-constants";
import {
dateRangeSchema,
frequencyMinutesSchema,
@ -83,7 +87,6 @@ export const useConnectorDialog = () => {
// MCP list view state (for managing multiple MCP connectors)
const [viewingMCPList, setViewingMCPList] = useState(false);
// Track if we came from accounts list when entering edit mode
const [cameFromAccountsList, setCameFromAccountsList] = useState<{
connectorType: string;
@ -164,16 +167,14 @@ export const useConnectorDialog = () => {
// Handle accounts view
if (params.view === "accounts" && params.connectorType) {
// Update state if not set, or if connectorType has changed
const needsUpdate = !viewingAccountsType ||
viewingAccountsType.connectorType !== params.connectorType;
const needsUpdate =
!viewingAccountsType || viewingAccountsType.connectorType !== params.connectorType;
if (needsUpdate) {
// Check both OAUTH_CONNECTORS and COMPOSIO_CONNECTORS
const oauthConnector = OAUTH_CONNECTORS.find(
(c) => c.connectorType === params.connectorType
) || COMPOSIO_CONNECTORS.find(
(c) => c.connectorType === params.connectorType
);
const oauthConnector =
OAUTH_CONNECTORS.find((c) => c.connectorType === params.connectorType) ||
COMPOSIO_CONNECTORS.find((c) => c.connectorType === params.connectorType);
if (oauthConnector) {
setViewingAccountsType({
connectorType: oauthConnector.connectorType,
@ -395,11 +396,8 @@ export const useConnectorDialog = () => {
// Check if authEndpoint already has query parameters
const separator = connector.authEndpoint.includes("?") ? "&" : "?";
const url = `${process.env.NEXT_PUBLIC_FASTAPI_BACKEND_URL}${connector.authEndpoint}${separator}space_id=${searchSpaceId}`;
const response = await authenticatedFetch(
url,
{ method: "GET" }
);
const response = await authenticatedFetch(url, { method: "GET" });
if (!response.ok) {
throw new Error(`Failed to initiate ${connector.title} OAuth`);

View file

@ -4,7 +4,12 @@ import type { FC } from "react";
import { EnumConnectorName } from "@/contracts/enums/connector";
import type { SearchSourceConnector } from "@/contracts/types/connector.types";
import { ConnectorCard } from "../components/connector-card";
import { CRAWLERS, OAUTH_CONNECTORS, OTHER_CONNECTORS, COMPOSIO_CONNECTORS } from "../constants/connector-constants";
import {
CRAWLERS,
OAUTH_CONNECTORS,
OTHER_CONNECTORS,
COMPOSIO_CONNECTORS,
} from "../constants/connector-constants";
import { getDocumentCountForConnector } from "../utils/connector-document-mapping";
/**
@ -28,7 +33,9 @@ interface AllConnectorsTabProps {
allConnectors: SearchSourceConnector[] | undefined;
documentTypeCounts?: Record<string, number>;
indexingConnectorIds?: Set<number>;
onConnectOAuth: (connector: (typeof OAUTH_CONNECTORS)[number] | (typeof COMPOSIO_CONNECTORS)[number]) => void;
onConnectOAuth: (
connector: (typeof OAUTH_CONNECTORS)[number] | (typeof COMPOSIO_CONNECTORS)[number]
) => void;
onConnectNonOAuth?: (connectorType: string) => void;
onCreateWebcrawler?: () => void;
onCreateYouTubeCrawler?: () => void;
@ -82,7 +89,9 @@ export const AllConnectorsTab: FC<AllConnectorsTabProps> = ({
{filteredComposio.length > 0 && (
<section>
<div className="flex items-center gap-2 mb-4">
<h3 className="text-sm font-semibold text-muted-foreground">Managed OAuth (Composio)</h3>
<h3 className="text-sm font-semibold text-muted-foreground">
Managed OAuth (Composio)
</h3>
</div>
<div className="grid grid-cols-1 sm:grid-cols-2 gap-3">
{filteredComposio.map((connector) => {
@ -99,7 +108,6 @@ export const AllConnectorsTab: FC<AllConnectorsTabProps> = ({
const accountCount = typeConnectors.length;
const documentCount = getDocumentCountForConnector(
connector.connectorType,
documentTypeCounts
@ -154,7 +162,6 @@ export const AllConnectorsTab: FC<AllConnectorsTabProps> = ({
const accountCount = typeConnectors.length;
const documentCount = getDocumentCountForConnector(
connector.connectorType,
documentTypeCounts

View file

@ -362,4 +362,3 @@ export function ComposioDriveFolderTree({
</div>
);
}

View file

@ -26,4 +26,3 @@ export function useComposioDriveFolders({
retry: 2,
});
}