diff --git a/apps/x/packages/core/src/knowledge/sync_gmail.ts b/apps/x/packages/core/src/knowledge/sync_gmail.ts index 6402d097..8057df86 100644 --- a/apps/x/packages/core/src/knowledge/sync_gmail.ts +++ b/apps/x/packages/core/src/knowledge/sync_gmail.ts @@ -28,6 +28,7 @@ const CACHE_DIR = path.join(WorkDir, 'inbox_lists'); const SYNC_INTERVAL_MS = 30 * 1000; // Check every 30 seconds const REQUIRED_SCOPE = 'https://www.googleapis.com/auth/gmail.readonly'; const MAX_THREADS_IN_DIGEST = 10; +const RECENT_BACKFILL_INTERVAL_MS = 15 * 60 * 1000; const nhm = new NodeHtmlMarkdown(); interface SnapshotCacheEntry { @@ -713,7 +714,9 @@ async function processThread(auth: OAuth2Client, threadId: string, syncDir: stri } catch (error) { console.error(`Error processing thread ${threadId}:`, error); - return null; + const status = getErrorStatus(error); + if (status === 404) return null; + throw error; } } @@ -757,20 +760,102 @@ async function pruneInboxCache(auth: OAuth2Client): Promise { } } -function loadState(stateFile: string): { historyId?: string; last_sync?: string } { +function loadState(stateFile: string): { historyId?: string; last_sync?: string; last_recent_backfill?: string } { if (fs.existsSync(stateFile)) { return JSON.parse(fs.readFileSync(stateFile, 'utf-8')); } return {}; } -function saveState(historyId: string, stateFile: string) { +function saveState(historyId: string, stateFile: string, extra: { last_recent_backfill?: string } = {}) { + const previous = loadState(stateFile); fs.writeFileSync(stateFile, JSON.stringify({ historyId, - last_sync: new Date().toISOString() + last_sync: new Date().toISOString(), + last_recent_backfill: extra.last_recent_backfill ?? previous.last_recent_backfill, + ...extra, }, null, 2)); } +function getErrorStatus(error: unknown): number | undefined { + const status = (error as { response?: { status?: number } }).response?.status; + if (status) return status; + const code = Number((error as { code?: number | string }).code); + return Number.isFinite(code) ? code : undefined; +} + +function recentDateQuery(lookbackDays: number): string { + const pastDate = new Date(); + pastDate.setDate(pastDate.getDate() - lookbackDays); + return pastDate.toISOString().split('T')[0].replace(/-/g, '/'); +} + +async function listRecentNonDeletedThreadIds(gmailClient: gmail.Gmail, lookbackDays: number): Promise { + const dateQuery = recentDateQuery(lookbackDays); + const results: RecentThreadInfo[] = []; + const seen = new Set(); + let pageToken: string | undefined; + + do { + const res = await gmailClient.users.threads.list({ + userId: 'me', + q: `after:${dateQuery} -in:spam -in:trash`, + maxResults: 500, + pageToken, + }); + for (const thread of res.data.threads || []) { + if (!thread.id || seen.has(thread.id)) continue; + seen.add(thread.id); + results.push({ + threadId: thread.id, + historyId: thread.historyId || '', + snippet: thread.snippet || undefined, + }); + } + pageToken = res.data.nextPageToken ?? undefined; + } while (pageToken); + + return results; +} + +function shouldRunRecentBackfill(stateFile: string): boolean { + const state = loadState(stateFile); + if (!state.last_recent_backfill) return true; + const lastRunMs = new Date(state.last_recent_backfill).getTime(); + if (!Number.isFinite(lastRunMs)) return true; + return Date.now() - lastRunMs >= RECENT_BACKFILL_INTERVAL_MS; +} + +async function backfillMissingRecentThreads( + auth: OAuth2Client, + syncDir: string, + attachmentsDir: string, + stateFile: string, + lookbackDays: number, +): Promise { + if (!shouldRunRecentBackfill(stateFile)) return []; + + const gmailClient = google.gmail({ version: 'v1', auth }); + const recentThreads = await listRecentNonDeletedThreadIds(gmailClient, lookbackDays); + const missingThreadIds = recentThreads + .map((thread) => thread.threadId) + .filter((threadId) => !fs.existsSync(path.join(syncDir, `${threadId}.md`))); + + const synced: SyncedThread[] = []; + for (const threadId of missingThreadIds) { + const result = await processThread(auth, threadId, syncDir, attachmentsDir); + if (result) synced.push(result); + } + + const profile = await gmailClient.users.getProfile({ userId: 'me' }); + saveState(profile.data.historyId!, stateFile, { last_recent_backfill: new Date().toISOString() }); + + if (missingThreadIds.length > 0) { + console.log(`Recent Gmail backfill synced ${synced.length}/${missingThreadIds.length} missing thread(s).`); + } + return synced; +} + async function fullSync(auth: OAuth2Client, syncDir: string, attachmentsDir: string, stateFile: string, lookbackDays: number) { const gmail = google.gmail({ version: 'v1', auth }); @@ -814,6 +899,7 @@ async function fullSync(auth: OAuth2Client, syncDir: string, attachmentsDir: str const res = await gmail.users.threads.list({ userId: 'me', q: `after:${dateQuery} -in:spam -in:trash`, + maxResults: 500, pageToken }); @@ -907,15 +993,24 @@ async function partialSync(auth: OAuth2Client, startHistoryId: string, syncDir: }; try { - const res = await gmail.users.history.list({ - userId: 'me', - startHistoryId, - historyTypes: ['messageAdded'] - }); + const changes: gmail.Schema$History[] = []; + let pageToken: string | undefined; + do { + const res = await gmail.users.history.list({ + userId: 'me', + startHistoryId, + historyTypes: ['messageAdded'], + maxResults: 500, + pageToken, + }); + if (res.data.history) changes.push(...res.data.history); + pageToken = res.data.nextPageToken ?? undefined; + } while (pageToken); - const changes = res.data.history; if (!changes || changes.length === 0) { console.log("No new changes."); + const backfilled = await backfillMissingRecentThreads(auth, syncDir, attachmentsDir, stateFile, lookbackDays); + await publishGmailSyncEvent(backfilled); const profile = await gmail.users.getProfile({ userId: 'me' }); saveState(profile.data.historyId!, stateFile); return; @@ -937,6 +1032,8 @@ async function partialSync(auth: OAuth2Client, startHistoryId: string, syncDir: } if (threadIds.size === 0) { + const backfilled = await backfillMissingRecentThreads(auth, syncDir, attachmentsDir, stateFile, lookbackDays); + await publishGmailSyncEvent(backfilled); const profile = await gmail.users.getProfile({ userId: 'me' }); saveState(profile.data.historyId!, stateFile); return; @@ -961,6 +1058,8 @@ async function partialSync(auth: OAuth2Client, startHistoryId: string, syncDir: const result = await processThread(auth, tid, syncDir, attachmentsDir); if (result) synced.push(result); } + const backfilled = await backfillMissingRecentThreads(auth, syncDir, attachmentsDir, stateFile, lookbackDays); + synced.push(...backfilled); await publishGmailSyncEvent(synced);