diff --git a/apps/x/apps/main/src/composio-handler.ts b/apps/x/apps/main/src/composio-handler.ts index dad0acbe..aac7c453 100644 --- a/apps/x/apps/main/src/composio-handler.ts +++ b/apps/x/apps/main/src/composio-handler.ts @@ -12,11 +12,13 @@ import { triggerSync as triggerCalendarSync } from '@x/core/dist/knowledge/sync_ const REDIRECT_URI = 'http://localhost:8081/oauth/callback'; -// Store active OAuth flows +// Store active OAuth flows (keyed by toolkitSlug to prevent concurrent flows for the same toolkit) const activeFlows = new Map(); /** @@ -128,13 +130,14 @@ export async function initiateConnection(toolkitSlug: string): Promise<{ }; } - // Store flow state - const flowKey = `${toolkitSlug}-${Date.now()}`; - activeFlows.set(flowKey, { - toolkitSlug, - connectedAccountId, - authConfigId, - }); + // Abort any existing flow for this toolkit before starting a new one + const existingFlow = activeFlows.get(toolkitSlug); + if (existingFlow) { + console.log(`[Composio] Aborting existing flow for ${toolkitSlug}`); + clearTimeout(existingFlow.timeout); + existingFlow.server.close(); + activeFlows.delete(toolkitSlug); + } // Save initial account state const account: LocalConnectedAccount = { @@ -148,7 +151,7 @@ export async function initiateConnection(toolkitSlug: string): Promise<{ composioAccountsRepo.saveAccount(account); // Set up callback server - let cleanupTimeout: NodeJS.Timeout; + const timeoutRef: { current: NodeJS.Timeout | null } = { current: null }; let callbackHandled = false; const { server } = await createAuthServer(8081, async () => { // Guard against duplicate callbacks (browser may send multiple requests) @@ -182,17 +185,17 @@ export async function initiateConnection(toolkitSlug: string): Promise<{ error: error instanceof Error ? error.message : 'Unknown error', }); } finally { - activeFlows.delete(flowKey); + activeFlows.delete(toolkitSlug); server.close(); - clearTimeout(cleanupTimeout); + if (timeoutRef.current) clearTimeout(timeoutRef.current); } }); // Timeout for abandoned flows (5 minutes) - cleanupTimeout = setTimeout(() => { - if (activeFlows.has(flowKey)) { + const cleanupTimeout = setTimeout(() => { + if (activeFlows.has(toolkitSlug)) { console.log(`[Composio] Cleaning up abandoned flow for ${toolkitSlug}`); - activeFlows.delete(flowKey); + activeFlows.delete(toolkitSlug); server.close(); emitComposioEvent({ toolkitSlug, @@ -201,6 +204,16 @@ export async function initiateConnection(toolkitSlug: string): Promise<{ }); } }, 5 * 60 * 1000); + timeoutRef.current = cleanupTimeout; + + // Store flow state (keyed by toolkit to prevent concurrent flows) + activeFlows.set(toolkitSlug, { + toolkitSlug, + connectedAccountId, + authConfigId, + server, + timeout: cleanupTimeout, + }); // Open browser for OAuth shell.openExternal(redirectUrl); diff --git a/apps/x/apps/renderer/src/components/settings-dialog.tsx b/apps/x/apps/renderer/src/components/settings-dialog.tsx index 72df13a2..255e6d48 100644 --- a/apps/x/apps/renderer/src/components/settings-dialog.tsx +++ b/apps/x/apps/renderer/src/components/settings-dialog.tsx @@ -962,6 +962,15 @@ function ToolsLibrarySettings({ dialogOpen }: { dialogOpen: boolean }) { } }, [expandedToolkit]) + // Clean up pending search timer on unmount + useEffect(() => { + return () => { + if (toolSearchTimerRef.current) { + clearTimeout(toolSearchTimerRef.current) + } + } + }, []) + // Toggle toolkit expansion const handleToggleToolkit = (toolkitSlug: string) => { if (expandedToolkit === toolkitSlug) { diff --git a/apps/x/packages/core/src/application/assistant/instructions.ts b/apps/x/packages/core/src/application/assistant/instructions.ts index 6db0deab..6bd29963 100644 --- a/apps/x/packages/core/src/application/assistant/instructions.ts +++ b/apps/x/packages/core/src/application/assistant/instructions.ts @@ -299,12 +299,29 @@ This renders as an interactive card in the UI that the user can click to open th Never output raw file paths in plain text when they could be wrapped in a filepath block — unless the file does not exist yet.`; +/** + * Cached Composio instructions. Invalidated by calling invalidateCopilotInstructionsCache(). + */ +let cachedInstructions: string | null = null; + +/** + * Invalidate the cached instructions so the next buildCopilotInstructions() call + * regenerates the Composio tools section. Call this after enabling/disabling tools + * or connecting/disconnecting a toolkit. + */ +export function invalidateCopilotInstructionsCache(): void { + cachedInstructions = null; +} + /** * Build full copilot instructions with dynamic Composio tools section. - * Called each time the agent is loaded to reflect currently enabled tools. + * Results are cached and reused until invalidated via invalidateCopilotInstructionsCache(). */ export async function buildCopilotInstructions(): Promise { + if (cachedInstructions !== null) return cachedInstructions; const composioPrompt = await getComposioToolsPrompt(); - if (!composioPrompt) return CopilotInstructions; - return CopilotInstructions + '\n' + composioPrompt; + cachedInstructions = composioPrompt + ? CopilotInstructions + '\n' + composioPrompt + : CopilotInstructions; + return cachedInstructions; } diff --git a/apps/x/packages/core/src/application/lib/builtin-tools.ts b/apps/x/packages/core/src/application/lib/builtin-tools.ts index aac3f3f2..ee6872a6 100644 --- a/apps/x/packages/core/src/application/lib/builtin-tools.ts +++ b/apps/x/packages/core/src/application/lib/builtin-tools.ts @@ -15,6 +15,7 @@ import { WorkDir } from "../../config/config.js"; import { composioAccountsRepo } from "../../composio/repo.js"; import { composioEnabledToolsRepo } from "../../composio/enabled-tools-repo.js"; import { executeAction as executeComposioAction, isConfigured as isComposioConfigured } from "../../composio/client.js"; +import { invalidateCopilotInstructionsCache } from "../assistant/instructions.js"; import type { ToolContext } from "./exec-tool.js"; import { generateText, jsonSchema } from "ai"; import { createProvider } from "../../models/models.js"; @@ -1256,10 +1257,12 @@ function registerComposioTools(): void { /** * Refresh dynamic Composio tools by unregistering all and re-registering from the repo. * Called after enabling/disabling tools or disconnecting a toolkit. + * Also invalidates the cached agent instructions so they reflect the new tool set. */ export function refreshComposioTools(): void { unregisterComposioTools(); registerComposioTools(); + invalidateCopilotInstructionsCache(); } // Register on module load diff --git a/apps/x/packages/core/src/composio/client.ts b/apps/x/packages/core/src/composio/client.ts index eb367e7c..1efa0489 100644 --- a/apps/x/packages/core/src/composio/client.ts +++ b/apps/x/packages/core/src/composio/client.ts @@ -301,49 +301,50 @@ export async function listToolkitTools( } /** - * List available tools for a toolkit with full details including input_parameters + * Schema for the detailed tools response (preserves input_parameters). + * Uses passthrough so extra API fields don't cause validation failures. + */ +const ZDetailedTool = z.object({ + slug: z.string(), + name: z.string(), + description: z.string(), + input_parameters: z.object({ + type: z.literal('object').optional().default('object'), + properties: z.record(z.string(), z.unknown()).optional().default({}), + required: z.array(z.string()).optional(), + }).optional().default({ type: 'object', properties: {} }), +}).passthrough(); + +/** + * List available tools for a toolkit with full details including input_parameters. + * Uses composioApiCall for consistent error handling, logging, and validation. */ export async function listToolkitToolsDetailed( toolkitSlug: string, searchQuery: string | null = null, ): Promise<{ items: Array<{ slug: string; name: string; description: string; toolkitSlug: string; inputParameters: { type: 'object'; properties: Record; required?: string[] } }> }> { - const authHeaders = await getAuthHeaders(); - const baseURL = await getBaseUrl(); - - const url = new URL(`${baseURL}/tools`); - url.searchParams.set('toolkit_slug', toolkitSlug); - url.searchParams.set('limit', '200'); + const params: Record = { + toolkit_slug: toolkitSlug, + limit: '200', + }; if (searchQuery) { - url.searchParams.set('query', searchQuery); + params.search = searchQuery; } - console.log(`[Composio] Listing tools (detailed) for toolkit: ${toolkitSlug}`); - - const response = await fetch(url.toString(), { - headers: authHeaders, - }); - - if (!response.ok) { - throw new Error(`Failed to list tools: ${response.status} ${response.statusText}`); - } - - const data = await response.json() as { items?: Array> }; + const result = await composioApiCall(ZListResponse(ZDetailedTool), "/tools", params); return { - items: (data.items || []).map((item) => { - const inputParams = item.input_parameters as Record | undefined; - return { - slug: String(item.slug ?? ''), - name: String(item.name ?? ''), - description: String(item.description ?? ''), - toolkitSlug, - inputParameters: { - type: 'object' as const, - properties: (inputParams?.properties as Record) ?? {}, - required: Array.isArray(inputParams?.required) ? inputParams.required as string[] : undefined, - }, - }; - }), + items: result.items.map((item) => ({ + slug: item.slug, + name: item.name, + description: item.description, + toolkitSlug, + inputParameters: { + type: 'object' as const, + properties: item.input_parameters?.properties ?? {}, + required: item.input_parameters?.required, + }, + })), }; } diff --git a/apps/x/packages/core/src/composio/enabled-tools-repo.ts b/apps/x/packages/core/src/composio/enabled-tools-repo.ts index 4f18b391..7a2d08e2 100644 --- a/apps/x/packages/core/src/composio/enabled-tools-repo.ts +++ b/apps/x/packages/core/src/composio/enabled-tools-repo.ts @@ -52,7 +52,7 @@ function ensureStorageDir(): void { } } -function loadStorage(): EnabledToolsStorage { +function loadStorageFromDisk(): EnabledToolsStorage { try { if (fs.existsSync(ENABLED_TOOLS_FILE)) { const data = fs.readFileSync(ENABLED_TOOLS_FILE, 'utf-8'); @@ -64,64 +64,80 @@ function loadStorage(): EnabledToolsStorage { return { tools: {} }; } -function saveStorage(storage: EnabledToolsStorage): void { +function saveStorageToDisk(storage: EnabledToolsStorage): void { ensureStorageDir(); fs.writeFileSync(ENABLED_TOOLS_FILE, JSON.stringify(storage, null, 2)); } /** - * Repository for managing enabled Composio tools + * Repository for managing enabled Composio tools. + * Uses an in-memory cache loaded once from disk. Mutations write through to disk. */ export class ComposioEnabledToolsRepo implements IComposioEnabledToolsRepo { + private cache: EnabledToolsStorage | null = null; + + private getStorage(): EnabledToolsStorage { + if (!this.cache) { + this.cache = loadStorageFromDisk(); + } + return this.cache; + } + + private persist(): void { + if (this.cache) { + saveStorageToDisk(this.cache); + } + } + getAll(): Record { - return loadStorage().tools; + return this.getStorage().tools; } getByToolkit(toolkitSlug: string): EnabledTool[] { - const storage = loadStorage(); + const storage = this.getStorage(); return Object.values(storage.tools).filter(t => t.toolkitSlug === toolkitSlug); } enable(tool: EnabledTool): void { - const storage = loadStorage(); + const storage = this.getStorage(); storage.tools[tool.slug] = tool; - saveStorage(storage); + this.persist(); } enableBatch(tools: EnabledTool[]): void { - const storage = loadStorage(); + const storage = this.getStorage(); for (const tool of tools) { storage.tools[tool.slug] = tool; } - saveStorage(storage); + this.persist(); } disable(toolSlug: string): void { - const storage = loadStorage(); + const storage = this.getStorage(); delete storage.tools[toolSlug]; - saveStorage(storage); + this.persist(); } disableBatch(toolSlugs: string[]): void { - const storage = loadStorage(); + const storage = this.getStorage(); for (const slug of toolSlugs) { delete storage.tools[slug]; } - saveStorage(storage); + this.persist(); } disableAllForToolkit(toolkitSlug: string): void { - const storage = loadStorage(); + const storage = this.getStorage(); for (const [slug, tool] of Object.entries(storage.tools)) { if (tool.toolkitSlug === toolkitSlug) { delete storage.tools[slug]; } } - saveStorage(storage); + this.persist(); } isEnabled(toolSlug: string): boolean { - const storage = loadStorage(); + const storage = this.getStorage(); return toolSlug in storage.tools; } } diff --git a/apps/x/packages/core/src/composio/types.ts b/apps/x/packages/core/src/composio/types.ts index 4e150a6c..8bd2418a 100644 --- a/apps/x/packages/core/src/composio/types.ts +++ b/apps/x/packages/core/src/composio/types.ts @@ -149,7 +149,7 @@ export const ZCreateConnectedAccountRequest = z.object({ */ export const ZCreateConnectedAccountResponse = z.object({ id: z.string(), - connectionData: ZConnectionData, + connectionData: ZConnectionData.optional(), }); /**