add pagination, watcher and separation from gmail sync

This commit is contained in:
Arjun 2026-05-14 13:43:42 +05:30
parent 22e5452003
commit d757dc37da
5 changed files with 378 additions and 94 deletions

View file

@ -47,7 +47,7 @@ import { summarizeMeeting } from '@x/core/dist/knowledge/summarize_meeting.js';
import { getAccessToken } from '@x/core/dist/auth/tokens.js';
import { getRowboatConfig } from '@x/core/dist/config/rowboat.js';
import { runLiveNoteAgent } from '@x/core/dist/knowledge/live-note/runner.js';
import { fetchThreadSnapshot, listRecentThreadIds, listCachedThreads, saveMessageBodyHeight } from '@x/core/dist/knowledge/sync_gmail.js';
import { fetchThreadSnapshot, listRecentThreadIds, listInboxPage, saveMessageBodyHeight } from '@x/core/dist/knowledge/sync_gmail.js';
import { liveNoteBus } from '@x/core/dist/knowledge/live-note/bus.js';
import { getInstallationId } from '@x/core/dist/analytics/installation.js';
import { API_URL } from '@x/core/dist/config/env.js';
@ -503,8 +503,12 @@ export function setupIpcHandlers() {
};
}
},
'gmail:listCachedThreads': async (_event, args) => {
return { threads: listCachedThreads(args.daysAgo ?? 2) };
'gmail:listInboxPage': async (_event, args) => {
return listInboxPage({
section: args.section,
cursor: args.cursor,
limit: args.limit,
});
},
'gmail:saveMessageHeight': async (_event, args) => {
saveMessageBodyHeight(args.threadId, args.messageId, args.height);

View file

@ -224,6 +224,14 @@
margin-top: 28px;
}
.gmail-section-sentinel {
display: flex;
align-items: center;
justify-content: center;
height: 28px;
color: var(--gm-text-faint);
}
.gmail-row {
display: grid;
grid-template-columns: 12px minmax(140px, 0.22fr) minmax(0, 1fr) 60px;

View file

@ -663,15 +663,52 @@ function ThreadDetail({
}
const MAX_KEPT_OPEN = 5
const PAGE_SIZE = 25
const SECTIONS = ['important', 'other'] as const
type InboxSection = (typeof SECTIONS)[number]
interface SectionState {
threads: GmailThread[]
nextCursor: string | null
hasReachedEnd: boolean
loadingPage: boolean
}
const initialSectionState: SectionState = {
threads: [],
nextCursor: null,
hasReachedEnd: false,
loadingPage: false,
}
// Module-level survives unmount/remount within the renderer process — so switching
// panels and coming back doesn't reload from scratch.
let persistedImportant: SectionState | null = null
let persistedOther: SectionState | null = null
function clearLoadingFlag(state: SectionState | null): SectionState {
if (!state) return initialSectionState
return { ...state, loadingPage: false }
}
export function EmailView() {
const [threads, setThreads] = useState<GmailThread[]>([])
const [important, setImportant] = useState<SectionState>(() => clearLoadingFlag(persistedImportant))
const [other, setOther] = useState<SectionState>(() => clearLoadingFlag(persistedOther))
const hadPersistedDataOnMount = useRef(persistedImportant !== null)
const [selectedThreadId, setSelectedThreadId] = useState<string | null>(null)
const [openedThreadIds, setOpenedThreadIds] = useState<string[]>([])
const [loading, setLoading] = useState(true)
const [refreshing, setRefreshing] = useState(!hadPersistedDataOnMount.current)
const [error, setError] = useState<string | null>(null)
const [query, setQuery] = useState('')
useEffect(() => { persistedImportant = important }, [important])
useEffect(() => { persistedOther = other }, [other])
const setSection = useCallback((section: InboxSection, updater: (prev: SectionState) => SectionState) => {
if (section === 'important') setImportant(updater)
else setOther(updater)
}, [])
const toggleThread = useCallback((threadId: string) => {
setSelectedThreadId((current) => {
const next = current === threadId ? null : threadId
@ -709,66 +746,169 @@ export function EmailView() {
if (hoverTimerRef.current) clearTimeout(hoverTimerRef.current)
}, [])
const loadThreads = useCallback(async () => {
setError(null)
let hasCachedContent = false
// Track the current "load epoch" so concurrent refreshes don't apply stale results.
const epochRef = useRef(0)
const loadNextPage = useCallback(async (section: InboxSection) => {
const current = section === 'important' ? important : other
if (current.loadingPage || current.hasReachedEnd) return
const epoch = epochRef.current
setSection(section, (prev) => ({ ...prev, loadingPage: true }))
try {
const cached = await window.ipc.invoke('gmail:listCachedThreads', { daysAgo: 2 })
if (cached.threads.length > 0) {
setThreads(cached.threads)
hasCachedContent = true
}
const result = await window.ipc.invoke('gmail:listInboxPage', {
section,
cursor: current.nextCursor ?? undefined,
limit: PAGE_SIZE,
})
if (epoch !== epochRef.current) return
setSection(section, (prev) => ({
threads: [...prev.threads, ...result.threads],
nextCursor: result.nextCursor,
hasReachedEnd: result.nextCursor === null,
loadingPage: false,
}))
} catch (err) {
console.warn('[Gmail] cache read failed:', err)
if (epoch !== epochRef.current) return
console.warn(`[Gmail] page load failed for ${section}:`, err)
setSection(section, (prev) => ({ ...prev, loadingPage: false }))
}
}, [important, other, setSection])
setLoading(true)
const reloadFirstPage = useCallback(async (section: InboxSection) => {
const epoch = ++epochRef.current
setSection(section, () => ({ ...initialSectionState, loadingPage: true }))
try {
const list = await window.ipc.invoke('gmail:listRecentThreads', { daysAgo: 2 })
if (list.error) throw new Error(list.error)
const result = await window.ipc.invoke('gmail:listInboxPage', {
section,
limit: PAGE_SIZE,
})
if (epoch !== epochRef.current) return
setSection(section, () => ({
threads: result.threads,
nextCursor: result.nextCursor,
hasReachedEnd: result.nextCursor === null,
loadingPage: false,
}))
} catch (err) {
if (epoch !== epochRef.current) return
console.warn(`[Gmail] initial page load failed for ${section}:`, err)
setSection(section, () => ({ ...initialSectionState, loadingPage: false }))
}
}, [setSection])
const hydrated = await mapWithConcurrency(list.threads, 6, async (item) => {
// Initial load — fetch page 1 of Important only. Everything else stays hidden
// until Important is exhausted (see effect below).
useEffect(() => {
if (hadPersistedDataOnMount.current) return
void reloadFirstPage('important')
// eslint-disable-next-line react-hooks/exhaustive-deps
}, [])
// Once Important is exhausted, kick off page 1 of Everything else.
useEffect(() => {
if (!important.hasReachedEnd) return
if (other.threads.length > 0) return
if (other.loadingPage) return
void reloadFirstPage('other')
}, [important.hasReachedEnd, other.threads.length, other.loadingPage, reloadFirstPage])
// Live updates: watcher on inbox_lists/ → reload page 1 when files change.
// Suppressed while a thread is open (composing/reading) — instead, mark a
// pending update and reload once the user closes the thread.
const pendingReloadRef = useRef(false)
const reloadDebounceRef = useRef<ReturnType<typeof setTimeout> | null>(null)
const isSelectedRef = useRef<string | null>(null)
isSelectedRef.current = selectedThreadId
const isRefreshingRef = useRef(false)
isRefreshingRef.current = refreshing
const triggerLiveReload = useCallback(() => {
if (reloadDebounceRef.current) clearTimeout(reloadDebounceRef.current)
reloadDebounceRef.current = setTimeout(() => {
reloadDebounceRef.current = null
// Skip if our own refresh is in flight — its writes triggered the watcher.
if (isRefreshingRef.current) return
// If a thread is open, defer until it closes.
if (isSelectedRef.current !== null) {
pendingReloadRef.current = true
return
}
void reloadFirstPage('important')
setOther(() => ({ ...initialSectionState }))
}, 500)
}, [reloadFirstPage])
useEffect(() => {
const cleanup = window.ipc.on('workspace:didChange', (event) => {
const matches = (p: string) => p.startsWith('inbox_lists/')
switch (event.type) {
case 'created':
case 'changed':
case 'deleted':
if (event.path && matches(event.path)) triggerLiveReload()
break
case 'moved':
if ((event.from && matches(event.from)) || (event.to && matches(event.to))) triggerLiveReload()
break
case 'bulkChanged':
if (event.paths?.some(matches)) triggerLiveReload()
break
}
})
return () => {
cleanup()
if (reloadDebounceRef.current) clearTimeout(reloadDebounceRef.current)
}
}, [triggerLiveReload])
// When user closes a thread, if updates arrived while they were reading, flush now.
useEffect(() => {
if (selectedThreadId !== null) return
if (!pendingReloadRef.current) return
pendingReloadRef.current = false
void reloadFirstPage('important')
setOther(() => ({ ...initialSectionState }))
}, [selectedThreadId, reloadFirstPage])
// Live refresh: hit the Gmail API to validate the freshest threads, then re-render.
const refresh = useCallback(async () => {
if (refreshing) return
setRefreshing(true)
setError(null)
try {
// TEMP(pagination-testing): widened from daysAgo: 2 to pull more threads into inbox_lists/ for testing. Revert before shipping.
const list = await window.ipc.invoke('gmail:listRecentThreads', { daysAgo: 7 })
if (list.error) throw new Error(list.error)
await mapWithConcurrency(list.threads, 6, async (item) => {
const result = await window.ipc.invoke('gmail:getThread', {
threadId: item.threadId,
expectedHistoryId: item.historyId,
})
if (result.thread) return result.thread
console.warn('Failed to hydrate Gmail thread', item.threadId, result.error)
return null
if (!result.thread) {
console.warn('Failed to hydrate Gmail thread', item.threadId, result.error)
}
})
const nextThreads = hydrated
.filter((thread): thread is GmailThread => Boolean(thread))
.sort((a, b) => {
const aDate = Date.parse(latestMessage(a)?.date || a.date || '')
const bDate = Date.parse(latestMessage(b)?.date || b.date || '')
return (Number.isNaN(bDate) ? 0 : bDate) - (Number.isNaN(aDate) ? 0 : aDate)
})
const liveIds = new Set(nextThreads.map((t) => t.threadId))
setThreads(nextThreads)
setSelectedThreadId(current => current && liveIds.has(current) ? current : null)
setOpenedThreadIds((prev) => prev.filter((id) => liveIds.has(id)))
// Reset Other so the auto-load effect re-triggers once Important hits end.
setOther(() => ({ ...initialSectionState }))
await reloadFirstPage('important')
} catch (err) {
if (hasCachedContent) {
console.warn('[Gmail] background refresh failed; keeping cached view:', err)
} else {
setError(err instanceof Error ? err.message : String(err))
setThreads([])
setSelectedThreadId(null)
}
console.warn('[Gmail] live refresh failed:', err)
setError(err instanceof Error ? err.message : String(err))
} finally {
setLoading(false)
setRefreshing(false)
}
}, [refreshing, reloadFirstPage])
// Kick off a live refresh on mount only when there's no persisted data —
// otherwise we'd clobber the snapshot the user already had.
useEffect(() => {
if (hadPersistedDataOnMount.current) return
void refresh()
// eslint-disable-next-line react-hooks/exhaustive-deps
}, [])
useEffect(() => {
void loadThreads()
}, [loadThreads])
const filteredThreads = useMemo(() => {
const filterThreads = useCallback((threads: GmailThread[]) => {
const normalized = query.trim().toLowerCase()
if (!normalized) return threads
return threads.filter((thread) => {
@ -780,21 +920,13 @@ export function EmailView() {
latest?.body,
].some(value => (value || '').toLowerCase().includes(normalized))
})
}, [query, threads])
}, [query])
const { importantThreads, otherThreads } = useMemo(() => {
const important: GmailThread[] = []
const other: GmailThread[] = []
for (const thread of filteredThreads) {
// Default unclassified threads to Important so we don't hide anything
// before the classifier has run on them.
if (thread.importance === 'other') other.push(thread)
else important.push(thread)
}
return { importantThreads: important, otherThreads: other }
}, [filteredThreads])
const visibleImportant = useMemo(() => filterThreads(important.threads), [important.threads, filterThreads])
const visibleOther = useMemo(() => filterThreads(other.threads), [other.threads, filterThreads])
const hasThreads = filteredThreads.length > 0
const hasAny = important.threads.length > 0 || other.threads.length > 0
const initialLoading = !hasAny && refreshing
const renderRow = (thread: GmailThread) => {
const latest = latestMessage(thread)
@ -838,43 +970,91 @@ export function EmailView() {
<input
value={query}
onChange={(event) => setQuery(event.target.value)}
placeholder="Search mail"
placeholder="Search loaded mail"
/>
</div>
<button type="button" className="gmail-icon-button" onClick={() => void loadThreads()} aria-label="Refresh">
{loading ? <LoaderIcon size={18} className="animate-spin" /> : <RefreshCw size={18} />}
<button type="button" className="gmail-icon-button" onClick={() => void refresh()} aria-label="Refresh">
{refreshing ? <LoaderIcon size={18} className="animate-spin" /> : <RefreshCw size={18} />}
</button>
</div>
{error ? (
{error && !hasAny ? (
<div className="gmail-empty-state">Could not load mail: {error}</div>
) : hasThreads ? (
) : hasAny ? (
<div className="gmail-list" aria-label="Recent emails">
{importantThreads.length > 0 && (
{important.threads.length > 0 && (
<section className="gmail-section">
<div className="gmail-list-header">
<span>Important</span>
<span>{importantThreads.length} thread{importantThreads.length === 1 ? '' : 's'}</span>
<span>
{important.threads.length}{important.hasReachedEnd ? '' : '+'} thread{important.threads.length === 1 ? '' : 's'}
</span>
</div>
{importantThreads.map(renderRow)}
{visibleImportant.map(renderRow)}
{!important.hasReachedEnd && (
<SectionSentinel
disabled={important.loadingPage || important.hasReachedEnd}
onIntersect={() => loadNextPage('important')}
loading={important.loadingPage}
/>
)}
</section>
)}
{otherThreads.length > 0 && (
{important.hasReachedEnd && other.threads.length > 0 && (
<section className="gmail-section">
<div className="gmail-list-header">
<span>Everything else</span>
<span>{otherThreads.length} thread{otherThreads.length === 1 ? '' : 's'}</span>
<span>
{other.threads.length}{other.hasReachedEnd ? '' : '+'} thread{other.threads.length === 1 ? '' : 's'}
</span>
</div>
{otherThreads.map(renderRow)}
{visibleOther.map(renderRow)}
{!other.hasReachedEnd && (
<SectionSentinel
disabled={other.loadingPage || other.hasReachedEnd}
onIntersect={() => loadNextPage('other')}
loading={other.loadingPage}
/>
)}
</section>
)}
</div>
) : (
<div className="gmail-empty-state">
{loading ? 'Loading recent Gmail threads...' : 'No Gmail threads found from the last 2 days.'}
{initialLoading ? 'Loading Gmail threads…' : 'No Gmail threads in your inbox cache yet.'}
</div>
)}
</div>
</div>
)
}
function SectionSentinel({
disabled,
onIntersect,
loading,
}: {
disabled: boolean
onIntersect: () => void
loading: boolean
}) {
const sentinelRef = useRef<HTMLDivElement>(null)
useEffect(() => {
if (disabled) return
const el = sentinelRef.current
if (!el) return
const observer = new IntersectionObserver((entries) => {
if (entries.some((entry) => entry.isIntersecting)) {
onIntersect()
}
}, { rootMargin: '200px' })
observer.observe(el)
return () => observer.disconnect()
}, [disabled, onIntersect])
return (
<div ref={sentinelRef} className="gmail-section-sentinel" aria-hidden>
{loading ? <LoaderIcon size={14} className="animate-spin" /> : null}
</div>
)
}

View file

@ -12,7 +12,19 @@ import { classifyThread, getUserEmail } from './classify_thread.js';
// Configuration
const SYNC_DIR = path.join(WorkDir, 'gmail_sync');
const CACHE_DIR = path.join(SYNC_DIR, 'cache');
const LEGACY_CACHE_DIR = path.join(SYNC_DIR, 'cache');
const CACHE_DIR = path.join(WorkDir, 'inbox_lists');
(function migrateLegacyCacheDir() {
try {
if (fs.existsSync(LEGACY_CACHE_DIR) && !fs.existsSync(CACHE_DIR)) {
fs.renameSync(LEGACY_CACHE_DIR, CACHE_DIR);
console.log(`[Gmail] Migrated cache from ${LEGACY_CACHE_DIR}${CACHE_DIR}`);
}
} catch (err) {
console.warn('[Gmail] Cache directory migration failed:', err);
}
})();
const SYNC_INTERVAL_MS = 5 * 60 * 1000; // Check every 5 minutes
const REQUIRED_SCOPE = 'https://www.googleapis.com/auth/gmail.readonly';
const MAX_THREADS_IN_DIGEST = 10;
@ -266,31 +278,108 @@ export interface RecentThreadInfo {
snippet?: string;
}
export function listCachedThreads(daysAgo: number = 2): GmailThreadSnapshot[] {
if (!fs.existsSync(CACHE_DIR)) return [];
const cutoffMs = Date.now() - daysAgo * 24 * 60 * 60 * 1000;
const out: GmailThreadSnapshot[] = [];
for (const name of fs.readdirSync(CACHE_DIR)) {
export type InboxSection = 'important' | 'other';
export interface InboxPageOptions {
section: InboxSection;
cursor?: string;
limit?: number;
}
export interface InboxPageResult {
threads: GmailThreadSnapshot[];
nextCursor: string | null;
}
interface IndexedEntry {
threadId: string;
dateMs: number;
snapshot: GmailThreadSnapshot;
}
function snapshotImportance(s: GmailThreadSnapshot): InboxSection {
return s.importance === 'other' ? 'other' : 'important';
}
function snapshotDateMs(s: GmailThreadSnapshot): number {
const latest = s.messages[s.messages.length - 1];
const raw = latest?.date || s.date;
if (!raw) return 0;
const ms = Date.parse(raw);
return Number.isFinite(ms) ? ms : 0;
}
function parseCursor(cursor: string | undefined): { dateMs: number; threadId: string } | null {
if (!cursor) return null;
const idx = cursor.indexOf('|');
if (idx < 0) return null;
const dateMs = Number(cursor.slice(0, idx));
const threadId = cursor.slice(idx + 1);
if (!Number.isFinite(dateMs) || !threadId) return null;
return { dateMs, threadId };
}
function encodeCursor(entry: { dateMs: number; threadId: string }): string {
return `${entry.dateMs}|${entry.threadId}`;
}
export function listInboxPage(opts: InboxPageOptions): InboxPageResult {
const limit = Math.max(1, Math.min(100, opts.limit ?? 25));
const cursor = parseCursor(opts.cursor);
if (!fs.existsSync(CACHE_DIR)) return { threads: [], nextCursor: null };
let names: string[];
try {
names = fs.readdirSync(CACHE_DIR);
} catch {
return { threads: [], nextCursor: null };
}
const entries: IndexedEntry[] = [];
for (const name of names) {
if (!name.endsWith('.json')) continue;
const filePath = path.join(CACHE_DIR, name);
try {
const stat = fs.statSync(filePath);
if (stat.mtimeMs < cutoffMs) continue;
const entry = JSON.parse(fs.readFileSync(filePath, 'utf-8')) as SnapshotCacheEntry;
const latestDate = entry.snapshot.messages[entry.snapshot.messages.length - 1]?.date;
const latestMs = latestDate ? Date.parse(latestDate) : stat.mtimeMs;
if (Number.isFinite(latestMs) && latestMs < cutoffMs) continue;
out.push(entry.snapshot);
const raw = fs.readFileSync(filePath, 'utf-8');
const wrapper = JSON.parse(raw) as SnapshotCacheEntry;
const snapshot = wrapper.snapshot;
if (!snapshot) continue;
if (snapshotImportance(snapshot) !== opts.section) continue;
entries.push({
threadId: snapshot.threadId,
dateMs: snapshotDateMs(snapshot),
snapshot,
});
} catch (err) {
console.warn(`[Gmail cache] read failed for ${name}:`, err);
console.warn(`[Inbox lists] read failed for ${name}:`, err);
}
}
out.sort((a, b) => {
const aDate = Date.parse(a.messages[a.messages.length - 1]?.date || a.date || '');
const bDate = Date.parse(b.messages[b.messages.length - 1]?.date || b.date || '');
return (Number.isNaN(bDate) ? 0 : bDate) - (Number.isNaN(aDate) ? 0 : aDate);
// Newest first, threadId asc as tiebreak.
entries.sort((a, b) => {
if (b.dateMs !== a.dateMs) return b.dateMs - a.dateMs;
return a.threadId < b.threadId ? -1 : 1;
});
return out;
let startIdx = 0;
if (cursor) {
startIdx = entries.findIndex((e) => {
if (e.dateMs < cursor.dateMs) return true;
if (e.dateMs === cursor.dateMs && e.threadId > cursor.threadId) return true;
return false;
});
if (startIdx < 0) startIdx = entries.length;
}
const slice = entries.slice(startIdx, startIdx + limit);
const hasMore = startIdx + slice.length < entries.length;
const last = slice[slice.length - 1];
return {
threads: slice.map((e) => e.snapshot),
nextCursor: hasMore && last ? encodeCursor({ dateMs: last.dateMs, threadId: last.threadId }) : null,
};
}
export async function listRecentThreadIds(daysAgo: number = 2): Promise<RecentThreadInfo[]> {

View file

@ -146,12 +146,15 @@ const ipcSchemas = {
error: z.string().optional(),
}),
},
'gmail:listCachedThreads': {
'gmail:listInboxPage': {
req: z.object({
daysAgo: z.number().int().positive().optional(),
section: z.enum(['important', 'other']),
cursor: z.string().optional(),
limit: z.number().int().min(1).max(100).optional(),
}),
res: z.object({
threads: z.array(GmailThreadSchema),
nextCursor: z.string().nullable(),
}),
},
'gmail:saveMessageHeight': {