Enhance Composio OAuth flow management and improve tool handling

- Updated the activeFlows management to prevent concurrent OAuth flows for the same toolkit by using toolkitSlug as the key.
- Implemented cleanup logic for existing flows, ensuring proper resource management by aborting and closing servers as needed.
- Introduced a timeout mechanism for abandoned flows, enhancing reliability.
- Refactored the Composio tools repository to use an in-memory cache for improved performance and added methods for persisting changes to disk.
- Updated the detailed tools listing to use a consistent API call structure and improved input parameter handling.
- Made connectionData in the response optional for better flexibility in handling connected accounts.
This commit is contained in:
tusharmagar 2026-04-01 13:56:36 +05:30
parent 013f6bdf17
commit 8ea1500ab9
7 changed files with 126 additions and 67 deletions

View file

@ -12,11 +12,13 @@ import { triggerSync as triggerCalendarSync } from '@x/core/dist/knowledge/sync_
const REDIRECT_URI = 'http://localhost:8081/oauth/callback';
// Store active OAuth flows
// Store active OAuth flows (keyed by toolkitSlug to prevent concurrent flows for the same toolkit)
const activeFlows = new Map<string, {
toolkitSlug: string;
connectedAccountId: string;
authConfigId: string;
server: import('http').Server;
timeout: NodeJS.Timeout;
}>();
/**
@ -128,13 +130,14 @@ export async function initiateConnection(toolkitSlug: string): Promise<{
};
}
// Store flow state
const flowKey = `${toolkitSlug}-${Date.now()}`;
activeFlows.set(flowKey, {
toolkitSlug,
connectedAccountId,
authConfigId,
});
// Abort any existing flow for this toolkit before starting a new one
const existingFlow = activeFlows.get(toolkitSlug);
if (existingFlow) {
console.log(`[Composio] Aborting existing flow for ${toolkitSlug}`);
clearTimeout(existingFlow.timeout);
existingFlow.server.close();
activeFlows.delete(toolkitSlug);
}
// Save initial account state
const account: LocalConnectedAccount = {
@ -148,7 +151,7 @@ export async function initiateConnection(toolkitSlug: string): Promise<{
composioAccountsRepo.saveAccount(account);
// Set up callback server
let cleanupTimeout: NodeJS.Timeout;
const timeoutRef: { current: NodeJS.Timeout | null } = { current: null };
let callbackHandled = false;
const { server } = await createAuthServer(8081, async () => {
// Guard against duplicate callbacks (browser may send multiple requests)
@ -182,17 +185,17 @@ export async function initiateConnection(toolkitSlug: string): Promise<{
error: error instanceof Error ? error.message : 'Unknown error',
});
} finally {
activeFlows.delete(flowKey);
activeFlows.delete(toolkitSlug);
server.close();
clearTimeout(cleanupTimeout);
if (timeoutRef.current) clearTimeout(timeoutRef.current);
}
});
// Timeout for abandoned flows (5 minutes)
cleanupTimeout = setTimeout(() => {
if (activeFlows.has(flowKey)) {
const cleanupTimeout = setTimeout(() => {
if (activeFlows.has(toolkitSlug)) {
console.log(`[Composio] Cleaning up abandoned flow for ${toolkitSlug}`);
activeFlows.delete(flowKey);
activeFlows.delete(toolkitSlug);
server.close();
emitComposioEvent({
toolkitSlug,
@ -201,6 +204,16 @@ export async function initiateConnection(toolkitSlug: string): Promise<{
});
}
}, 5 * 60 * 1000);
timeoutRef.current = cleanupTimeout;
// Store flow state (keyed by toolkit to prevent concurrent flows)
activeFlows.set(toolkitSlug, {
toolkitSlug,
connectedAccountId,
authConfigId,
server,
timeout: cleanupTimeout,
});
// Open browser for OAuth
shell.openExternal(redirectUrl);

View file

@ -962,6 +962,15 @@ function ToolsLibrarySettings({ dialogOpen }: { dialogOpen: boolean }) {
}
}, [expandedToolkit])
// Clean up pending search timer on unmount
useEffect(() => {
return () => {
if (toolSearchTimerRef.current) {
clearTimeout(toolSearchTimerRef.current)
}
}
}, [])
// Toggle toolkit expansion
const handleToggleToolkit = (toolkitSlug: string) => {
if (expandedToolkit === toolkitSlug) {

View file

@ -299,12 +299,29 @@ This renders as an interactive card in the UI that the user can click to open th
Never output raw file paths in plain text when they could be wrapped in a filepath block unless the file does not exist yet.`;
/**
* Cached Composio instructions. Invalidated by calling invalidateCopilotInstructionsCache().
*/
let cachedInstructions: string | null = null;
/**
* Invalidate the cached instructions so the next buildCopilotInstructions() call
* regenerates the Composio tools section. Call this after enabling/disabling tools
* or connecting/disconnecting a toolkit.
*/
export function invalidateCopilotInstructionsCache(): void {
cachedInstructions = null;
}
/**
* Build full copilot instructions with dynamic Composio tools section.
* Called each time the agent is loaded to reflect currently enabled tools.
* Results are cached and reused until invalidated via invalidateCopilotInstructionsCache().
*/
export async function buildCopilotInstructions(): Promise<string> {
if (cachedInstructions !== null) return cachedInstructions;
const composioPrompt = await getComposioToolsPrompt();
if (!composioPrompt) return CopilotInstructions;
return CopilotInstructions + '\n' + composioPrompt;
cachedInstructions = composioPrompt
? CopilotInstructions + '\n' + composioPrompt
: CopilotInstructions;
return cachedInstructions;
}

View file

@ -15,6 +15,7 @@ import { WorkDir } from "../../config/config.js";
import { composioAccountsRepo } from "../../composio/repo.js";
import { composioEnabledToolsRepo } from "../../composio/enabled-tools-repo.js";
import { executeAction as executeComposioAction, isConfigured as isComposioConfigured } from "../../composio/client.js";
import { invalidateCopilotInstructionsCache } from "../assistant/instructions.js";
import type { ToolContext } from "./exec-tool.js";
import { generateText, jsonSchema } from "ai";
import { createProvider } from "../../models/models.js";
@ -1256,10 +1257,12 @@ function registerComposioTools(): void {
/**
* Refresh dynamic Composio tools by unregistering all and re-registering from the repo.
* Called after enabling/disabling tools or disconnecting a toolkit.
* Also invalidates the cached agent instructions so they reflect the new tool set.
*/
export function refreshComposioTools(): void {
unregisterComposioTools();
registerComposioTools();
invalidateCopilotInstructionsCache();
}
// Register on module load

View file

@ -301,49 +301,50 @@ export async function listToolkitTools(
}
/**
* List available tools for a toolkit with full details including input_parameters
* Schema for the detailed tools response (preserves input_parameters).
* Uses passthrough so extra API fields don't cause validation failures.
*/
const ZDetailedTool = z.object({
slug: z.string(),
name: z.string(),
description: z.string(),
input_parameters: z.object({
type: z.literal('object').optional().default('object'),
properties: z.record(z.string(), z.unknown()).optional().default({}),
required: z.array(z.string()).optional(),
}).optional().default({ type: 'object', properties: {} }),
}).passthrough();
/**
* List available tools for a toolkit with full details including input_parameters.
* Uses composioApiCall for consistent error handling, logging, and validation.
*/
export async function listToolkitToolsDetailed(
toolkitSlug: string,
searchQuery: string | null = null,
): Promise<{ items: Array<{ slug: string; name: string; description: string; toolkitSlug: string; inputParameters: { type: 'object'; properties: Record<string, unknown>; required?: string[] } }> }> {
const authHeaders = await getAuthHeaders();
const baseURL = await getBaseUrl();
const url = new URL(`${baseURL}/tools`);
url.searchParams.set('toolkit_slug', toolkitSlug);
url.searchParams.set('limit', '200');
const params: Record<string, string> = {
toolkit_slug: toolkitSlug,
limit: '200',
};
if (searchQuery) {
url.searchParams.set('query', searchQuery);
params.search = searchQuery;
}
console.log(`[Composio] Listing tools (detailed) for toolkit: ${toolkitSlug}`);
const response = await fetch(url.toString(), {
headers: authHeaders,
});
if (!response.ok) {
throw new Error(`Failed to list tools: ${response.status} ${response.statusText}`);
}
const data = await response.json() as { items?: Array<Record<string, unknown>> };
const result = await composioApiCall(ZListResponse(ZDetailedTool), "/tools", params);
return {
items: (data.items || []).map((item) => {
const inputParams = item.input_parameters as Record<string, unknown> | undefined;
return {
slug: String(item.slug ?? ''),
name: String(item.name ?? ''),
description: String(item.description ?? ''),
toolkitSlug,
inputParameters: {
type: 'object' as const,
properties: (inputParams?.properties as Record<string, unknown>) ?? {},
required: Array.isArray(inputParams?.required) ? inputParams.required as string[] : undefined,
},
};
}),
items: result.items.map((item) => ({
slug: item.slug,
name: item.name,
description: item.description,
toolkitSlug,
inputParameters: {
type: 'object' as const,
properties: item.input_parameters?.properties ?? {},
required: item.input_parameters?.required,
},
})),
};
}

View file

@ -52,7 +52,7 @@ function ensureStorageDir(): void {
}
}
function loadStorage(): EnabledToolsStorage {
function loadStorageFromDisk(): EnabledToolsStorage {
try {
if (fs.existsSync(ENABLED_TOOLS_FILE)) {
const data = fs.readFileSync(ENABLED_TOOLS_FILE, 'utf-8');
@ -64,64 +64,80 @@ function loadStorage(): EnabledToolsStorage {
return { tools: {} };
}
function saveStorage(storage: EnabledToolsStorage): void {
function saveStorageToDisk(storage: EnabledToolsStorage): void {
ensureStorageDir();
fs.writeFileSync(ENABLED_TOOLS_FILE, JSON.stringify(storage, null, 2));
}
/**
* Repository for managing enabled Composio tools
* Repository for managing enabled Composio tools.
* Uses an in-memory cache loaded once from disk. Mutations write through to disk.
*/
export class ComposioEnabledToolsRepo implements IComposioEnabledToolsRepo {
private cache: EnabledToolsStorage | null = null;
private getStorage(): EnabledToolsStorage {
if (!this.cache) {
this.cache = loadStorageFromDisk();
}
return this.cache;
}
private persist(): void {
if (this.cache) {
saveStorageToDisk(this.cache);
}
}
getAll(): Record<string, EnabledTool> {
return loadStorage().tools;
return this.getStorage().tools;
}
getByToolkit(toolkitSlug: string): EnabledTool[] {
const storage = loadStorage();
const storage = this.getStorage();
return Object.values(storage.tools).filter(t => t.toolkitSlug === toolkitSlug);
}
enable(tool: EnabledTool): void {
const storage = loadStorage();
const storage = this.getStorage();
storage.tools[tool.slug] = tool;
saveStorage(storage);
this.persist();
}
enableBatch(tools: EnabledTool[]): void {
const storage = loadStorage();
const storage = this.getStorage();
for (const tool of tools) {
storage.tools[tool.slug] = tool;
}
saveStorage(storage);
this.persist();
}
disable(toolSlug: string): void {
const storage = loadStorage();
const storage = this.getStorage();
delete storage.tools[toolSlug];
saveStorage(storage);
this.persist();
}
disableBatch(toolSlugs: string[]): void {
const storage = loadStorage();
const storage = this.getStorage();
for (const slug of toolSlugs) {
delete storage.tools[slug];
}
saveStorage(storage);
this.persist();
}
disableAllForToolkit(toolkitSlug: string): void {
const storage = loadStorage();
const storage = this.getStorage();
for (const [slug, tool] of Object.entries(storage.tools)) {
if (tool.toolkitSlug === toolkitSlug) {
delete storage.tools[slug];
}
}
saveStorage(storage);
this.persist();
}
isEnabled(toolSlug: string): boolean {
const storage = loadStorage();
const storage = this.getStorage();
return toolSlug in storage.tools;
}
}

View file

@ -149,7 +149,7 @@ export const ZCreateConnectedAccountRequest = z.object({
*/
export const ZCreateConnectedAccountResponse = z.object({
id: z.string(),
connectionData: ZConnectionData,
connectionData: ZConnectionData.optional(),
});
/**