diff --git a/apps/x/apps/main/src/composio-handler.ts b/apps/x/apps/main/src/composio-handler.ts index 731492a0..b0537cd8 100644 --- a/apps/x/apps/main/src/composio-handler.ts +++ b/apps/x/apps/main/src/composio-handler.ts @@ -4,6 +4,7 @@ import * as composioClient from '@x/core/dist/composio/client.js'; import { composioAccountsRepo } from '@x/core/dist/composio/repo.js'; import type { LocalConnectedAccount, ZExecuteActionResponse } from '@x/core/dist/composio/types.js'; import { z } from 'zod'; +import { triggerSync as triggerGmailSync } from '@x/core/dist/knowledge/sync_gmail.js'; const REDIRECT_URI = 'http://localhost:8081/oauth/callback'; @@ -152,6 +153,9 @@ export async function initiateConnection(toolkitSlug: string): Promise<{ if (accountStatus.status === 'ACTIVE') { emitComposioEvent({ toolkitSlug, success: true }); + if (toolkitSlug === 'gmail') { + triggerGmailSync(); + } } else { emitComposioEvent({ toolkitSlug, @@ -266,6 +270,13 @@ export function listConnected(): { toolkits: string[] } { return { toolkits: composioAccountsRepo.getConnectedToolkits() }; } +/** + * Check if Composio should be used for Google services (Gmail, etc.) + */ +export function useComposioForGoogle(): { enabled: boolean } { + return { enabled: composioClient.useComposioForGoogle() }; +} + /** * Execute a Composio action */ diff --git a/apps/x/apps/main/src/ipc.ts b/apps/x/apps/main/src/ipc.ts index 84e2b3e4..35856e08 100644 --- a/apps/x/apps/main/src/ipc.ts +++ b/apps/x/apps/main/src/ipc.ts @@ -543,6 +543,9 @@ export function setupIpcHandlers() { 'composio:execute-action': async (_event, args) => { return composioHandler.executeAction(args.actionSlug, args.toolkitSlug, args.input); }, + 'composio:use-composio-for-google': async () => { + return composioHandler.useComposioForGoogle(); + }, // Agent schedule handlers 'agent-schedule:getConfig': async () => { const repo = container.resolve('agentScheduleRepo'); diff --git a/apps/x/apps/renderer/src/components/connectors-popover.tsx b/apps/x/apps/renderer/src/components/connectors-popover.tsx index fe1d58ae..268fcfe7 100644 --- a/apps/x/apps/renderer/src/components/connectors-popover.tsx +++ b/apps/x/apps/renderer/src/components/connectors-popover.tsx @@ -20,6 +20,7 @@ import { Separator } from "@/components/ui/separator" import { GoogleClientIdModal } from "@/components/google-client-id-modal" import { getGoogleClientId, setGoogleClientId, clearGoogleClientId } from "@/lib/google-client-id-store" import { toast } from "sonner" +import { ComposioApiKeyModal } from "@/components/composio-api-key-modal" interface ProviderState { isConnected: boolean @@ -54,6 +55,10 @@ export function ConnectorsPopover({ children, tooltip, open: openProp, onOpenCha const [granolaEnabled, setGranolaEnabled] = useState(false) const [granolaLoading, setGranolaLoading] = useState(true) + // Composio API key state + const [composioApiKeyOpen, setComposioApiKeyOpen] = useState(false) + const [composioApiKeyTarget, setComposioApiKeyTarget] = useState<'slack' | 'gmail'>('gmail') + // Slack state (agent-slack CLI) const [slackEnabled, setSlackEnabled] = useState(false) const [slackLoading, setSlackLoading] = useState(true) @@ -64,7 +69,13 @@ export function ConnectorsPopover({ children, tooltip, open: openProp, onOpenCha const [slackDiscovering, setSlackDiscovering] = useState(false) const [slackDiscoverError, setSlackDiscoverError] = useState(null) - // Load available providers on mount + // Composio/Gmail state + const [useComposioForGoogle, setUseComposioForGoogle] = useState(false) + const [gmailConnected, setGmailConnected] = useState(false) + const [gmailLoading, setGmailLoading] = useState(true) + const [gmailConnecting, setGmailConnecting] = useState(false) + + // Load available providers and composio-for-google flag on mount useEffect(() => { async function loadProviders() { try { @@ -78,7 +89,16 @@ export function ConnectorsPopover({ children, tooltip, open: openProp, onOpenCha setProvidersLoading(false) } } + async function loadComposioForGoogleFlag() { + try { + const result = await window.ipc.invoke('composio:use-composio-for-google', null) + setUseComposioForGoogle(result.enabled) + } catch (error) { + console.error('Failed to check composio-for-google flag:', error) + } + } loadProviders() + loadComposioForGoogleFlag() }, []) // Load Granola config @@ -150,6 +170,80 @@ export function ConnectorsPopover({ children, tooltip, open: openProp, onOpenCha } }, []) + // Load Gmail connection status + const refreshGmailStatus = useCallback(async () => { + try { + setGmailLoading(true) + const result = await window.ipc.invoke('composio:get-connection-status', { toolkitSlug: 'gmail' }) + setGmailConnected(result.isConnected) + } catch (error) { + console.error('Failed to load Gmail status:', error) + setGmailConnected(false) + } finally { + setGmailLoading(false) + } + }, []) + + // Connect to Gmail via Composio + const startGmailConnect = useCallback(async () => { + try { + setGmailConnecting(true) + const result = await window.ipc.invoke('composio:initiate-connection', { toolkitSlug: 'gmail' }) + if (!result.success) { + toast.error(result.error || 'Failed to connect to Gmail') + setGmailConnecting(false) + } + // Success will be handled by composio:didConnect event + } catch (error) { + console.error('Failed to connect to Gmail:', error) + toast.error('Failed to connect to Gmail') + setGmailConnecting(false) + } + }, []) + + // Handle Gmail connect button click + const handleConnectGmail = useCallback(async () => { + const configResult = await window.ipc.invoke('composio:is-configured', null) + if (!configResult.configured) { + setComposioApiKeyTarget('gmail') + setComposioApiKeyOpen(true) + return + } + await startGmailConnect() + }, [startGmailConnect]) + + // Disconnect from Gmail + const handleDisconnectGmail = useCallback(async () => { + try { + setGmailLoading(true) + const result = await window.ipc.invoke('composio:disconnect', { toolkitSlug: 'gmail' }) + if (result.success) { + setGmailConnected(false) + toast.success('Disconnected from Gmail') + } else { + toast.error('Failed to disconnect from Gmail') + } + } catch (error) { + console.error('Failed to disconnect from Gmail:', error) + toast.error('Failed to disconnect from Gmail') + } finally { + setGmailLoading(false) + } + }, []) + + // Handle Composio API key submission + const handleComposioApiKeySubmit = useCallback(async (apiKey: string) => { + try { + await window.ipc.invoke('composio:set-api-key', { apiKey }) + setComposioApiKeyOpen(false) + toast.success('Composio API key saved') + await startGmailConnect() + } catch (error) { + console.error('Failed to save Composio API key:', error) + toast.error('Failed to save API key') + } + }, [startGmailConnect]) + // Save selected Slack workspaces const handleSlackSaveWorkspaces = useCallback(async () => { const selected = slackAvailableWorkspaces.filter(w => slackSelectedUrls.has(w.url)) @@ -193,6 +287,11 @@ export function ConnectorsPopover({ children, tooltip, open: openProp, onOpenCha // Refresh Slack config refreshSlackConfig() + // Refresh Gmail Composio status if enabled + if (useComposioForGoogle) { + refreshGmailStatus() + } + // Refresh OAuth providers if (providers.length === 0) return @@ -229,7 +328,7 @@ export function ConnectorsPopover({ children, tooltip, open: openProp, onOpenCha } setProviderStates(newStates) - }, [providers, refreshGranolaConfig, refreshSlackConfig]) + }, [providers, refreshGranolaConfig, refreshSlackConfig, refreshGmailStatus, useComposioForGoogle]) // Refresh statuses when popover opens or providers list changes useEffect(() => { @@ -273,6 +372,30 @@ export function ConnectorsPopover({ children, tooltip, open: openProp, onOpenCha return cleanup }, [refreshAllStatuses]) + // Listen for Composio connection events (Gmail) + useEffect(() => { + const cleanup = window.ipc.on('composio:didConnect', (event) => { + const { toolkitSlug, success, error } = event + + if (toolkitSlug === 'gmail') { + setGmailConnected(success) + setGmailConnecting(false) + + if (success) { + toast.success('Connected to Gmail', { + description: 'Syncing your emails in the background. This may take a few minutes before changes appear.', + duration: 8000, + }) + } else { + toast.error(error || 'Failed to connect to Gmail') + } + } + }) + + return cleanup + }, []) + + const startConnect = useCallback(async (provider: string, clientId?: string) => { setProviderStates(prev => ({ ...prev, @@ -516,13 +639,63 @@ export function ConnectorsPopover({ children, tooltip, open: openProp, onOpenCha )} - {/* Email & Calendar Section - Google */} - {providers.includes('google') && ( + {/* Email & Calendar Section */} + {(useComposioForGoogle || providers.includes('google')) && ( <>
- Email & Calendar + + {useComposioForGoogle ? 'Email' : 'Email & Calendar'} +
- {renderOAuthProvider('google', 'Google', , 'Sync emails and calendar')} + {useComposioForGoogle ? ( +
+
+
+ +
+
+ Gmail + {gmailLoading ? ( + Checking... + ) : ( + + Sync emails + + )} +
+
+
+ {gmailLoading ? ( + + ) : gmailConnected ? ( + + ) : ( + + )} +
+
+ ) : ( + renderOAuthProvider('google', 'Google', , 'Sync emails and calendar') + )} )} @@ -652,6 +825,12 @@ export function ConnectorsPopover({ children, tooltip, open: openProp, onOpenCha + ) } diff --git a/apps/x/apps/renderer/src/components/onboarding-modal.tsx b/apps/x/apps/renderer/src/components/onboarding-modal.tsx index 3e663b91..9060a669 100644 --- a/apps/x/apps/renderer/src/components/onboarding-modal.tsx +++ b/apps/x/apps/renderer/src/components/onboarding-modal.tsx @@ -25,6 +25,7 @@ import { cn } from "@/lib/utils" import { GoogleClientIdModal } from "@/components/google-client-id-modal" import { getGoogleClientId, setGoogleClientId } from "@/lib/google-client-id-store" import { toast } from "sonner" +import { ComposioApiKeyModal } from "@/components/composio-api-key-modal" interface ProviderState { isConnected: boolean @@ -78,6 +79,10 @@ export function OnboardingModal({ open, onComplete }: OnboardingModalProps) { const [granolaLoading, setGranolaLoading] = useState(true) const [showMoreProviders, setShowMoreProviders] = useState(false) + // Composio API key state + const [composioApiKeyOpen, setComposioApiKeyOpen] = useState(false) + const [composioApiKeyTarget, setComposioApiKeyTarget] = useState<'slack' | 'gmail'>('gmail') + // Slack state (agent-slack CLI) const [slackEnabled, setSlackEnabled] = useState(false) const [slackLoading, setSlackLoading] = useState(true) @@ -88,6 +93,12 @@ export function OnboardingModal({ open, onComplete }: OnboardingModalProps) { const [slackDiscovering, setSlackDiscovering] = useState(false) const [slackDiscoverError, setSlackDiscoverError] = useState(null) + // Composio/Gmail state + const [useComposioForGoogle, setUseComposioForGoogle] = useState(false) + const [gmailConnected, setGmailConnected] = useState(false) + const [gmailLoading, setGmailLoading] = useState(true) + const [gmailConnecting, setGmailConnecting] = useState(false) + const updateProviderConfig = useCallback( (provider: LlmProviderFlavor, updates: Partial<{ apiKey: string; baseURL: string; model: string; knowledgeGraphModel: string }>) => { setProviderConfigs(prev => ({ @@ -115,7 +126,7 @@ export function OnboardingModal({ open, onComplete }: OnboardingModalProps) { .filter(([, state]) => state.isConnected) .map(([provider]) => provider) - // Load available providers on mount + // Load available providers and composio-for-google flag on mount useEffect(() => { if (!open) return @@ -131,7 +142,16 @@ export function OnboardingModal({ open, onComplete }: OnboardingModalProps) { setProvidersLoading(false) } } + async function loadComposioForGoogleFlag() { + try { + const result = await window.ipc.invoke('composio:use-composio-for-google', null) + setUseComposioForGoogle(result.enabled) + } catch (error) { + console.error('Failed to check composio-for-google flag:', error) + } + } loadProviders() + loadComposioForGoogleFlag() }, [open]) // Load LLM models catalog on open @@ -254,6 +274,60 @@ export function OnboardingModal({ open, onComplete }: OnboardingModalProps) { } }, []) + // Load Gmail connection status + const refreshGmailStatus = useCallback(async () => { + try { + setGmailLoading(true) + const result = await window.ipc.invoke('composio:get-connection-status', { toolkitSlug: 'gmail' }) + setGmailConnected(result.isConnected) + } catch (error) { + console.error('Failed to load Gmail status:', error) + setGmailConnected(false) + } finally { + setGmailLoading(false) + } + }, []) + + // Connect to Gmail via Composio + const startGmailConnect = useCallback(async () => { + try { + setGmailConnecting(true) + const result = await window.ipc.invoke('composio:initiate-connection', { toolkitSlug: 'gmail' }) + if (!result.success) { + toast.error(result.error || 'Failed to connect to Gmail') + setGmailConnecting(false) + } + } catch (error) { + console.error('Failed to connect to Gmail:', error) + toast.error('Failed to connect to Gmail') + setGmailConnecting(false) + } + }, []) + + // Handle Gmail connect button click + const handleConnectGmail = useCallback(async () => { + const configResult = await window.ipc.invoke('composio:is-configured', null) + if (!configResult.configured) { + setComposioApiKeyTarget('gmail') + setComposioApiKeyOpen(true) + return + } + await startGmailConnect() + }, [startGmailConnect]) + + // Handle Composio API key submission + const handleComposioApiKeySubmit = useCallback(async (apiKey: string) => { + try { + await window.ipc.invoke('composio:set-api-key', { apiKey }) + setComposioApiKeyOpen(false) + toast.success('Composio API key saved') + await startGmailConnect() + } catch (error) { + console.error('Failed to save Composio API key:', error) + toast.error('Failed to save API key') + } + }, [startGmailConnect]) + // Save selected Slack workspaces const handleSlackSaveWorkspaces = useCallback(async () => { const selected = slackAvailableWorkspaces.filter(w => slackSelectedUrls.has(w.url)) @@ -341,6 +415,11 @@ export function OnboardingModal({ open, onComplete }: OnboardingModalProps) { // Refresh Slack config refreshSlackConfig() + // Refresh Gmail Composio status if enabled + if (useComposioForGoogle) { + refreshGmailStatus() + } + // Refresh OAuth providers if (providers.length === 0) return @@ -368,7 +447,7 @@ export function OnboardingModal({ open, onComplete }: OnboardingModalProps) { } setProviderStates(newStates) - }, [providers, refreshGranolaConfig, refreshSlackConfig]) + }, [providers, refreshGranolaConfig, refreshSlackConfig, refreshGmailStatus, useComposioForGoogle]) // Refresh statuses when modal opens or providers list changes useEffect(() => { @@ -402,6 +481,30 @@ export function OnboardingModal({ open, onComplete }: OnboardingModalProps) { return cleanup }, []) + // Listen for Composio connection events (Gmail) + useEffect(() => { + const cleanup = window.ipc.on('composio:didConnect', (event) => { + const { toolkitSlug, success, error } = event + + if (toolkitSlug === 'gmail') { + setGmailConnected(success) + setGmailConnecting(false) + + if (success) { + toast.success('Connected to Gmail', { + description: 'Syncing your emails in the background. This may take a few minutes before changes appear.', + duration: 8000, + }) + } else { + toast.error(error || 'Failed to connect to Gmail') + } + } + }) + + return cleanup + }, []) + + const startConnect = useCallback(async (provider: string, clientId?: string) => { setProviderStates(prev => ({ ...prev, @@ -544,6 +647,50 @@ export function OnboardingModal({ open, onComplete }: OnboardingModalProps) { ) + // Render Gmail Composio row + const renderGmailRow = () => ( +
+
+
+ +
+
+ Gmail + {gmailLoading ? ( + Checking... + ) : ( + + Sync emails + + )} +
+
+
+ {gmailLoading ? ( + + ) : gmailConnected ? ( +
+ + Connected +
+ ) : ( + + )} +
+
+ ) + // Render Slack row const renderSlackRow = () => (
@@ -835,13 +982,18 @@ export function OnboardingModal({ open, onComplete }: OnboardingModalProps) {
) : ( <> - {/* Email & Calendar Section */} - {providers.includes('google') && ( + {/* Email / Email & Calendar Section */} + {(useComposioForGoogle || providers.includes('google')) && (
- Email & Calendar + + {useComposioForGoogle ? 'Email' : 'Email & Calendar'} +
- {renderOAuthProvider('google', 'Google', , 'Sync emails and calendar events')} + {useComposioForGoogle + ? renderGmailRow() + : renderOAuthProvider('google', 'Google', , 'Sync emails and calendar events') + }
)} @@ -878,7 +1030,7 @@ export function OnboardingModal({ open, onComplete }: OnboardingModalProps) { // Step 2: Completion const renderCompletionStep = () => { - const hasConnections = connectedProviders.length > 0 || granolaEnabled || slackEnabled + const hasConnections = connectedProviders.length > 0 || granolaEnabled || slackEnabled || gmailConnected return (
@@ -901,6 +1053,12 @@ export function OnboardingModal({ open, onComplete }: OnboardingModalProps) {

Connected accounts:

+ {gmailConnected && ( +
+ + Gmail (Email) +
+ )} {connectedProviders.includes('google') && (
@@ -945,6 +1103,12 @@ export function OnboardingModal({ open, onComplete }: OnboardingModalProps) { onSubmit={handleGoogleClientIdSubmit} isSubmitting={providerStates.google?.isConnecting ?? false} /> + {}}> > { */ const ZComposioConfig = z.object({ apiKey: z.string().optional(), + use_composio_for_google: z.boolean().optional(), }); type ComposioConfig = z.infer; @@ -103,6 +104,14 @@ export async function isConfigured(): Promise { return !!getApiKey(); } +/** + * Check if Composio should be used for Google services (Gmail, etc.) + */ +export function useComposioForGoogle(): boolean { + const config = loadConfig(); + return config.use_composio_for_google === true; +} + /** * Make an API call to Composio */ diff --git a/apps/x/packages/core/src/knowledge/sync_gmail.ts b/apps/x/packages/core/src/knowledge/sync_gmail.ts index de73c016..ab613efe 100644 --- a/apps/x/packages/core/src/knowledge/sync_gmail.ts +++ b/apps/x/packages/core/src/knowledge/sync_gmail.ts @@ -7,6 +7,8 @@ import { WorkDir } from '../config/config.js'; import { GoogleClientFactory } from './google-client-factory.js'; import { serviceLogger, type ServiceRunContext } from '../services/service_logger.js'; import { limitEventItems } from './limit_event_items.js'; +import { executeAction, useComposioForGoogle } from '../composio/client.js'; +import { composioAccountsRepo } from '../composio/repo.js'; // Configuration const SYNC_DIR = path.join(WorkDir, 'gmail_sync'); @@ -440,20 +442,366 @@ async function performSync() { } } +// --- Composio-based Sync --- + +const COMPOSIO_LOOKBACK_DAYS = 7; + +interface ComposioSyncState { + last_sync: string; // ISO string +} + +function loadComposioState(stateFile: string): ComposioSyncState | null { + if (fs.existsSync(stateFile)) { + try { + const data = JSON.parse(fs.readFileSync(stateFile, 'utf-8')); + if (data.last_sync) { + return { last_sync: data.last_sync }; + } + } catch (e) { + console.error('[Gmail] Failed to load composio state:', e); + } + } + return null; +} + +function saveComposioState(stateFile: string, lastSync: string): void { + fs.writeFileSync(stateFile, JSON.stringify({ last_sync: lastSync }, null, 2)); +} + +function tryParseDate(dateStr: string): Date | null { + const d = new Date(dateStr); + return isNaN(d.getTime()) ? null : d; +} + +interface ParsedMessage { + from: string; + date: string; + subject: string; + body: string; +} + +function parseMessageData(messageData: Record): ParsedMessage { + const headers = messageData.payload && typeof messageData.payload === 'object' + ? (messageData.payload as Record).headers as Array<{ name: string; value: string }> | undefined + : undefined; + + const from = headers?.find(h => h.name === 'From')?.value || String(messageData.from || messageData.sender || 'Unknown'); + const date = headers?.find(h => h.name === 'Date')?.value || String(messageData.date || messageData.internalDate || 'Unknown'); + const subject = headers?.find(h => h.name === 'Subject')?.value || String(messageData.subject || '(No Subject)'); + + let body = ''; + + if (messageData.payload && typeof messageData.payload === 'object') { + body = extractBodyFromPayload(messageData.payload as Record); + } + + if (!body) { + if (typeof messageData.body === 'string') { + body = messageData.body; + } else if (typeof messageData.snippet === 'string') { + body = messageData.snippet; + } else if (typeof messageData.text === 'string') { + body = messageData.text; + } + } + + if (body && (body.includes(' !line.trim().startsWith('>')).join('\n'); + } + + return { from, date, subject, body }; +} + +function extractBodyFromPayload(payload: Record): string { + const parts = payload.parts as Array> | undefined; + + if (parts) { + for (const part of parts) { + const mimeType = part.mimeType as string | undefined; + const bodyData = part.body && typeof part.body === 'object' + ? (part.body as Record).data as string | undefined + : undefined; + + if ((mimeType === 'text/plain' || mimeType === 'text/html') && bodyData) { + const decoded = Buffer.from(bodyData, 'base64').toString('utf-8'); + if (mimeType === 'text/html') { + return nhm.translate(decoded); + } + return decoded; + } + + if (part.parts) { + const result = extractBodyFromPayload(part as Record); + if (result) return result; + } + } + } + + const bodyData = payload.body && typeof payload.body === 'object' + ? (payload.body as Record).data as string | undefined + : undefined; + + if (bodyData) { + const decoded = Buffer.from(bodyData, 'base64').toString('utf-8'); + const mimeType = payload.mimeType as string | undefined; + if (mimeType === 'text/html') { + return nhm.translate(decoded); + } + return decoded; + } + + return ''; +} + +async function processThreadComposio(connectedAccountId: string, threadId: string, syncDir: string): Promise { + let threadResult; + try { + threadResult = await executeAction( + 'GMAIL_FETCH_MESSAGE_BY_THREAD_ID', + connectedAccountId, + { thread_id: threadId, user_id: 'me' } + ); + } catch (error) { + console.warn(`[Gmail] Skipping thread ${threadId} (fetch failed):`, error instanceof Error ? error.message : error); + return null; + } + + if (!threadResult.success || !threadResult.data) { + console.error(`[Gmail] Failed to fetch thread ${threadId}:`, threadResult.error); + return null; + } + + const data = threadResult.data as Record; + const messages = data.messages as Array> | undefined; + + let newestDate: Date | null = null; + + if (!messages || messages.length === 0) { + const parsed = parseMessageData(data); + const mdContent = `# ${parsed.subject}\n\n` + + `**Thread ID:** ${threadId}\n` + + `**Message Count:** 1\n\n---\n\n` + + `### From: ${parsed.from}\n` + + `**Date:** ${parsed.date}\n\n` + + `${parsed.body}\n\n---\n\n`; + + fs.writeFileSync(path.join(syncDir, `${cleanFilename(threadId)}.md`), mdContent); + console.log(`[Gmail] Synced Thread: ${parsed.subject} (${threadId})`); + newestDate = tryParseDate(parsed.date); + } else { + const firstParsed = parseMessageData(messages[0]); + let mdContent = `# ${firstParsed.subject}\n\n`; + mdContent += `**Thread ID:** ${threadId}\n`; + mdContent += `**Message Count:** ${messages.length}\n\n---\n\n`; + + for (const msg of messages) { + const parsed = parseMessageData(msg); + mdContent += `### From: ${parsed.from}\n`; + mdContent += `**Date:** ${parsed.date}\n\n`; + mdContent += `${parsed.body}\n\n`; + mdContent += `---\n\n`; + + const msgDate = tryParseDate(parsed.date); + if (msgDate && (!newestDate || msgDate > newestDate)) { + newestDate = msgDate; + } + } + + fs.writeFileSync(path.join(syncDir, `${cleanFilename(threadId)}.md`), mdContent); + console.log(`[Gmail] Synced Thread: ${firstParsed.subject} (${threadId})`); + } + + if (!newestDate) return null; + return new Date(newestDate.getTime() + 1000).toISOString(); +} + +async function performSyncComposio() { + const ATTACHMENTS_DIR = path.join(SYNC_DIR, 'attachments'); + const STATE_FILE = path.join(SYNC_DIR, 'sync_state.json'); + + if (!fs.existsSync(SYNC_DIR)) fs.mkdirSync(SYNC_DIR, { recursive: true }); + if (!fs.existsSync(ATTACHMENTS_DIR)) fs.mkdirSync(ATTACHMENTS_DIR, { recursive: true }); + + const account = composioAccountsRepo.getAccount('gmail'); + if (!account || account.status !== 'ACTIVE') { + console.log('[Gmail] Gmail not connected via Composio. Skipping sync.'); + return; + } + + const connectedAccountId = account.id; + + const state = loadComposioState(STATE_FILE); + let afterEpochSeconds: number; + + if (state) { + afterEpochSeconds = Math.floor(new Date(state.last_sync).getTime() / 1000); + console.log(`[Gmail] Syncing messages since ${state.last_sync}...`); + } else { + const pastDate = new Date(); + pastDate.setDate(pastDate.getDate() - COMPOSIO_LOOKBACK_DAYS); + afterEpochSeconds = Math.floor(pastDate.getTime() / 1000); + console.log(`[Gmail] First sync - fetching last ${COMPOSIO_LOOKBACK_DAYS} days...`); + } + + let run: ServiceRunContext | null = null; + const ensureRun = async () => { + if (!run) { + run = await serviceLogger.startRun({ + service: 'gmail', + message: 'Syncing Gmail (Composio)', + trigger: 'timer', + }); + } + }; + + try { + const allThreadIds: string[] = []; + let pageToken: string | undefined; + + do { + const params: Record = { + query: `after:${afterEpochSeconds}`, + max_results: 20, + user_id: 'me', + }; + if (pageToken) { + params.page_token = pageToken; + } + + const result = await executeAction( + 'GMAIL_LIST_THREADS', + connectedAccountId, + params + ); + + if (!result.success || !result.data) { + console.error('[Gmail] Failed to list threads:', result.error); + return; + } + + const data = result.data as Record; + const threads = data.threads as Array> | undefined; + + if (threads && threads.length > 0) { + for (const thread of threads) { + const threadId = thread.id as string | undefined; + if (threadId) { + allThreadIds.push(threadId); + } + } + } + + pageToken = data.nextPageToken as string | undefined; + } while (pageToken); + + if (allThreadIds.length === 0) { + console.log('[Gmail] No new threads.'); + return; + } + + console.log(`[Gmail] Found ${allThreadIds.length} threads to sync.`); + + await ensureRun(); + const limitedThreads = limitEventItems(allThreadIds); + await serviceLogger.log({ + type: 'changes_identified', + service: run!.service, + runId: run!.runId, + level: 'info', + message: `Found ${allThreadIds.length} thread${allThreadIds.length === 1 ? '' : 's'} to sync`, + counts: { threads: allThreadIds.length }, + items: limitedThreads.items, + truncated: limitedThreads.truncated, + }); + + // Process oldest first so high-water mark advances chronologically + allThreadIds.reverse(); + + let highWaterMark: string | null = state?.last_sync ?? null; + let processedCount = 0; + for (const threadId of allThreadIds) { + try { + const newestInThread = await processThreadComposio(connectedAccountId, threadId, SYNC_DIR); + processedCount++; + + if (newestInThread) { + if (!highWaterMark || new Date(newestInThread) > new Date(highWaterMark)) { + highWaterMark = newestInThread; + } + saveComposioState(STATE_FILE, highWaterMark); + } + } catch (error) { + console.error(`[Gmail] Error processing thread ${threadId}, skipping:`, error); + } + } + + await serviceLogger.log({ + type: 'run_complete', + service: run!.service, + runId: run!.runId, + level: 'info', + message: `Gmail sync complete: ${processedCount}/${allThreadIds.length} thread${allThreadIds.length === 1 ? '' : 's'}`, + durationMs: Date.now() - run!.startedAt, + outcome: 'ok', + summary: { threads: processedCount }, + }); + + console.log(`[Gmail] Sync completed. Processed ${processedCount}/${allThreadIds.length} threads.`); + } catch (error) { + console.error('[Gmail] Error during sync:', error); + await ensureRun(); + await serviceLogger.log({ + type: 'error', + service: run!.service, + runId: run!.runId, + level: 'error', + message: 'Gmail sync error', + error: error instanceof Error ? error.message : String(error), + }); + await serviceLogger.log({ + type: 'run_complete', + service: run!.service, + runId: run!.runId, + level: 'error', + message: 'Gmail sync failed', + durationMs: Date.now() - run!.startedAt, + outcome: 'error', + }); + } +} + export async function init() { console.log("Starting Gmail Sync (TS)..."); console.log(`Will sync every ${SYNC_INTERVAL_MS / 1000} seconds.`); + const composioMode = useComposioForGoogle(); + if (composioMode) { + console.log('[Gmail] Using Composio backend for Gmail sync.'); + } + while (true) { try { - // Check if credentials are available with required scopes - const hasCredentials = await GoogleClientFactory.hasValidCredentials(REQUIRED_SCOPE); - - if (!hasCredentials) { - console.log("Google OAuth credentials not available or missing required Gmail scope. Sleeping..."); + if (composioMode) { + const isConnected = composioAccountsRepo.isConnected('gmail'); + if (!isConnected) { + console.log('[Gmail] Gmail not connected via Composio. Sleeping...'); + } else { + await performSyncComposio(); + } } else { - // Perform one sync - await performSync(); + // Check if credentials are available with required scopes + const hasCredentials = await GoogleClientFactory.hasValidCredentials(REQUIRED_SCOPE); + + if (!hasCredentials) { + console.log("Google OAuth credentials not available or missing required Gmail scope. Sleeping..."); + } else { + // Perform one sync + await performSync(); + } } } catch (error) { console.error("Error in main loop:", error); diff --git a/apps/x/packages/shared/src/ipc.ts b/apps/x/packages/shared/src/ipc.ts index f6f5c0ee..557df845 100644 --- a/apps/x/packages/shared/src/ipc.ts +++ b/apps/x/packages/shared/src/ipc.ts @@ -380,6 +380,12 @@ const ipcSchemas = { error: z.string().nullable(), }), }, + 'composio:use-composio-for-google': { + req: z.null(), + res: z.object({ + enabled: z.boolean(), + }), + }, 'composio:didConnect': { req: z.object({ toolkitSlug: z.string(),