mirror of
https://github.com/MODSetter/SurfSense.git
synced 2026-05-25 19:15:18 +02:00
feat: enhance Composio connector handling and status management
- Updated the .env.example file to include a note about disabling "Mask Connected Account Secrets" for Google indexing. - Implemented a wait_for_connection method in ComposioService to ensure connected accounts reach ACTIVE status after authentication. - Added logic in composio_callback and composio_reauth_callback to wait for Composio to finish processing tokens, improving reliability in token handling. - Enhanced logging to provide clearer warnings when connection timeouts occur, ensuring better visibility into potential issues.
This commit is contained in:
parent
2390bd7d26
commit
83d9c49a50
3 changed files with 68 additions and 6 deletions
|
|
@ -104,7 +104,8 @@ TEAMS_CLIENT_ID=your_teams_client_id_here
|
|||
TEAMS_CLIENT_SECRET=your_teams_client_secret_here
|
||||
TEAMS_REDIRECT_URI=http://localhost:8000/api/v1/auth/teams/connector/callback
|
||||
|
||||
#Composio Coonnector
|
||||
# Composio Connector
|
||||
# NOTE: Disable "Mask Connected Account Secrets" in Composio dashboard (Settings → Project Settings) for Google indexing to work.
|
||||
COMPOSIO_API_KEY=your_api_key_here
|
||||
COMPOSIO_ENABLED=TRUE
|
||||
COMPOSIO_REDIRECT_URI=http://localhost:8000/api/v1/auth/composio/connector/callback
|
||||
|
|
|
|||
|
|
@ -263,6 +263,15 @@ async def composio_callback(
|
|||
logger.info(
|
||||
f"Successfully got connected_account_id: {final_connected_account_id}"
|
||||
)
|
||||
# Wait for Composio to finish exchanging the auth code for tokens.
|
||||
try:
|
||||
service.wait_for_connection(final_connected_account_id, timeout=30.0)
|
||||
except Exception:
|
||||
logger.warning(
|
||||
f"wait_for_connection timed out for {final_connected_account_id}, "
|
||||
"proceeding anyway",
|
||||
exc_info=True,
|
||||
)
|
||||
|
||||
# Build entity_id for Composio API calls (same format as used in initiate)
|
||||
entity_id = f"surfsense_{user_id}"
|
||||
|
|
@ -578,12 +587,25 @@ async def composio_reauth_callback(
|
|||
detail="Connector not found or access denied during re-auth callback",
|
||||
)
|
||||
|
||||
# Wait for Composio to finish processing new tokens before proceeding.
|
||||
# Without this, get_access_token() may return stale credentials.
|
||||
connected_account_id = connector.config.get("composio_connected_account_id")
|
||||
if connected_account_id:
|
||||
try:
|
||||
service = ComposioService()
|
||||
service.wait_for_connection(connected_account_id, timeout=30.0)
|
||||
except Exception:
|
||||
logger.warning(
|
||||
f"wait_for_connection timed out for connector {reauth_connector_id}, "
|
||||
"proceeding anyway — tokens may not be ready yet",
|
||||
exc_info=True,
|
||||
)
|
||||
|
||||
# Clear auth_expired flag
|
||||
if connector.config.get("auth_expired"):
|
||||
connector.config = {**connector.config, "auth_expired": False}
|
||||
flag_modified(connector, "config")
|
||||
await session.commit()
|
||||
await session.refresh(connector)
|
||||
connector.config = {**connector.config, "auth_expired": False}
|
||||
flag_modified(connector, "config")
|
||||
await session.commit()
|
||||
await session.refresh(connector)
|
||||
|
||||
logger.info(f"Composio re-auth completed for connector {reauth_connector_id}")
|
||||
|
||||
|
|
|
|||
|
|
@ -260,6 +260,39 @@ class ComposioService:
|
|||
"redirect_url": result.redirect_url,
|
||||
}
|
||||
|
||||
def wait_for_connection(
|
||||
self,
|
||||
connected_account_id: str,
|
||||
timeout: float = 30.0,
|
||||
) -> str:
|
||||
"""
|
||||
Poll Composio until the connected account reaches ACTIVE status.
|
||||
|
||||
Must be called after refresh() / initiate() to ensure Composio has
|
||||
finished exchanging the authorization code for valid tokens.
|
||||
|
||||
Returns:
|
||||
The final account status string (should be "ACTIVE").
|
||||
|
||||
Raises:
|
||||
TimeoutError: If the account does not become ACTIVE within *timeout*.
|
||||
"""
|
||||
try:
|
||||
account = self.client.connected_accounts.wait_for_connection(
|
||||
id=connected_account_id,
|
||||
timeout=timeout,
|
||||
)
|
||||
status = getattr(account, "status", "UNKNOWN")
|
||||
logger.info(
|
||||
f"Composio account {connected_account_id} is now {status}"
|
||||
)
|
||||
return status
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"Timeout/error waiting for Composio account {connected_account_id}: {e!s}"
|
||||
)
|
||||
raise
|
||||
|
||||
def get_access_token(self, connected_account_id: str) -> str:
|
||||
"""Retrieve the raw OAuth access token for a Composio connected account."""
|
||||
account = self.client.connected_accounts.get(nanoid=connected_account_id)
|
||||
|
|
@ -271,6 +304,12 @@ class ComposioService:
|
|||
access_token = getattr(token, "access_token", None)
|
||||
if not access_token:
|
||||
raise ValueError(f"No access_token in state.val for {connected_account_id}")
|
||||
if len(access_token) < 20:
|
||||
raise ValueError(
|
||||
f"Composio returned a masked access_token ({len(access_token)} chars) "
|
||||
f"for account {connected_account_id}. Disable 'Mask Connected Account "
|
||||
f"Secrets' in Composio dashboard: Settings → Project Settings."
|
||||
)
|
||||
return access_token
|
||||
|
||||
async def execute_tool(
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue