fix: composio tool calls in composio connectors

This commit is contained in:
DESKTOP-RTLN3BA\$punk 2026-05-05 18:57:10 -07:00
parent 4e174f17f2
commit 5e87a7a251
13 changed files with 1347 additions and 550 deletions

View file

@ -168,20 +168,46 @@ def create_create_calendar_event_tool(
f"Creating calendar event: summary='{final_summary}', connector={actual_connector_id}"
)
tz = context.get("timezone", "UTC")
if (
connector.connector_type
== SearchSourceConnectorType.COMPOSIO_GOOGLE_CALENDAR_CONNECTOR
):
from app.utils.google_credentials import build_composio_credentials
cca_id = connector.config.get("composio_connected_account_id")
if cca_id:
creds = build_composio_credentials(cca_id)
else:
if not cca_id:
return {
"status": "error",
"message": "Composio connected account ID not found for this connector.",
}
from app.services.composio_service import ComposioService
(
event_id,
html_link,
error,
) = await ComposioService().create_calendar_event(
connected_account_id=cca_id,
entity_id=f"surfsense_{user_id}",
summary=final_summary,
start_datetime=final_start_datetime,
end_datetime=final_end_datetime,
timezone=tz,
description=final_description,
location=final_location,
attendees=final_attendees,
)
if error:
return {"status": "error", "message": error}
created = {
"id": event_id,
"summary": final_summary,
"htmlLink": html_link,
}
logger.info(
f"Calendar event created via Composio: id={event_id}, summary={final_summary}"
)
else:
config_data = dict(connector.config)
@ -211,70 +237,69 @@ def create_create_calendar_event_tool(
expiry=datetime.fromisoformat(exp) if exp else None,
)
service = await asyncio.get_event_loop().run_in_executor(
None, lambda: build("calendar", "v3", credentials=creds)
)
tz = context.get("timezone", "UTC")
event_body: dict[str, Any] = {
"summary": final_summary,
"start": {"dateTime": final_start_datetime, "timeZone": tz},
"end": {"dateTime": final_end_datetime, "timeZone": tz},
}
if final_description:
event_body["description"] = final_description
if final_location:
event_body["location"] = final_location
if final_attendees:
event_body["attendees"] = [
{"email": e.strip()} for e in final_attendees if e.strip()
]
try:
created = await asyncio.get_event_loop().run_in_executor(
None,
lambda: (
service.events()
.insert(calendarId="primary", body=event_body)
.execute()
),
service = await asyncio.get_event_loop().run_in_executor(
None, lambda: build("calendar", "v3", credentials=creds)
)
except Exception as api_err:
from googleapiclient.errors import HttpError
if isinstance(api_err, HttpError) and api_err.resp.status == 403:
logger.warning(
f"Insufficient permissions for connector {actual_connector_id}: {api_err}"
event_body: dict[str, Any] = {
"summary": final_summary,
"start": {"dateTime": final_start_datetime, "timeZone": tz},
"end": {"dateTime": final_end_datetime, "timeZone": tz},
}
if final_description:
event_body["description"] = final_description
if final_location:
event_body["location"] = final_location
if final_attendees:
event_body["attendees"] = [
{"email": e.strip()} for e in final_attendees if e.strip()
]
try:
created = await asyncio.get_event_loop().run_in_executor(
None,
lambda: (
service.events()
.insert(calendarId="primary", body=event_body)
.execute()
),
)
try:
from sqlalchemy.orm.attributes import flag_modified
except Exception as api_err:
from googleapiclient.errors import HttpError
_res = await db_session.execute(
select(SearchSourceConnector).where(
SearchSourceConnector.id == actual_connector_id
)
)
_conn = _res.scalar_one_or_none()
if _conn and not _conn.config.get("auth_expired"):
_conn.config = {**_conn.config, "auth_expired": True}
flag_modified(_conn, "config")
await db_session.commit()
except Exception:
if isinstance(api_err, HttpError) and api_err.resp.status == 403:
logger.warning(
"Failed to persist auth_expired for connector %s",
actual_connector_id,
exc_info=True,
f"Insufficient permissions for connector {actual_connector_id}: {api_err}"
)
return {
"status": "insufficient_permissions",
"connector_id": actual_connector_id,
"message": "This Google Calendar account needs additional permissions. Please re-authenticate in connector settings.",
}
raise
try:
from sqlalchemy.orm.attributes import flag_modified
logger.info(
f"Calendar event created: id={created.get('id')}, summary={created.get('summary')}"
)
_res = await db_session.execute(
select(SearchSourceConnector).where(
SearchSourceConnector.id == actual_connector_id
)
)
_conn = _res.scalar_one_or_none()
if _conn and not _conn.config.get("auth_expired"):
_conn.config = {**_conn.config, "auth_expired": True}
flag_modified(_conn, "config")
await db_session.commit()
except Exception:
logger.warning(
"Failed to persist auth_expired for connector %s",
actual_connector_id,
exc_info=True,
)
return {
"status": "insufficient_permissions",
"connector_id": actual_connector_id,
"message": "This Google Calendar account needs additional permissions. Please re-authenticate in connector settings.",
}
raise
logger.info(
f"Calendar event created via Google API: id={created.get('id')}, summary={created.get('summary')}"
)
kb_message_suffix = ""
try:

View file

@ -163,16 +163,22 @@ def create_delete_calendar_event_tool(
connector.connector_type
== SearchSourceConnectorType.COMPOSIO_GOOGLE_CALENDAR_CONNECTOR
):
from app.utils.google_credentials import build_composio_credentials
cca_id = connector.config.get("composio_connected_account_id")
if cca_id:
creds = build_composio_credentials(cca_id)
else:
if not cca_id:
return {
"status": "error",
"message": "Composio connected account ID not found for this connector.",
}
from app.services.composio_service import ComposioService
error = await ComposioService().delete_calendar_event(
connected_account_id=cca_id,
entity_id=f"surfsense_{user_id}",
event_id=final_event_id,
)
if error:
return {"status": "error", "message": error}
else:
config_data = dict(connector.config)
@ -202,51 +208,51 @@ def create_delete_calendar_event_tool(
expiry=datetime.fromisoformat(exp) if exp else None,
)
service = await asyncio.get_event_loop().run_in_executor(
None, lambda: build("calendar", "v3", credentials=creds)
)
try:
await asyncio.get_event_loop().run_in_executor(
None,
lambda: (
service.events()
.delete(calendarId="primary", eventId=final_event_id)
.execute()
),
service = await asyncio.get_event_loop().run_in_executor(
None, lambda: build("calendar", "v3", credentials=creds)
)
except Exception as api_err:
from googleapiclient.errors import HttpError
if isinstance(api_err, HttpError) and api_err.resp.status == 403:
logger.warning(
f"Insufficient permissions for connector {actual_connector_id}: {api_err}"
try:
await asyncio.get_event_loop().run_in_executor(
None,
lambda: (
service.events()
.delete(calendarId="primary", eventId=final_event_id)
.execute()
),
)
try:
from sqlalchemy.orm.attributes import flag_modified
except Exception as api_err:
from googleapiclient.errors import HttpError
_res = await db_session.execute(
select(SearchSourceConnector).where(
SearchSourceConnector.id == actual_connector_id
)
)
_conn = _res.scalar_one_or_none()
if _conn and not _conn.config.get("auth_expired"):
_conn.config = {**_conn.config, "auth_expired": True}
flag_modified(_conn, "config")
await db_session.commit()
except Exception:
if isinstance(api_err, HttpError) and api_err.resp.status == 403:
logger.warning(
"Failed to persist auth_expired for connector %s",
actual_connector_id,
exc_info=True,
f"Insufficient permissions for connector {actual_connector_id}: {api_err}"
)
return {
"status": "insufficient_permissions",
"connector_id": actual_connector_id,
"message": "This Google Calendar account needs additional permissions. Please re-authenticate in connector settings.",
}
raise
try:
from sqlalchemy.orm.attributes import flag_modified
_res = await db_session.execute(
select(SearchSourceConnector).where(
SearchSourceConnector.id == actual_connector_id
)
)
_conn = _res.scalar_one_or_none()
if _conn and not _conn.config.get("auth_expired"):
_conn.config = {**_conn.config, "auth_expired": True}
flag_modified(_conn, "config")
await db_session.commit()
except Exception:
logger.warning(
"Failed to persist auth_expired for connector %s",
actual_connector_id,
exc_info=True,
)
return {
"status": "insufficient_permissions",
"connector_id": actual_connector_id,
"message": "This Google Calendar account needs additional permissions. Please re-authenticate in connector settings.",
}
raise
logger.info(f"Calendar event deleted: event_id={final_event_id}")

View file

@ -16,6 +16,14 @@ _CALENDAR_TYPES = [
]
def _to_calendar_boundary(value: str, *, is_end: bool) -> str:
"""Promote a bare YYYY-MM-DD to RFC3339 with a day-edge time, leave full datetimes alone."""
if "T" in value:
return value
time = "23:59:59" if is_end else "00:00:00"
return f"{value}T{time}Z"
def create_search_calendar_events_tool(
db_session: AsyncSession | None = None,
search_space_id: int | None = None,
@ -61,22 +69,47 @@ def create_search_calendar_events_tool(
"message": "No Google Calendar connector found. Please connect Google Calendar in your workspace settings.",
}
creds = _build_credentials(connector)
if (
connector.connector_type
== SearchSourceConnectorType.COMPOSIO_GOOGLE_CALENDAR_CONNECTOR
):
cca_id = connector.config.get("composio_connected_account_id")
if not cca_id:
return {
"status": "error",
"message": "Composio connected account ID not found for this connector.",
}
from app.connectors.google_calendar_connector import GoogleCalendarConnector
from app.services.composio_service import ComposioService
cal = GoogleCalendarConnector(
credentials=creds,
session=db_session,
user_id=user_id,
connector_id=connector.id,
)
events_raw, error = await ComposioService().get_calendar_events(
connected_account_id=cca_id,
entity_id=f"surfsense_{user_id}",
time_min=_to_calendar_boundary(start_date, is_end=False),
time_max=_to_calendar_boundary(end_date, is_end=True),
max_results=max_results,
)
if not events_raw and not error:
error = "No events found in the specified date range."
else:
creds = _build_credentials(connector)
events_raw, error = await cal.get_all_primary_calendar_events(
start_date=start_date,
end_date=end_date,
max_results=max_results,
)
from app.connectors.google_calendar_connector import (
GoogleCalendarConnector,
)
cal = GoogleCalendarConnector(
credentials=creds,
session=db_session,
user_id=user_id,
connector_id=connector.id,
)
events_raw, error = await cal.get_all_primary_calendar_events(
start_date=start_date,
end_date=end_date,
max_results=max_results,
)
if error:
if (

View file

@ -192,20 +192,62 @@ def create_update_calendar_event_tool(
f"Updating calendar event: event_id='{final_event_id}', connector={actual_connector_id}"
)
has_changes = any(
v is not None
for v in (
final_new_summary,
final_new_start_datetime,
final_new_end_datetime,
final_new_description,
final_new_location,
final_new_attendees,
)
)
if not has_changes:
return {
"status": "error",
"message": "No changes specified. Please provide at least one field to update.",
}
if (
connector.connector_type
== SearchSourceConnectorType.COMPOSIO_GOOGLE_CALENDAR_CONNECTOR
):
from app.utils.google_credentials import build_composio_credentials
cca_id = connector.config.get("composio_connected_account_id")
if cca_id:
creds = build_composio_credentials(cca_id)
else:
if not cca_id:
return {
"status": "error",
"message": "Composio connected account ID not found for this connector.",
}
from app.services.composio_service import ComposioService
tz_for_composio: str | None = None
if final_new_start_datetime is not None and not _is_date_only(
final_new_start_datetime
):
tz_for_composio = (
context.get("timezone") if isinstance(context, dict) else None
)
_, html_link, error = await ComposioService().update_calendar_event(
connected_account_id=cca_id,
entity_id=f"surfsense_{user_id}",
event_id=final_event_id,
summary=final_new_summary,
start_time=final_new_start_datetime,
end_time=final_new_end_datetime,
timezone=tz_for_composio,
description=final_new_description,
location=final_new_location,
attendees=final_new_attendees,
)
if error:
return {"status": "error", "message": error}
updated = {"htmlLink": html_link}
logger.info(
f"Calendar event updated via Composio: event_id={final_event_id}"
)
else:
config_data = dict(connector.config)
@ -235,81 +277,79 @@ def create_update_calendar_event_tool(
expiry=datetime.fromisoformat(exp) if exp else None,
)
service = await asyncio.get_event_loop().run_in_executor(
None, lambda: build("calendar", "v3", credentials=creds)
)
update_body: dict[str, Any] = {}
if final_new_summary is not None:
update_body["summary"] = final_new_summary
if final_new_start_datetime is not None:
update_body["start"] = _build_time_body(
final_new_start_datetime, context
service = await asyncio.get_event_loop().run_in_executor(
None, lambda: build("calendar", "v3", credentials=creds)
)
if final_new_end_datetime is not None:
update_body["end"] = _build_time_body(final_new_end_datetime, context)
if final_new_description is not None:
update_body["description"] = final_new_description
if final_new_location is not None:
update_body["location"] = final_new_location
if final_new_attendees is not None:
update_body["attendees"] = [
{"email": e.strip()} for e in final_new_attendees if e.strip()
]
if not update_body:
return {
"status": "error",
"message": "No changes specified. Please provide at least one field to update.",
}
try:
updated = await asyncio.get_event_loop().run_in_executor(
None,
lambda: (
service.events()
.patch(
calendarId="primary",
eventId=final_event_id,
body=update_body,
)
.execute()
),
)
except Exception as api_err:
from googleapiclient.errors import HttpError
if isinstance(api_err, HttpError) and api_err.resp.status == 403:
logger.warning(
f"Insufficient permissions for connector {actual_connector_id}: {api_err}"
update_body: dict[str, Any] = {}
if final_new_summary is not None:
update_body["summary"] = final_new_summary
if final_new_start_datetime is not None:
update_body["start"] = _build_time_body(
final_new_start_datetime, context
)
try:
from sqlalchemy.orm.attributes import flag_modified
if final_new_end_datetime is not None:
update_body["end"] = _build_time_body(
final_new_end_datetime, context
)
if final_new_description is not None:
update_body["description"] = final_new_description
if final_new_location is not None:
update_body["location"] = final_new_location
if final_new_attendees is not None:
update_body["attendees"] = [
{"email": e.strip()} for e in final_new_attendees if e.strip()
]
_res = await db_session.execute(
select(SearchSourceConnector).where(
SearchSourceConnector.id == actual_connector_id
try:
updated = await asyncio.get_event_loop().run_in_executor(
None,
lambda: (
service.events()
.patch(
calendarId="primary",
eventId=final_event_id,
body=update_body,
)
)
_conn = _res.scalar_one_or_none()
if _conn and not _conn.config.get("auth_expired"):
_conn.config = {**_conn.config, "auth_expired": True}
flag_modified(_conn, "config")
await db_session.commit()
except Exception:
logger.warning(
"Failed to persist auth_expired for connector %s",
actual_connector_id,
exc_info=True,
)
return {
"status": "insufficient_permissions",
"connector_id": actual_connector_id,
"message": "This Google Calendar account needs additional permissions. Please re-authenticate in connector settings.",
}
raise
.execute()
),
)
except Exception as api_err:
from googleapiclient.errors import HttpError
logger.info(f"Calendar event updated: event_id={final_event_id}")
if isinstance(api_err, HttpError) and api_err.resp.status == 403:
logger.warning(
f"Insufficient permissions for connector {actual_connector_id}: {api_err}"
)
try:
from sqlalchemy.orm.attributes import flag_modified
_res = await db_session.execute(
select(SearchSourceConnector).where(
SearchSourceConnector.id == actual_connector_id
)
)
_conn = _res.scalar_one_or_none()
if _conn and not _conn.config.get("auth_expired"):
_conn.config = {**_conn.config, "auth_expired": True}
flag_modified(_conn, "config")
await db_session.commit()
except Exception:
logger.warning(
"Failed to persist auth_expired for connector %s",
actual_connector_id,
exc_info=True,
)
return {
"status": "insufficient_permissions",
"connector_id": actual_connector_id,
"message": "This Google Calendar account needs additional permissions. Please re-authenticate in connector settings.",
}
raise
logger.info(
f"Calendar event updated via Google API: event_id={final_event_id}"
)
kb_message_suffix = ""
if document_id is not None:

View file

@ -161,16 +161,39 @@ def create_create_gmail_draft_tool(
connector.connector_type
== SearchSourceConnectorType.COMPOSIO_GMAIL_CONNECTOR
):
from app.utils.google_credentials import build_composio_credentials
cca_id = connector.config.get("composio_connected_account_id")
if cca_id:
creds = build_composio_credentials(cca_id)
else:
if not cca_id:
return {
"status": "error",
"message": "Composio connected account ID not found for this Gmail connector.",
}
from app.services.composio_service import ComposioService
(
draft_id,
draft_message_id,
draft_thread_id,
error,
) = await ComposioService().create_gmail_draft(
connected_account_id=cca_id,
entity_id=f"surfsense_{user_id}",
to=final_to,
subject=final_subject,
body=final_body,
cc=final_cc,
bcc=final_bcc,
)
if error:
return {"status": "error", "message": error}
created = {
"id": draft_id,
"message": {
"id": draft_message_id,
"threadId": draft_thread_id,
},
}
logger.info(f"Gmail draft created via Composio: id={draft_id}")
else:
from google.oauth2.credentials import Credentials
@ -208,63 +231,65 @@ def create_create_gmail_draft_tool(
expiry=datetime.fromisoformat(exp) if exp else None,
)
from googleapiclient.discovery import build
from googleapiclient.discovery import build
gmail_service = build("gmail", "v1", credentials=creds)
gmail_service = build("gmail", "v1", credentials=creds)
message = MIMEText(final_body)
message["to"] = final_to
message["subject"] = final_subject
if final_cc:
message["cc"] = final_cc
if final_bcc:
message["bcc"] = final_bcc
raw = base64.urlsafe_b64encode(message.as_bytes()).decode()
message = MIMEText(final_body)
message["to"] = final_to
message["subject"] = final_subject
if final_cc:
message["cc"] = final_cc
if final_bcc:
message["bcc"] = final_bcc
raw = base64.urlsafe_b64encode(message.as_bytes()).decode()
try:
created = await asyncio.get_event_loop().run_in_executor(
None,
lambda: (
gmail_service.users()
.drafts()
.create(userId="me", body={"message": {"raw": raw}})
.execute()
),
)
except Exception as api_err:
from googleapiclient.errors import HttpError
if isinstance(api_err, HttpError) and api_err.resp.status == 403:
logger.warning(
f"Insufficient permissions for connector {actual_connector_id}: {api_err}"
try:
created = await asyncio.get_event_loop().run_in_executor(
None,
lambda: (
gmail_service.users()
.drafts()
.create(userId="me", body={"message": {"raw": raw}})
.execute()
),
)
try:
from sqlalchemy.orm.attributes import flag_modified
except Exception as api_err:
from googleapiclient.errors import HttpError
_res = await db_session.execute(
select(SearchSourceConnector).where(
SearchSourceConnector.id == actual_connector_id
)
)
_conn = _res.scalar_one_or_none()
if _conn and not _conn.config.get("auth_expired"):
_conn.config = {**_conn.config, "auth_expired": True}
flag_modified(_conn, "config")
await db_session.commit()
except Exception:
if isinstance(api_err, HttpError) and api_err.resp.status == 403:
logger.warning(
"Failed to persist auth_expired for connector %s",
actual_connector_id,
exc_info=True,
f"Insufficient permissions for connector {actual_connector_id}: {api_err}"
)
return {
"status": "insufficient_permissions",
"connector_id": actual_connector_id,
"message": "This Gmail account needs additional permissions. Please re-authenticate in connector settings.",
}
raise
try:
from sqlalchemy.orm.attributes import flag_modified
logger.info(f"Gmail draft created: id={created.get('id')}")
_res = await db_session.execute(
select(SearchSourceConnector).where(
SearchSourceConnector.id == actual_connector_id
)
)
_conn = _res.scalar_one_or_none()
if _conn and not _conn.config.get("auth_expired"):
_conn.config = {**_conn.config, "auth_expired": True}
flag_modified(_conn, "config")
await db_session.commit()
except Exception:
logger.warning(
"Failed to persist auth_expired for connector %s",
actual_connector_id,
exc_info=True,
)
return {
"status": "insufficient_permissions",
"connector_id": actual_connector_id,
"message": "This Gmail account needs additional permissions. Please re-authenticate in connector settings.",
}
raise
logger.info(
f"Gmail draft created via Google API: id={created.get('id')}"
)
kb_message_suffix = ""
try:

View file

@ -50,7 +50,56 @@ def create_read_gmail_email_tool(
"message": "No Gmail connector found. Please connect Gmail in your workspace settings.",
}
from app.agents.new_chat.tools.gmail.search_emails import _build_credentials
if (
connector.connector_type
== SearchSourceConnectorType.COMPOSIO_GMAIL_CONNECTOR
):
cca_id = connector.config.get("composio_connected_account_id")
if not cca_id:
return {
"status": "error",
"message": "Composio connected account ID not found for this Gmail connector.",
}
from app.agents.new_chat.tools.gmail.search_emails import (
_format_gmail_summary,
)
from app.services.composio_service import ComposioService
detail, error = await ComposioService().get_gmail_message_detail(
connected_account_id=cca_id,
entity_id=f"surfsense_{user_id}",
message_id=message_id,
)
if error:
return {"status": "error", "message": error}
if not detail:
return {
"status": "not_found",
"message": f"Email with ID '{message_id}' not found.",
}
summary = _format_gmail_summary(detail)
content = (
f"# {summary['subject']}\n\n"
f"**From:** {summary['from']}\n"
f"**To:** {summary['to']}\n"
f"**Date:** {summary['date']}\n\n"
f"## Message Content\n\n"
f"{detail.get('messageText') or detail.get('snippet') or ''}\n\n"
f"## Message Details\n\n"
f"- **Message ID:** {summary['message_id']}\n"
f"- **Thread ID:** {summary['thread_id']}\n"
)
return {
"status": "success",
"message_id": summary["message_id"] or message_id,
"content": content,
}
from app.agents.new_chat.tools.gmail.search_emails import (
_build_credentials,
)
creds = _build_credentials(connector)

View file

@ -1,5 +1,4 @@
import logging
from datetime import datetime
from typing import Any
from langchain_core.tools import tool
@ -15,57 +14,6 @@ _GMAIL_TYPES = [
SearchSourceConnectorType.COMPOSIO_GMAIL_CONNECTOR,
]
_token_encryption_cache: object | None = None
def _get_token_encryption():
global _token_encryption_cache
if _token_encryption_cache is None:
from app.config import config
from app.utils.oauth_security import TokenEncryption
if not config.SECRET_KEY:
raise RuntimeError("SECRET_KEY not configured for token decryption.")
_token_encryption_cache = TokenEncryption(config.SECRET_KEY)
return _token_encryption_cache
def _build_credentials(connector: SearchSourceConnector):
"""Build Google OAuth Credentials from a connector's stored config.
Handles both native OAuth connectors (with encrypted tokens) and
Composio-backed connectors. Shared by Gmail and Calendar tools.
"""
from app.utils.google_credentials import COMPOSIO_GOOGLE_CONNECTOR_TYPES
if connector.connector_type in COMPOSIO_GOOGLE_CONNECTOR_TYPES:
from app.utils.google_credentials import build_composio_credentials
cca_id = connector.config.get("composio_connected_account_id")
if not cca_id:
raise ValueError("Composio connected account ID not found.")
return build_composio_credentials(cca_id)
from google.oauth2.credentials import Credentials
cfg = dict(connector.config)
if cfg.get("_token_encrypted"):
enc = _get_token_encryption()
for key in ("token", "refresh_token", "client_secret"):
if cfg.get(key):
cfg[key] = enc.decrypt_token(cfg[key])
exp = (cfg.get("expiry") or "").replace("Z", "")
return Credentials(
token=cfg.get("token"),
refresh_token=cfg.get("refresh_token"),
token_uri=cfg.get("token_uri"),
client_id=cfg.get("client_id"),
client_secret=cfg.get("client_secret"),
scopes=cfg.get("scopes", []),
expiry=datetime.fromisoformat(exp) if exp else None,
)
def create_search_gmail_tool(
db_session: AsyncSession | None = None,
@ -110,6 +58,50 @@ def create_search_gmail_tool(
"message": "No Gmail connector found. Please connect Gmail in your workspace settings.",
}
if (
connector.connector_type
== SearchSourceConnectorType.COMPOSIO_GMAIL_CONNECTOR
):
cca_id = connector.config.get("composio_connected_account_id")
if not cca_id:
return {
"status": "error",
"message": "Composio connected account ID not found for this Gmail connector.",
}
from app.agents.new_chat.tools.gmail.search_emails import (
_format_gmail_summary,
)
from app.services.composio_service import ComposioService
(
messages,
_next,
_estimate,
error,
) = await ComposioService().get_gmail_messages(
connected_account_id=cca_id,
entity_id=f"surfsense_{user_id}",
query=query,
max_results=max_results,
)
if error:
return {"status": "error", "message": error}
emails = [_format_gmail_summary(m) for m in messages]
if not emails:
return {
"status": "success",
"emails": [],
"total": 0,
"message": "No emails found.",
}
return {"status": "success", "emails": emails, "total": len(emails)}
from app.agents.new_chat.tools.gmail.search_emails import (
_build_credentials,
)
creds = _build_credentials(connector)
from app.connectors.google_gmail_connector import GoogleGmailConnector

View file

@ -162,16 +162,31 @@ def create_send_gmail_email_tool(
connector.connector_type
== SearchSourceConnectorType.COMPOSIO_GMAIL_CONNECTOR
):
from app.utils.google_credentials import build_composio_credentials
cca_id = connector.config.get("composio_connected_account_id")
if cca_id:
creds = build_composio_credentials(cca_id)
else:
if not cca_id:
return {
"status": "error",
"message": "Composio connected account ID not found for this Gmail connector.",
}
from app.services.composio_service import ComposioService
(
sent_message_id,
sent_thread_id,
error,
) = await ComposioService().send_gmail_email(
connected_account_id=cca_id,
entity_id=f"surfsense_{user_id}",
to=final_to,
subject=final_subject,
body=final_body,
cc=final_cc,
bcc=final_bcc,
)
if error:
return {"status": "error", "message": error}
sent = {"id": sent_message_id, "threadId": sent_thread_id}
else:
from google.oauth2.credentials import Credentials
@ -209,61 +224,61 @@ def create_send_gmail_email_tool(
expiry=datetime.fromisoformat(exp) if exp else None,
)
from googleapiclient.discovery import build
from googleapiclient.discovery import build
gmail_service = build("gmail", "v1", credentials=creds)
gmail_service = build("gmail", "v1", credentials=creds)
message = MIMEText(final_body)
message["to"] = final_to
message["subject"] = final_subject
if final_cc:
message["cc"] = final_cc
if final_bcc:
message["bcc"] = final_bcc
raw = base64.urlsafe_b64encode(message.as_bytes()).decode()
message = MIMEText(final_body)
message["to"] = final_to
message["subject"] = final_subject
if final_cc:
message["cc"] = final_cc
if final_bcc:
message["bcc"] = final_bcc
raw = base64.urlsafe_b64encode(message.as_bytes()).decode()
try:
sent = await asyncio.get_event_loop().run_in_executor(
None,
lambda: (
gmail_service.users()
.messages()
.send(userId="me", body={"raw": raw})
.execute()
),
)
except Exception as api_err:
from googleapiclient.errors import HttpError
if isinstance(api_err, HttpError) and api_err.resp.status == 403:
logger.warning(
f"Insufficient permissions for connector {actual_connector_id}: {api_err}"
try:
sent = await asyncio.get_event_loop().run_in_executor(
None,
lambda: (
gmail_service.users()
.messages()
.send(userId="me", body={"raw": raw})
.execute()
),
)
try:
from sqlalchemy.orm.attributes import flag_modified
except Exception as api_err:
from googleapiclient.errors import HttpError
_res = await db_session.execute(
select(SearchSourceConnector).where(
SearchSourceConnector.id == actual_connector_id
)
)
_conn = _res.scalar_one_or_none()
if _conn and not _conn.config.get("auth_expired"):
_conn.config = {**_conn.config, "auth_expired": True}
flag_modified(_conn, "config")
await db_session.commit()
except Exception:
if isinstance(api_err, HttpError) and api_err.resp.status == 403:
logger.warning(
"Failed to persist auth_expired for connector %s",
actual_connector_id,
exc_info=True,
f"Insufficient permissions for connector {actual_connector_id}: {api_err}"
)
return {
"status": "insufficient_permissions",
"connector_id": actual_connector_id,
"message": "This Gmail account needs additional permissions. Please re-authenticate in connector settings.",
}
raise
try:
from sqlalchemy.orm.attributes import flag_modified
_res = await db_session.execute(
select(SearchSourceConnector).where(
SearchSourceConnector.id == actual_connector_id
)
)
_conn = _res.scalar_one_or_none()
if _conn and not _conn.config.get("auth_expired"):
_conn.config = {**_conn.config, "auth_expired": True}
flag_modified(_conn, "config")
await db_session.commit()
except Exception:
logger.warning(
"Failed to persist auth_expired for connector %s",
actual_connector_id,
exc_info=True,
)
return {
"status": "insufficient_permissions",
"connector_id": actual_connector_id,
"message": "This Gmail account needs additional permissions. Please re-authenticate in connector settings.",
}
raise
logger.info(
f"Gmail email sent: id={sent.get('id')}, threadId={sent.get('threadId')}"

View file

@ -162,16 +162,22 @@ def create_trash_gmail_email_tool(
connector.connector_type
== SearchSourceConnectorType.COMPOSIO_GMAIL_CONNECTOR
):
from app.utils.google_credentials import build_composio_credentials
cca_id = connector.config.get("composio_connected_account_id")
if cca_id:
creds = build_composio_credentials(cca_id)
else:
if not cca_id:
return {
"status": "error",
"message": "Composio connected account ID not found for this Gmail connector.",
}
from app.services.composio_service import ComposioService
error = await ComposioService().trash_gmail_message(
connected_account_id=cca_id,
entity_id=f"surfsense_{user_id}",
message_id=final_message_id,
)
if error:
return {"status": "error", "message": error}
else:
from google.oauth2.credentials import Credentials
@ -209,49 +215,49 @@ def create_trash_gmail_email_tool(
expiry=datetime.fromisoformat(exp) if exp else None,
)
from googleapiclient.discovery import build
from googleapiclient.discovery import build
gmail_service = build("gmail", "v1", credentials=creds)
gmail_service = build("gmail", "v1", credentials=creds)
try:
await asyncio.get_event_loop().run_in_executor(
None,
lambda: (
gmail_service.users()
.messages()
.trash(userId="me", id=final_message_id)
.execute()
),
)
except Exception as api_err:
from googleapiclient.errors import HttpError
if isinstance(api_err, HttpError) and api_err.resp.status == 403:
logger.warning(
f"Insufficient permissions for connector {connector.id}: {api_err}"
try:
await asyncio.get_event_loop().run_in_executor(
None,
lambda: (
gmail_service.users()
.messages()
.trash(userId="me", id=final_message_id)
.execute()
),
)
try:
from sqlalchemy.orm.attributes import flag_modified
except Exception as api_err:
from googleapiclient.errors import HttpError
if not connector.config.get("auth_expired"):
connector.config = {
**connector.config,
"auth_expired": True,
}
flag_modified(connector, "config")
await db_session.commit()
except Exception:
if isinstance(api_err, HttpError) and api_err.resp.status == 403:
logger.warning(
"Failed to persist auth_expired for connector %s",
connector.id,
exc_info=True,
f"Insufficient permissions for connector {connector.id}: {api_err}"
)
return {
"status": "insufficient_permissions",
"connector_id": connector.id,
"message": "This Gmail account needs additional permissions. Please re-authenticate in connector settings.",
}
raise
try:
from sqlalchemy.orm.attributes import flag_modified
if not connector.config.get("auth_expired"):
connector.config = {
**connector.config,
"auth_expired": True,
}
flag_modified(connector, "config")
await db_session.commit()
except Exception:
logger.warning(
"Failed to persist auth_expired for connector %s",
connector.id,
exc_info=True,
)
return {
"status": "insufficient_permissions",
"connector_id": connector.id,
"message": "This Gmail account needs additional permissions. Please re-authenticate in connector settings.",
}
raise
logger.info(f"Gmail email trashed: message_id={final_message_id}")

View file

@ -192,16 +192,51 @@ def create_update_gmail_draft_tool(
connector.connector_type
== SearchSourceConnectorType.COMPOSIO_GMAIL_CONNECTOR
):
from app.utils.google_credentials import build_composio_credentials
cca_id = connector.config.get("composio_connected_account_id")
if cca_id:
creds = build_composio_credentials(cca_id)
else:
if not cca_id:
return {
"status": "error",
"message": "Composio connected account ID not found for this Gmail connector.",
}
if not final_draft_id:
return {
"status": "error",
"message": (
"Could not find this draft in Gmail. "
"It may have already been sent or deleted."
),
}
from app.services.composio_service import ComposioService
(
new_draft_id,
new_message_id,
error,
) = await ComposioService().update_gmail_draft(
connected_account_id=cca_id,
entity_id=f"surfsense_{user_id}",
draft_id=final_draft_id,
to=final_to or None,
subject=final_subject,
body=final_body,
cc=final_cc,
bcc=final_bcc,
)
if error:
if "not found" in error.lower() or "no longer" in error.lower():
return {
"status": "error",
"message": "Draft no longer exists in Gmail. It may have been sent or deleted.",
}
return {"status": "error", "message": error}
updated = {
"id": new_draft_id or final_draft_id,
"message": {"id": new_message_id} if new_message_id else {},
}
logger.info(f"Gmail draft updated via Composio: id={updated.get('id')}")
else:
from google.oauth2.credentials import Credentials
@ -239,88 +274,90 @@ def create_update_gmail_draft_tool(
expiry=datetime.fromisoformat(exp) if exp else None,
)
from googleapiclient.discovery import build
from googleapiclient.discovery import build
gmail_service = build("gmail", "v1", credentials=creds)
gmail_service = build("gmail", "v1", credentials=creds)
# Resolve draft_id if not already available
if not final_draft_id:
logger.info(
f"draft_id not in metadata, looking up via drafts.list for message_id={message_id}"
)
final_draft_id = await _find_draft_id_by_message(
gmail_service, message_id
)
if not final_draft_id:
return {
"status": "error",
"message": (
"Could not find this draft in Gmail. "
"It may have already been sent or deleted."
),
}
message = MIMEText(final_body)
if final_to:
message["to"] = final_to
message["subject"] = final_subject
if final_cc:
message["cc"] = final_cc
if final_bcc:
message["bcc"] = final_bcc
raw = base64.urlsafe_b64encode(message.as_bytes()).decode()
try:
updated = await asyncio.get_event_loop().run_in_executor(
None,
lambda: (
gmail_service.users()
.drafts()
.update(
userId="me",
id=final_draft_id,
body={"message": {"raw": raw}},
)
.execute()
),
)
except Exception as api_err:
from googleapiclient.errors import HttpError
if isinstance(api_err, HttpError) and api_err.resp.status == 403:
logger.warning(
f"Insufficient permissions for connector {connector.id}: {api_err}"
# Resolve draft_id if not already available
if not final_draft_id:
logger.info(
f"draft_id not in metadata, looking up via drafts.list for message_id={message_id}"
)
final_draft_id = await _find_draft_id_by_message(
gmail_service, message_id
)
try:
from sqlalchemy.orm.attributes import flag_modified
if not connector.config.get("auth_expired"):
connector.config = {
**connector.config,
"auth_expired": True,
}
flag_modified(connector, "config")
await db_session.commit()
except Exception:
logger.warning(
"Failed to persist auth_expired for connector %s",
connector.id,
exc_info=True,
)
return {
"status": "insufficient_permissions",
"connector_id": connector.id,
"message": "This Gmail account needs additional permissions. Please re-authenticate in connector settings.",
}
if isinstance(api_err, HttpError) and api_err.resp.status == 404:
if not final_draft_id:
return {
"status": "error",
"message": "Draft no longer exists in Gmail. It may have been sent or deleted.",
"message": (
"Could not find this draft in Gmail. "
"It may have already been sent or deleted."
),
}
raise
logger.info(f"Gmail draft updated: id={updated.get('id')}")
message = MIMEText(final_body)
if final_to:
message["to"] = final_to
message["subject"] = final_subject
if final_cc:
message["cc"] = final_cc
if final_bcc:
message["bcc"] = final_bcc
raw = base64.urlsafe_b64encode(message.as_bytes()).decode()
try:
updated = await asyncio.get_event_loop().run_in_executor(
None,
lambda: (
gmail_service.users()
.drafts()
.update(
userId="me",
id=final_draft_id,
body={"message": {"raw": raw}},
)
.execute()
),
)
except Exception as api_err:
from googleapiclient.errors import HttpError
if isinstance(api_err, HttpError) and api_err.resp.status == 403:
logger.warning(
f"Insufficient permissions for connector {connector.id}: {api_err}"
)
try:
from sqlalchemy.orm.attributes import flag_modified
if not connector.config.get("auth_expired"):
connector.config = {
**connector.config,
"auth_expired": True,
}
flag_modified(connector, "config")
await db_session.commit()
except Exception:
logger.warning(
"Failed to persist auth_expired for connector %s",
connector.id,
exc_info=True,
)
return {
"status": "insufficient_permissions",
"connector_id": connector.id,
"message": "This Gmail account needs additional permissions. Please re-authenticate in connector settings.",
}
if isinstance(api_err, HttpError) and api_err.resp.status == 404:
return {
"status": "error",
"message": "Draft no longer exists in Gmail. It may have been sent or deleted.",
}
raise
logger.info(
f"Gmail draft updated via Google API: id={updated.get('id')}"
)
kb_message_suffix = ""
if document_id:

View file

@ -179,59 +179,96 @@ def create_create_google_drive_file_tool(
f"Creating Google Drive file: name='{final_name}', type='{final_file_type}', connector={actual_connector_id}"
)
pre_built_creds = None
async def _flag_auth_expired() -> None:
try:
from sqlalchemy.orm.attributes import flag_modified
_res = await db_session.execute(
select(SearchSourceConnector).where(
SearchSourceConnector.id == actual_connector_id
)
)
_conn = _res.scalar_one_or_none()
if _conn and not _conn.config.get("auth_expired"):
_conn.config = {**_conn.config, "auth_expired": True}
flag_modified(_conn, "config")
await db_session.commit()
except Exception:
logger.warning(
"Failed to persist auth_expired for connector %s",
actual_connector_id,
exc_info=True,
)
if (
connector.connector_type
== SearchSourceConnectorType.COMPOSIO_GOOGLE_DRIVE_CONNECTOR
):
from app.utils.google_credentials import build_composio_credentials
cca_id = connector.config.get("composio_connected_account_id")
if cca_id:
pre_built_creds = build_composio_credentials(cca_id)
if not cca_id:
return {
"status": "error",
"message": "Composio connected account ID not found for this Google Drive connector.",
}
client = GoogleDriveClient(
session=db_session,
connector_id=actual_connector_id,
credentials=pre_built_creds,
)
try:
created = await client.create_file(
from app.services.composio_service import ComposioService
created, error = await ComposioService().create_drive_file_from_text(
connected_account_id=cca_id,
entity_id=f"surfsense_{user_id}",
name=final_name,
mime_type=mime_type,
parent_folder_id=final_parent_folder_id,
content=final_content,
parent_id=final_parent_folder_id,
)
except HttpError as http_err:
if http_err.resp.status == 403:
logger.warning(
f"Insufficient permissions for connector {actual_connector_id}: {http_err}"
)
try:
from sqlalchemy.orm.attributes import flag_modified
_res = await db_session.execute(
select(SearchSourceConnector).where(
SearchSourceConnector.id == actual_connector_id
)
)
_conn = _res.scalar_one_or_none()
if _conn and not _conn.config.get("auth_expired"):
_conn.config = {**_conn.config, "auth_expired": True}
flag_modified(_conn, "config")
await db_session.commit()
except Exception:
if error or not created:
err_lower = (error or "").lower()
if (
"insufficient" in err_lower
or "permission" in err_lower
or "403" in err_lower
):
logger.warning(
"Failed to persist auth_expired for connector %s",
actual_connector_id,
exc_info=True,
f"Insufficient permissions for Composio Drive connector {actual_connector_id}: {error}"
)
await _flag_auth_expired()
return {
"status": "insufficient_permissions",
"connector_id": actual_connector_id,
"message": "This Google Drive account needs additional permissions. Please re-authenticate in connector settings.",
}
logger.error(
f"Composio Drive create_file failed for connector {actual_connector_id}: {error}"
)
return {
"status": "insufficient_permissions",
"connector_id": actual_connector_id,
"message": "This Google Drive account needs additional permissions. Please re-authenticate in connector settings.",
"status": "error",
"message": "Something went wrong while creating the file. Please try again.",
}
raise
else:
client = GoogleDriveClient(
session=db_session,
connector_id=actual_connector_id,
)
try:
created = await client.create_file(
name=final_name,
mime_type=mime_type,
parent_folder_id=final_parent_folder_id,
content=final_content,
)
except HttpError as http_err:
if http_err.resp.status == 403:
logger.warning(
f"Insufficient permissions for connector {actual_connector_id}: {http_err}"
)
await _flag_auth_expired()
return {
"status": "insufficient_permissions",
"connector_id": actual_connector_id,
"message": "This Google Drive account needs additional permissions. Please re-authenticate in connector settings.",
}
raise
logger.info(
f"Google Drive file created: id={created.get('id')}, name={created.get('name')}"

View file

@ -158,51 +158,84 @@ def create_delete_google_drive_file_tool(
f"Deleting Google Drive file: file_id='{final_file_id}', connector={final_connector_id}"
)
pre_built_creds = None
async def _flag_auth_expired() -> None:
try:
from sqlalchemy.orm.attributes import flag_modified
if not connector.config.get("auth_expired"):
connector.config = {
**connector.config,
"auth_expired": True,
}
flag_modified(connector, "config")
await db_session.commit()
except Exception:
logger.warning(
"Failed to persist auth_expired for connector %s",
connector.id,
exc_info=True,
)
if (
connector.connector_type
== SearchSourceConnectorType.COMPOSIO_GOOGLE_DRIVE_CONNECTOR
):
from app.utils.google_credentials import build_composio_credentials
cca_id = connector.config.get("composio_connected_account_id")
if cca_id:
pre_built_creds = build_composio_credentials(cca_id)
client = GoogleDriveClient(
session=db_session,
connector_id=connector.id,
credentials=pre_built_creds,
)
try:
await client.trash_file(file_id=final_file_id)
except HttpError as http_err:
if http_err.resp.status == 403:
logger.warning(
f"Insufficient permissions for connector {connector.id}: {http_err}"
)
try:
from sqlalchemy.orm.attributes import flag_modified
if not connector.config.get("auth_expired"):
connector.config = {
**connector.config,
"auth_expired": True,
}
flag_modified(connector, "config")
await db_session.commit()
except Exception:
logger.warning(
"Failed to persist auth_expired for connector %s",
connector.id,
exc_info=True,
)
if not cca_id:
return {
"status": "insufficient_permissions",
"connector_id": connector.id,
"message": "This Google Drive account needs additional permissions. Please re-authenticate in connector settings.",
"status": "error",
"message": "Composio connected account ID not found for this Google Drive connector.",
}
raise
from app.services.composio_service import ComposioService
error = await ComposioService().trash_drive_file(
connected_account_id=cca_id,
entity_id=f"surfsense_{user_id}",
file_id=final_file_id,
)
if error:
err_lower = error.lower()
if (
"insufficient" in err_lower
or "permission" in err_lower
or "403" in err_lower
):
logger.warning(
f"Insufficient permissions for Composio Drive connector {connector.id}: {error}"
)
await _flag_auth_expired()
return {
"status": "insufficient_permissions",
"connector_id": connector.id,
"message": "This Google Drive account needs additional permissions. Please re-authenticate in connector settings.",
}
logger.error(
f"Composio Drive trash_file failed for connector {connector.id}: {error}"
)
return {
"status": "error",
"message": "Something went wrong while trashing the file. Please try again.",
}
else:
client = GoogleDriveClient(
session=db_session,
connector_id=connector.id,
)
try:
await client.trash_file(file_id=final_file_id)
except HttpError as http_err:
if http_err.resp.status == 403:
logger.warning(
f"Insufficient permissions for connector {connector.id}: {http_err}"
)
await _flag_auth_expired()
return {
"status": "insufficient_permissions",
"connector_id": connector.id,
"message": "This Google Drive account needs additional permissions. Please re-authenticate in connector settings.",
}
raise
logger.info(
f"Google Drive file deleted (moved to trash): file_id={final_file_id}"