diff --git a/apps/x/apps/main/src/ipc.ts b/apps/x/apps/main/src/ipc.ts index 050407ff..31d6d57a 100644 --- a/apps/x/apps/main/src/ipc.ts +++ b/apps/x/apps/main/src/ipc.ts @@ -47,7 +47,7 @@ import { summarizeMeeting } from '@x/core/dist/knowledge/summarize_meeting.js'; import { getAccessToken } from '@x/core/dist/auth/tokens.js'; import { getRowboatConfig } from '@x/core/dist/config/rowboat.js'; import { runLiveNoteAgent } from '@x/core/dist/knowledge/live-note/runner.js'; -import { fetchThreadSnapshot } from '@x/core/dist/knowledge/sync_gmail.js'; +import { fetchThreadSnapshot, listRecentThreadIds, listCachedThreads } from '@x/core/dist/knowledge/sync_gmail.js'; import { liveNoteBus } from '@x/core/dist/knowledge/live-note/bus.js'; import { getInstallationId } from '@x/core/dist/analytics/installation.js'; import { API_URL } from '@x/core/dist/config/env.js'; @@ -485,7 +485,7 @@ export function setupIpcHandlers() { }, 'gmail:getThread': async (_event, args) => { try { - return { thread: await fetchThreadSnapshot(args.threadId) }; + return { thread: await fetchThreadSnapshot(args.threadId, args.expectedHistoryId) }; } catch (error) { return { thread: null, @@ -493,6 +493,19 @@ export function setupIpcHandlers() { }; } }, + 'gmail:listRecentThreads': async (_event, args) => { + try { + return { threads: await listRecentThreadIds(args.daysAgo ?? 2) }; + } catch (error) { + return { + threads: [], + error: error instanceof Error ? error.message : String(error), + }; + } + }, + 'gmail:listCachedThreads': async (_event, args) => { + return { threads: listCachedThreads(args.daysAgo ?? 2) }; + }, 'mcp:listTools': async (_event, args) => { return mcpCore.listTools(args.serverName, args.cursor); }, diff --git a/apps/x/apps/renderer/src/components/email-view.tsx b/apps/x/apps/renderer/src/components/email-view.tsx index 58fdf262..9018961d 100644 --- a/apps/x/apps/renderer/src/components/email-view.tsx +++ b/apps/x/apps/renderer/src/components/email-view.tsx @@ -7,31 +7,6 @@ import { toast } from '@/lib/toast' type GmailThread = blocks.GmailThread type GmailThreadMessage = blocks.GmailThreadMessage -type IndexedThread = { - threadId: string - lastDateMs: number - sourcePath: string -} - -const TWO_DAYS_MS = 2 * 24 * 60 * 60 * 1000 - -function parseThreadId(path: string, markdown: string): string | null { - const fromBody = markdown.match(/\*\*Thread ID:\*\*\s*([^\s]+)/)?.[1]?.trim() - if (fromBody) return fromBody - return path.split('/').pop()?.replace(/\.md$/i, '') || null -} - -function parseLatestDateMs(markdown: string, fallbackMs?: number): number { - const matches = Array.from(markdown.matchAll(/^\*\*Date:\*\*\s*(.+)$/gm)) - for (let i = matches.length - 1; i >= 0; i -= 1) { - const raw = matches[i]?.[1]?.trim() - if (!raw) continue - const ms = Date.parse(raw) - if (!Number.isNaN(ms)) return ms - } - return fallbackMs ?? 0 -} - function formatInboxTime(value?: string): string { if (!value) return '' const date = new Date(value) @@ -318,45 +293,37 @@ function ThreadDetail({ export function EmailView() { const [threads, setThreads] = useState([]) const [selectedThreadId, setSelectedThreadId] = useState(null) - const [loading, setLoading] = useState(false) + const [loading, setLoading] = useState(true) const [error, setError] = useState(null) const [query, setQuery] = useState('') const loadThreads = useCallback(async () => { - setLoading(true) setError(null) + let hasCachedContent = false + try { - const entries = await window.ipc.invoke('workspace:readdir', { - path: 'gmail_sync', - opts: { includeStats: true }, - }) - const cutoff = Date.now() - TWO_DAYS_MS - const indexed: IndexedThread[] = [] + const cached = await window.ipc.invoke('gmail:listCachedThreads', { daysAgo: 2 }) + if (cached.threads.length > 0) { + setThreads(cached.threads) + hasCachedContent = true + } + } catch (err) { + console.warn('[Gmail] cache read failed:', err) + } - await Promise.all(entries - .filter(entry => entry.kind === 'file' && entry.name.endsWith('.md') && entry.name !== 'sync_state.json') - .map(async (entry) => { - try { - const result = await window.ipc.invoke('workspace:readFile', { path: entry.path, encoding: 'utf8' }) - const threadId = parseThreadId(entry.path, result.data) - if (!threadId) return - const lastDateMs = parseLatestDateMs(result.data, entry.stat?.mtimeMs) - if (lastDateMs < cutoff) return - indexed.push({ threadId, lastDateMs, sourcePath: entry.path }) - } catch { - const threadId = entry.name.replace(/\.md$/i, '') - const lastDateMs = entry.stat?.mtimeMs ?? 0 - if (lastDateMs >= cutoff) indexed.push({ threadId, lastDateMs, sourcePath: entry.path }) - } - })) + setLoading(true) - const recent = indexed - .sort((a, b) => b.lastDateMs - a.lastDateMs) + try { + const list = await window.ipc.invoke('gmail:listRecentThreads', { daysAgo: 2 }) + if (list.error) throw new Error(list.error) - const hydrated = await mapWithConcurrency(recent, 6, async (item) => { - const result = await window.ipc.invoke('gmail:getThread', { threadId: item.threadId }) + const hydrated = await mapWithConcurrency(list.threads, 6, async (item) => { + const result = await window.ipc.invoke('gmail:getThread', { + threadId: item.threadId, + expectedHistoryId: item.historyId, + }) if (result.thread) return result.thread - console.warn('Failed to hydrate Gmail thread', item.sourcePath, result.error) + console.warn('Failed to hydrate Gmail thread', item.threadId, result.error) return null }) @@ -369,11 +336,15 @@ export function EmailView() { }) setThreads(nextThreads) - setSelectedThreadId(current => current && nextThreads.some(thread => thread.threadId === current) ? current : nextThreads[0]?.threadId ?? null) + setSelectedThreadId(current => current && nextThreads.some(thread => thread.threadId === current) ? current : null) } catch (err) { - setError(err instanceof Error ? err.message : String(err)) - setThreads([]) - setSelectedThreadId(null) + if (hasCachedContent) { + console.warn('[Gmail] background refresh failed; keeping cached view:', err) + } else { + setError(err instanceof Error ? err.message : String(err)) + setThreads([]) + setSelectedThreadId(null) + } } finally { setLoading(false) } diff --git a/apps/x/packages/core/src/knowledge/sync_gmail.ts b/apps/x/packages/core/src/knowledge/sync_gmail.ts index 6e36fc6e..72c71143 100644 --- a/apps/x/packages/core/src/knowledge/sync_gmail.ts +++ b/apps/x/packages/core/src/knowledge/sync_gmail.ts @@ -11,11 +11,45 @@ import { createEvent } from '../events/producer.js'; // Configuration const SYNC_DIR = path.join(WorkDir, 'gmail_sync'); +const CACHE_DIR = path.join(SYNC_DIR, 'cache'); const SYNC_INTERVAL_MS = 5 * 60 * 1000; // Check every 5 minutes const REQUIRED_SCOPE = 'https://www.googleapis.com/auth/gmail.readonly'; const MAX_THREADS_IN_DIGEST = 10; const nhm = new NodeHtmlMarkdown(); +interface SnapshotCacheEntry { + historyId: string; + fetchedAt: string; + snapshot: GmailThreadSnapshot; +} + +function cachePath(threadId: string): string { + return path.join(CACHE_DIR, `${encodeURIComponent(threadId)}.json`); +} + +function readCachedSnapshot(threadId: string): SnapshotCacheEntry | null { + try { + const raw = fs.readFileSync(cachePath(threadId), 'utf-8'); + return JSON.parse(raw) as SnapshotCacheEntry; + } catch { + return null; + } +} + +function writeCachedSnapshot(threadId: string, historyId: string, snapshot: GmailThreadSnapshot): void { + try { + if (!fs.existsSync(CACHE_DIR)) fs.mkdirSync(CACHE_DIR, { recursive: true }); + const entry: SnapshotCacheEntry = { + historyId, + fetchedAt: new Date().toISOString(), + snapshot, + }; + fs.writeFileSync(cachePath(threadId), JSON.stringify(entry), 'utf-8'); + } catch (err) { + console.warn(`[Gmail cache] write failed for ${threadId}:`, err); + } +} + interface SyncedThread { threadId: string; markdown: string; @@ -208,7 +242,82 @@ function headerValue(headers: gmail.Schema$MessagePartHeader[] | undefined, name return headers?.find(h => h.name?.toLowerCase() === name.toLowerCase())?.value || undefined; } -export async function fetchThreadSnapshot(threadId: string): Promise { +export interface RecentThreadInfo { + threadId: string; + historyId: string; + snippet?: string; +} + +export function listCachedThreads(daysAgo: number = 2): GmailThreadSnapshot[] { + if (!fs.existsSync(CACHE_DIR)) return []; + const cutoffMs = Date.now() - daysAgo * 24 * 60 * 60 * 1000; + const out: GmailThreadSnapshot[] = []; + for (const name of fs.readdirSync(CACHE_DIR)) { + if (!name.endsWith('.json')) continue; + const filePath = path.join(CACHE_DIR, name); + try { + const stat = fs.statSync(filePath); + if (stat.mtimeMs < cutoffMs) continue; + const entry = JSON.parse(fs.readFileSync(filePath, 'utf-8')) as SnapshotCacheEntry; + const latestDate = entry.snapshot.messages[entry.snapshot.messages.length - 1]?.date; + const latestMs = latestDate ? Date.parse(latestDate) : stat.mtimeMs; + if (Number.isFinite(latestMs) && latestMs < cutoffMs) continue; + out.push(entry.snapshot); + } catch (err) { + console.warn(`[Gmail cache] read failed for ${name}:`, err); + } + } + out.sort((a, b) => { + const aDate = Date.parse(a.messages[a.messages.length - 1]?.date || a.date || ''); + const bDate = Date.parse(b.messages[b.messages.length - 1]?.date || b.date || ''); + return (Number.isNaN(bDate) ? 0 : bDate) - (Number.isNaN(aDate) ? 0 : aDate); + }); + return out; +} + +export async function listRecentThreadIds(daysAgo: number = 2): Promise { + const auth = await GoogleClientFactory.getClient(); + if (!auth) { + throw new Error('Gmail is not connected.'); + } + + const gmailClient = google.gmail({ version: 'v1', auth }); + const since = new Date(); + since.setDate(since.getDate() - daysAgo); + const dateQuery = since.toISOString().split('T')[0].replace(/-/g, '/'); + + const results: RecentThreadInfo[] = []; + let pageToken: string | undefined; + do { + const res = await gmailClient.users.threads.list({ + userId: 'me', + q: `after:${dateQuery}`, + pageToken, + }); + const threads = res.data.threads || []; + for (const thread of threads) { + if (thread.id && thread.historyId) { + results.push({ + threadId: thread.id, + historyId: thread.historyId, + snippet: thread.snippet || undefined, + }); + } + } + pageToken = res.data.nextPageToken ?? undefined; + } while (pageToken); + + return results; +} + +export async function fetchThreadSnapshot(threadId: string, expectedHistoryId?: string): Promise { + if (expectedHistoryId) { + const cached = readCachedSnapshot(threadId); + if (cached && cached.historyId === expectedHistoryId) { + return cached.snapshot; + } + } + const auth = await GoogleClientFactory.getClient(); if (!auth) { throw new Error('Gmail is not connected.'); @@ -256,7 +365,7 @@ export async function fetchThreadSnapshot(threadId: string): Promise m.unread), messages: parsed, }; + + if (res.data.historyId) { + writeCachedSnapshot(threadId, res.data.historyId, snapshot); + } + + return snapshot; } async function saveAttachment(gmail: gmail.Gmail, userId: string, msgId: string, part: gmail.Schema$MessagePart, attachmentsDir: string): Promise { diff --git a/apps/x/packages/shared/src/ipc.ts b/apps/x/packages/shared/src/ipc.ts index b83b9754..37d120a1 100644 --- a/apps/x/packages/shared/src/ipc.ts +++ b/apps/x/packages/shared/src/ipc.ts @@ -127,12 +127,34 @@ const ipcSchemas = { 'gmail:getThread': { req: z.object({ threadId: z.string().min(1), + expectedHistoryId: z.string().optional(), }), res: z.object({ thread: GmailThreadSchema.nullable(), error: z.string().optional(), }), }, + 'gmail:listRecentThreads': { + req: z.object({ + daysAgo: z.number().int().positive().optional(), + }), + res: z.object({ + threads: z.array(z.object({ + threadId: z.string(), + historyId: z.string(), + snippet: z.string().optional(), + })), + error: z.string().optional(), + }), + }, + 'gmail:listCachedThreads': { + req: z.object({ + daysAgo: z.number().int().positive().optional(), + }), + res: z.object({ + threads: z.array(GmailThreadSchema), + }), + }, 'mcp:listTools': { req: z.object({ serverName: z.string(),