mirror of
https://github.com/rowboatlabs/rowboat.git
synced 2026-06-27 20:29:44 +02:00
faster loads
This commit is contained in:
parent
ec1420fcf5
commit
84562e400a
4 changed files with 183 additions and 62 deletions
|
|
@ -47,7 +47,7 @@ import { summarizeMeeting } from '@x/core/dist/knowledge/summarize_meeting.js';
|
||||||
import { getAccessToken } from '@x/core/dist/auth/tokens.js';
|
import { getAccessToken } from '@x/core/dist/auth/tokens.js';
|
||||||
import { getRowboatConfig } from '@x/core/dist/config/rowboat.js';
|
import { getRowboatConfig } from '@x/core/dist/config/rowboat.js';
|
||||||
import { runLiveNoteAgent } from '@x/core/dist/knowledge/live-note/runner.js';
|
import { runLiveNoteAgent } from '@x/core/dist/knowledge/live-note/runner.js';
|
||||||
import { fetchThreadSnapshot } from '@x/core/dist/knowledge/sync_gmail.js';
|
import { fetchThreadSnapshot, listRecentThreadIds, listCachedThreads } from '@x/core/dist/knowledge/sync_gmail.js';
|
||||||
import { liveNoteBus } from '@x/core/dist/knowledge/live-note/bus.js';
|
import { liveNoteBus } from '@x/core/dist/knowledge/live-note/bus.js';
|
||||||
import { getInstallationId } from '@x/core/dist/analytics/installation.js';
|
import { getInstallationId } from '@x/core/dist/analytics/installation.js';
|
||||||
import { API_URL } from '@x/core/dist/config/env.js';
|
import { API_URL } from '@x/core/dist/config/env.js';
|
||||||
|
|
@ -485,7 +485,7 @@ export function setupIpcHandlers() {
|
||||||
},
|
},
|
||||||
'gmail:getThread': async (_event, args) => {
|
'gmail:getThread': async (_event, args) => {
|
||||||
try {
|
try {
|
||||||
return { thread: await fetchThreadSnapshot(args.threadId) };
|
return { thread: await fetchThreadSnapshot(args.threadId, args.expectedHistoryId) };
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
return {
|
return {
|
||||||
thread: null,
|
thread: null,
|
||||||
|
|
@ -493,6 +493,19 @@ export function setupIpcHandlers() {
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
'gmail:listRecentThreads': async (_event, args) => {
|
||||||
|
try {
|
||||||
|
return { threads: await listRecentThreadIds(args.daysAgo ?? 2) };
|
||||||
|
} catch (error) {
|
||||||
|
return {
|
||||||
|
threads: [],
|
||||||
|
error: error instanceof Error ? error.message : String(error),
|
||||||
|
};
|
||||||
|
}
|
||||||
|
},
|
||||||
|
'gmail:listCachedThreads': async (_event, args) => {
|
||||||
|
return { threads: listCachedThreads(args.daysAgo ?? 2) };
|
||||||
|
},
|
||||||
'mcp:listTools': async (_event, args) => {
|
'mcp:listTools': async (_event, args) => {
|
||||||
return mcpCore.listTools(args.serverName, args.cursor);
|
return mcpCore.listTools(args.serverName, args.cursor);
|
||||||
},
|
},
|
||||||
|
|
|
||||||
|
|
@ -7,31 +7,6 @@ import { toast } from '@/lib/toast'
|
||||||
type GmailThread = blocks.GmailThread
|
type GmailThread = blocks.GmailThread
|
||||||
type GmailThreadMessage = blocks.GmailThreadMessage
|
type GmailThreadMessage = blocks.GmailThreadMessage
|
||||||
|
|
||||||
type IndexedThread = {
|
|
||||||
threadId: string
|
|
||||||
lastDateMs: number
|
|
||||||
sourcePath: string
|
|
||||||
}
|
|
||||||
|
|
||||||
const TWO_DAYS_MS = 2 * 24 * 60 * 60 * 1000
|
|
||||||
|
|
||||||
function parseThreadId(path: string, markdown: string): string | null {
|
|
||||||
const fromBody = markdown.match(/\*\*Thread ID:\*\*\s*([^\s]+)/)?.[1]?.trim()
|
|
||||||
if (fromBody) return fromBody
|
|
||||||
return path.split('/').pop()?.replace(/\.md$/i, '') || null
|
|
||||||
}
|
|
||||||
|
|
||||||
function parseLatestDateMs(markdown: string, fallbackMs?: number): number {
|
|
||||||
const matches = Array.from(markdown.matchAll(/^\*\*Date:\*\*\s*(.+)$/gm))
|
|
||||||
for (let i = matches.length - 1; i >= 0; i -= 1) {
|
|
||||||
const raw = matches[i]?.[1]?.trim()
|
|
||||||
if (!raw) continue
|
|
||||||
const ms = Date.parse(raw)
|
|
||||||
if (!Number.isNaN(ms)) return ms
|
|
||||||
}
|
|
||||||
return fallbackMs ?? 0
|
|
||||||
}
|
|
||||||
|
|
||||||
function formatInboxTime(value?: string): string {
|
function formatInboxTime(value?: string): string {
|
||||||
if (!value) return ''
|
if (!value) return ''
|
||||||
const date = new Date(value)
|
const date = new Date(value)
|
||||||
|
|
@ -318,45 +293,37 @@ function ThreadDetail({
|
||||||
export function EmailView() {
|
export function EmailView() {
|
||||||
const [threads, setThreads] = useState<GmailThread[]>([])
|
const [threads, setThreads] = useState<GmailThread[]>([])
|
||||||
const [selectedThreadId, setSelectedThreadId] = useState<string | null>(null)
|
const [selectedThreadId, setSelectedThreadId] = useState<string | null>(null)
|
||||||
const [loading, setLoading] = useState(false)
|
const [loading, setLoading] = useState(true)
|
||||||
const [error, setError] = useState<string | null>(null)
|
const [error, setError] = useState<string | null>(null)
|
||||||
const [query, setQuery] = useState('')
|
const [query, setQuery] = useState('')
|
||||||
|
|
||||||
const loadThreads = useCallback(async () => {
|
const loadThreads = useCallback(async () => {
|
||||||
setLoading(true)
|
|
||||||
setError(null)
|
setError(null)
|
||||||
|
let hasCachedContent = false
|
||||||
|
|
||||||
try {
|
try {
|
||||||
const entries = await window.ipc.invoke('workspace:readdir', {
|
const cached = await window.ipc.invoke('gmail:listCachedThreads', { daysAgo: 2 })
|
||||||
path: 'gmail_sync',
|
if (cached.threads.length > 0) {
|
||||||
opts: { includeStats: true },
|
setThreads(cached.threads)
|
||||||
})
|
hasCachedContent = true
|
||||||
const cutoff = Date.now() - TWO_DAYS_MS
|
}
|
||||||
const indexed: IndexedThread[] = []
|
} catch (err) {
|
||||||
|
console.warn('[Gmail] cache read failed:', err)
|
||||||
|
}
|
||||||
|
|
||||||
await Promise.all(entries
|
setLoading(true)
|
||||||
.filter(entry => entry.kind === 'file' && entry.name.endsWith('.md') && entry.name !== 'sync_state.json')
|
|
||||||
.map(async (entry) => {
|
|
||||||
try {
|
|
||||||
const result = await window.ipc.invoke('workspace:readFile', { path: entry.path, encoding: 'utf8' })
|
|
||||||
const threadId = parseThreadId(entry.path, result.data)
|
|
||||||
if (!threadId) return
|
|
||||||
const lastDateMs = parseLatestDateMs(result.data, entry.stat?.mtimeMs)
|
|
||||||
if (lastDateMs < cutoff) return
|
|
||||||
indexed.push({ threadId, lastDateMs, sourcePath: entry.path })
|
|
||||||
} catch {
|
|
||||||
const threadId = entry.name.replace(/\.md$/i, '')
|
|
||||||
const lastDateMs = entry.stat?.mtimeMs ?? 0
|
|
||||||
if (lastDateMs >= cutoff) indexed.push({ threadId, lastDateMs, sourcePath: entry.path })
|
|
||||||
}
|
|
||||||
}))
|
|
||||||
|
|
||||||
const recent = indexed
|
try {
|
||||||
.sort((a, b) => b.lastDateMs - a.lastDateMs)
|
const list = await window.ipc.invoke('gmail:listRecentThreads', { daysAgo: 2 })
|
||||||
|
if (list.error) throw new Error(list.error)
|
||||||
|
|
||||||
const hydrated = await mapWithConcurrency(recent, 6, async (item) => {
|
const hydrated = await mapWithConcurrency(list.threads, 6, async (item) => {
|
||||||
const result = await window.ipc.invoke('gmail:getThread', { threadId: item.threadId })
|
const result = await window.ipc.invoke('gmail:getThread', {
|
||||||
|
threadId: item.threadId,
|
||||||
|
expectedHistoryId: item.historyId,
|
||||||
|
})
|
||||||
if (result.thread) return result.thread
|
if (result.thread) return result.thread
|
||||||
console.warn('Failed to hydrate Gmail thread', item.sourcePath, result.error)
|
console.warn('Failed to hydrate Gmail thread', item.threadId, result.error)
|
||||||
return null
|
return null
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|
@ -369,11 +336,15 @@ export function EmailView() {
|
||||||
})
|
})
|
||||||
|
|
||||||
setThreads(nextThreads)
|
setThreads(nextThreads)
|
||||||
setSelectedThreadId(current => current && nextThreads.some(thread => thread.threadId === current) ? current : nextThreads[0]?.threadId ?? null)
|
setSelectedThreadId(current => current && nextThreads.some(thread => thread.threadId === current) ? current : null)
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
setError(err instanceof Error ? err.message : String(err))
|
if (hasCachedContent) {
|
||||||
setThreads([])
|
console.warn('[Gmail] background refresh failed; keeping cached view:', err)
|
||||||
setSelectedThreadId(null)
|
} else {
|
||||||
|
setError(err instanceof Error ? err.message : String(err))
|
||||||
|
setThreads([])
|
||||||
|
setSelectedThreadId(null)
|
||||||
|
}
|
||||||
} finally {
|
} finally {
|
||||||
setLoading(false)
|
setLoading(false)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -11,11 +11,45 @@ import { createEvent } from '../events/producer.js';
|
||||||
|
|
||||||
// Configuration
|
// Configuration
|
||||||
const SYNC_DIR = path.join(WorkDir, 'gmail_sync');
|
const SYNC_DIR = path.join(WorkDir, 'gmail_sync');
|
||||||
|
const CACHE_DIR = path.join(SYNC_DIR, 'cache');
|
||||||
const SYNC_INTERVAL_MS = 5 * 60 * 1000; // Check every 5 minutes
|
const SYNC_INTERVAL_MS = 5 * 60 * 1000; // Check every 5 minutes
|
||||||
const REQUIRED_SCOPE = 'https://www.googleapis.com/auth/gmail.readonly';
|
const REQUIRED_SCOPE = 'https://www.googleapis.com/auth/gmail.readonly';
|
||||||
const MAX_THREADS_IN_DIGEST = 10;
|
const MAX_THREADS_IN_DIGEST = 10;
|
||||||
const nhm = new NodeHtmlMarkdown();
|
const nhm = new NodeHtmlMarkdown();
|
||||||
|
|
||||||
|
interface SnapshotCacheEntry {
|
||||||
|
historyId: string;
|
||||||
|
fetchedAt: string;
|
||||||
|
snapshot: GmailThreadSnapshot;
|
||||||
|
}
|
||||||
|
|
||||||
|
function cachePath(threadId: string): string {
|
||||||
|
return path.join(CACHE_DIR, `${encodeURIComponent(threadId)}.json`);
|
||||||
|
}
|
||||||
|
|
||||||
|
function readCachedSnapshot(threadId: string): SnapshotCacheEntry | null {
|
||||||
|
try {
|
||||||
|
const raw = fs.readFileSync(cachePath(threadId), 'utf-8');
|
||||||
|
return JSON.parse(raw) as SnapshotCacheEntry;
|
||||||
|
} catch {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
function writeCachedSnapshot(threadId: string, historyId: string, snapshot: GmailThreadSnapshot): void {
|
||||||
|
try {
|
||||||
|
if (!fs.existsSync(CACHE_DIR)) fs.mkdirSync(CACHE_DIR, { recursive: true });
|
||||||
|
const entry: SnapshotCacheEntry = {
|
||||||
|
historyId,
|
||||||
|
fetchedAt: new Date().toISOString(),
|
||||||
|
snapshot,
|
||||||
|
};
|
||||||
|
fs.writeFileSync(cachePath(threadId), JSON.stringify(entry), 'utf-8');
|
||||||
|
} catch (err) {
|
||||||
|
console.warn(`[Gmail cache] write failed for ${threadId}:`, err);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
interface SyncedThread {
|
interface SyncedThread {
|
||||||
threadId: string;
|
threadId: string;
|
||||||
markdown: string;
|
markdown: string;
|
||||||
|
|
@ -208,7 +242,82 @@ function headerValue(headers: gmail.Schema$MessagePartHeader[] | undefined, name
|
||||||
return headers?.find(h => h.name?.toLowerCase() === name.toLowerCase())?.value || undefined;
|
return headers?.find(h => h.name?.toLowerCase() === name.toLowerCase())?.value || undefined;
|
||||||
}
|
}
|
||||||
|
|
||||||
export async function fetchThreadSnapshot(threadId: string): Promise<GmailThreadSnapshot | null> {
|
export interface RecentThreadInfo {
|
||||||
|
threadId: string;
|
||||||
|
historyId: string;
|
||||||
|
snippet?: string;
|
||||||
|
}
|
||||||
|
|
||||||
|
export function listCachedThreads(daysAgo: number = 2): GmailThreadSnapshot[] {
|
||||||
|
if (!fs.existsSync(CACHE_DIR)) return [];
|
||||||
|
const cutoffMs = Date.now() - daysAgo * 24 * 60 * 60 * 1000;
|
||||||
|
const out: GmailThreadSnapshot[] = [];
|
||||||
|
for (const name of fs.readdirSync(CACHE_DIR)) {
|
||||||
|
if (!name.endsWith('.json')) continue;
|
||||||
|
const filePath = path.join(CACHE_DIR, name);
|
||||||
|
try {
|
||||||
|
const stat = fs.statSync(filePath);
|
||||||
|
if (stat.mtimeMs < cutoffMs) continue;
|
||||||
|
const entry = JSON.parse(fs.readFileSync(filePath, 'utf-8')) as SnapshotCacheEntry;
|
||||||
|
const latestDate = entry.snapshot.messages[entry.snapshot.messages.length - 1]?.date;
|
||||||
|
const latestMs = latestDate ? Date.parse(latestDate) : stat.mtimeMs;
|
||||||
|
if (Number.isFinite(latestMs) && latestMs < cutoffMs) continue;
|
||||||
|
out.push(entry.snapshot);
|
||||||
|
} catch (err) {
|
||||||
|
console.warn(`[Gmail cache] read failed for ${name}:`, err);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
out.sort((a, b) => {
|
||||||
|
const aDate = Date.parse(a.messages[a.messages.length - 1]?.date || a.date || '');
|
||||||
|
const bDate = Date.parse(b.messages[b.messages.length - 1]?.date || b.date || '');
|
||||||
|
return (Number.isNaN(bDate) ? 0 : bDate) - (Number.isNaN(aDate) ? 0 : aDate);
|
||||||
|
});
|
||||||
|
return out;
|
||||||
|
}
|
||||||
|
|
||||||
|
export async function listRecentThreadIds(daysAgo: number = 2): Promise<RecentThreadInfo[]> {
|
||||||
|
const auth = await GoogleClientFactory.getClient();
|
||||||
|
if (!auth) {
|
||||||
|
throw new Error('Gmail is not connected.');
|
||||||
|
}
|
||||||
|
|
||||||
|
const gmailClient = google.gmail({ version: 'v1', auth });
|
||||||
|
const since = new Date();
|
||||||
|
since.setDate(since.getDate() - daysAgo);
|
||||||
|
const dateQuery = since.toISOString().split('T')[0].replace(/-/g, '/');
|
||||||
|
|
||||||
|
const results: RecentThreadInfo[] = [];
|
||||||
|
let pageToken: string | undefined;
|
||||||
|
do {
|
||||||
|
const res = await gmailClient.users.threads.list({
|
||||||
|
userId: 'me',
|
||||||
|
q: `after:${dateQuery}`,
|
||||||
|
pageToken,
|
||||||
|
});
|
||||||
|
const threads = res.data.threads || [];
|
||||||
|
for (const thread of threads) {
|
||||||
|
if (thread.id && thread.historyId) {
|
||||||
|
results.push({
|
||||||
|
threadId: thread.id,
|
||||||
|
historyId: thread.historyId,
|
||||||
|
snippet: thread.snippet || undefined,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pageToken = res.data.nextPageToken ?? undefined;
|
||||||
|
} while (pageToken);
|
||||||
|
|
||||||
|
return results;
|
||||||
|
}
|
||||||
|
|
||||||
|
export async function fetchThreadSnapshot(threadId: string, expectedHistoryId?: string): Promise<GmailThreadSnapshot | null> {
|
||||||
|
if (expectedHistoryId) {
|
||||||
|
const cached = readCachedSnapshot(threadId);
|
||||||
|
if (cached && cached.historyId === expectedHistoryId) {
|
||||||
|
return cached.snapshot;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
const auth = await GoogleClientFactory.getClient();
|
const auth = await GoogleClientFactory.getClient();
|
||||||
if (!auth) {
|
if (!auth) {
|
||||||
throw new Error('Gmail is not connected.');
|
throw new Error('Gmail is not connected.');
|
||||||
|
|
@ -256,7 +365,7 @@ export async function fetchThreadSnapshot(threadId: string): Promise<GmailThread
|
||||||
.filter(Boolean)
|
.filter(Boolean)
|
||||||
.join('\n\n');
|
.join('\n\n');
|
||||||
|
|
||||||
return {
|
const snapshot: GmailThreadSnapshot = {
|
||||||
threadId,
|
threadId,
|
||||||
threadUrl: `https://mail.google.com/mail/u/0/#all/${threadId}`,
|
threadUrl: `https://mail.google.com/mail/u/0/#all/${threadId}`,
|
||||||
subject: latest.subject || parsed[0]?.subject,
|
subject: latest.subject || parsed[0]?.subject,
|
||||||
|
|
@ -268,6 +377,12 @@ export async function fetchThreadSnapshot(threadId: string): Promise<GmailThread
|
||||||
unread: parsed.some((m) => m.unread),
|
unread: parsed.some((m) => m.unread),
|
||||||
messages: parsed,
|
messages: parsed,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
if (res.data.historyId) {
|
||||||
|
writeCachedSnapshot(threadId, res.data.historyId, snapshot);
|
||||||
|
}
|
||||||
|
|
||||||
|
return snapshot;
|
||||||
}
|
}
|
||||||
|
|
||||||
async function saveAttachment(gmail: gmail.Gmail, userId: string, msgId: string, part: gmail.Schema$MessagePart, attachmentsDir: string): Promise<string | null> {
|
async function saveAttachment(gmail: gmail.Gmail, userId: string, msgId: string, part: gmail.Schema$MessagePart, attachmentsDir: string): Promise<string | null> {
|
||||||
|
|
|
||||||
|
|
@ -126,12 +126,34 @@ const ipcSchemas = {
|
||||||
'gmail:getThread': {
|
'gmail:getThread': {
|
||||||
req: z.object({
|
req: z.object({
|
||||||
threadId: z.string().min(1),
|
threadId: z.string().min(1),
|
||||||
|
expectedHistoryId: z.string().optional(),
|
||||||
}),
|
}),
|
||||||
res: z.object({
|
res: z.object({
|
||||||
thread: GmailThreadSchema.nullable(),
|
thread: GmailThreadSchema.nullable(),
|
||||||
error: z.string().optional(),
|
error: z.string().optional(),
|
||||||
}),
|
}),
|
||||||
},
|
},
|
||||||
|
'gmail:listRecentThreads': {
|
||||||
|
req: z.object({
|
||||||
|
daysAgo: z.number().int().positive().optional(),
|
||||||
|
}),
|
||||||
|
res: z.object({
|
||||||
|
threads: z.array(z.object({
|
||||||
|
threadId: z.string(),
|
||||||
|
historyId: z.string(),
|
||||||
|
snippet: z.string().optional(),
|
||||||
|
})),
|
||||||
|
error: z.string().optional(),
|
||||||
|
}),
|
||||||
|
},
|
||||||
|
'gmail:listCachedThreads': {
|
||||||
|
req: z.object({
|
||||||
|
daysAgo: z.number().int().positive().optional(),
|
||||||
|
}),
|
||||||
|
res: z.object({
|
||||||
|
threads: z.array(GmailThreadSchema),
|
||||||
|
}),
|
||||||
|
},
|
||||||
'mcp:listTools': {
|
'mcp:listTools': {
|
||||||
req: z.object({
|
req: z.object({
|
||||||
serverName: z.string(),
|
serverName: z.string(),
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue