diff --git a/apps/x/apps/main/src/composio-handler.ts b/apps/x/apps/main/src/composio-handler.ts index f72613ad..e5b25d1a 100644 --- a/apps/x/apps/main/src/composio-handler.ts +++ b/apps/x/apps/main/src/composio-handler.ts @@ -3,7 +3,6 @@ import { createAuthServer } from './auth-server.js'; import * as composioClient from '@x/core/dist/composio/client.js'; import { composioAccountsRepo } from '@x/core/dist/composio/repo.js'; import type { LocalConnectedAccount } from '@x/core/dist/composio/types.js'; -import { triggerSync as triggerGmailSync } from '@x/core/dist/knowledge/sync_gmail.js'; const REDIRECT_URI = 'http://localhost:8081/oauth/callback'; @@ -152,9 +151,6 @@ export async function initiateConnection(toolkitSlug: string): Promise<{ if (accountStatus.status === 'ACTIVE') { emitComposioEvent({ toolkitSlug, success: true }); - if (toolkitSlug === 'gmail') { - triggerGmailSync(); - } } else { emitComposioEvent({ toolkitSlug, diff --git a/apps/x/apps/main/src/main.ts b/apps/x/apps/main/src/main.ts index c276301b..6ddab7bc 100644 --- a/apps/x/apps/main/src/main.ts +++ b/apps/x/apps/main/src/main.ts @@ -5,7 +5,7 @@ import { fileURLToPath, pathToFileURL } from "node:url"; import { dirname } from "node:path"; import { updateElectronApp, UpdateSourceType } from "update-electron-app"; import { init as initGmailSync } from "@x/core/dist/knowledge/sync_gmail.js"; - +import { init as initCalendarSync } from "@x/core/dist/knowledge/sync_calendar.js"; import { init as initFirefliesSync } from "@x/core/dist/knowledge/sync_fireflies.js"; import { init as initGranolaSync } from "@x/core/dist/knowledge/granola/sync.js"; import { init as initGraphBuilder } from "@x/core/dist/knowledge/build_graph.js"; @@ -134,6 +134,9 @@ app.whenReady().then(async () => { // start gmail sync initGmailSync(); + // start calendar sync + initCalendarSync(); + // start fireflies sync initFirefliesSync(); diff --git a/apps/x/apps/main/src/oauth-handler.ts b/apps/x/apps/main/src/oauth-handler.ts index 58ab0809..5b55e8b7 100644 --- a/apps/x/apps/main/src/oauth-handler.ts +++ b/apps/x/apps/main/src/oauth-handler.ts @@ -7,6 +7,7 @@ import { getProviderConfig, getAvailableProviders } from '@x/core/dist/auth/prov import container from '@x/core/dist/di/container.js'; import { IOAuthRepo } from '@x/core/dist/auth/repo.js'; import { IClientRegistrationRepo } from '@x/core/dist/auth/client-repo.js'; +import { triggerSync as triggerGmailSync } from '@x/core/dist/knowledge/sync_gmail.js'; import { triggerSync as triggerCalendarSync } from '@x/core/dist/knowledge/sync_calendar.js'; import { triggerSync as triggerFirefliesSync } from '@x/core/dist/knowledge/sync_fireflies.js'; import { emitOAuthEvent } from './ipc.js'; @@ -193,6 +194,7 @@ export async function connectProvider(provider: string): Promise<{ success: bool // Trigger immediate sync for relevant providers if (provider === 'google') { + triggerGmailSync(); triggerCalendarSync(); } else if (provider === 'fireflies-ai') { triggerFirefliesSync(); diff --git a/apps/x/apps/renderer/src/components/connectors-popover.tsx b/apps/x/apps/renderer/src/components/connectors-popover.tsx index 882a8d48..1799ab75 100644 --- a/apps/x/apps/renderer/src/components/connectors-popover.tsx +++ b/apps/x/apps/renderer/src/components/connectors-popover.tsx @@ -41,12 +41,8 @@ export function ConnectorsPopover({ children, tooltip }: ConnectorsPopoverProps) const [granolaEnabled, setGranolaEnabled] = useState(false) const [granolaLoading, setGranolaLoading] = useState(true) - // Composio state (Gmail + Slack) + // Composio/Slack state const [composioApiKeyOpen, setComposioApiKeyOpen] = useState(false) - const [composioApiKeyTarget, setComposioApiKeyTarget] = useState<'gmail' | 'slack'>('gmail') - const [gmailConnected, setGmailConnected] = useState(false) - const [gmailLoading, setGmailLoading] = useState(true) - const [gmailConnecting, setGmailConnecting] = useState(false) const [slackConnected, setSlackConnected] = useState(false) const [slackLoading, setSlackLoading] = useState(true) const [slackConnecting, setSlackConnecting] = useState(false) @@ -97,20 +93,6 @@ export function ConnectorsPopover({ children, tooltip }: ConnectorsPopoverProps) } }, []) - // Load Gmail connection status - const refreshGmailStatus = useCallback(async () => { - try { - setGmailLoading(true) - const result = await window.ipc.invoke('composio:get-connection-status', { toolkitSlug: 'gmail' }) - setGmailConnected(result.isConnected) - } catch (error) { - console.error('Failed to load Gmail status:', error) - setGmailConnected(false) - } finally { - setGmailLoading(false) - } - }, []) - // Load Slack connection status const refreshSlackStatus = useCallback(async () => { try { @@ -125,53 +107,6 @@ export function ConnectorsPopover({ children, tooltip }: ConnectorsPopoverProps) } }, []) - // Connect to Gmail via Composio - const startGmailConnect = useCallback(async () => { - try { - setGmailConnecting(true) - const result = await window.ipc.invoke('composio:initiate-connection', { toolkitSlug: 'gmail' }) - if (!result.success) { - toast.error(result.error || 'Failed to connect to Gmail') - setGmailConnecting(false) - } - // Success will be handled by composio:didConnect event - } catch (error) { - console.error('Failed to connect to Gmail:', error) - toast.error('Failed to connect to Gmail') - setGmailConnecting(false) - } - }, []) - - // Handle Gmail connect button click - const handleConnectGmail = useCallback(async () => { - const configResult = await window.ipc.invoke('composio:is-configured', null) - if (!configResult.configured) { - setComposioApiKeyTarget('gmail') - setComposioApiKeyOpen(true) - return - } - await startGmailConnect() - }, [startGmailConnect]) - - // Disconnect from Gmail - const handleDisconnectGmail = useCallback(async () => { - try { - setGmailLoading(true) - const result = await window.ipc.invoke('composio:disconnect', { toolkitSlug: 'gmail' }) - if (result.success) { - setGmailConnected(false) - toast.success('Disconnected from Gmail') - } else { - toast.error('Failed to disconnect from Gmail') - } - } catch (error) { - console.error('Failed to disconnect from Gmail:', error) - toast.error('Failed to disconnect from Gmail') - } finally { - setGmailLoading(false) - } - }, []) - // Connect to Slack via Composio const startSlackConnect = useCallback(async () => { try { @@ -191,9 +126,9 @@ export function ConnectorsPopover({ children, tooltip }: ConnectorsPopoverProps) // Handle Slack connect button click const handleConnectSlack = useCallback(async () => { + // Check if Composio is configured const configResult = await window.ipc.invoke('composio:is-configured', null) if (!configResult.configured) { - setComposioApiKeyTarget('slack') setComposioApiKeyOpen(true) return } @@ -206,17 +141,13 @@ export function ConnectorsPopover({ children, tooltip }: ConnectorsPopoverProps) await window.ipc.invoke('composio:set-api-key', { apiKey }) setComposioApiKeyOpen(false) toast.success('Composio API key saved') - // Start the connection for whichever toolkit triggered the API key prompt - if (composioApiKeyTarget === 'gmail') { - await startGmailConnect() - } else { - await startSlackConnect() - } + // Now start the Slack connection + await startSlackConnect() } catch (error) { console.error('Failed to save Composio API key:', error) toast.error('Failed to save API key') } - }, [composioApiKeyTarget, startGmailConnect, startSlackConnect]) + }, [startSlackConnect]) // Disconnect from Slack const handleDisconnectSlack = useCallback(async () => { @@ -242,8 +173,7 @@ export function ConnectorsPopover({ children, tooltip }: ConnectorsPopoverProps) // Refresh Granola refreshGranolaConfig() - // Refresh Composio connections - refreshGmailStatus() + // Refresh Slack status refreshSlackStatus() // Refresh OAuth providers @@ -272,7 +202,7 @@ export function ConnectorsPopover({ children, tooltip }: ConnectorsPopoverProps) ) setProviderStates(newStates) - }, [providers, refreshGranolaConfig, refreshGmailStatus, refreshSlackStatus]) + }, [providers, refreshGranolaConfig, refreshSlackStatus]) // Refresh statuses when popover opens or providers list changes useEffect(() => { @@ -297,7 +227,7 @@ export function ConnectorsPopover({ children, tooltip }: ConnectorsPopoverProps) if (success) { const displayName = provider === 'fireflies-ai' ? 'Fireflies' : provider.charAt(0).toUpperCase() + provider.slice(1) - // Show detailed message for providers that sync in background + // Show detailed message for Google and Fireflies (includes sync info) if (provider === 'google' || provider === 'fireflies-ai') { toast.success(`Connected to ${displayName}`, { description: 'Syncing your data in the background. This may take a few minutes before changes appear.', @@ -321,19 +251,7 @@ export function ConnectorsPopover({ children, tooltip }: ConnectorsPopoverProps) const cleanup = window.ipc.on('composio:didConnect', (event) => { const { toolkitSlug, success, error } = event - if (toolkitSlug === 'gmail') { - setGmailConnected(success) - setGmailConnecting(false) - - if (success) { - toast.success('Connected to Gmail', { - description: 'Syncing your emails in the background. This may take a few minutes.', - duration: 8000, - }) - } else { - toast.error(error || 'Failed to connect to Gmail') - } - } else if (toolkitSlug === 'slack') { + if (toolkitSlug === 'slack') { setSlackConnected(success) setSlackConnecting(false) @@ -513,55 +431,16 @@ export function ConnectorsPopover({ children, tooltip }: ConnectorsPopoverProps) ) : ( <> - {/* Email Section - Gmail via Composio */} -
- Email -
-
-
-
- + {/* Email & Calendar Section - Google */} + {providers.includes('google') && ( + <> +
+ Email & Calendar
-
- Gmail - {gmailLoading ? ( - Checking... - ) : ( - Sync emails - )} -
-
-
- {gmailLoading ? ( - - ) : gmailConnected ? ( - - ) : ( - - )} -
-
- - + {renderOAuthProvider('google', 'Google', , 'Sync emails and calendar')} + + + )} {/* Meeting Notes Section - Granola & Fireflies */}
@@ -658,7 +537,7 @@ export function ConnectorsPopover({ children, tooltip }: ConnectorsPopoverProps) open={composioApiKeyOpen} onOpenChange={setComposioApiKeyOpen} onSubmit={handleComposioApiKeySubmit} - isSubmitting={gmailConnecting || slackConnecting} + isSubmitting={slackConnecting} /> ) diff --git a/apps/x/apps/renderer/src/components/onboarding-modal.tsx b/apps/x/apps/renderer/src/components/onboarding-modal.tsx index 0675697c..074ad645 100644 --- a/apps/x/apps/renderer/src/components/onboarding-modal.tsx +++ b/apps/x/apps/renderer/src/components/onboarding-modal.tsx @@ -42,12 +42,8 @@ export function OnboardingModal({ open, onComplete }: OnboardingModalProps) { const [granolaEnabled, setGranolaEnabled] = useState(false) const [granolaLoading, setGranolaLoading] = useState(true) - // Composio state (Gmail + Slack) + // Composio/Slack state const [composioApiKeyOpen, setComposioApiKeyOpen] = useState(false) - const [composioApiKeyTarget, setComposioApiKeyTarget] = useState<'gmail' | 'slack'>('gmail') - const [gmailConnected, setGmailConnected] = useState(false) - const [gmailLoading, setGmailLoading] = useState(true) - const [gmailConnecting, setGmailConnecting] = useState(false) const [slackConnected, setSlackConnected] = useState(false) const [slackLoading, setSlackLoading] = useState(true) const [slackConnecting, setSlackConnecting] = useState(false) @@ -105,47 +101,6 @@ export function OnboardingModal({ open, onComplete }: OnboardingModalProps) { } }, []) - // Load Gmail connection status - const refreshGmailStatus = useCallback(async () => { - try { - setGmailLoading(true) - const result = await window.ipc.invoke('composio:get-connection-status', { toolkitSlug: 'gmail' }) - setGmailConnected(result.isConnected) - } catch (error) { - console.error('Failed to load Gmail status:', error) - setGmailConnected(false) - } finally { - setGmailLoading(false) - } - }, []) - - // Connect to Gmail via Composio - const startGmailConnect = useCallback(async () => { - try { - setGmailConnecting(true) - const result = await window.ipc.invoke('composio:initiate-connection', { toolkitSlug: 'gmail' }) - if (!result.success) { - toast.error(result.error || 'Failed to connect to Gmail') - setGmailConnecting(false) - } - } catch (error) { - console.error('Failed to connect to Gmail:', error) - toast.error('Failed to connect to Gmail') - setGmailConnecting(false) - } - }, []) - - // Handle Gmail connect button click - const handleConnectGmail = useCallback(async () => { - const configResult = await window.ipc.invoke('composio:is-configured', null) - if (!configResult.configured) { - setComposioApiKeyTarget('gmail') - setComposioApiKeyOpen(true) - return - } - await startGmailConnect() - }, [startGmailConnect]) - // Load Slack connection status const refreshSlackStatus = useCallback(async () => { try { @@ -179,9 +134,9 @@ export function OnboardingModal({ open, onComplete }: OnboardingModalProps) { // Connect to Slack via Composio (checks if configured first) const handleConnectSlack = useCallback(async () => { + // Check if Composio is configured const configResult = await window.ipc.invoke('composio:is-configured', null) if (!configResult.configured) { - setComposioApiKeyTarget('slack') setComposioApiKeyOpen(true) return } @@ -194,24 +149,20 @@ export function OnboardingModal({ open, onComplete }: OnboardingModalProps) { await window.ipc.invoke('composio:set-api-key', { apiKey }) setComposioApiKeyOpen(false) toast.success('Composio API key saved') - if (composioApiKeyTarget === 'gmail') { - await startGmailConnect() - } else { - await startSlackConnect() - } + // Now start the Slack connection + await startSlackConnect() } catch (error) { console.error('Failed to save Composio API key:', error) toast.error('Failed to save API key') } - }, [composioApiKeyTarget, startGmailConnect, startSlackConnect]) + }, [startSlackConnect]) // Check connection status for all providers const refreshAllStatuses = useCallback(async () => { // Refresh Granola refreshGranolaConfig() - // Refresh Composio connections - refreshGmailStatus() + // Refresh Slack status refreshSlackStatus() // Refresh OAuth providers @@ -240,7 +191,7 @@ export function OnboardingModal({ open, onComplete }: OnboardingModalProps) { ) setProviderStates(newStates) - }, [providers, refreshGranolaConfig, refreshGmailStatus, refreshSlackStatus]) + }, [providers, refreshGranolaConfig, refreshSlackStatus]) // Refresh statuses when modal opens or providers list changes useEffect(() => { @@ -279,16 +230,7 @@ export function OnboardingModal({ open, onComplete }: OnboardingModalProps) { const cleanup = window.ipc.on('composio:didConnect', (event) => { const { toolkitSlug, success, error } = event - if (toolkitSlug === 'gmail') { - setGmailConnected(success) - setGmailConnecting(false) - - if (success) { - toast.success('Connected to Gmail') - } else { - toast.error(error || 'Failed to connect to Gmail') - } - } else if (toolkitSlug === 'slack') { + if (toolkitSlug === 'slack') { setSlackConnected(success) setSlackConnecting(false) @@ -435,48 +377,6 @@ export function OnboardingModal({ open, onComplete }: OnboardingModalProps) {
) - // Render Gmail row (Composio) - const renderGmailRow = () => ( -
-
-
- -
-
- Gmail - {gmailLoading ? ( - Checking... - ) : ( - Sync emails - )} -
-
-
- {gmailLoading ? ( - - ) : gmailConnected ? ( -
- - Connected -
- ) : ( - - )} -
-
- ) - // Render Slack row const renderSlackRow = () => (
@@ -570,13 +470,15 @@ export function OnboardingModal({ open, onComplete }: OnboardingModalProps) {
) : ( <> - {/* Email Section - Gmail via Composio */} -
-
- Email + {/* Email & Calendar Section */} + {providers.includes('google') && ( +
+
+ Email & Calendar +
+ {renderOAuthProvider('google', 'Google', , 'Sync emails and calendar events')}
- {renderGmailRow()} -
+ )} {/* Meeting Notes Section */}
@@ -611,7 +513,7 @@ export function OnboardingModal({ open, onComplete }: OnboardingModalProps) { // Step 2: Completion const CompletionStep = () => { - const hasConnections = connectedProviders.length > 0 || gmailConnected || granolaEnabled || slackConnected + const hasConnections = connectedProviders.length > 0 || granolaEnabled || slackConnected return (
@@ -634,10 +536,10 @@ export function OnboardingModal({ open, onComplete }: OnboardingModalProps) {

Connected accounts:

- {gmailConnected && ( + {connectedProviders.includes('google') && (
- Gmail (Email) + Google (Email & Calendar)
)} {connectedProviders.includes('fireflies-ai') && ( @@ -676,7 +578,7 @@ export function OnboardingModal({ open, onComplete }: OnboardingModalProps) { open={composioApiKeyOpen} onOpenChange={setComposioApiKeyOpen} onSubmit={handleComposioApiKeySubmit} - isSubmitting={gmailConnecting || slackConnecting} + isSubmitting={slackConnecting} /> {}}> |]/g, "").substring(0, 100).trim(); } -// --- State Management --- - -interface SyncState { - last_sync: string; // ISO string — human-readable, source of truth +function decodeBase64(data: string): string { + return Buffer.from(data, 'base64').toString('utf-8'); } -function loadState(stateFile: string): SyncState | null { - if (fs.existsSync(stateFile)) { - try { - const data = JSON.parse(fs.readFileSync(stateFile, 'utf-8')); - if (data.last_sync) { - return { last_sync: data.last_sync }; +function getBody(payload: gmail.Schema$MessagePart): string { + let body = ""; + if (payload.parts) { + for (const part of payload.parts) { + if (part.mimeType === 'text/plain' && part.body && part.body.data) { + const text = decodeBase64(part.body.data); + // Strip quoted lines + const cleanLines = text.split('\n').filter((line: string) => !line.trim().startsWith('>')); + body += cleanLines.join('\n'); + } else if (part.mimeType === 'text/html' && part.body && part.body.data) { + const html = decodeBase64(part.body.data); + const md = nhm.translate(html); + // Simple quote stripping for MD + const cleanLines = md.split('\n').filter((line: string) => !line.trim().startsWith('>')); + body += cleanLines.join('\n'); + } else if (part.parts) { + body += getBody(part); } - } catch (e) { - console.error('[Gmail] Failed to load state:', e); } + } else if (payload.body && payload.body.data) { + const data = decodeBase64(payload.body.data); + if (payload.mimeType === 'text/html') { + const md = nhm.translate(data); + body += md.split('\n').filter((line: string) => !line.trim().startsWith('>')).join('\n'); + } else { + body += data.split('\n').filter((line: string) => !line.trim().startsWith('>')).join('\n'); + } + } + return body; +} + +async function saveAttachment(gmail: gmail.Gmail, userId: string, msgId: string, part: gmail.Schema$MessagePart, attachmentsDir: string): Promise { + const filename = part.filename; + const attId = part.body?.attachmentId; + if (!filename || !attId) return null; + + const safeName = `${msgId}_${cleanFilename(filename)}`; + const filePath = path.join(attachmentsDir, safeName); + + if (fs.existsSync(filePath)) return safeName; + + try { + const res = await gmail.users.messages.attachments.get({ + userId, + messageId: msgId, + id: attId + }); + + const data = res.data.data; + if (data) { + fs.writeFileSync(filePath, Buffer.from(data, 'base64')); + console.log(`Saved attachment: ${safeName}`); + return safeName; + } + } catch (e) { + console.error(`Error saving attachment ${filename}:`, e); } return null; } -function saveState(stateFile: string, lastSync: string): void { - const state: SyncState = { - last_sync: lastSync, - }; - fs.writeFileSync(stateFile, JSON.stringify(state, null, 2)); -} - -/** - * Try to parse a date string into a Date. Returns null if unparseable. - */ -function tryParseDate(dateStr: string): Date | null { - const d = new Date(dateStr); - return isNaN(d.getTime()) ? null : d; -} - -function toEpochSeconds(isoString: string): number { - return Math.floor(new Date(isoString).getTime() / 1000); -} - -// --- Message Parsing --- - -interface ParsedMessage { - from: string; - date: string; - subject: string; - body: string; -} - -function parseMessageData(messageData: Record): ParsedMessage { - const headers = messageData.payload && typeof messageData.payload === 'object' - ? (messageData.payload as Record).headers as Array<{ name: string; value: string }> | undefined - : undefined; - - const from = headers?.find(h => h.name === 'From')?.value || String(messageData.from || messageData.sender || 'Unknown'); - const date = headers?.find(h => h.name === 'Date')?.value || String(messageData.date || messageData.internalDate || 'Unknown'); - const subject = headers?.find(h => h.name === 'Subject')?.value || String(messageData.subject || '(No Subject)'); - - let body = ''; - - // Try to extract body from payload structure (Gmail API format) - if (messageData.payload && typeof messageData.payload === 'object') { - body = extractBodyFromPayload(messageData.payload as Record); - } - - // Fallback: try snippet or body fields - if (!body) { - if (typeof messageData.body === 'string') { - body = messageData.body; - } else if (typeof messageData.snippet === 'string') { - body = messageData.snippet; - } else if (typeof messageData.text === 'string') { - body = messageData.text; - } - } - - // Convert HTML to markdown if body looks like HTML - if (body && (body.includes(' !line.trim().startsWith('>')).join('\n'); - } - - return { from, date, subject, body }; -} - -function extractBodyFromPayload(payload: Record): string { - const parts = payload.parts as Array> | undefined; - - if (parts) { - for (const part of parts) { - const mimeType = part.mimeType as string | undefined; - const bodyData = part.body && typeof part.body === 'object' - ? (part.body as Record).data as string | undefined - : undefined; - - if ((mimeType === 'text/plain' || mimeType === 'text/html') && bodyData) { - const decoded = Buffer.from(bodyData, 'base64').toString('utf-8'); - if (mimeType === 'text/html') { - return nhm.translate(decoded); - } - return decoded; - } - - // Recurse into nested parts - if (part.parts) { - const result = extractBodyFromPayload(part as Record); - if (result) return result; - } - } - } - - // Single-part message - const bodyData = payload.body && typeof payload.body === 'object' - ? (payload.body as Record).data as string | undefined - : undefined; - - if (bodyData) { - const decoded = Buffer.from(bodyData, 'base64').toString('utf-8'); - const mimeType = payload.mimeType as string | undefined; - if (mimeType === 'text/html') { - return nhm.translate(decoded); - } - return decoded; - } - - return ''; -} - // --- Sync Logic --- -/** - * Process a thread and write its .md file. - * Returns the newest message date (as ISO string) found in the thread, or null. - */ -async function processThread(connectedAccountId: string, threadId: string, syncDir: string): Promise { - let threadResult; +async function processThread(auth: OAuth2Client, threadId: string, syncDir: string, attachmentsDir: string) { + const gmail = google.gmail({ version: 'v1', auth }); try { - threadResult = await executeAction( - 'GMAIL_FETCH_MESSAGE_BY_THREAD_ID', - connectedAccountId, - { thread_id: threadId, user_id: 'me' } - ); - } catch (error) { - console.warn(`[Gmail] Skipping thread ${threadId} (fetch failed):`, error instanceof Error ? error.message : error); - return null; - } + const res = await gmail.users.threads.get({ userId: 'me', id: threadId }); + const thread = res.data; + const messages = thread.messages; - if (!threadResult.success || !threadResult.data) { - console.error(`[Gmail] Failed to fetch thread ${threadId}:`, threadResult.error); - return null; - } + if (!messages || messages.length === 0) return; - const data = threadResult.data as Record; - const messages = data.messages as Array> | undefined; + // Subject from first message + const firstHeader = messages[0].payload?.headers; + const subject = firstHeader?.find(h => h.name === 'Subject')?.value || '(No Subject)'; - let newestDate: Date | null = null; - - if (!messages || messages.length === 0) { - // Single message response - const parsed = parseMessageData(data); - const mdContent = `# ${parsed.subject}\n\n` + - `**Thread ID:** ${threadId}\n` + - `**Message Count:** 1\n\n---\n\n` + - `### From: ${parsed.from}\n` + - `**Date:** ${parsed.date}\n\n` + - `${parsed.body}\n\n---\n\n`; - - fs.writeFileSync(path.join(syncDir, `${cleanFilename(threadId)}.md`), mdContent); - console.log(`[Gmail] Synced Thread: ${parsed.subject} (${threadId})`); - newestDate = tryParseDate(parsed.date); - } else { - // Multi-message thread - const firstParsed = parseMessageData(messages[0]); - let mdContent = `# ${firstParsed.subject}\n\n`; + let mdContent = `# ${subject}\n\n`; mdContent += `**Thread ID:** ${threadId}\n`; mdContent += `**Message Count:** ${messages.length}\n\n---\n\n`; for (const msg of messages) { - const parsed = parseMessageData(msg); - mdContent += `### From: ${parsed.from}\n`; - mdContent += `**Date:** ${parsed.date}\n\n`; - mdContent += `${parsed.body}\n\n`; - mdContent += `---\n\n`; + const msgId = msg.id!; + const headers = msg.payload?.headers || []; + const from = headers.find(h => h.name === 'From')?.value || 'Unknown'; + const date = headers.find(h => h.name === 'Date')?.value || 'Unknown'; - const msgDate = tryParseDate(parsed.date); - if (msgDate && (!newestDate || msgDate > newestDate)) { - newestDate = msgDate; + mdContent += `### From: ${from}\n`; + mdContent += `**Date:** ${date}\n\n`; + + if (msg.payload) { + const body = getBody(msg.payload); + mdContent += `${body}\n\n`; + } + + // Attachments + const parts: gmail.Schema$MessagePart[] = []; + const traverseParts = (pList: gmail.Schema$MessagePart[]) => { + for (const p of pList) { + parts.push(p); + if (p.parts) traverseParts(p.parts); + } + }; + if (msg.payload?.parts) traverseParts(msg.payload.parts); + + let attachmentsFound = false; + for (const part of parts) { + if (part.filename && part.body?.attachmentId) { + const savedName = await saveAttachment(gmail, 'me', msgId, part, attachmentsDir); + if (savedName) { + if (!attachmentsFound) { + mdContent += "**Attachments:**\n"; + attachmentsFound = true; + } + mdContent += `- [${part.filename}](attachments/${savedName})\n`; + } + } + } + mdContent += "\n---\n\n"; + } + + fs.writeFileSync(path.join(syncDir, `${threadId}.md`), mdContent); + console.log(`Synced Thread: ${subject} (${threadId})`); + + } catch (error) { + console.error(`Error processing thread ${threadId}:`, error); + } +} + +function loadState(stateFile: string): { historyId?: string } { + if (fs.existsSync(stateFile)) { + return JSON.parse(fs.readFileSync(stateFile, 'utf-8')); + } + return {}; +} + +function saveState(historyId: string, stateFile: string) { + fs.writeFileSync(stateFile, JSON.stringify({ + historyId, + last_sync: new Date().toISOString() + }, null, 2)); +} + +async function fullSync(auth: OAuth2Client, syncDir: string, attachmentsDir: string, stateFile: string, lookbackDays: number) { + console.log(`Performing full sync of last ${lookbackDays} days...`); + const gmail = google.gmail({ version: 'v1', auth }); + + const pastDate = new Date(); + pastDate.setDate(pastDate.getDate() - lookbackDays); + const dateQuery = pastDate.toISOString().split('T')[0].replace(/-/g, '/'); + + // Get History ID + const profile = await gmail.users.getProfile({ userId: 'me' }); + const currentHistoryId = profile.data.historyId!; + + let pageToken: string | undefined; + do { + const res = await gmail.users.threads.list({ + userId: 'me', + q: `after:${dateQuery}`, + pageToken + }); + + const threads = res.data.threads; + if (threads) { + for (const thread of threads) { + await processThread(auth, thread.id!, syncDir, attachmentsDir); + } + } + pageToken = res.data.nextPageToken ?? undefined; + } while (pageToken); + + saveState(currentHistoryId, stateFile); + console.log("Full sync complete."); +} + +async function partialSync(auth: OAuth2Client, startHistoryId: string, syncDir: string, attachmentsDir: string, stateFile: string, lookbackDays: number) { + console.log(`Checking updates since historyId ${startHistoryId}...`); + const gmail = google.gmail({ version: 'v1', auth }); + + try { + const res = await gmail.users.history.list({ + userId: 'me', + startHistoryId, + historyTypes: ['messageAdded'] + }); + + const changes = res.data.history; + if (!changes || changes.length === 0) { + console.log("No new changes."); + const profile = await gmail.users.getProfile({ userId: 'me' }); + saveState(profile.data.historyId!, stateFile); + return; + } + + console.log(`Found ${changes.length} history records.`); + const threadIds = new Set(); + + for (const record of changes) { + if (record.messagesAdded) { + for (const item of record.messagesAdded) { + if (item.message?.threadId) { + threadIds.add(item.message.threadId); + } + } } } - fs.writeFileSync(path.join(syncDir, `${cleanFilename(threadId)}.md`), mdContent); - console.log(`[Gmail] Synced Thread: ${firstParsed.subject} (${threadId})`); - } + for (const tid of threadIds) { + await processThread(auth, tid, syncDir, attachmentsDir); + } - if (!newestDate) return null; - // Add 1 second so the `after:` query (epoch-second granularity) excludes this email next sync - return new Date(newestDate.getTime() + 1000).toISOString(); + const profile = await gmail.users.getProfile({ userId: 'me' }); + saveState(profile.data.historyId!, stateFile); + + } catch (error: unknown) { + const e = error as { response?: { status?: number } }; + if (e.response?.status === 404) { + console.log("History ID expired. Falling back to full sync."); + await fullSync(auth, syncDir, attachmentsDir, stateFile, lookbackDays); + } else { + console.error("Error during partial sync:", error); + // If 401, clear tokens to force re-auth next run + if (e.response?.status === 401) { + console.log("401 Unauthorized, clearing cache"); + GoogleClientFactory.clearCache(); + } + } + } } async function performSync() { + const LOOKBACK_DAYS = 30; // Default to 1 month const ATTACHMENTS_DIR = path.join(SYNC_DIR, 'attachments'); const STATE_FILE = path.join(SYNC_DIR, 'sync_state.json'); @@ -252,127 +285,51 @@ async function performSync() { if (!fs.existsSync(SYNC_DIR)) fs.mkdirSync(SYNC_DIR, { recursive: true }); if (!fs.existsSync(ATTACHMENTS_DIR)) fs.mkdirSync(ATTACHMENTS_DIR, { recursive: true }); - const account = composioAccountsRepo.getAccount('gmail'); - if (!account || account.status !== 'ACTIVE') { - console.log('[Gmail] Gmail not connected via Composio. Skipping sync.'); - return; - } - - const connectedAccountId = account.id; - - // Determine query timestamp - const state = loadState(STATE_FILE); - let afterEpochSeconds: number; - - if (state) { - afterEpochSeconds = toEpochSeconds(state.last_sync); - console.log(`[Gmail] Syncing messages since ${state.last_sync}...`); - } else { - const pastDate = new Date(); - pastDate.setDate(pastDate.getDate() - LOOKBACK_DAYS); - afterEpochSeconds = Math.floor(pastDate.getTime() / 1000); - console.log(`[Gmail] First sync - fetching last ${LOOKBACK_DAYS} days...`); - } - try { - // List threads since last sync (lightweight - returns IDs only) - const allThreadIds: string[] = []; - let pageToken: string | undefined; - - do { - const params: Record = { - query: `after:${afterEpochSeconds}`, - max_results: 20, - user_id: 'me', - }; - if (pageToken) { - params.page_token = pageToken; - } - - const result = await executeAction( - 'GMAIL_LIST_THREADS', - connectedAccountId, - params - ); - - if (!result.success || !result.data) { - console.error('[Gmail] Failed to list threads:', result.error); - return; - } - - const data = result.data as Record; - const threads = data.threads as Array> | undefined; - - if (threads && threads.length > 0) { - for (const thread of threads) { - const threadId = thread.id as string | undefined; - if (threadId) { - allThreadIds.push(threadId); - } - } - } - - pageToken = data.nextPageToken as string | undefined; - } while (pageToken); - - if (allThreadIds.length === 0) { - console.log('[Gmail] No new threads.'); + const auth = await GoogleClientFactory.getClient(); + if (!auth) { + console.log("No valid OAuth credentials available."); return; } - console.log(`[Gmail] Found ${allThreadIds.length} threads to sync.`); + console.log("Authorization successful. Starting sync..."); - // Reverse so we process oldest first. Gmail returns newest first, - // so processing in reverse lets the high-water mark advance - // chronologically — safe to save state after each thread. - allThreadIds.reverse(); - - // Process each thread, saving state after each one with the - // newest email date seen so far (high-water mark). - let highWaterMark: string | null = state?.last_sync ?? null; - let processedCount = 0; - for (const threadId of allThreadIds) { - try { - const newestInThread = await processThread(connectedAccountId, threadId, SYNC_DIR); - processedCount++; - - // Advance high-water mark if this thread has a newer email - if (newestInThread) { - if (!highWaterMark || new Date(newestInThread) > new Date(highWaterMark)) { - highWaterMark = newestInThread; - } - saveState(STATE_FILE, highWaterMark); - } - } catch (error) { - console.error(`[Gmail] Error processing thread ${threadId}, skipping:`, error); - } + const state = loadState(STATE_FILE); + if (!state.historyId) { + console.log("No history ID found, starting full sync..."); + await fullSync(auth, SYNC_DIR, ATTACHMENTS_DIR, STATE_FILE, LOOKBACK_DAYS); + } else { + console.log("History ID found, starting partial sync..."); + await partialSync(auth, state.historyId, SYNC_DIR, ATTACHMENTS_DIR, STATE_FILE, LOOKBACK_DAYS); } - console.log(`[Gmail] Sync completed. Processed ${processedCount}/${allThreadIds.length} threads.`); - + console.log("Sync completed."); } catch (error) { - console.error('[Gmail] Error during sync:', error); + console.error("Error during sync:", error); } } export async function init() { - console.log('[Gmail] Starting Gmail Sync (Composio)...'); - console.log(`[Gmail] Will sync every ${SYNC_INTERVAL_MS / 1000} seconds.`); + console.log("Starting Gmail Sync (TS)..."); + console.log(`Will sync every ${SYNC_INTERVAL_MS / 1000} seconds.`); while (true) { try { - const isConnected = composioAccountsRepo.isConnected('gmail'); - - if (!isConnected) { - console.log('[Gmail] Gmail not connected via Composio. Sleeping...'); + // Check if credentials are available with required scopes + const hasCredentials = await GoogleClientFactory.hasValidCredentials(REQUIRED_SCOPE); + + if (!hasCredentials) { + console.log("Google OAuth credentials not available or missing required Gmail scope. Sleeping..."); } else { + // Perform one sync await performSync(); } } catch (error) { - console.error('[Gmail] Error in main loop:', error); + console.error("Error in main loop:", error); } - console.log(`[Gmail] Sleeping for ${SYNC_INTERVAL_MS / 1000} seconds...`); + // Sleep for N minutes before next check (can be interrupted by triggerSync) + console.log(`Sleeping for ${SYNC_INTERVAL_MS / 1000} seconds...`); await interruptibleSleep(SYNC_INTERVAL_MS); } }