diff --git a/apps/x/apps/main/src/ipc.ts b/apps/x/apps/main/src/ipc.ts index 1065a978..72055579 100644 --- a/apps/x/apps/main/src/ipc.ts +++ b/apps/x/apps/main/src/ipc.ts @@ -47,7 +47,7 @@ import { summarizeMeeting } from '@x/core/dist/knowledge/summarize_meeting.js'; import { getAccessToken } from '@x/core/dist/auth/tokens.js'; import { getRowboatConfig } from '@x/core/dist/config/rowboat.js'; import { runLiveNoteAgent } from '@x/core/dist/knowledge/live-note/runner.js'; -import { fetchThreadSnapshot, listRecentThreadIds, listInboxPage, saveMessageBodyHeight } from '@x/core/dist/knowledge/sync_gmail.js'; +import { fetchThreadSnapshot, listImportantThreads, listEverythingElseThreads, saveMessageBodyHeight, triggerSync as triggerGmailSync } from '@x/core/dist/knowledge/sync_gmail.js'; import { liveNoteBus } from '@x/core/dist/knowledge/live-note/bus.js'; import { getInstallationId } from '@x/core/dist/analytics/installation.js'; import { API_URL } from '@x/core/dist/config/env.js'; @@ -493,22 +493,15 @@ export function setupIpcHandlers() { }; } }, - 'gmail:listRecentThreads': async (_event, args) => { - try { - return { threads: await listRecentThreadIds(args.daysAgo ?? 2) }; - } catch (error) { - return { - threads: [], - error: error instanceof Error ? error.message : String(error), - }; - } + 'gmail:getImportant': async (_event, args) => { + return listImportantThreads({ cursor: args.cursor, limit: args.limit }); }, - 'gmail:listInboxPage': async (_event, args) => { - return listInboxPage({ - section: args.section, - cursor: args.cursor, - limit: args.limit, - }); + 'gmail:getEverythingElse': async (_event, args) => { + return listEverythingElseThreads({ cursor: args.cursor, limit: args.limit }); + }, + 'gmail:triggerSync': async () => { + triggerGmailSync(); + return {}; }, 'gmail:saveMessageHeight': async (_event, args) => { saveMessageBodyHeight(args.threadId, args.messageId, args.height); diff --git a/apps/x/apps/renderer/src/components/email-view.tsx b/apps/x/apps/renderer/src/components/email-view.tsx index d5229df8..79df6f49 100644 --- a/apps/x/apps/renderer/src/components/email-view.tsx +++ b/apps/x/apps/renderer/src/components/email-view.tsx @@ -272,19 +272,6 @@ function MessageBody({ message, threadId }: { message: GmailThreadMessage; threa ) } -async function mapWithConcurrency( - items: T[], - limit: number, - mapper: (item: T) => Promise, -): Promise { - const results: R[] = [] - for (let i = 0; i < items.length; i += limit) { - const batch = items.slice(i, i + limit) - results.push(...await Promise.all(batch.map(mapper))) - } - return results -} - type ComposeMode = 'reply' | 'forward' function ComposeToolbarButton({ @@ -784,22 +771,26 @@ export function EmailView() { if (hoverTimerRef.current) clearTimeout(hoverTimerRef.current) }, []) - // Track the current "load epoch" so concurrent refreshes don't apply stale results. - const epochRef = useRef(0) + // Per-section load epochs so concurrent reloads of different sections don't + // trample each other. (A single shared epoch caused Important's silent + // reload to be discarded whenever Other was reloaded in the same tick.) + const epochsRef = useRef>({ important: 0, other: 0 }) + + const sectionChannel = (section: InboxSection) => + section === 'important' ? 'gmail:getImportant' as const : 'gmail:getEverythingElse' as const const loadNextPage = useCallback(async (section: InboxSection) => { const current = section === 'important' ? important : other if (current.loadingPage || current.hasReachedEnd) return - const epoch = epochRef.current + const epoch = epochsRef.current[section] setSection(section, (prev) => ({ ...prev, loadingPage: true })) try { - const result = await window.ipc.invoke('gmail:listInboxPage', { - section, + const result = await window.ipc.invoke(sectionChannel(section), { cursor: current.nextCursor ?? undefined, limit: PAGE_SIZE, }) - if (epoch !== epochRef.current) return + if (epoch !== epochsRef.current[section]) return setSection(section, (prev) => ({ threads: [...prev.threads, ...result.threads], nextCursor: result.nextCursor, @@ -807,21 +798,24 @@ export function EmailView() { loadingPage: false, })) } catch (err) { - if (epoch !== epochRef.current) return + if (epoch !== epochsRef.current[section]) return console.warn(`[Gmail] page load failed for ${section}:`, err) setSection(section, (prev) => ({ ...prev, loadingPage: false })) } }, [important, other, setSection]) - const reloadFirstPage = useCallback(async (section: InboxSection) => { - const epoch = ++epochRef.current - setSection(section, () => ({ ...initialSectionState, loadingPage: true })) + const reloadFirstPage = useCallback(async (section: InboxSection, options: { silent?: boolean } = {}) => { + const epoch = ++epochsRef.current[section] + if (options.silent) { + setSection(section, (prev) => ({ ...prev, loadingPage: true })) + } else { + setSection(section, () => ({ ...initialSectionState, loadingPage: true })) + } try { - const result = await window.ipc.invoke('gmail:listInboxPage', { - section, + const result = await window.ipc.invoke(sectionChannel(section), { limit: PAGE_SIZE, }) - if (epoch !== epochRef.current) return + if (epoch !== epochsRef.current[section]) return setSection(section, () => ({ threads: result.threads, nextCursor: result.nextCursor, @@ -829,17 +823,27 @@ export function EmailView() { loadingPage: false, })) } catch (err) { - if (epoch !== epochRef.current) return + if (epoch !== epochsRef.current[section]) return console.warn(`[Gmail] initial page load failed for ${section}:`, err) - setSection(section, () => ({ ...initialSectionState, loadingPage: false })) + setSection(section, (prev) => ({ ...prev, loadingPage: false })) } }, [setSection]) - // Initial load — fetch page 1 of Important only. Everything else stays hidden - // until Important is exhausted (see effect below). + // Initial load — fetch page 1 of Important. On first-ever mount we do a + // non-silent load (shows loading state). On re-mount with persisted state we + // do a silent reconcile against the cache — necessary because the watcher + // subscription only runs while mounted, so any cache changes that happened + // while the panel was unmounted would otherwise stay invisible. useEffect(() => { - if (hadPersistedDataOnMount.current) return - void reloadFirstPage('important') + if (hadPersistedDataOnMount.current) { + void reloadFirstPage('important', { silent: true }) + // Reconcile Other too if it had been loaded before the unmount. + if (other.threads.length > 0) { + void reloadFirstPage('other', { silent: true }) + } + } else { + void reloadFirstPage('important') + } // eslint-disable-next-line react-hooks/exhaustive-deps }, []) @@ -851,31 +855,56 @@ export function EmailView() { void reloadFirstPage('other') }, [important.hasReachedEnd, other.threads.length, other.loadingPage, reloadFirstPage]) - // Live updates: watcher on inbox_lists/ → reload page 1 when files change. - // Suppressed while a thread is open (composing/reading) — instead, mark a - // pending update and reload once the user closes the thread. + // Live updates: watcher on inbox_lists/ → silently refresh visible sections + // when files change. Throttled to at most one reload per ~3s so a burst of + // backend writes (sync processing many threads sequentially) coalesces into + // a small number of in-place updates rather than a flicker storm. + // Suppressed while a thread is open (composing/reading); deferred until close. const pendingReloadRef = useRef(false) const reloadDebounceRef = useRef | null>(null) + const lastReloadAtRef = useRef(0) const isSelectedRef = useRef(null) isSelectedRef.current = selectedThreadId const isRefreshingRef = useRef(false) isRefreshingRef.current = refreshing + const otherHasThreadsRef = useRef(false) + otherHasThreadsRef.current = other.threads.length > 0 + const RELOAD_THROTTLE_MS = 3000 + + const doReload = useCallback(() => { + if (isRefreshingRef.current) return + if (isSelectedRef.current !== null) { + pendingReloadRef.current = true + return + } + lastReloadAtRef.current = Date.now() + void reloadFirstPage('important', { silent: true }) + // Only refresh Other if it had been loaded — otherwise the chained + // effect handles it once Important hits hasReachedEnd. + if (otherHasThreadsRef.current) { + void reloadFirstPage('other', { silent: true }) + } + }, [reloadFirstPage]) + + // Leading-edge throttle: + // - First event after a quiet period (≥ THROTTLE) → fire immediately. + // - During an active burst → queue a trailing fire at the next throttle + // boundary. Subsequent events while a trailing fire is pending do nothing + // (so a continuous stream of writes can't starve the reload). const triggerLiveReload = useCallback(() => { - if (reloadDebounceRef.current) clearTimeout(reloadDebounceRef.current) + const sinceLast = Date.now() - lastReloadAtRef.current + if (sinceLast >= RELOAD_THROTTLE_MS && !reloadDebounceRef.current) { + doReload() + return + } + if (reloadDebounceRef.current) return + const wait = Math.max(200, RELOAD_THROTTLE_MS - sinceLast) reloadDebounceRef.current = setTimeout(() => { reloadDebounceRef.current = null - // Skip if our own refresh is in flight — its writes triggered the watcher. - if (isRefreshingRef.current) return - // If a thread is open, defer until it closes. - if (isSelectedRef.current !== null) { - pendingReloadRef.current = true - return - } - void reloadFirstPage('important') - setOther(() => ({ ...initialSectionState })) - }, 500) - }, [reloadFirstPage]) + doReload() + }, wait) + }, [doReload]) useEffect(() => { const cleanup = window.ipc.on('workspace:didChange', (event) => { @@ -905,11 +934,16 @@ export function EmailView() { if (selectedThreadId !== null) return if (!pendingReloadRef.current) return pendingReloadRef.current = false - void reloadFirstPage('important') - setOther(() => ({ ...initialSectionState })) + lastReloadAtRef.current = Date.now() + void reloadFirstPage('important', { silent: true }) + if (otherHasThreadsRef.current) { + void reloadFirstPage('other', { silent: true }) + } }, [selectedThreadId, reloadFirstPage]) - // Live refresh: hit the Gmail API to validate the freshest threads, then re-render. + // Manual refresh: wake the background sync loop. It updates inbox_lists/, + // the watcher fires, and triggerLiveReload picks up the changes. The + // spinner is a UX cue — we stop it shortly after the sync poke. const refreshInFlightRef = useRef(false) const refresh = useCallback(async () => { if (refreshInFlightRef.current) return @@ -917,29 +951,19 @@ export function EmailView() { setRefreshing(true) setError(null) try { - // TEMP(pagination-testing): widened from daysAgo: 2 to pull more threads into inbox_lists/ for testing. Revert before shipping. - const list = await window.ipc.invoke('gmail:listRecentThreads', { daysAgo: 7 }) - if (list.error) throw new Error(list.error) - await mapWithConcurrency(list.threads, 6, async (item) => { - const result = await window.ipc.invoke('gmail:getThread', { - threadId: item.threadId, - expectedHistoryId: item.historyId, - }) - if (!result.thread) { - console.warn('Failed to hydrate Gmail thread', item.threadId, result.error) - } - }) - // Reset Other so the auto-load effect re-triggers once Important hits end. - setOther(() => ({ ...initialSectionState })) - await reloadFirstPage('important') + await window.ipc.invoke('gmail:triggerSync', {}) } catch (err) { - console.warn('[Gmail] live refresh failed:', err) + console.warn('[Gmail] triggerSync failed:', err) setError(err instanceof Error ? err.message : String(err)) } finally { - refreshInFlightRef.current = false - setRefreshing(false) + // Leave the spinner on briefly so the user sees feedback; the watcher + // will refresh the visible state once the sync cycle writes new files. + setTimeout(() => { + refreshInFlightRef.current = false + setRefreshing(false) + }, 800) } - }, [reloadFirstPage]) + }, []) // Kick off a live refresh on mount only when there's no persisted data — // otherwise we'd clobber the snapshot the user already had. diff --git a/apps/x/packages/core/src/knowledge/sync_gmail.ts b/apps/x/packages/core/src/knowledge/sync_gmail.ts index a5aacfe2..c28ff0d9 100644 --- a/apps/x/packages/core/src/knowledge/sync_gmail.ts +++ b/apps/x/packages/core/src/knowledge/sync_gmail.ts @@ -324,6 +324,14 @@ function encodeCursor(entry: { dateMs: number; threadId: string }): string { return `${entry.dateMs}|${entry.threadId}`; } +export function listImportantThreads(opts: { cursor?: string; limit?: number } = {}): InboxPageResult { + return listInboxPage({ section: 'important', ...opts }); +} + +export function listEverythingElseThreads(opts: { cursor?: string; limit?: number } = {}): InboxPageResult { + return listInboxPage({ section: 'other', ...opts }); +} + export function listInboxPage(opts: InboxPageOptions): InboxPageResult { const limit = Math.max(1, Math.min(100, opts.limit ?? 25)); const cursor = parseCursor(opts.cursor); @@ -418,11 +426,24 @@ export async function listRecentThreadIds(daysAgo: number = 2): Promise { +/** + * Build a GmailThreadSnapshot from an already-fetched threads.get response, + * classify it, and write to inbox_lists/. Shared by the renderer-driven + * fetchThreadSnapshot and the background sync (processThread). + * + * Returns null when the thread has no visible (non-draft) messages — + * those shouldn't show up in the inbox. + */ +async function buildAndCacheSnapshot( + threadId: string, + threadData: gmail.Schema$Thread, + gmailClient: gmail.Gmail, + auth: OAuth2Client, +): Promise { + const messages = threadData.messages; + if (!messages || messages.length === 0) return null; + const cached = readCachedSnapshot(threadId); - if (expectedHistoryId && cached && cached.historyId === expectedHistoryId) { - return cached.snapshot; - } const heightCarryover = new Map(); if (cached) { for (const m of cached.snapshot.messages) { @@ -430,16 +451,6 @@ export async function fetchThreadSnapshot(threadId: string, expectedHistoryId?: } } - const auth = await GoogleClientFactory.getClient(); - if (!auth) { - throw new Error('Gmail is not connected.'); - } - - const gmailClient = google.gmail({ version: 'v1', auth }); - const res = await gmailClient.users.threads.get({ userId: 'me', id: threadId }); - const messages = res.data.messages; - if (!messages || messages.length === 0) return null; - const parsed = await Promise.all(messages.map(async (msg) => { const headers = msg.payload?.headers || []; const parts = msg.payload ? extractBodyParts(msg.payload) : { text: '', html: '' }; @@ -465,20 +476,18 @@ export async function fetchThreadSnapshot(threadId: string, expectedHistoryId?: bodyHtml, unread: msg.labelIds?.includes('UNREAD') ?? false, bodyHeight: msg.id ? heightCarryover.get(msg.id) : undefined, + messageIdHeader: headerValue(headers, 'Message-ID') || headerValue(headers, 'Message-Id') || undefined, isDraft, }; })); const sentMessages = parsed.filter((m) => !m.isDraft); const draftMessages = parsed.filter((m) => m.isDraft); - // Drop the isDraft helper field from outgoing messages — it's internal. const visibleMessages = sentMessages.map(({ isDraft: _isDraft, ...rest }) => rest); const latestDraftBody = draftMessages.length > 0 ? draftMessages[draftMessages.length - 1]!.body.trim() : ''; - // A thread with no sent messages (only a draft) shouldn't show up in the inbox — - // skip caching it. Once the user actually sends, the thread reappears with a real message. if (visibleMessages.length === 0) return null; const latest = visibleMessages[visibleMessages.length - 1]!; @@ -508,8 +517,6 @@ export async function fetchThreadSnapshot(threadId: string, expectedHistoryId?: try { const userEmail = await getUserEmail(auth); - // If the user already has a Gmail-side draft going, skip the AI draft generation — - // the renderer will prefer the Gmail draft anyway, and we save an LLM call. const skipDraft = latestDraftBody.length > 0; const classification = await classifyThread(snapshot, userEmail, { skipDraft }); snapshot.importance = classification.importance; @@ -519,13 +526,29 @@ export async function fetchThreadSnapshot(threadId: string, expectedHistoryId?: console.warn(`[Gmail] classify failed for ${threadId}:`, err); } - if (res.data.historyId) { - writeCachedSnapshot(threadId, res.data.historyId, snapshot); + if (threadData.historyId) { + writeCachedSnapshot(threadId, threadData.historyId, snapshot); } return snapshot; } +export async function fetchThreadSnapshot(threadId: string, expectedHistoryId?: string): Promise { + const cached = readCachedSnapshot(threadId); + if (expectedHistoryId && cached && cached.historyId === expectedHistoryId) { + return cached.snapshot; + } + + const auth = await GoogleClientFactory.getClient(); + if (!auth) { + throw new Error('Gmail is not connected.'); + } + + const gmailClient = google.gmail({ version: 'v1', auth }); + const res = await gmailClient.users.threads.get({ userId: 'me', id: threadId }); + return buildAndCacheSnapshot(threadId, res.data, gmailClient, auth); +} + async function saveAttachment(gmail: gmail.Gmail, userId: string, msgId: string, part: gmail.Schema$MessagePart, attachmentsDir: string): Promise { const filename = part.filename; const attId = part.body?.attachmentId; @@ -627,6 +650,14 @@ async function processThread(auth: OAuth2Client, threadId: string, syncDir: stri fs.writeFileSync(path.join(syncDir, `${threadId}.md`), mdContent); console.log(`Synced Thread: ${subject} (${threadId})`); + // Also build + cache the rich snapshot for the inbox view. + // Reuses the threads.get response — no extra API call. + try { + await buildAndCacheSnapshot(threadId, thread, gmail, auth); + } catch (err) { + console.warn(`[Gmail] Inbox snapshot build failed for ${threadId}:`, err); + } + return { threadId, markdown: mdContent }; } catch (error) { @@ -635,6 +666,46 @@ async function processThread(auth: OAuth2Client, threadId: string, syncDir: stri } } +/** + * After a sync cycle, prune inbox_lists/ entries for threadIds that are + * no longer in INBOX (archived/trashed elsewhere). Single threads.list call, + * keeps the cache in lock-step with Gmail's INBOX label. + */ +async function pruneInboxCache(auth: OAuth2Client): Promise { + if (!fs.existsSync(CACHE_DIR)) return; + try { + const gmailClient = google.gmail({ version: 'v1', auth }); + const inInbox = new Set(); + let pageToken: string | undefined; + do { + const res = await gmailClient.users.threads.list({ + userId: 'me', + labelIds: ['INBOX'], + maxResults: 500, + pageToken, + }); + for (const t of res.data.threads || []) { + if (t.id) inInbox.add(t.id); + } + pageToken = res.data.nextPageToken ?? undefined; + } while (pageToken); + + for (const name of fs.readdirSync(CACHE_DIR)) { + if (!name.endsWith('.json')) continue; + const threadId = decodeURIComponent(name.replace(/\.json$/, '')); + if (!inInbox.has(threadId)) { + try { + fs.rmSync(path.join(CACHE_DIR, name), { force: true }); + } catch (err) { + console.warn(`[Gmail] prune failed for ${threadId}:`, err); + } + } + } + } catch (err) { + console.warn('[Gmail] pruneInboxCache failed:', err); + } +} + function loadState(stateFile: string): { historyId?: string; last_sync?: string } { if (fs.existsSync(stateFile)) { return JSON.parse(fs.readFileSync(stateFile, 'utf-8')); @@ -917,6 +988,10 @@ async function performSync() { await partialSync(auth, state.historyId, SYNC_DIR, ATTACHMENTS_DIR, STATE_FILE, LOOKBACK_DAYS); } + // Keep inbox_lists/ in lock-step with Gmail's INBOX label — + // remove cache files for threads that were archived/trashed elsewhere. + await pruneInboxCache(auth); + console.log("Sync completed."); } catch (error) { console.error("Error during sync:", error); diff --git a/apps/x/packages/shared/src/ipc.ts b/apps/x/packages/shared/src/ipc.ts index 957b4d29..1074672d 100644 --- a/apps/x/packages/shared/src/ipc.ts +++ b/apps/x/packages/shared/src/ipc.ts @@ -134,22 +134,8 @@ const ipcSchemas = { error: z.string().optional(), }), }, - 'gmail:listRecentThreads': { + 'gmail:getImportant': { req: z.object({ - daysAgo: z.number().int().positive().optional(), - }), - res: z.object({ - threads: z.array(z.object({ - threadId: z.string(), - historyId: z.string(), - snippet: z.string().optional(), - })), - error: z.string().optional(), - }), - }, - 'gmail:listInboxPage': { - req: z.object({ - section: z.enum(['important', 'other']), cursor: z.string().optional(), limit: z.number().int().min(1).max(100).optional(), }), @@ -158,6 +144,20 @@ const ipcSchemas = { nextCursor: z.string().nullable(), }), }, + 'gmail:getEverythingElse': { + req: z.object({ + cursor: z.string().optional(), + limit: z.number().int().min(1).max(100).optional(), + }), + res: z.object({ + threads: z.array(GmailThreadSchema), + nextCursor: z.string().nullable(), + }), + }, + 'gmail:triggerSync': { + req: z.object({}), + res: z.object({}), + }, 'gmail:saveMessageHeight': { req: z.object({ threadId: z.string().min(1),