diff --git a/apps/x/apps/main/src/composio-handler.ts b/apps/x/apps/main/src/composio-handler.ts index e5b25d1a..f72613ad 100644 --- a/apps/x/apps/main/src/composio-handler.ts +++ b/apps/x/apps/main/src/composio-handler.ts @@ -3,6 +3,7 @@ import { createAuthServer } from './auth-server.js'; import * as composioClient from '@x/core/dist/composio/client.js'; import { composioAccountsRepo } from '@x/core/dist/composio/repo.js'; import type { LocalConnectedAccount } from '@x/core/dist/composio/types.js'; +import { triggerSync as triggerGmailSync } from '@x/core/dist/knowledge/sync_gmail.js'; const REDIRECT_URI = 'http://localhost:8081/oauth/callback'; @@ -151,6 +152,9 @@ export async function initiateConnection(toolkitSlug: string): Promise<{ if (accountStatus.status === 'ACTIVE') { emitComposioEvent({ toolkitSlug, success: true }); + if (toolkitSlug === 'gmail') { + triggerGmailSync(); + } } else { emitComposioEvent({ toolkitSlug, diff --git a/apps/x/apps/main/src/main.ts b/apps/x/apps/main/src/main.ts index 6ddab7bc..c276301b 100644 --- a/apps/x/apps/main/src/main.ts +++ b/apps/x/apps/main/src/main.ts @@ -5,7 +5,7 @@ import { fileURLToPath, pathToFileURL } from "node:url"; import { dirname } from "node:path"; import { updateElectronApp, UpdateSourceType } from "update-electron-app"; import { init as initGmailSync } from "@x/core/dist/knowledge/sync_gmail.js"; -import { init as initCalendarSync } from "@x/core/dist/knowledge/sync_calendar.js"; + import { init as initFirefliesSync } from "@x/core/dist/knowledge/sync_fireflies.js"; import { init as initGranolaSync } from "@x/core/dist/knowledge/granola/sync.js"; import { init as initGraphBuilder } from "@x/core/dist/knowledge/build_graph.js"; @@ -134,9 +134,6 @@ app.whenReady().then(async () => { // start gmail sync initGmailSync(); - // start calendar sync - initCalendarSync(); - // start fireflies sync initFirefliesSync(); diff --git a/apps/x/apps/main/src/oauth-handler.ts b/apps/x/apps/main/src/oauth-handler.ts index 5b55e8b7..58ab0809 100644 --- a/apps/x/apps/main/src/oauth-handler.ts +++ b/apps/x/apps/main/src/oauth-handler.ts @@ -7,7 +7,6 @@ import { getProviderConfig, getAvailableProviders } from '@x/core/dist/auth/prov import container from '@x/core/dist/di/container.js'; import { IOAuthRepo } from '@x/core/dist/auth/repo.js'; import { IClientRegistrationRepo } from '@x/core/dist/auth/client-repo.js'; -import { triggerSync as triggerGmailSync } from '@x/core/dist/knowledge/sync_gmail.js'; import { triggerSync as triggerCalendarSync } from '@x/core/dist/knowledge/sync_calendar.js'; import { triggerSync as triggerFirefliesSync } from '@x/core/dist/knowledge/sync_fireflies.js'; import { emitOAuthEvent } from './ipc.js'; @@ -194,7 +193,6 @@ export async function connectProvider(provider: string): Promise<{ success: bool // Trigger immediate sync for relevant providers if (provider === 'google') { - triggerGmailSync(); triggerCalendarSync(); } else if (provider === 'fireflies-ai') { triggerFirefliesSync(); diff --git a/apps/x/apps/renderer/src/components/connectors-popover.tsx b/apps/x/apps/renderer/src/components/connectors-popover.tsx index 1799ab75..882a8d48 100644 --- a/apps/x/apps/renderer/src/components/connectors-popover.tsx +++ b/apps/x/apps/renderer/src/components/connectors-popover.tsx @@ -41,8 +41,12 @@ export function ConnectorsPopover({ children, tooltip }: ConnectorsPopoverProps) const [granolaEnabled, setGranolaEnabled] = useState(false) const [granolaLoading, setGranolaLoading] = useState(true) - // Composio/Slack state + // Composio state (Gmail + Slack) const [composioApiKeyOpen, setComposioApiKeyOpen] = useState(false) + const [composioApiKeyTarget, setComposioApiKeyTarget] = useState<'gmail' | 'slack'>('gmail') + const [gmailConnected, setGmailConnected] = useState(false) + const [gmailLoading, setGmailLoading] = useState(true) + const [gmailConnecting, setGmailConnecting] = useState(false) const [slackConnected, setSlackConnected] = useState(false) const [slackLoading, setSlackLoading] = useState(true) const [slackConnecting, setSlackConnecting] = useState(false) @@ -93,6 +97,20 @@ export function ConnectorsPopover({ children, tooltip }: ConnectorsPopoverProps) } }, []) + // Load Gmail connection status + const refreshGmailStatus = useCallback(async () => { + try { + setGmailLoading(true) + const result = await window.ipc.invoke('composio:get-connection-status', { toolkitSlug: 'gmail' }) + setGmailConnected(result.isConnected) + } catch (error) { + console.error('Failed to load Gmail status:', error) + setGmailConnected(false) + } finally { + setGmailLoading(false) + } + }, []) + // Load Slack connection status const refreshSlackStatus = useCallback(async () => { try { @@ -107,6 +125,53 @@ export function ConnectorsPopover({ children, tooltip }: ConnectorsPopoverProps) } }, []) + // Connect to Gmail via Composio + const startGmailConnect = useCallback(async () => { + try { + setGmailConnecting(true) + const result = await window.ipc.invoke('composio:initiate-connection', { toolkitSlug: 'gmail' }) + if (!result.success) { + toast.error(result.error || 'Failed to connect to Gmail') + setGmailConnecting(false) + } + // Success will be handled by composio:didConnect event + } catch (error) { + console.error('Failed to connect to Gmail:', error) + toast.error('Failed to connect to Gmail') + setGmailConnecting(false) + } + }, []) + + // Handle Gmail connect button click + const handleConnectGmail = useCallback(async () => { + const configResult = await window.ipc.invoke('composio:is-configured', null) + if (!configResult.configured) { + setComposioApiKeyTarget('gmail') + setComposioApiKeyOpen(true) + return + } + await startGmailConnect() + }, [startGmailConnect]) + + // Disconnect from Gmail + const handleDisconnectGmail = useCallback(async () => { + try { + setGmailLoading(true) + const result = await window.ipc.invoke('composio:disconnect', { toolkitSlug: 'gmail' }) + if (result.success) { + setGmailConnected(false) + toast.success('Disconnected from Gmail') + } else { + toast.error('Failed to disconnect from Gmail') + } + } catch (error) { + console.error('Failed to disconnect from Gmail:', error) + toast.error('Failed to disconnect from Gmail') + } finally { + setGmailLoading(false) + } + }, []) + // Connect to Slack via Composio const startSlackConnect = useCallback(async () => { try { @@ -126,9 +191,9 @@ export function ConnectorsPopover({ children, tooltip }: ConnectorsPopoverProps) // Handle Slack connect button click const handleConnectSlack = useCallback(async () => { - // Check if Composio is configured const configResult = await window.ipc.invoke('composio:is-configured', null) if (!configResult.configured) { + setComposioApiKeyTarget('slack') setComposioApiKeyOpen(true) return } @@ -141,13 +206,17 @@ export function ConnectorsPopover({ children, tooltip }: ConnectorsPopoverProps) await window.ipc.invoke('composio:set-api-key', { apiKey }) setComposioApiKeyOpen(false) toast.success('Composio API key saved') - // Now start the Slack connection - await startSlackConnect() + // Start the connection for whichever toolkit triggered the API key prompt + if (composioApiKeyTarget === 'gmail') { + await startGmailConnect() + } else { + await startSlackConnect() + } } catch (error) { console.error('Failed to save Composio API key:', error) toast.error('Failed to save API key') } - }, [startSlackConnect]) + }, [composioApiKeyTarget, startGmailConnect, startSlackConnect]) // Disconnect from Slack const handleDisconnectSlack = useCallback(async () => { @@ -173,7 +242,8 @@ export function ConnectorsPopover({ children, tooltip }: ConnectorsPopoverProps) // Refresh Granola refreshGranolaConfig() - // Refresh Slack status + // Refresh Composio connections + refreshGmailStatus() refreshSlackStatus() // Refresh OAuth providers @@ -202,7 +272,7 @@ export function ConnectorsPopover({ children, tooltip }: ConnectorsPopoverProps) ) setProviderStates(newStates) - }, [providers, refreshGranolaConfig, refreshSlackStatus]) + }, [providers, refreshGranolaConfig, refreshGmailStatus, refreshSlackStatus]) // Refresh statuses when popover opens or providers list changes useEffect(() => { @@ -227,7 +297,7 @@ export function ConnectorsPopover({ children, tooltip }: ConnectorsPopoverProps) if (success) { const displayName = provider === 'fireflies-ai' ? 'Fireflies' : provider.charAt(0).toUpperCase() + provider.slice(1) - // Show detailed message for Google and Fireflies (includes sync info) + // Show detailed message for providers that sync in background if (provider === 'google' || provider === 'fireflies-ai') { toast.success(`Connected to ${displayName}`, { description: 'Syncing your data in the background. This may take a few minutes before changes appear.', @@ -251,7 +321,19 @@ export function ConnectorsPopover({ children, tooltip }: ConnectorsPopoverProps) const cleanup = window.ipc.on('composio:didConnect', (event) => { const { toolkitSlug, success, error } = event - if (toolkitSlug === 'slack') { + if (toolkitSlug === 'gmail') { + setGmailConnected(success) + setGmailConnecting(false) + + if (success) { + toast.success('Connected to Gmail', { + description: 'Syncing your emails in the background. This may take a few minutes.', + duration: 8000, + }) + } else { + toast.error(error || 'Failed to connect to Gmail') + } + } else if (toolkitSlug === 'slack') { setSlackConnected(success) setSlackConnecting(false) @@ -431,16 +513,55 @@ export function ConnectorsPopover({ children, tooltip }: ConnectorsPopoverProps) ) : ( <> - {/* Email & Calendar Section - Google */} - {providers.includes('google') && ( - <> -
- Email & Calendar + {/* Email Section - Gmail via Composio */} +
+ Email +
+
+
+
+
- {renderOAuthProvider('google', 'Google', , 'Sync emails and calendar')} - - - )} +
+ Gmail + {gmailLoading ? ( + Checking... + ) : ( + Sync emails + )} +
+
+
+ {gmailLoading ? ( + + ) : gmailConnected ? ( + + ) : ( + + )} +
+
+ + {/* Meeting Notes Section - Granola & Fireflies */}
@@ -537,7 +658,7 @@ export function ConnectorsPopover({ children, tooltip }: ConnectorsPopoverProps) open={composioApiKeyOpen} onOpenChange={setComposioApiKeyOpen} onSubmit={handleComposioApiKeySubmit} - isSubmitting={slackConnecting} + isSubmitting={gmailConnecting || slackConnecting} /> ) diff --git a/apps/x/apps/renderer/src/components/onboarding-modal.tsx b/apps/x/apps/renderer/src/components/onboarding-modal.tsx index 074ad645..0675697c 100644 --- a/apps/x/apps/renderer/src/components/onboarding-modal.tsx +++ b/apps/x/apps/renderer/src/components/onboarding-modal.tsx @@ -42,8 +42,12 @@ export function OnboardingModal({ open, onComplete }: OnboardingModalProps) { const [granolaEnabled, setGranolaEnabled] = useState(false) const [granolaLoading, setGranolaLoading] = useState(true) - // Composio/Slack state + // Composio state (Gmail + Slack) const [composioApiKeyOpen, setComposioApiKeyOpen] = useState(false) + const [composioApiKeyTarget, setComposioApiKeyTarget] = useState<'gmail' | 'slack'>('gmail') + const [gmailConnected, setGmailConnected] = useState(false) + const [gmailLoading, setGmailLoading] = useState(true) + const [gmailConnecting, setGmailConnecting] = useState(false) const [slackConnected, setSlackConnected] = useState(false) const [slackLoading, setSlackLoading] = useState(true) const [slackConnecting, setSlackConnecting] = useState(false) @@ -101,6 +105,47 @@ export function OnboardingModal({ open, onComplete }: OnboardingModalProps) { } }, []) + // Load Gmail connection status + const refreshGmailStatus = useCallback(async () => { + try { + setGmailLoading(true) + const result = await window.ipc.invoke('composio:get-connection-status', { toolkitSlug: 'gmail' }) + setGmailConnected(result.isConnected) + } catch (error) { + console.error('Failed to load Gmail status:', error) + setGmailConnected(false) + } finally { + setGmailLoading(false) + } + }, []) + + // Connect to Gmail via Composio + const startGmailConnect = useCallback(async () => { + try { + setGmailConnecting(true) + const result = await window.ipc.invoke('composio:initiate-connection', { toolkitSlug: 'gmail' }) + if (!result.success) { + toast.error(result.error || 'Failed to connect to Gmail') + setGmailConnecting(false) + } + } catch (error) { + console.error('Failed to connect to Gmail:', error) + toast.error('Failed to connect to Gmail') + setGmailConnecting(false) + } + }, []) + + // Handle Gmail connect button click + const handleConnectGmail = useCallback(async () => { + const configResult = await window.ipc.invoke('composio:is-configured', null) + if (!configResult.configured) { + setComposioApiKeyTarget('gmail') + setComposioApiKeyOpen(true) + return + } + await startGmailConnect() + }, [startGmailConnect]) + // Load Slack connection status const refreshSlackStatus = useCallback(async () => { try { @@ -134,9 +179,9 @@ export function OnboardingModal({ open, onComplete }: OnboardingModalProps) { // Connect to Slack via Composio (checks if configured first) const handleConnectSlack = useCallback(async () => { - // Check if Composio is configured const configResult = await window.ipc.invoke('composio:is-configured', null) if (!configResult.configured) { + setComposioApiKeyTarget('slack') setComposioApiKeyOpen(true) return } @@ -149,20 +194,24 @@ export function OnboardingModal({ open, onComplete }: OnboardingModalProps) { await window.ipc.invoke('composio:set-api-key', { apiKey }) setComposioApiKeyOpen(false) toast.success('Composio API key saved') - // Now start the Slack connection - await startSlackConnect() + if (composioApiKeyTarget === 'gmail') { + await startGmailConnect() + } else { + await startSlackConnect() + } } catch (error) { console.error('Failed to save Composio API key:', error) toast.error('Failed to save API key') } - }, [startSlackConnect]) + }, [composioApiKeyTarget, startGmailConnect, startSlackConnect]) // Check connection status for all providers const refreshAllStatuses = useCallback(async () => { // Refresh Granola refreshGranolaConfig() - // Refresh Slack status + // Refresh Composio connections + refreshGmailStatus() refreshSlackStatus() // Refresh OAuth providers @@ -191,7 +240,7 @@ export function OnboardingModal({ open, onComplete }: OnboardingModalProps) { ) setProviderStates(newStates) - }, [providers, refreshGranolaConfig, refreshSlackStatus]) + }, [providers, refreshGranolaConfig, refreshGmailStatus, refreshSlackStatus]) // Refresh statuses when modal opens or providers list changes useEffect(() => { @@ -230,7 +279,16 @@ export function OnboardingModal({ open, onComplete }: OnboardingModalProps) { const cleanup = window.ipc.on('composio:didConnect', (event) => { const { toolkitSlug, success, error } = event - if (toolkitSlug === 'slack') { + if (toolkitSlug === 'gmail') { + setGmailConnected(success) + setGmailConnecting(false) + + if (success) { + toast.success('Connected to Gmail') + } else { + toast.error(error || 'Failed to connect to Gmail') + } + } else if (toolkitSlug === 'slack') { setSlackConnected(success) setSlackConnecting(false) @@ -377,6 +435,48 @@ export function OnboardingModal({ open, onComplete }: OnboardingModalProps) {
) + // Render Gmail row (Composio) + const renderGmailRow = () => ( +
+
+
+ +
+
+ Gmail + {gmailLoading ? ( + Checking... + ) : ( + Sync emails + )} +
+
+
+ {gmailLoading ? ( + + ) : gmailConnected ? ( +
+ + Connected +
+ ) : ( + + )} +
+
+ ) + // Render Slack row const renderSlackRow = () => (
@@ -470,15 +570,13 @@ export function OnboardingModal({ open, onComplete }: OnboardingModalProps) {
) : ( <> - {/* Email & Calendar Section */} - {providers.includes('google') && ( -
-
- Email & Calendar -
- {renderOAuthProvider('google', 'Google', , 'Sync emails and calendar events')} + {/* Email Section - Gmail via Composio */} +
+
+ Email
- )} + {renderGmailRow()} +
{/* Meeting Notes Section */}
@@ -513,7 +611,7 @@ export function OnboardingModal({ open, onComplete }: OnboardingModalProps) { // Step 2: Completion const CompletionStep = () => { - const hasConnections = connectedProviders.length > 0 || granolaEnabled || slackConnected + const hasConnections = connectedProviders.length > 0 || gmailConnected || granolaEnabled || slackConnected return (
@@ -536,10 +634,10 @@ export function OnboardingModal({ open, onComplete }: OnboardingModalProps) {

Connected accounts:

- {connectedProviders.includes('google') && ( + {gmailConnected && (
- Google (Email & Calendar) + Gmail (Email)
)} {connectedProviders.includes('fireflies-ai') && ( @@ -578,7 +676,7 @@ export function OnboardingModal({ open, onComplete }: OnboardingModalProps) { open={composioApiKeyOpen} onOpenChange={setComposioApiKeyOpen} onSubmit={handleComposioApiKeySubmit} - isSubmitting={slackConnecting} + isSubmitting={gmailConnecting || slackConnecting} /> {}}> |]/g, "").substring(0, 100).trim(); } -function decodeBase64(data: string): string { - return Buffer.from(data, 'base64').toString('utf-8'); +// --- State Management --- + +interface SyncState { + last_sync: string; // ISO string — human-readable, source of truth } -function getBody(payload: gmail.Schema$MessagePart): string { - let body = ""; - if (payload.parts) { - for (const part of payload.parts) { - if (part.mimeType === 'text/plain' && part.body && part.body.data) { - const text = decodeBase64(part.body.data); - // Strip quoted lines - const cleanLines = text.split('\n').filter((line: string) => !line.trim().startsWith('>')); - body += cleanLines.join('\n'); - } else if (part.mimeType === 'text/html' && part.body && part.body.data) { - const html = decodeBase64(part.body.data); - const md = nhm.translate(html); - // Simple quote stripping for MD - const cleanLines = md.split('\n').filter((line: string) => !line.trim().startsWith('>')); - body += cleanLines.join('\n'); - } else if (part.parts) { - body += getBody(part); +function loadState(stateFile: string): SyncState | null { + if (fs.existsSync(stateFile)) { + try { + const data = JSON.parse(fs.readFileSync(stateFile, 'utf-8')); + if (data.last_sync) { + return { last_sync: data.last_sync }; } + } catch (e) { + console.error('[Gmail] Failed to load state:', e); } - } else if (payload.body && payload.body.data) { - const data = decodeBase64(payload.body.data); - if (payload.mimeType === 'text/html') { - const md = nhm.translate(data); - body += md.split('\n').filter((line: string) => !line.trim().startsWith('>')).join('\n'); - } else { - body += data.split('\n').filter((line: string) => !line.trim().startsWith('>')).join('\n'); - } - } - return body; -} - -async function saveAttachment(gmail: gmail.Gmail, userId: string, msgId: string, part: gmail.Schema$MessagePart, attachmentsDir: string): Promise { - const filename = part.filename; - const attId = part.body?.attachmentId; - if (!filename || !attId) return null; - - const safeName = `${msgId}_${cleanFilename(filename)}`; - const filePath = path.join(attachmentsDir, safeName); - - if (fs.existsSync(filePath)) return safeName; - - try { - const res = await gmail.users.messages.attachments.get({ - userId, - messageId: msgId, - id: attId - }); - - const data = res.data.data; - if (data) { - fs.writeFileSync(filePath, Buffer.from(data, 'base64')); - console.log(`Saved attachment: ${safeName}`); - return safeName; - } - } catch (e) { - console.error(`Error saving attachment ${filename}:`, e); } return null; } +function saveState(stateFile: string, lastSync: string): void { + const state: SyncState = { + last_sync: lastSync, + }; + fs.writeFileSync(stateFile, JSON.stringify(state, null, 2)); +} + +/** + * Try to parse a date string into a Date. Returns null if unparseable. + */ +function tryParseDate(dateStr: string): Date | null { + const d = new Date(dateStr); + return isNaN(d.getTime()) ? null : d; +} + +function toEpochSeconds(isoString: string): number { + return Math.floor(new Date(isoString).getTime() / 1000); +} + +// --- Message Parsing --- + +interface ParsedMessage { + from: string; + date: string; + subject: string; + body: string; +} + +function parseMessageData(messageData: Record): ParsedMessage { + const headers = messageData.payload && typeof messageData.payload === 'object' + ? (messageData.payload as Record).headers as Array<{ name: string; value: string }> | undefined + : undefined; + + const from = headers?.find(h => h.name === 'From')?.value || String(messageData.from || messageData.sender || 'Unknown'); + const date = headers?.find(h => h.name === 'Date')?.value || String(messageData.date || messageData.internalDate || 'Unknown'); + const subject = headers?.find(h => h.name === 'Subject')?.value || String(messageData.subject || '(No Subject)'); + + let body = ''; + + // Try to extract body from payload structure (Gmail API format) + if (messageData.payload && typeof messageData.payload === 'object') { + body = extractBodyFromPayload(messageData.payload as Record); + } + + // Fallback: try snippet or body fields + if (!body) { + if (typeof messageData.body === 'string') { + body = messageData.body; + } else if (typeof messageData.snippet === 'string') { + body = messageData.snippet; + } else if (typeof messageData.text === 'string') { + body = messageData.text; + } + } + + // Convert HTML to markdown if body looks like HTML + if (body && (body.includes(' !line.trim().startsWith('>')).join('\n'); + } + + return { from, date, subject, body }; +} + +function extractBodyFromPayload(payload: Record): string { + const parts = payload.parts as Array> | undefined; + + if (parts) { + for (const part of parts) { + const mimeType = part.mimeType as string | undefined; + const bodyData = part.body && typeof part.body === 'object' + ? (part.body as Record).data as string | undefined + : undefined; + + if ((mimeType === 'text/plain' || mimeType === 'text/html') && bodyData) { + const decoded = Buffer.from(bodyData, 'base64').toString('utf-8'); + if (mimeType === 'text/html') { + return nhm.translate(decoded); + } + return decoded; + } + + // Recurse into nested parts + if (part.parts) { + const result = extractBodyFromPayload(part as Record); + if (result) return result; + } + } + } + + // Single-part message + const bodyData = payload.body && typeof payload.body === 'object' + ? (payload.body as Record).data as string | undefined + : undefined; + + if (bodyData) { + const decoded = Buffer.from(bodyData, 'base64').toString('utf-8'); + const mimeType = payload.mimeType as string | undefined; + if (mimeType === 'text/html') { + return nhm.translate(decoded); + } + return decoded; + } + + return ''; +} + // --- Sync Logic --- -async function processThread(auth: OAuth2Client, threadId: string, syncDir: string, attachmentsDir: string) { - const gmail = google.gmail({ version: 'v1', auth }); +/** + * Process a thread and write its .md file. + * Returns the newest message date (as ISO string) found in the thread, or null. + */ +async function processThread(connectedAccountId: string, threadId: string, syncDir: string): Promise { + let threadResult; try { - const res = await gmail.users.threads.get({ userId: 'me', id: threadId }); - const thread = res.data; - const messages = thread.messages; + threadResult = await executeAction( + 'GMAIL_FETCH_MESSAGE_BY_THREAD_ID', + connectedAccountId, + { thread_id: threadId, user_id: 'me' } + ); + } catch (error) { + console.warn(`[Gmail] Skipping thread ${threadId} (fetch failed):`, error instanceof Error ? error.message : error); + return null; + } - if (!messages || messages.length === 0) return; + if (!threadResult.success || !threadResult.data) { + console.error(`[Gmail] Failed to fetch thread ${threadId}:`, threadResult.error); + return null; + } - // Subject from first message - const firstHeader = messages[0].payload?.headers; - const subject = firstHeader?.find(h => h.name === 'Subject')?.value || '(No Subject)'; + const data = threadResult.data as Record; + const messages = data.messages as Array> | undefined; - let mdContent = `# ${subject}\n\n`; + let newestDate: Date | null = null; + + if (!messages || messages.length === 0) { + // Single message response + const parsed = parseMessageData(data); + const mdContent = `# ${parsed.subject}\n\n` + + `**Thread ID:** ${threadId}\n` + + `**Message Count:** 1\n\n---\n\n` + + `### From: ${parsed.from}\n` + + `**Date:** ${parsed.date}\n\n` + + `${parsed.body}\n\n---\n\n`; + + fs.writeFileSync(path.join(syncDir, `${cleanFilename(threadId)}.md`), mdContent); + console.log(`[Gmail] Synced Thread: ${parsed.subject} (${threadId})`); + newestDate = tryParseDate(parsed.date); + } else { + // Multi-message thread + const firstParsed = parseMessageData(messages[0]); + let mdContent = `# ${firstParsed.subject}\n\n`; mdContent += `**Thread ID:** ${threadId}\n`; mdContent += `**Message Count:** ${messages.length}\n\n---\n\n`; for (const msg of messages) { - const msgId = msg.id!; - const headers = msg.payload?.headers || []; - const from = headers.find(h => h.name === 'From')?.value || 'Unknown'; - const date = headers.find(h => h.name === 'Date')?.value || 'Unknown'; + const parsed = parseMessageData(msg); + mdContent += `### From: ${parsed.from}\n`; + mdContent += `**Date:** ${parsed.date}\n\n`; + mdContent += `${parsed.body}\n\n`; + mdContent += `---\n\n`; - mdContent += `### From: ${from}\n`; - mdContent += `**Date:** ${date}\n\n`; - - if (msg.payload) { - const body = getBody(msg.payload); - mdContent += `${body}\n\n`; + const msgDate = tryParseDate(parsed.date); + if (msgDate && (!newestDate || msgDate > newestDate)) { + newestDate = msgDate; } - - // Attachments - const parts: gmail.Schema$MessagePart[] = []; - const traverseParts = (pList: gmail.Schema$MessagePart[]) => { - for (const p of pList) { - parts.push(p); - if (p.parts) traverseParts(p.parts); - } - }; - if (msg.payload?.parts) traverseParts(msg.payload.parts); - - let attachmentsFound = false; - for (const part of parts) { - if (part.filename && part.body?.attachmentId) { - const savedName = await saveAttachment(gmail, 'me', msgId, part, attachmentsDir); - if (savedName) { - if (!attachmentsFound) { - mdContent += "**Attachments:**\n"; - attachmentsFound = true; - } - mdContent += `- [${part.filename}](attachments/${savedName})\n`; - } - } - } - mdContent += "\n---\n\n"; } - fs.writeFileSync(path.join(syncDir, `${threadId}.md`), mdContent); - console.log(`Synced Thread: ${subject} (${threadId})`); - - } catch (error) { - console.error(`Error processing thread ${threadId}:`, error); + fs.writeFileSync(path.join(syncDir, `${cleanFilename(threadId)}.md`), mdContent); + console.log(`[Gmail] Synced Thread: ${firstParsed.subject} (${threadId})`); } -} -function loadState(stateFile: string): { historyId?: string } { - if (fs.existsSync(stateFile)) { - return JSON.parse(fs.readFileSync(stateFile, 'utf-8')); - } - return {}; -} - -function saveState(historyId: string, stateFile: string) { - fs.writeFileSync(stateFile, JSON.stringify({ - historyId, - last_sync: new Date().toISOString() - }, null, 2)); -} - -async function fullSync(auth: OAuth2Client, syncDir: string, attachmentsDir: string, stateFile: string, lookbackDays: number) { - console.log(`Performing full sync of last ${lookbackDays} days...`); - const gmail = google.gmail({ version: 'v1', auth }); - - const pastDate = new Date(); - pastDate.setDate(pastDate.getDate() - lookbackDays); - const dateQuery = pastDate.toISOString().split('T')[0].replace(/-/g, '/'); - - // Get History ID - const profile = await gmail.users.getProfile({ userId: 'me' }); - const currentHistoryId = profile.data.historyId!; - - let pageToken: string | undefined; - do { - const res = await gmail.users.threads.list({ - userId: 'me', - q: `after:${dateQuery}`, - pageToken - }); - - const threads = res.data.threads; - if (threads) { - for (const thread of threads) { - await processThread(auth, thread.id!, syncDir, attachmentsDir); - } - } - pageToken = res.data.nextPageToken ?? undefined; - } while (pageToken); - - saveState(currentHistoryId, stateFile); - console.log("Full sync complete."); -} - -async function partialSync(auth: OAuth2Client, startHistoryId: string, syncDir: string, attachmentsDir: string, stateFile: string, lookbackDays: number) { - console.log(`Checking updates since historyId ${startHistoryId}...`); - const gmail = google.gmail({ version: 'v1', auth }); - - try { - const res = await gmail.users.history.list({ - userId: 'me', - startHistoryId, - historyTypes: ['messageAdded'] - }); - - const changes = res.data.history; - if (!changes || changes.length === 0) { - console.log("No new changes."); - const profile = await gmail.users.getProfile({ userId: 'me' }); - saveState(profile.data.historyId!, stateFile); - return; - } - - console.log(`Found ${changes.length} history records.`); - const threadIds = new Set(); - - for (const record of changes) { - if (record.messagesAdded) { - for (const item of record.messagesAdded) { - if (item.message?.threadId) { - threadIds.add(item.message.threadId); - } - } - } - } - - for (const tid of threadIds) { - await processThread(auth, tid, syncDir, attachmentsDir); - } - - const profile = await gmail.users.getProfile({ userId: 'me' }); - saveState(profile.data.historyId!, stateFile); - - } catch (error: unknown) { - const e = error as { response?: { status?: number } }; - if (e.response?.status === 404) { - console.log("History ID expired. Falling back to full sync."); - await fullSync(auth, syncDir, attachmentsDir, stateFile, lookbackDays); - } else { - console.error("Error during partial sync:", error); - // If 401, clear tokens to force re-auth next run - if (e.response?.status === 401) { - console.log("401 Unauthorized, clearing cache"); - GoogleClientFactory.clearCache(); - } - } - } + if (!newestDate) return null; + // Add 1 second so the `after:` query (epoch-second granularity) excludes this email next sync + return new Date(newestDate.getTime() + 1000).toISOString(); } async function performSync() { - const LOOKBACK_DAYS = 30; // Default to 1 month const ATTACHMENTS_DIR = path.join(SYNC_DIR, 'attachments'); const STATE_FILE = path.join(SYNC_DIR, 'sync_state.json'); @@ -285,51 +252,127 @@ async function performSync() { if (!fs.existsSync(SYNC_DIR)) fs.mkdirSync(SYNC_DIR, { recursive: true }); if (!fs.existsSync(ATTACHMENTS_DIR)) fs.mkdirSync(ATTACHMENTS_DIR, { recursive: true }); + const account = composioAccountsRepo.getAccount('gmail'); + if (!account || account.status !== 'ACTIVE') { + console.log('[Gmail] Gmail not connected via Composio. Skipping sync.'); + return; + } + + const connectedAccountId = account.id; + + // Determine query timestamp + const state = loadState(STATE_FILE); + let afterEpochSeconds: number; + + if (state) { + afterEpochSeconds = toEpochSeconds(state.last_sync); + console.log(`[Gmail] Syncing messages since ${state.last_sync}...`); + } else { + const pastDate = new Date(); + pastDate.setDate(pastDate.getDate() - LOOKBACK_DAYS); + afterEpochSeconds = Math.floor(pastDate.getTime() / 1000); + console.log(`[Gmail] First sync - fetching last ${LOOKBACK_DAYS} days...`); + } + try { - const auth = await GoogleClientFactory.getClient(); - if (!auth) { - console.log("No valid OAuth credentials available."); + // List threads since last sync (lightweight - returns IDs only) + const allThreadIds: string[] = []; + let pageToken: string | undefined; + + do { + const params: Record = { + query: `after:${afterEpochSeconds}`, + max_results: 20, + user_id: 'me', + }; + if (pageToken) { + params.page_token = pageToken; + } + + const result = await executeAction( + 'GMAIL_LIST_THREADS', + connectedAccountId, + params + ); + + if (!result.success || !result.data) { + console.error('[Gmail] Failed to list threads:', result.error); + return; + } + + const data = result.data as Record; + const threads = data.threads as Array> | undefined; + + if (threads && threads.length > 0) { + for (const thread of threads) { + const threadId = thread.id as string | undefined; + if (threadId) { + allThreadIds.push(threadId); + } + } + } + + pageToken = data.nextPageToken as string | undefined; + } while (pageToken); + + if (allThreadIds.length === 0) { + console.log('[Gmail] No new threads.'); return; } - console.log("Authorization successful. Starting sync..."); + console.log(`[Gmail] Found ${allThreadIds.length} threads to sync.`); - const state = loadState(STATE_FILE); - if (!state.historyId) { - console.log("No history ID found, starting full sync..."); - await fullSync(auth, SYNC_DIR, ATTACHMENTS_DIR, STATE_FILE, LOOKBACK_DAYS); - } else { - console.log("History ID found, starting partial sync..."); - await partialSync(auth, state.historyId, SYNC_DIR, ATTACHMENTS_DIR, STATE_FILE, LOOKBACK_DAYS); + // Reverse so we process oldest first. Gmail returns newest first, + // so processing in reverse lets the high-water mark advance + // chronologically — safe to save state after each thread. + allThreadIds.reverse(); + + // Process each thread, saving state after each one with the + // newest email date seen so far (high-water mark). + let highWaterMark: string | null = state?.last_sync ?? null; + let processedCount = 0; + for (const threadId of allThreadIds) { + try { + const newestInThread = await processThread(connectedAccountId, threadId, SYNC_DIR); + processedCount++; + + // Advance high-water mark if this thread has a newer email + if (newestInThread) { + if (!highWaterMark || new Date(newestInThread) > new Date(highWaterMark)) { + highWaterMark = newestInThread; + } + saveState(STATE_FILE, highWaterMark); + } + } catch (error) { + console.error(`[Gmail] Error processing thread ${threadId}, skipping:`, error); + } } - console.log("Sync completed."); + console.log(`[Gmail] Sync completed. Processed ${processedCount}/${allThreadIds.length} threads.`); + } catch (error) { - console.error("Error during sync:", error); + console.error('[Gmail] Error during sync:', error); } } export async function init() { - console.log("Starting Gmail Sync (TS)..."); - console.log(`Will sync every ${SYNC_INTERVAL_MS / 1000} seconds.`); + console.log('[Gmail] Starting Gmail Sync (Composio)...'); + console.log(`[Gmail] Will sync every ${SYNC_INTERVAL_MS / 1000} seconds.`); while (true) { try { - // Check if credentials are available with required scopes - const hasCredentials = await GoogleClientFactory.hasValidCredentials(REQUIRED_SCOPE); - - if (!hasCredentials) { - console.log("Google OAuth credentials not available or missing required Gmail scope. Sleeping..."); + const isConnected = composioAccountsRepo.isConnected('gmail'); + + if (!isConnected) { + console.log('[Gmail] Gmail not connected via Composio. Sleeping...'); } else { - // Perform one sync await performSync(); } } catch (error) { - console.error("Error in main loop:", error); + console.error('[Gmail] Error in main loop:', error); } - // Sleep for N minutes before next check (can be interrupted by triggerSync) - console.log(`Sleeping for ${SYNC_INTERVAL_MS / 1000} seconds...`); + console.log(`[Gmail] Sleeping for ${SYNC_INTERVAL_MS / 1000} seconds...`); await interruptibleSleep(SYNC_INTERVAL_MS); } }