From c3b754ee6b227a6286b743da82203dc9eb6f2260 Mon Sep 17 00:00:00 2001 From: Arjun <6592213+arkml@users.noreply.github.com> Date: Mon, 2 Feb 2026 16:50:01 +0530 Subject: [PATCH] move gmail_sync to composio --- apps/x/apps/main/src/composio-handler.ts | 4 + apps/x/apps/main/src/oauth-handler.ts | 2 - .../packages/core/src/knowledge/sync_gmail.ts | 509 ++++++++++-------- 3 files changed, 276 insertions(+), 239 deletions(-) diff --git a/apps/x/apps/main/src/composio-handler.ts b/apps/x/apps/main/src/composio-handler.ts index e5b25d1a..f72613ad 100644 --- a/apps/x/apps/main/src/composio-handler.ts +++ b/apps/x/apps/main/src/composio-handler.ts @@ -3,6 +3,7 @@ import { createAuthServer } from './auth-server.js'; import * as composioClient from '@x/core/dist/composio/client.js'; import { composioAccountsRepo } from '@x/core/dist/composio/repo.js'; import type { LocalConnectedAccount } from '@x/core/dist/composio/types.js'; +import { triggerSync as triggerGmailSync } from '@x/core/dist/knowledge/sync_gmail.js'; const REDIRECT_URI = 'http://localhost:8081/oauth/callback'; @@ -151,6 +152,9 @@ export async function initiateConnection(toolkitSlug: string): Promise<{ if (accountStatus.status === 'ACTIVE') { emitComposioEvent({ toolkitSlug, success: true }); + if (toolkitSlug === 'gmail') { + triggerGmailSync(); + } } else { emitComposioEvent({ toolkitSlug, diff --git a/apps/x/apps/main/src/oauth-handler.ts b/apps/x/apps/main/src/oauth-handler.ts index 5b55e8b7..58ab0809 100644 --- a/apps/x/apps/main/src/oauth-handler.ts +++ b/apps/x/apps/main/src/oauth-handler.ts @@ -7,7 +7,6 @@ import { getProviderConfig, getAvailableProviders } from '@x/core/dist/auth/prov import container from '@x/core/dist/di/container.js'; import { IOAuthRepo } from '@x/core/dist/auth/repo.js'; import { IClientRegistrationRepo } from '@x/core/dist/auth/client-repo.js'; -import { triggerSync as triggerGmailSync } from '@x/core/dist/knowledge/sync_gmail.js'; import { triggerSync as triggerCalendarSync } from '@x/core/dist/knowledge/sync_calendar.js'; import { triggerSync as triggerFirefliesSync } from '@x/core/dist/knowledge/sync_fireflies.js'; import { emitOAuthEvent } from './ipc.js'; @@ -194,7 +193,6 @@ export async function connectProvider(provider: string): Promise<{ success: bool // Trigger immediate sync for relevant providers if (provider === 'google') { - triggerGmailSync(); triggerCalendarSync(); } else if (provider === 'fireflies-ai') { triggerFirefliesSync(); diff --git a/apps/x/packages/core/src/knowledge/sync_gmail.ts b/apps/x/packages/core/src/knowledge/sync_gmail.ts index d1782a96..9b042eb0 100644 --- a/apps/x/packages/core/src/knowledge/sync_gmail.ts +++ b/apps/x/packages/core/src/knowledge/sync_gmail.ts @@ -1,15 +1,14 @@ import fs from 'fs'; import path from 'path'; -import { google, gmail_v1 as gmail } from 'googleapis'; import { NodeHtmlMarkdown } from 'node-html-markdown' -import { OAuth2Client } from 'google-auth-library'; import { WorkDir } from '../config/config.js'; -import { GoogleClientFactory } from './google-client-factory.js'; +import { executeAction } from '../composio/client.js'; +import { composioAccountsRepo } from '../composio/repo.js'; // Configuration const SYNC_DIR = path.join(WorkDir, 'gmail_sync'); const SYNC_INTERVAL_MS = 5 * 60 * 1000; // Check every 5 minutes -const REQUIRED_SCOPE = 'https://www.googleapis.com/auth/gmail.readonly'; +const LOOKBACK_DAYS = 7; const nhm = new NodeHtmlMarkdown(); @@ -43,241 +42,201 @@ function cleanFilename(name: string): string { return name.replace(/[\\/*?:":<>|]/g, "").substring(0, 100).trim(); } -function decodeBase64(data: string): string { - return Buffer.from(data, 'base64').toString('utf-8'); +// --- State Management --- + +interface SyncState { + last_sync: string; // ISO string — human-readable, source of truth } -function getBody(payload: gmail.Schema$MessagePart): string { - let body = ""; - if (payload.parts) { - for (const part of payload.parts) { - if (part.mimeType === 'text/plain' && part.body && part.body.data) { - const text = decodeBase64(part.body.data); - // Strip quoted lines - const cleanLines = text.split('\n').filter((line: string) => !line.trim().startsWith('>')); - body += cleanLines.join('\n'); - } else if (part.mimeType === 'text/html' && part.body && part.body.data) { - const html = decodeBase64(part.body.data); - const md = nhm.translate(html); - // Simple quote stripping for MD - const cleanLines = md.split('\n').filter((line: string) => !line.trim().startsWith('>')); - body += cleanLines.join('\n'); - } else if (part.parts) { - body += getBody(part); +function loadState(stateFile: string): SyncState | null { + if (fs.existsSync(stateFile)) { + try { + const data = JSON.parse(fs.readFileSync(stateFile, 'utf-8')); + if (data.last_sync) { + return { last_sync: data.last_sync }; } + } catch (e) { + console.error('[Gmail] Failed to load state:', e); } - } else if (payload.body && payload.body.data) { - const data = decodeBase64(payload.body.data); - if (payload.mimeType === 'text/html') { - const md = nhm.translate(data); - body += md.split('\n').filter((line: string) => !line.trim().startsWith('>')).join('\n'); - } else { - body += data.split('\n').filter((line: string) => !line.trim().startsWith('>')).join('\n'); - } - } - return body; -} - -async function saveAttachment(gmail: gmail.Gmail, userId: string, msgId: string, part: gmail.Schema$MessagePart, attachmentsDir: string): Promise { - const filename = part.filename; - const attId = part.body?.attachmentId; - if (!filename || !attId) return null; - - const safeName = `${msgId}_${cleanFilename(filename)}`; - const filePath = path.join(attachmentsDir, safeName); - - if (fs.existsSync(filePath)) return safeName; - - try { - const res = await gmail.users.messages.attachments.get({ - userId, - messageId: msgId, - id: attId - }); - - const data = res.data.data; - if (data) { - fs.writeFileSync(filePath, Buffer.from(data, 'base64')); - console.log(`Saved attachment: ${safeName}`); - return safeName; - } - } catch (e) { - console.error(`Error saving attachment ${filename}:`, e); } return null; } +function saveState(stateFile: string, lastSync: string): void { + const state: SyncState = { + last_sync: lastSync, + }; + fs.writeFileSync(stateFile, JSON.stringify(state, null, 2)); +} + +/** + * Try to parse a date string into a Date. Returns null if unparseable. + */ +function tryParseDate(dateStr: string): Date | null { + const d = new Date(dateStr); + return isNaN(d.getTime()) ? null : d; +} + +function toEpochSeconds(isoString: string): number { + return Math.floor(new Date(isoString).getTime() / 1000); +} + +// --- Message Parsing --- + +interface ParsedMessage { + from: string; + date: string; + subject: string; + body: string; +} + +function parseMessageData(messageData: Record): ParsedMessage { + const headers = messageData.payload && typeof messageData.payload === 'object' + ? (messageData.payload as Record).headers as Array<{ name: string; value: string }> | undefined + : undefined; + + const from = headers?.find(h => h.name === 'From')?.value || String(messageData.from || messageData.sender || 'Unknown'); + const date = headers?.find(h => h.name === 'Date')?.value || String(messageData.date || messageData.internalDate || 'Unknown'); + const subject = headers?.find(h => h.name === 'Subject')?.value || String(messageData.subject || '(No Subject)'); + + let body = ''; + + // Try to extract body from payload structure (Gmail API format) + if (messageData.payload && typeof messageData.payload === 'object') { + body = extractBodyFromPayload(messageData.payload as Record); + } + + // Fallback: try snippet or body fields + if (!body) { + if (typeof messageData.body === 'string') { + body = messageData.body; + } else if (typeof messageData.snippet === 'string') { + body = messageData.snippet; + } else if (typeof messageData.text === 'string') { + body = messageData.text; + } + } + + // Convert HTML to markdown if body looks like HTML + if (body && (body.includes(' !line.trim().startsWith('>')).join('\n'); + } + + return { from, date, subject, body }; +} + +function extractBodyFromPayload(payload: Record): string { + const parts = payload.parts as Array> | undefined; + + if (parts) { + for (const part of parts) { + const mimeType = part.mimeType as string | undefined; + const bodyData = part.body && typeof part.body === 'object' + ? (part.body as Record).data as string | undefined + : undefined; + + if ((mimeType === 'text/plain' || mimeType === 'text/html') && bodyData) { + const decoded = Buffer.from(bodyData, 'base64').toString('utf-8'); + if (mimeType === 'text/html') { + return nhm.translate(decoded); + } + return decoded; + } + + // Recurse into nested parts + if (part.parts) { + const result = extractBodyFromPayload(part as Record); + if (result) return result; + } + } + } + + // Single-part message + const bodyData = payload.body && typeof payload.body === 'object' + ? (payload.body as Record).data as string | undefined + : undefined; + + if (bodyData) { + const decoded = Buffer.from(bodyData, 'base64').toString('utf-8'); + const mimeType = payload.mimeType as string | undefined; + if (mimeType === 'text/html') { + return nhm.translate(decoded); + } + return decoded; + } + + return ''; +} + // --- Sync Logic --- -async function processThread(auth: OAuth2Client, threadId: string, syncDir: string, attachmentsDir: string) { - const gmail = google.gmail({ version: 'v1', auth }); - try { - const res = await gmail.users.threads.get({ userId: 'me', id: threadId }); - const thread = res.data; - const messages = thread.messages; +/** + * Process a thread and write its .md file. + * Returns the newest message date (as ISO string) found in the thread, or null. + */ +async function processThread(connectedAccountId: string, threadId: string, syncDir: string): Promise { + const threadResult = await executeAction( + 'GMAIL_FETCH_MESSAGE_BY_THREAD_ID', + connectedAccountId, + { thread_id: threadId, user_id: 'me' } + ); - if (!messages || messages.length === 0) return; + if (!threadResult.success || !threadResult.data) { + console.error(`[Gmail] Failed to fetch thread ${threadId}:`, threadResult.error); + return null; + } - // Subject from first message - const firstHeader = messages[0].payload?.headers; - const subject = firstHeader?.find(h => h.name === 'Subject')?.value || '(No Subject)'; + const data = threadResult.data as Record; + const messages = data.messages as Array> | undefined; - let mdContent = `# ${subject}\n\n`; + let newestDate: Date | null = null; + + if (!messages || messages.length === 0) { + // Single message response + const parsed = parseMessageData(data); + const mdContent = `# ${parsed.subject}\n\n` + + `**Thread ID:** ${threadId}\n` + + `**Message Count:** 1\n\n---\n\n` + + `### From: ${parsed.from}\n` + + `**Date:** ${parsed.date}\n\n` + + `${parsed.body}\n\n---\n\n`; + + fs.writeFileSync(path.join(syncDir, `${cleanFilename(threadId)}.md`), mdContent); + console.log(`[Gmail] Synced Thread: ${parsed.subject} (${threadId})`); + newestDate = tryParseDate(parsed.date); + } else { + // Multi-message thread + const firstParsed = parseMessageData(messages[0]); + let mdContent = `# ${firstParsed.subject}\n\n`; mdContent += `**Thread ID:** ${threadId}\n`; mdContent += `**Message Count:** ${messages.length}\n\n---\n\n`; for (const msg of messages) { - const msgId = msg.id!; - const headers = msg.payload?.headers || []; - const from = headers.find(h => h.name === 'From')?.value || 'Unknown'; - const date = headers.find(h => h.name === 'Date')?.value || 'Unknown'; + const parsed = parseMessageData(msg); + mdContent += `### From: ${parsed.from}\n`; + mdContent += `**Date:** ${parsed.date}\n\n`; + mdContent += `${parsed.body}\n\n`; + mdContent += `---\n\n`; - mdContent += `### From: ${from}\n`; - mdContent += `**Date:** ${date}\n\n`; - - if (msg.payload) { - const body = getBody(msg.payload); - mdContent += `${body}\n\n`; + const msgDate = tryParseDate(parsed.date); + if (msgDate && (!newestDate || msgDate > newestDate)) { + newestDate = msgDate; } - - // Attachments - const parts: gmail.Schema$MessagePart[] = []; - const traverseParts = (pList: gmail.Schema$MessagePart[]) => { - for (const p of pList) { - parts.push(p); - if (p.parts) traverseParts(p.parts); - } - }; - if (msg.payload?.parts) traverseParts(msg.payload.parts); - - let attachmentsFound = false; - for (const part of parts) { - if (part.filename && part.body?.attachmentId) { - const savedName = await saveAttachment(gmail, 'me', msgId, part, attachmentsDir); - if (savedName) { - if (!attachmentsFound) { - mdContent += "**Attachments:**\n"; - attachmentsFound = true; - } - mdContent += `- [${part.filename}](attachments/${savedName})\n`; - } - } - } - mdContent += "\n---\n\n"; } - fs.writeFileSync(path.join(syncDir, `${threadId}.md`), mdContent); - console.log(`Synced Thread: ${subject} (${threadId})`); - - } catch (error) { - console.error(`Error processing thread ${threadId}:`, error); + fs.writeFileSync(path.join(syncDir, `${cleanFilename(threadId)}.md`), mdContent); + console.log(`[Gmail] Synced Thread: ${firstParsed.subject} (${threadId})`); } -} -function loadState(stateFile: string): { historyId?: string } { - if (fs.existsSync(stateFile)) { - return JSON.parse(fs.readFileSync(stateFile, 'utf-8')); - } - return {}; -} - -function saveState(historyId: string, stateFile: string) { - fs.writeFileSync(stateFile, JSON.stringify({ - historyId, - last_sync: new Date().toISOString() - }, null, 2)); -} - -async function fullSync(auth: OAuth2Client, syncDir: string, attachmentsDir: string, stateFile: string, lookbackDays: number) { - console.log(`Performing full sync of last ${lookbackDays} days...`); - const gmail = google.gmail({ version: 'v1', auth }); - - const pastDate = new Date(); - pastDate.setDate(pastDate.getDate() - lookbackDays); - const dateQuery = pastDate.toISOString().split('T')[0].replace(/-/g, '/'); - - // Get History ID - const profile = await gmail.users.getProfile({ userId: 'me' }); - const currentHistoryId = profile.data.historyId!; - - let pageToken: string | undefined; - do { - const res = await gmail.users.threads.list({ - userId: 'me', - q: `after:${dateQuery}`, - pageToken - }); - - const threads = res.data.threads; - if (threads) { - for (const thread of threads) { - await processThread(auth, thread.id!, syncDir, attachmentsDir); - } - } - pageToken = res.data.nextPageToken ?? undefined; - } while (pageToken); - - saveState(currentHistoryId, stateFile); - console.log("Full sync complete."); -} - -async function partialSync(auth: OAuth2Client, startHistoryId: string, syncDir: string, attachmentsDir: string, stateFile: string, lookbackDays: number) { - console.log(`Checking updates since historyId ${startHistoryId}...`); - const gmail = google.gmail({ version: 'v1', auth }); - - try { - const res = await gmail.users.history.list({ - userId: 'me', - startHistoryId, - historyTypes: ['messageAdded'] - }); - - const changes = res.data.history; - if (!changes || changes.length === 0) { - console.log("No new changes."); - const profile = await gmail.users.getProfile({ userId: 'me' }); - saveState(profile.data.historyId!, stateFile); - return; - } - - console.log(`Found ${changes.length} history records.`); - const threadIds = new Set(); - - for (const record of changes) { - if (record.messagesAdded) { - for (const item of record.messagesAdded) { - if (item.message?.threadId) { - threadIds.add(item.message.threadId); - } - } - } - } - - for (const tid of threadIds) { - await processThread(auth, tid, syncDir, attachmentsDir); - } - - const profile = await gmail.users.getProfile({ userId: 'me' }); - saveState(profile.data.historyId!, stateFile); - - } catch (error: unknown) { - const e = error as { response?: { status?: number } }; - if (e.response?.status === 404) { - console.log("History ID expired. Falling back to full sync."); - await fullSync(auth, syncDir, attachmentsDir, stateFile, lookbackDays); - } else { - console.error("Error during partial sync:", error); - // If 401, clear tokens to force re-auth next run - if (e.response?.status === 401) { - console.log("401 Unauthorized, clearing cache"); - GoogleClientFactory.clearCache(); - } - } - } + return newestDate ? newestDate.toISOString() : null; } async function performSync() { - const LOOKBACK_DAYS = 30; // Default to 1 month const ATTACHMENTS_DIR = path.join(SYNC_DIR, 'attachments'); const STATE_FILE = path.join(SYNC_DIR, 'sync_state.json'); @@ -285,51 +244,127 @@ async function performSync() { if (!fs.existsSync(SYNC_DIR)) fs.mkdirSync(SYNC_DIR, { recursive: true }); if (!fs.existsSync(ATTACHMENTS_DIR)) fs.mkdirSync(ATTACHMENTS_DIR, { recursive: true }); + const account = composioAccountsRepo.getAccount('gmail'); + if (!account || account.status !== 'ACTIVE') { + console.log('[Gmail] Gmail not connected via Composio. Skipping sync.'); + return; + } + + const connectedAccountId = account.id; + + // Determine query timestamp + const state = loadState(STATE_FILE); + let afterEpochSeconds: number; + + if (state) { + afterEpochSeconds = toEpochSeconds(state.last_sync); + console.log(`[Gmail] Syncing messages since ${state.last_sync}...`); + } else { + const pastDate = new Date(); + pastDate.setDate(pastDate.getDate() - LOOKBACK_DAYS); + afterEpochSeconds = Math.floor(pastDate.getTime() / 1000); + console.log(`[Gmail] First sync - fetching last ${LOOKBACK_DAYS} days...`); + } + try { - const auth = await GoogleClientFactory.getClient(); - if (!auth) { - console.log("No valid OAuth credentials available."); + // List threads since last sync (lightweight - returns IDs only) + const allThreadIds: string[] = []; + let pageToken: string | undefined; + + do { + const params: Record = { + query: `after:${afterEpochSeconds}`, + max_results: 20, + user_id: 'me', + }; + if (pageToken) { + params.page_token = pageToken; + } + + const result = await executeAction( + 'GMAIL_LIST_THREADS', + connectedAccountId, + params + ); + + if (!result.success || !result.data) { + console.error('[Gmail] Failed to list threads:', result.error); + return; + } + + const data = result.data as Record; + const threads = data.threads as Array> | undefined; + + if (threads && threads.length > 0) { + for (const thread of threads) { + const threadId = thread.id as string | undefined; + if (threadId) { + allThreadIds.push(threadId); + } + } + } + + pageToken = data.nextPageToken as string | undefined; + } while (pageToken); + + if (allThreadIds.length === 0) { + console.log('[Gmail] No new threads.'); return; } - console.log("Authorization successful. Starting sync..."); + console.log(`[Gmail] Found ${allThreadIds.length} threads to sync.`); - const state = loadState(STATE_FILE); - if (!state.historyId) { - console.log("No history ID found, starting full sync..."); - await fullSync(auth, SYNC_DIR, ATTACHMENTS_DIR, STATE_FILE, LOOKBACK_DAYS); - } else { - console.log("History ID found, starting partial sync..."); - await partialSync(auth, state.historyId, SYNC_DIR, ATTACHMENTS_DIR, STATE_FILE, LOOKBACK_DAYS); + // Reverse so we process oldest first. Gmail returns newest first, + // so processing in reverse lets the high-water mark advance + // chronologically — safe to save state after each thread. + allThreadIds.reverse(); + + // Process each thread, saving state after each one with the + // newest email date seen so far (high-water mark). + let highWaterMark: string | null = state?.last_sync ?? null; + let processedCount = 0; + for (const threadId of allThreadIds) { + try { + const newestInThread = await processThread(connectedAccountId, threadId, SYNC_DIR); + processedCount++; + + // Advance high-water mark if this thread has a newer email + if (newestInThread) { + if (!highWaterMark || new Date(newestInThread) > new Date(highWaterMark)) { + highWaterMark = newestInThread; + } + saveState(STATE_FILE, highWaterMark); + } + } catch (error) { + console.error(`[Gmail] Error processing thread ${threadId}, skipping:`, error); + } } - console.log("Sync completed."); + console.log(`[Gmail] Sync completed. Processed ${processedCount}/${allThreadIds.length} threads.`); + } catch (error) { - console.error("Error during sync:", error); + console.error('[Gmail] Error during sync:', error); } } export async function init() { - console.log("Starting Gmail Sync (TS)..."); - console.log(`Will sync every ${SYNC_INTERVAL_MS / 1000} seconds.`); + console.log('[Gmail] Starting Gmail Sync (Composio)...'); + console.log(`[Gmail] Will sync every ${SYNC_INTERVAL_MS / 1000} seconds.`); while (true) { try { - // Check if credentials are available with required scopes - const hasCredentials = await GoogleClientFactory.hasValidCredentials(REQUIRED_SCOPE); - - if (!hasCredentials) { - console.log("Google OAuth credentials not available or missing required Gmail scope. Sleeping..."); + const isConnected = composioAccountsRepo.isConnected('gmail'); + + if (!isConnected) { + console.log('[Gmail] Gmail not connected via Composio. Sleeping...'); } else { - // Perform one sync await performSync(); } } catch (error) { - console.error("Error in main loop:", error); + console.error('[Gmail] Error in main loop:', error); } - // Sleep for N minutes before next check (can be interrupted by triggerSync) - console.log(`Sleeping for ${SYNC_INTERVAL_MS / 1000} seconds...`); + console.log(`[Gmail] Sleeping for ${SYNC_INTERVAL_MS / 1000} seconds...`); await interruptibleSleep(SYNC_INTERVAL_MS); } }