fix: fixed composio issues

This commit is contained in:
DESKTOP-RTLN3BA\$punk 2026-05-02 21:16:03 -07:00
parent 47b2994ec7
commit cea8618aed
25 changed files with 1756 additions and 461 deletions

View file

@ -96,6 +96,46 @@ def _compute_turn_cancelling_retry_delay(attempt: int) -> int:
return min(delay, TURN_CANCELLING_MAX_DELAY_MS)
def _first_interrupt_value(state: Any) -> dict[str, Any] | None:
"""Return the first LangGraph interrupt payload across all snapshot tasks."""
def _extract_interrupt_value(candidate: Any) -> dict[str, Any] | None:
if isinstance(candidate, dict):
value = candidate.get("value", candidate)
return value if isinstance(value, dict) else None
value = getattr(candidate, "value", None)
if isinstance(value, dict):
return value
if isinstance(candidate, (list, tuple)):
for item in candidate:
extracted = _extract_interrupt_value(item)
if extracted is not None:
return extracted
return None
for task in getattr(state, "tasks", ()) or ():
try:
interrupts = getattr(task, "interrupts", ()) or ()
except (AttributeError, IndexError, TypeError):
interrupts = ()
if not interrupts:
extracted = _extract_interrupt_value(task)
if extracted is not None:
return extracted
continue
for interrupt_item in interrupts:
extracted = _extract_interrupt_value(interrupt_item)
if extracted is not None:
return extracted
try:
state_interrupts = getattr(state, "interrupts", ()) or ()
except (AttributeError, IndexError, TypeError):
state_interrupts = ()
extracted = _extract_interrupt_value(state_interrupts)
if extracted is not None:
return extracted
return None
def _extract_chunk_parts(chunk: Any) -> dict[str, Any]:
"""Decompose an ``AIMessageChunk`` into typed text/reasoning/tool-call parts.
@ -2178,10 +2218,10 @@ async def _stream_agent_events(
result.agent_called_update_memory = called_update_memory
_log_file_contract("turn_outcome", result)
is_interrupted = state.tasks and any(task.interrupts for task in state.tasks)
if is_interrupted:
interrupt_value = _first_interrupt_value(state)
if interrupt_value is not None:
result.is_interrupted = True
result.interrupt_value = state.tasks[0].interrupts[0].value
result.interrupt_value = interrupt_value
yield streaming_service.format_interrupt_request(result.interrupt_value)

View file

@ -20,12 +20,10 @@ from app.indexing_pipeline.indexing_pipeline_service import (
IndexingPipelineService,
PlaceholderInfo,
)
from app.services.composio_service import ComposioService
from app.services.llm_service import get_user_long_context_llm
from app.services.task_logging_service import TaskLoggingService
from app.utils.google_credentials import (
COMPOSIO_GOOGLE_CONNECTOR_TYPES,
build_composio_credentials,
)
from app.utils.google_credentials import COMPOSIO_GOOGLE_CONNECTOR_TYPES
from .base import (
check_duplicate_document_by_hash,
@ -44,6 +42,10 @@ HeartbeatCallbackType = Callable[[int], Awaitable[None]]
HEARTBEAT_INTERVAL_SECONDS = 30
def _format_calendar_event_to_markdown(event: dict) -> str:
return GoogleCalendarConnector.format_event_to_markdown(None, event)
def _build_connector_doc(
event: dict,
event_markdown: str,
@ -150,7 +152,14 @@ async def index_google_calendar_events(
)
return 0, 0, f"Connector with ID {connector_id} not found"
# ── Credential building ───────────────────────────────────────
is_composio_connector = (
connector.connector_type in COMPOSIO_GOOGLE_CONNECTOR_TYPES
)
calendar_client = None
composio_service = None
connected_account_id = None
# ── Credential/client building ────────────────────────────────
if connector.connector_type in COMPOSIO_GOOGLE_CONNECTOR_TYPES:
connected_account_id = connector.config.get("composio_connected_account_id")
if not connected_account_id:
@ -161,7 +170,7 @@ async def index_google_calendar_events(
{"error_type": "MissingComposioAccount"},
)
return 0, 0, "Composio connected_account_id not found"
credentials = build_composio_credentials(connected_account_id)
composio_service = ComposioService()
else:
config_data = connector.config
@ -229,12 +238,13 @@ async def index_google_calendar_events(
{"stage": "client_initialization"},
)
calendar_client = GoogleCalendarConnector(
credentials=credentials,
session=session,
user_id=user_id,
connector_id=connector_id,
)
if not is_composio_connector:
calendar_client = GoogleCalendarConnector(
credentials=credentials,
session=session,
user_id=user_id,
connector_id=connector_id,
)
# Handle 'undefined' string from frontend (treat as None)
if start_date == "undefined" or start_date == "":
@ -300,9 +310,26 @@ async def index_google_calendar_events(
)
try:
events, error = await calendar_client.get_all_primary_calendar_events(
start_date=start_date_str, end_date=end_date_str
)
if is_composio_connector:
start_dt = parse_date_flexible(start_date_str).replace(
hour=0, minute=0, second=0, microsecond=0
)
end_dt = parse_date_flexible(end_date_str).replace(
hour=23, minute=59, second=59, microsecond=0
)
events, error = await composio_service.get_calendar_events(
connected_account_id=connected_account_id,
entity_id=f"surfsense_{user_id}",
time_min=start_dt.isoformat(),
time_max=end_dt.isoformat(),
max_results=250,
)
if not events and not error:
error = "No events found in the specified date range."
else:
events, error = await calendar_client.get_all_primary_calendar_events(
start_date=start_date_str, end_date=end_date_str
)
if error:
if "No events found" in error:
@ -381,7 +408,7 @@ async def index_google_calendar_events(
documents_skipped += 1
continue
event_markdown = calendar_client.format_event_to_markdown(event)
event_markdown = _format_calendar_event_to_markdown(event)
if not event_markdown.strip():
logger.warning(f"Skipping event with no content: {event_summary}")
documents_skipped += 1

View file

@ -9,6 +9,8 @@ import asyncio
import logging
import time
from collections.abc import Awaitable, Callable
from pathlib import Path
from typing import Any
from sqlalchemy import String, cast, select
from sqlalchemy.exc import SQLAlchemyError
@ -37,6 +39,7 @@ from app.indexing_pipeline.indexing_pipeline_service import (
IndexingPipelineService,
PlaceholderInfo,
)
from app.services.composio_service import ComposioService
from app.services.llm_service import get_user_long_context_llm
from app.services.page_limit_service import PageLimitService
from app.services.task_logging_service import TaskLoggingService
@ -45,10 +48,7 @@ from app.tasks.connector_indexers.base import (
get_connector_by_id,
update_connector_last_indexed,
)
from app.utils.google_credentials import (
COMPOSIO_GOOGLE_CONNECTOR_TYPES,
build_composio_credentials,
)
from app.utils.google_credentials import COMPOSIO_GOOGLE_CONNECTOR_TYPES
ACCEPTED_DRIVE_CONNECTOR_TYPES = {
SearchSourceConnectorType.GOOGLE_DRIVE_CONNECTOR,
@ -61,6 +61,209 @@ HEARTBEAT_INTERVAL_SECONDS = 30
logger = logging.getLogger(__name__)
class ComposioDriveClient:
"""Google Drive client facade backed by Composio tool execution.
Composio-managed OAuth connections can execute tools without exposing raw
OAuth tokens through connected account state.
"""
def __init__(
self,
session: AsyncSession,
connector_id: int,
connected_account_id: str,
entity_id: str,
):
self.session = session
self.connector_id = connector_id
self.connected_account_id = connected_account_id
self.entity_id = entity_id
self.composio = ComposioService()
async def list_files(
self,
query: str = "",
fields: str = "nextPageToken, files(id, name, mimeType, modifiedTime, md5Checksum, size, webViewLink, parents, owners, createdTime, description)",
page_size: int = 100,
page_token: str | None = None,
) -> tuple[list[dict[str, Any]], str | None, str | None]:
params: dict[str, Any] = {
"page_size": min(page_size, 100),
"fields": fields,
}
if query:
params["q"] = query
if page_token:
params["page_token"] = page_token
result = await self.composio.execute_tool(
connected_account_id=self.connected_account_id,
tool_name="GOOGLEDRIVE_LIST_FILES",
params=params,
entity_id=self.entity_id,
)
if not result.get("success"):
return [], None, result.get("error", "Unknown error")
data = result.get("data", {})
files = []
next_token = None
if isinstance(data, dict):
inner_data = data.get("data", data)
if isinstance(inner_data, dict):
files = inner_data.get("files", [])
next_token = inner_data.get("nextPageToken") or inner_data.get(
"next_page_token"
)
elif isinstance(data, list):
files = data
return files, next_token, None
async def get_file_metadata(
self, file_id: str, fields: str = "*"
) -> tuple[dict[str, Any] | None, str | None]:
result = await self.composio.execute_tool(
connected_account_id=self.connected_account_id,
tool_name="GOOGLEDRIVE_GET_FILE_METADATA",
params={"file_id": file_id, "fields": fields},
entity_id=self.entity_id,
)
if not result.get("success"):
return None, result.get("error", "Unknown error")
data = result.get("data", {})
if isinstance(data, dict):
inner_data = data.get("data", data)
if isinstance(inner_data, dict):
return inner_data, None
return None, "Could not extract metadata from Composio response"
async def download_file(self, file_id: str) -> tuple[bytes | None, str | None]:
return await self._download_file_content(file_id)
async def download_file_to_disk(
self,
file_id: str,
dest_path: str,
chunksize: int = 5 * 1024 * 1024,
) -> str | None:
del chunksize
content, error = await self.download_file(file_id)
if error:
return error
if content is None:
return "No content returned from Composio"
Path(dest_path).write_bytes(content)
return None
async def export_google_file(
self, file_id: str, mime_type: str
) -> tuple[bytes | None, str | None]:
return await self._download_file_content(file_id, mime_type=mime_type)
async def _download_file_content(
self, file_id: str, mime_type: str | None = None
) -> tuple[bytes | None, str | None]:
params: dict[str, Any] = {"file_id": file_id}
if mime_type:
params["mime_type"] = mime_type
result = await self.composio.execute_tool(
connected_account_id=self.connected_account_id,
tool_name="GOOGLEDRIVE_DOWNLOAD_FILE",
params=params,
entity_id=self.entity_id,
)
if not result.get("success"):
return None, result.get("error", "Unknown error")
return self._read_download_result(result.get("data"))
def _read_download_result(self, data: Any) -> tuple[bytes | None, str | None]:
if isinstance(data, bytes):
return data, None
file_path: str | None = None
if isinstance(data, str):
file_path = data
elif isinstance(data, dict):
inner_data = data.get("data", data)
if isinstance(inner_data, dict):
for key in ("file_path", "downloaded_file_content", "path", "uri"):
value = inner_data.get(key)
if isinstance(value, str):
file_path = value
break
if isinstance(value, dict):
nested = (
value.get("file_path")
or value.get("downloaded_file_content")
or value.get("path")
or value.get("uri")
or value.get("s3url")
)
if isinstance(nested, str):
file_path = nested
break
if not file_path:
return None, "No file path/content returned from Composio"
if file_path.startswith(("http://", "https://")):
try:
import urllib.request
with urllib.request.urlopen(file_path, timeout=60) as response:
return response.read(), None
except Exception as e:
return None, f"Failed to download Composio file URL: {e!s}"
path_obj = Path(file_path)
if path_obj.is_absolute() or ".composio" in str(path_obj):
if not path_obj.exists():
return None, f"File not found at path: {file_path}"
return path_obj.read_bytes(), None
try:
import base64
return base64.b64decode(file_path), None
except Exception:
return file_path.encode("utf-8"), None
def _build_drive_client_for_connector(
session: AsyncSession,
connector_id: int,
connector: object,
user_id: str,
) -> tuple[GoogleDriveClient | ComposioDriveClient | None, str | None]:
if connector.connector_type in COMPOSIO_GOOGLE_CONNECTOR_TYPES:
connected_account_id = connector.config.get("composio_connected_account_id")
if not connected_account_id:
return None, (
f"Composio connected_account_id not found for connector {connector_id}"
)
return (
ComposioDriveClient(
session,
connector_id,
connected_account_id,
entity_id=f"surfsense_{user_id}",
),
None,
)
token_encrypted = connector.config.get("_token_encrypted", False)
if token_encrypted and not config.SECRET_KEY:
return None, "SECRET_KEY not configured but credentials are marked as encrypted"
return GoogleDriveClient(session, connector_id), None
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
@ -927,34 +1130,17 @@ async def index_google_drive_files(
{"stage": "client_initialization"},
)
pre_built_credentials = None
if connector.connector_type in COMPOSIO_GOOGLE_CONNECTOR_TYPES:
connected_account_id = connector.config.get("composio_connected_account_id")
if not connected_account_id:
error_msg = f"Composio connected_account_id not found for connector {connector_id}"
await task_logger.log_task_failure(
log_entry,
error_msg,
"Missing Composio account",
{"error_type": "MissingComposioAccount"},
)
return 0, 0, error_msg, 0
pre_built_credentials = build_composio_credentials(connected_account_id)
else:
token_encrypted = connector.config.get("_token_encrypted", False)
if token_encrypted and not config.SECRET_KEY:
await task_logger.log_task_failure(
log_entry,
"SECRET_KEY not configured but credentials are encrypted",
"Missing SECRET_KEY",
{"error_type": "MissingSecretKey"},
)
return (
0,
0,
"SECRET_KEY not configured but credentials are marked as encrypted",
0,
)
drive_client, client_error = _build_drive_client_for_connector(
session, connector_id, connector, user_id
)
if client_error or not drive_client:
await task_logger.log_task_failure(
log_entry,
client_error or "Failed to initialize Google Drive client",
"Missing connector credentials",
{"error_type": "ClientInitializationError"},
)
return 0, 0, client_error, 0
connector_enable_summary = getattr(connector, "enable_summary", True)
connector_enable_vision_llm = getattr(connector, "enable_vision_llm", False)
@ -963,10 +1149,6 @@ async def index_google_drive_files(
from app.services.llm_service import get_vision_llm
vision_llm = await get_vision_llm(session, search_space_id)
drive_client = GoogleDriveClient(
session, connector_id, credentials=pre_built_credentials
)
if not folder_id:
error_msg = "folder_id is required for Google Drive indexing"
await task_logger.log_task_failure(
@ -979,8 +1161,14 @@ async def index_google_drive_files(
folder_tokens = connector.config.get("folder_tokens", {})
start_page_token = folder_tokens.get(target_folder_id)
is_composio_connector = (
connector.connector_type in COMPOSIO_GOOGLE_CONNECTOR_TYPES
)
can_use_delta = (
use_delta_sync and start_page_token and connector.last_indexed_at
not is_composio_connector
and use_delta_sync
and start_page_token
and connector.last_indexed_at
)
documents_unsupported = 0
@ -1051,7 +1239,16 @@ async def index_google_drive_files(
)
if documents_indexed > 0 or can_use_delta:
new_token, token_error = await get_start_page_token(drive_client)
if isinstance(drive_client, ComposioDriveClient):
(
new_token,
token_error,
) = await drive_client.composio.get_drive_start_page_token(
drive_client.connected_account_id,
drive_client.entity_id,
)
else:
new_token, token_error = await get_start_page_token(drive_client)
if new_token and not token_error:
await session.refresh(connector)
if "folder_tokens" not in connector.config:
@ -1137,32 +1334,17 @@ async def index_google_drive_single_file(
)
return 0, error_msg
pre_built_credentials = None
if connector.connector_type in COMPOSIO_GOOGLE_CONNECTOR_TYPES:
connected_account_id = connector.config.get("composio_connected_account_id")
if not connected_account_id:
error_msg = f"Composio connected_account_id not found for connector {connector_id}"
await task_logger.log_task_failure(
log_entry,
error_msg,
"Missing Composio account",
{"error_type": "MissingComposioAccount"},
)
return 0, error_msg
pre_built_credentials = build_composio_credentials(connected_account_id)
else:
token_encrypted = connector.config.get("_token_encrypted", False)
if token_encrypted and not config.SECRET_KEY:
await task_logger.log_task_failure(
log_entry,
"SECRET_KEY not configured but credentials are encrypted",
"Missing SECRET_KEY",
{"error_type": "MissingSecretKey"},
)
return (
0,
"SECRET_KEY not configured but credentials are marked as encrypted",
)
drive_client, client_error = _build_drive_client_for_connector(
session, connector_id, connector, user_id
)
if client_error or not drive_client:
await task_logger.log_task_failure(
log_entry,
client_error or "Failed to initialize Google Drive client",
"Missing connector credentials",
{"error_type": "ClientInitializationError"},
)
return 0, client_error
connector_enable_summary = getattr(connector, "enable_summary", True)
connector_enable_vision_llm = getattr(connector, "enable_vision_llm", False)
@ -1171,10 +1353,6 @@ async def index_google_drive_single_file(
from app.services.llm_service import get_vision_llm
vision_llm = await get_vision_llm(session, search_space_id)
drive_client = GoogleDriveClient(
session, connector_id, credentials=pre_built_credentials
)
file, error = await get_file_by_id(drive_client, file_id)
if error or not file:
error_msg = f"Failed to fetch file {file_id}: {error or 'File not found'}"
@ -1276,32 +1454,18 @@ async def index_google_drive_selected_files(
)
return 0, 0, [error_msg]
pre_built_credentials = None
if connector.connector_type in COMPOSIO_GOOGLE_CONNECTOR_TYPES:
connected_account_id = connector.config.get("composio_connected_account_id")
if not connected_account_id:
error_msg = f"Composio connected_account_id not found for connector {connector_id}"
await task_logger.log_task_failure(
log_entry,
error_msg,
"Missing Composio account",
{"error_type": "MissingComposioAccount"},
)
return 0, 0, [error_msg]
pre_built_credentials = build_composio_credentials(connected_account_id)
else:
token_encrypted = connector.config.get("_token_encrypted", False)
if token_encrypted and not config.SECRET_KEY:
error_msg = (
"SECRET_KEY not configured but credentials are marked as encrypted"
)
await task_logger.log_task_failure(
log_entry,
error_msg,
"Missing SECRET_KEY",
{"error_type": "MissingSecretKey"},
)
return 0, 0, [error_msg]
drive_client, client_error = _build_drive_client_for_connector(
session, connector_id, connector, user_id
)
if client_error or not drive_client:
error_msg = client_error or "Failed to initialize Google Drive client"
await task_logger.log_task_failure(
log_entry,
error_msg,
"Missing connector credentials",
{"error_type": "ClientInitializationError"},
)
return 0, 0, [error_msg]
connector_enable_summary = getattr(connector, "enable_summary", True)
connector_enable_vision_llm = getattr(connector, "enable_vision_llm", False)
@ -1310,10 +1474,6 @@ async def index_google_drive_selected_files(
from app.services.llm_service import get_vision_llm
vision_llm = await get_vision_llm(session, search_space_id)
drive_client = GoogleDriveClient(
session, connector_id, credentials=pre_built_credentials
)
indexed, skipped, unsupported, errors = await _index_selected_files(
drive_client,
session,

View file

@ -20,12 +20,10 @@ from app.indexing_pipeline.indexing_pipeline_service import (
IndexingPipelineService,
PlaceholderInfo,
)
from app.services.composio_service import ComposioService
from app.services.llm_service import get_user_long_context_llm
from app.services.task_logging_service import TaskLoggingService
from app.utils.google_credentials import (
COMPOSIO_GOOGLE_CONNECTOR_TYPES,
build_composio_credentials,
)
from app.utils.google_credentials import COMPOSIO_GOOGLE_CONNECTOR_TYPES
from .base import (
calculate_date_range,
@ -44,6 +42,62 @@ HeartbeatCallbackType = Callable[[int], Awaitable[None]]
HEARTBEAT_INTERVAL_SECONDS = 30
def _normalize_composio_gmail_message(message: dict) -> dict:
if message.get("payload"):
return message
headers = []
header_values = {
"Subject": message.get("subject"),
"From": message.get("from") or message.get("sender"),
"To": message.get("to") or message.get("recipient"),
"Date": message.get("date"),
}
for name, value in header_values.items():
if value:
headers.append({"name": name, "value": value})
return {
**message,
"id": message.get("id")
or message.get("message_id")
or message.get("messageId"),
"threadId": message.get("threadId") or message.get("thread_id"),
"payload": {"headers": headers},
"snippet": message.get("snippet", ""),
"messageText": message.get("messageText") or message.get("body") or "",
}
def _format_gmail_message_to_markdown(message: dict) -> str:
headers = {
header.get("name", "").lower(): header.get("value", "")
for header in message.get("payload", {}).get("headers", [])
if isinstance(header, dict)
}
subject = headers.get("subject", "No Subject")
from_email = headers.get("from", "Unknown Sender")
to_email = headers.get("to", "Unknown Recipient")
date_str = headers.get("date", "Unknown Date")
message_text = (
message.get("messageText")
or message.get("body")
or message.get("text")
or message.get("snippet", "")
)
return (
f"# {subject}\n\n"
f"**From:** {from_email}\n"
f"**To:** {to_email}\n"
f"**Date:** {date_str}\n\n"
f"## Message Content\n\n{message_text}\n\n"
f"## Message Details\n\n"
f"- **Message ID:** {message.get('id', 'Unknown')}\n"
f"- **Thread ID:** {message.get('threadId', 'Unknown')}\n"
)
def _build_connector_doc(
message: dict,
markdown_content: str,
@ -162,7 +216,14 @@ async def index_google_gmail_messages(
)
return 0, 0, error_msg
# ── Credential building ───────────────────────────────────────
is_composio_connector = (
connector.connector_type in COMPOSIO_GOOGLE_CONNECTOR_TYPES
)
gmail_connector = None
composio_service = None
connected_account_id = None
# ── Credential/client building ────────────────────────────────
if connector.connector_type in COMPOSIO_GOOGLE_CONNECTOR_TYPES:
connected_account_id = connector.config.get("composio_connected_account_id")
if not connected_account_id:
@ -173,7 +234,7 @@ async def index_google_gmail_messages(
{"error_type": "MissingComposioAccount"},
)
return 0, 0, "Composio connected_account_id not found"
credentials = build_composio_credentials(connected_account_id)
composio_service = ComposioService()
else:
config_data = connector.config
@ -241,9 +302,10 @@ async def index_google_gmail_messages(
{"stage": "client_initialization"},
)
gmail_connector = GoogleGmailConnector(
credentials, session, user_id, connector_id
)
if not is_composio_connector:
gmail_connector = GoogleGmailConnector(
credentials, session, user_id, connector_id
)
calculated_start_date, calculated_end_date = calculate_date_range(
connector, start_date, end_date, default_days_back=365
@ -254,11 +316,60 @@ async def index_google_gmail_messages(
f"Fetching emails for connector {connector_id} "
f"from {calculated_start_date} to {calculated_end_date}"
)
messages, error = await gmail_connector.get_recent_messages(
max_results=max_messages,
start_date=calculated_start_date,
end_date=calculated_end_date,
)
if is_composio_connector:
query_parts = []
if calculated_start_date:
query_parts.append(f"after:{calculated_start_date.replace('-', '/')}")
if calculated_end_date:
query_parts.append(f"before:{calculated_end_date.replace('-', '/')}")
query = " ".join(query_parts)
messages = []
page_token = None
error = None
while len(messages) < max_messages:
page_size = min(50, max_messages - len(messages))
(
page_messages,
page_token,
_estimate,
page_error,
) = await composio_service.get_gmail_messages(
connected_account_id=connected_account_id,
entity_id=f"surfsense_{user_id}",
query=query,
max_results=page_size,
page_token=page_token,
)
if page_error:
error = page_error
break
for page_message in page_messages:
message_id = (
page_message.get("id")
or page_message.get("message_id")
or page_message.get("messageId")
)
if message_id:
(
detail,
detail_error,
) = await composio_service.get_gmail_message_detail(
connected_account_id=connected_account_id,
entity_id=f"surfsense_{user_id}",
message_id=message_id,
)
if not detail_error and isinstance(detail, dict):
page_message = detail
messages.append(_normalize_composio_gmail_message(page_message))
if not page_token:
break
else:
messages, error = await gmail_connector.get_recent_messages(
max_results=max_messages,
start_date=calculated_start_date,
end_date=calculated_end_date,
)
if error:
error_message = error
@ -326,7 +437,12 @@ async def index_google_gmail_messages(
documents_skipped += 1
continue
markdown_content = gmail_connector.format_message_to_markdown(message)
if is_composio_connector:
markdown_content = _format_gmail_message_to_markdown(message)
else:
markdown_content = gmail_connector.format_message_to_markdown(
message
)
if not markdown_content.strip():
logger.warning(f"Skipping message with no content: {message_id}")
documents_skipped += 1