From d757dc37da2fcd71d7dccc6077063a655e8f7f2e Mon Sep 17 00:00:00 2001 From: Arjun <6592213+arkml@users.noreply.github.com> Date: Thu, 14 May 2026 13:43:42 +0530 Subject: [PATCH] add pagination, watcher and separation from gmail sync --- apps/x/apps/main/src/ipc.ts | 10 +- apps/x/apps/renderer/src/App.css | 8 + .../renderer/src/components/email-view.tsx | 320 ++++++++++++++---- .../packages/core/src/knowledge/sync_gmail.ts | 127 +++++-- apps/x/packages/shared/src/ipc.ts | 7 +- 5 files changed, 378 insertions(+), 94 deletions(-) diff --git a/apps/x/apps/main/src/ipc.ts b/apps/x/apps/main/src/ipc.ts index 04821874..7808063d 100644 --- a/apps/x/apps/main/src/ipc.ts +++ b/apps/x/apps/main/src/ipc.ts @@ -47,7 +47,7 @@ import { summarizeMeeting } from '@x/core/dist/knowledge/summarize_meeting.js'; import { getAccessToken } from '@x/core/dist/auth/tokens.js'; import { getRowboatConfig } from '@x/core/dist/config/rowboat.js'; import { runLiveNoteAgent } from '@x/core/dist/knowledge/live-note/runner.js'; -import { fetchThreadSnapshot, listRecentThreadIds, listCachedThreads, saveMessageBodyHeight } from '@x/core/dist/knowledge/sync_gmail.js'; +import { fetchThreadSnapshot, listRecentThreadIds, listInboxPage, saveMessageBodyHeight } from '@x/core/dist/knowledge/sync_gmail.js'; import { liveNoteBus } from '@x/core/dist/knowledge/live-note/bus.js'; import { getInstallationId } from '@x/core/dist/analytics/installation.js'; import { API_URL } from '@x/core/dist/config/env.js'; @@ -503,8 +503,12 @@ export function setupIpcHandlers() { }; } }, - 'gmail:listCachedThreads': async (_event, args) => { - return { threads: listCachedThreads(args.daysAgo ?? 2) }; + 'gmail:listInboxPage': async (_event, args) => { + return listInboxPage({ + section: args.section, + cursor: args.cursor, + limit: args.limit, + }); }, 'gmail:saveMessageHeight': async (_event, args) => { saveMessageBodyHeight(args.threadId, args.messageId, args.height); diff --git a/apps/x/apps/renderer/src/App.css b/apps/x/apps/renderer/src/App.css index a7b35992..be2926d2 100644 --- a/apps/x/apps/renderer/src/App.css +++ b/apps/x/apps/renderer/src/App.css @@ -224,6 +224,14 @@ margin-top: 28px; } +.gmail-section-sentinel { + display: flex; + align-items: center; + justify-content: center; + height: 28px; + color: var(--gm-text-faint); +} + .gmail-row { display: grid; grid-template-columns: 12px minmax(140px, 0.22fr) minmax(0, 1fr) 60px; diff --git a/apps/x/apps/renderer/src/components/email-view.tsx b/apps/x/apps/renderer/src/components/email-view.tsx index 93f18fc6..5b2fa55f 100644 --- a/apps/x/apps/renderer/src/components/email-view.tsx +++ b/apps/x/apps/renderer/src/components/email-view.tsx @@ -663,15 +663,52 @@ function ThreadDetail({ } const MAX_KEPT_OPEN = 5 +const PAGE_SIZE = 25 +const SECTIONS = ['important', 'other'] as const +type InboxSection = (typeof SECTIONS)[number] + +interface SectionState { + threads: GmailThread[] + nextCursor: string | null + hasReachedEnd: boolean + loadingPage: boolean +} + +const initialSectionState: SectionState = { + threads: [], + nextCursor: null, + hasReachedEnd: false, + loadingPage: false, +} + +// Module-level survives unmount/remount within the renderer process — so switching +// panels and coming back doesn't reload from scratch. +let persistedImportant: SectionState | null = null +let persistedOther: SectionState | null = null + +function clearLoadingFlag(state: SectionState | null): SectionState { + if (!state) return initialSectionState + return { ...state, loadingPage: false } +} export function EmailView() { - const [threads, setThreads] = useState([]) + const [important, setImportant] = useState(() => clearLoadingFlag(persistedImportant)) + const [other, setOther] = useState(() => clearLoadingFlag(persistedOther)) + const hadPersistedDataOnMount = useRef(persistedImportant !== null) const [selectedThreadId, setSelectedThreadId] = useState(null) const [openedThreadIds, setOpenedThreadIds] = useState([]) - const [loading, setLoading] = useState(true) + const [refreshing, setRefreshing] = useState(!hadPersistedDataOnMount.current) const [error, setError] = useState(null) const [query, setQuery] = useState('') + useEffect(() => { persistedImportant = important }, [important]) + useEffect(() => { persistedOther = other }, [other]) + + const setSection = useCallback((section: InboxSection, updater: (prev: SectionState) => SectionState) => { + if (section === 'important') setImportant(updater) + else setOther(updater) + }, []) + const toggleThread = useCallback((threadId: string) => { setSelectedThreadId((current) => { const next = current === threadId ? null : threadId @@ -709,66 +746,169 @@ export function EmailView() { if (hoverTimerRef.current) clearTimeout(hoverTimerRef.current) }, []) - const loadThreads = useCallback(async () => { - setError(null) - let hasCachedContent = false + // Track the current "load epoch" so concurrent refreshes don't apply stale results. + const epochRef = useRef(0) + const loadNextPage = useCallback(async (section: InboxSection) => { + const current = section === 'important' ? important : other + if (current.loadingPage || current.hasReachedEnd) return + + const epoch = epochRef.current + setSection(section, (prev) => ({ ...prev, loadingPage: true })) try { - const cached = await window.ipc.invoke('gmail:listCachedThreads', { daysAgo: 2 }) - if (cached.threads.length > 0) { - setThreads(cached.threads) - hasCachedContent = true - } + const result = await window.ipc.invoke('gmail:listInboxPage', { + section, + cursor: current.nextCursor ?? undefined, + limit: PAGE_SIZE, + }) + if (epoch !== epochRef.current) return + setSection(section, (prev) => ({ + threads: [...prev.threads, ...result.threads], + nextCursor: result.nextCursor, + hasReachedEnd: result.nextCursor === null, + loadingPage: false, + })) } catch (err) { - console.warn('[Gmail] cache read failed:', err) + if (epoch !== epochRef.current) return + console.warn(`[Gmail] page load failed for ${section}:`, err) + setSection(section, (prev) => ({ ...prev, loadingPage: false })) } + }, [important, other, setSection]) - setLoading(true) - + const reloadFirstPage = useCallback(async (section: InboxSection) => { + const epoch = ++epochRef.current + setSection(section, () => ({ ...initialSectionState, loadingPage: true })) try { - const list = await window.ipc.invoke('gmail:listRecentThreads', { daysAgo: 2 }) - if (list.error) throw new Error(list.error) + const result = await window.ipc.invoke('gmail:listInboxPage', { + section, + limit: PAGE_SIZE, + }) + if (epoch !== epochRef.current) return + setSection(section, () => ({ + threads: result.threads, + nextCursor: result.nextCursor, + hasReachedEnd: result.nextCursor === null, + loadingPage: false, + })) + } catch (err) { + if (epoch !== epochRef.current) return + console.warn(`[Gmail] initial page load failed for ${section}:`, err) + setSection(section, () => ({ ...initialSectionState, loadingPage: false })) + } + }, [setSection]) - const hydrated = await mapWithConcurrency(list.threads, 6, async (item) => { + // Initial load — fetch page 1 of Important only. Everything else stays hidden + // until Important is exhausted (see effect below). + useEffect(() => { + if (hadPersistedDataOnMount.current) return + void reloadFirstPage('important') + // eslint-disable-next-line react-hooks/exhaustive-deps + }, []) + + // Once Important is exhausted, kick off page 1 of Everything else. + useEffect(() => { + if (!important.hasReachedEnd) return + if (other.threads.length > 0) return + if (other.loadingPage) return + void reloadFirstPage('other') + }, [important.hasReachedEnd, other.threads.length, other.loadingPage, reloadFirstPage]) + + // Live updates: watcher on inbox_lists/ → reload page 1 when files change. + // Suppressed while a thread is open (composing/reading) — instead, mark a + // pending update and reload once the user closes the thread. + const pendingReloadRef = useRef(false) + const reloadDebounceRef = useRef | null>(null) + const isSelectedRef = useRef(null) + isSelectedRef.current = selectedThreadId + const isRefreshingRef = useRef(false) + isRefreshingRef.current = refreshing + + const triggerLiveReload = useCallback(() => { + if (reloadDebounceRef.current) clearTimeout(reloadDebounceRef.current) + reloadDebounceRef.current = setTimeout(() => { + reloadDebounceRef.current = null + // Skip if our own refresh is in flight — its writes triggered the watcher. + if (isRefreshingRef.current) return + // If a thread is open, defer until it closes. + if (isSelectedRef.current !== null) { + pendingReloadRef.current = true + return + } + void reloadFirstPage('important') + setOther(() => ({ ...initialSectionState })) + }, 500) + }, [reloadFirstPage]) + + useEffect(() => { + const cleanup = window.ipc.on('workspace:didChange', (event) => { + const matches = (p: string) => p.startsWith('inbox_lists/') + switch (event.type) { + case 'created': + case 'changed': + case 'deleted': + if (event.path && matches(event.path)) triggerLiveReload() + break + case 'moved': + if ((event.from && matches(event.from)) || (event.to && matches(event.to))) triggerLiveReload() + break + case 'bulkChanged': + if (event.paths?.some(matches)) triggerLiveReload() + break + } + }) + return () => { + cleanup() + if (reloadDebounceRef.current) clearTimeout(reloadDebounceRef.current) + } + }, [triggerLiveReload]) + + // When user closes a thread, if updates arrived while they were reading, flush now. + useEffect(() => { + if (selectedThreadId !== null) return + if (!pendingReloadRef.current) return + pendingReloadRef.current = false + void reloadFirstPage('important') + setOther(() => ({ ...initialSectionState })) + }, [selectedThreadId, reloadFirstPage]) + + // Live refresh: hit the Gmail API to validate the freshest threads, then re-render. + const refresh = useCallback(async () => { + if (refreshing) return + setRefreshing(true) + setError(null) + try { + // TEMP(pagination-testing): widened from daysAgo: 2 to pull more threads into inbox_lists/ for testing. Revert before shipping. + const list = await window.ipc.invoke('gmail:listRecentThreads', { daysAgo: 7 }) + if (list.error) throw new Error(list.error) + await mapWithConcurrency(list.threads, 6, async (item) => { const result = await window.ipc.invoke('gmail:getThread', { threadId: item.threadId, expectedHistoryId: item.historyId, }) - if (result.thread) return result.thread - console.warn('Failed to hydrate Gmail thread', item.threadId, result.error) - return null + if (!result.thread) { + console.warn('Failed to hydrate Gmail thread', item.threadId, result.error) + } }) - - const nextThreads = hydrated - .filter((thread): thread is GmailThread => Boolean(thread)) - .sort((a, b) => { - const aDate = Date.parse(latestMessage(a)?.date || a.date || '') - const bDate = Date.parse(latestMessage(b)?.date || b.date || '') - return (Number.isNaN(bDate) ? 0 : bDate) - (Number.isNaN(aDate) ? 0 : aDate) - }) - - const liveIds = new Set(nextThreads.map((t) => t.threadId)) - setThreads(nextThreads) - setSelectedThreadId(current => current && liveIds.has(current) ? current : null) - setOpenedThreadIds((prev) => prev.filter((id) => liveIds.has(id))) + // Reset Other so the auto-load effect re-triggers once Important hits end. + setOther(() => ({ ...initialSectionState })) + await reloadFirstPage('important') } catch (err) { - if (hasCachedContent) { - console.warn('[Gmail] background refresh failed; keeping cached view:', err) - } else { - setError(err instanceof Error ? err.message : String(err)) - setThreads([]) - setSelectedThreadId(null) - } + console.warn('[Gmail] live refresh failed:', err) + setError(err instanceof Error ? err.message : String(err)) } finally { - setLoading(false) + setRefreshing(false) } + }, [refreshing, reloadFirstPage]) + + // Kick off a live refresh on mount only when there's no persisted data — + // otherwise we'd clobber the snapshot the user already had. + useEffect(() => { + if (hadPersistedDataOnMount.current) return + void refresh() + // eslint-disable-next-line react-hooks/exhaustive-deps }, []) - useEffect(() => { - void loadThreads() - }, [loadThreads]) - - const filteredThreads = useMemo(() => { + const filterThreads = useCallback((threads: GmailThread[]) => { const normalized = query.trim().toLowerCase() if (!normalized) return threads return threads.filter((thread) => { @@ -780,21 +920,13 @@ export function EmailView() { latest?.body, ].some(value => (value || '').toLowerCase().includes(normalized)) }) - }, [query, threads]) + }, [query]) - const { importantThreads, otherThreads } = useMemo(() => { - const important: GmailThread[] = [] - const other: GmailThread[] = [] - for (const thread of filteredThreads) { - // Default unclassified threads to Important so we don't hide anything - // before the classifier has run on them. - if (thread.importance === 'other') other.push(thread) - else important.push(thread) - } - return { importantThreads: important, otherThreads: other } - }, [filteredThreads]) + const visibleImportant = useMemo(() => filterThreads(important.threads), [important.threads, filterThreads]) + const visibleOther = useMemo(() => filterThreads(other.threads), [other.threads, filterThreads]) - const hasThreads = filteredThreads.length > 0 + const hasAny = important.threads.length > 0 || other.threads.length > 0 + const initialLoading = !hasAny && refreshing const renderRow = (thread: GmailThread) => { const latest = latestMessage(thread) @@ -838,43 +970,91 @@ export function EmailView() { setQuery(event.target.value)} - placeholder="Search mail" + placeholder="Search loaded mail" /> - - {error ? ( + {error && !hasAny ? (
Could not load mail: {error}
- ) : hasThreads ? ( + ) : hasAny ? (
- {importantThreads.length > 0 && ( + {important.threads.length > 0 && (
Important - {importantThreads.length} thread{importantThreads.length === 1 ? '' : 's'} + + {important.threads.length}{important.hasReachedEnd ? '' : '+'} thread{important.threads.length === 1 ? '' : 's'} +
- {importantThreads.map(renderRow)} + {visibleImportant.map(renderRow)} + {!important.hasReachedEnd && ( + loadNextPage('important')} + loading={important.loadingPage} + /> + )}
)} - {otherThreads.length > 0 && ( + {important.hasReachedEnd && other.threads.length > 0 && (
Everything else - {otherThreads.length} thread{otherThreads.length === 1 ? '' : 's'} + + {other.threads.length}{other.hasReachedEnd ? '' : '+'} thread{other.threads.length === 1 ? '' : 's'} +
- {otherThreads.map(renderRow)} + {visibleOther.map(renderRow)} + {!other.hasReachedEnd && ( + loadNextPage('other')} + loading={other.loadingPage} + /> + )}
)}
) : (
- {loading ? 'Loading recent Gmail threads...' : 'No Gmail threads found from the last 2 days.'} + {initialLoading ? 'Loading Gmail threads…' : 'No Gmail threads in your inbox cache yet.'}
)} ) } + +function SectionSentinel({ + disabled, + onIntersect, + loading, +}: { + disabled: boolean + onIntersect: () => void + loading: boolean +}) { + const sentinelRef = useRef(null) + useEffect(() => { + if (disabled) return + const el = sentinelRef.current + if (!el) return + const observer = new IntersectionObserver((entries) => { + if (entries.some((entry) => entry.isIntersecting)) { + onIntersect() + } + }, { rootMargin: '200px' }) + observer.observe(el) + return () => observer.disconnect() + }, [disabled, onIntersect]) + + return ( +
+ {loading ? : null} +
+ ) +} diff --git a/apps/x/packages/core/src/knowledge/sync_gmail.ts b/apps/x/packages/core/src/knowledge/sync_gmail.ts index ed92ef70..9de2715c 100644 --- a/apps/x/packages/core/src/knowledge/sync_gmail.ts +++ b/apps/x/packages/core/src/knowledge/sync_gmail.ts @@ -12,7 +12,19 @@ import { classifyThread, getUserEmail } from './classify_thread.js'; // Configuration const SYNC_DIR = path.join(WorkDir, 'gmail_sync'); -const CACHE_DIR = path.join(SYNC_DIR, 'cache'); +const LEGACY_CACHE_DIR = path.join(SYNC_DIR, 'cache'); +const CACHE_DIR = path.join(WorkDir, 'inbox_lists'); + +(function migrateLegacyCacheDir() { + try { + if (fs.existsSync(LEGACY_CACHE_DIR) && !fs.existsSync(CACHE_DIR)) { + fs.renameSync(LEGACY_CACHE_DIR, CACHE_DIR); + console.log(`[Gmail] Migrated cache from ${LEGACY_CACHE_DIR} → ${CACHE_DIR}`); + } + } catch (err) { + console.warn('[Gmail] Cache directory migration failed:', err); + } +})(); const SYNC_INTERVAL_MS = 5 * 60 * 1000; // Check every 5 minutes const REQUIRED_SCOPE = 'https://www.googleapis.com/auth/gmail.readonly'; const MAX_THREADS_IN_DIGEST = 10; @@ -266,31 +278,108 @@ export interface RecentThreadInfo { snippet?: string; } -export function listCachedThreads(daysAgo: number = 2): GmailThreadSnapshot[] { - if (!fs.existsSync(CACHE_DIR)) return []; - const cutoffMs = Date.now() - daysAgo * 24 * 60 * 60 * 1000; - const out: GmailThreadSnapshot[] = []; - for (const name of fs.readdirSync(CACHE_DIR)) { +export type InboxSection = 'important' | 'other'; + +export interface InboxPageOptions { + section: InboxSection; + cursor?: string; + limit?: number; +} + +export interface InboxPageResult { + threads: GmailThreadSnapshot[]; + nextCursor: string | null; +} + +interface IndexedEntry { + threadId: string; + dateMs: number; + snapshot: GmailThreadSnapshot; +} + +function snapshotImportance(s: GmailThreadSnapshot): InboxSection { + return s.importance === 'other' ? 'other' : 'important'; +} + +function snapshotDateMs(s: GmailThreadSnapshot): number { + const latest = s.messages[s.messages.length - 1]; + const raw = latest?.date || s.date; + if (!raw) return 0; + const ms = Date.parse(raw); + return Number.isFinite(ms) ? ms : 0; +} + +function parseCursor(cursor: string | undefined): { dateMs: number; threadId: string } | null { + if (!cursor) return null; + const idx = cursor.indexOf('|'); + if (idx < 0) return null; + const dateMs = Number(cursor.slice(0, idx)); + const threadId = cursor.slice(idx + 1); + if (!Number.isFinite(dateMs) || !threadId) return null; + return { dateMs, threadId }; +} + +function encodeCursor(entry: { dateMs: number; threadId: string }): string { + return `${entry.dateMs}|${entry.threadId}`; +} + +export function listInboxPage(opts: InboxPageOptions): InboxPageResult { + const limit = Math.max(1, Math.min(100, opts.limit ?? 25)); + const cursor = parseCursor(opts.cursor); + + if (!fs.existsSync(CACHE_DIR)) return { threads: [], nextCursor: null }; + + let names: string[]; + try { + names = fs.readdirSync(CACHE_DIR); + } catch { + return { threads: [], nextCursor: null }; + } + + const entries: IndexedEntry[] = []; + for (const name of names) { if (!name.endsWith('.json')) continue; const filePath = path.join(CACHE_DIR, name); try { - const stat = fs.statSync(filePath); - if (stat.mtimeMs < cutoffMs) continue; - const entry = JSON.parse(fs.readFileSync(filePath, 'utf-8')) as SnapshotCacheEntry; - const latestDate = entry.snapshot.messages[entry.snapshot.messages.length - 1]?.date; - const latestMs = latestDate ? Date.parse(latestDate) : stat.mtimeMs; - if (Number.isFinite(latestMs) && latestMs < cutoffMs) continue; - out.push(entry.snapshot); + const raw = fs.readFileSync(filePath, 'utf-8'); + const wrapper = JSON.parse(raw) as SnapshotCacheEntry; + const snapshot = wrapper.snapshot; + if (!snapshot) continue; + if (snapshotImportance(snapshot) !== opts.section) continue; + entries.push({ + threadId: snapshot.threadId, + dateMs: snapshotDateMs(snapshot), + snapshot, + }); } catch (err) { - console.warn(`[Gmail cache] read failed for ${name}:`, err); + console.warn(`[Inbox lists] read failed for ${name}:`, err); } } - out.sort((a, b) => { - const aDate = Date.parse(a.messages[a.messages.length - 1]?.date || a.date || ''); - const bDate = Date.parse(b.messages[b.messages.length - 1]?.date || b.date || ''); - return (Number.isNaN(bDate) ? 0 : bDate) - (Number.isNaN(aDate) ? 0 : aDate); + + // Newest first, threadId asc as tiebreak. + entries.sort((a, b) => { + if (b.dateMs !== a.dateMs) return b.dateMs - a.dateMs; + return a.threadId < b.threadId ? -1 : 1; }); - return out; + + let startIdx = 0; + if (cursor) { + startIdx = entries.findIndex((e) => { + if (e.dateMs < cursor.dateMs) return true; + if (e.dateMs === cursor.dateMs && e.threadId > cursor.threadId) return true; + return false; + }); + if (startIdx < 0) startIdx = entries.length; + } + + const slice = entries.slice(startIdx, startIdx + limit); + const hasMore = startIdx + slice.length < entries.length; + const last = slice[slice.length - 1]; + + return { + threads: slice.map((e) => e.snapshot), + nextCursor: hasMore && last ? encodeCursor({ dateMs: last.dateMs, threadId: last.threadId }) : null, + }; } export async function listRecentThreadIds(daysAgo: number = 2): Promise { diff --git a/apps/x/packages/shared/src/ipc.ts b/apps/x/packages/shared/src/ipc.ts index 73558afa..dc4079ef 100644 --- a/apps/x/packages/shared/src/ipc.ts +++ b/apps/x/packages/shared/src/ipc.ts @@ -146,12 +146,15 @@ const ipcSchemas = { error: z.string().optional(), }), }, - 'gmail:listCachedThreads': { + 'gmail:listInboxPage': { req: z.object({ - daysAgo: z.number().int().positive().optional(), + section: z.enum(['important', 'other']), + cursor: z.string().optional(), + limit: z.number().int().min(1).max(100).optional(), }), res: z.object({ threads: z.array(GmailThreadSchema), + nextCursor: z.string().nullable(), }), }, 'gmail:saveMessageHeight': {