mirror of
https://github.com/rowboatlabs/rowboat.git
synced 2026-05-22 18:45:19 +02:00
Email page (#561)
* email view * render html emails * match unread and read status * move to accordian * faster loads * iframe mounted across toggle and cached height * prefetch on hover * fix iframe caching * split inbox * email processing agent * summary * rich text * email drafts * add pagination, watcher and separation from gmail sync * fix first load issue * handle drafts * send button opens the thread * simplify renderer and fix flickering issue * remove rended driven email path * support attachments in incoming emails * fix white background as well as dark mode
This commit is contained in:
parent
af618155e1
commit
7dcf8eea70
10 changed files with 3139 additions and 72 deletions
250
apps/x/packages/core/src/knowledge/classify_thread.ts
Normal file
250
apps/x/packages/core/src/knowledge/classify_thread.ts
Normal file
|
|
@ -0,0 +1,250 @@
|
|||
import fs from 'fs';
|
||||
import path from 'path';
|
||||
import { z } from 'zod';
|
||||
import { generateObject } from 'ai';
|
||||
import { google } from 'googleapis';
|
||||
import type { OAuth2Client } from 'google-auth-library';
|
||||
import { WorkDir } from '../config/config.js';
|
||||
import { createProvider } from '../models/models.js';
|
||||
import {
|
||||
getDefaultModelAndProvider,
|
||||
getKgModel,
|
||||
resolveProviderConfig,
|
||||
} from '../models/defaults.js';
|
||||
import { captureLlmUsage } from '../analytics/usage.js';
|
||||
import type { GmailThreadSnapshot } from './sync_gmail.js';
|
||||
|
||||
const STYLE_GUIDE_PATH = path.join(WorkDir, 'knowledge', 'Agent Notes', 'style', 'email.md');
|
||||
const CALENDAR_DIR = path.join(WorkDir, 'calendar_sync');
|
||||
const CALENDAR_LOOKAHEAD_DAYS = 7;
|
||||
const MAX_CALENDAR_EVENTS = 25;
|
||||
|
||||
function readEmailStyleGuide(): string | null {
|
||||
try {
|
||||
const raw = fs.readFileSync(STYLE_GUIDE_PATH, 'utf-8').trim();
|
||||
return raw || null;
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
interface CalendarSlice {
|
||||
summary: string;
|
||||
startIso: string;
|
||||
endIso?: string;
|
||||
}
|
||||
|
||||
function readUpcomingCalendar(): CalendarSlice[] {
|
||||
if (!fs.existsSync(CALENDAR_DIR)) return [];
|
||||
const now = Date.now();
|
||||
const cutoff = now + CALENDAR_LOOKAHEAD_DAYS * 24 * 60 * 60 * 1000;
|
||||
const out: CalendarSlice[] = [];
|
||||
let names: string[];
|
||||
try {
|
||||
names = fs.readdirSync(CALENDAR_DIR);
|
||||
} catch {
|
||||
return [];
|
||||
}
|
||||
for (const name of names) {
|
||||
if (!name.endsWith('.json')) continue;
|
||||
try {
|
||||
const raw = fs.readFileSync(path.join(CALENDAR_DIR, name), 'utf-8');
|
||||
const ev = JSON.parse(raw) as {
|
||||
summary?: string;
|
||||
start?: { dateTime?: string; date?: string };
|
||||
end?: { dateTime?: string; date?: string };
|
||||
status?: string;
|
||||
};
|
||||
if (ev.status === 'cancelled') continue;
|
||||
const startStr = ev.start?.dateTime ?? ev.start?.date;
|
||||
if (!startStr) continue;
|
||||
const startMs = Date.parse(startStr);
|
||||
if (Number.isNaN(startMs)) continue;
|
||||
if (startMs < now || startMs > cutoff) continue;
|
||||
out.push({
|
||||
summary: ev.summary || '(no title)',
|
||||
startIso: startStr,
|
||||
endIso: ev.end?.dateTime ?? ev.end?.date,
|
||||
});
|
||||
} catch {
|
||||
// skip malformed
|
||||
}
|
||||
}
|
||||
out.sort((a, b) => Date.parse(a.startIso) - Date.parse(b.startIso));
|
||||
return out.slice(0, MAX_CALENDAR_EVENTS);
|
||||
}
|
||||
|
||||
function formatCalendar(events: CalendarSlice[]): string {
|
||||
if (events.length === 0) return '(no upcoming events)';
|
||||
return events.map((e) => {
|
||||
const end = e.endIso ? ` – ${e.endIso}` : '';
|
||||
return `- ${e.startIso}${end}: ${e.summary}`;
|
||||
}).join('\n');
|
||||
}
|
||||
|
||||
let cachedUserEmail: string | null = null;
|
||||
|
||||
export async function getUserEmail(auth: OAuth2Client): Promise<string | null> {
|
||||
if (cachedUserEmail) return cachedUserEmail;
|
||||
try {
|
||||
const gmailClient = google.gmail({ version: 'v1', auth });
|
||||
const res = await gmailClient.users.getProfile({ userId: 'me' });
|
||||
if (res.data.emailAddress) {
|
||||
cachedUserEmail = res.data.emailAddress.toLowerCase();
|
||||
return cachedUserEmail;
|
||||
}
|
||||
} catch (err) {
|
||||
console.warn('[Email classifier] getProfile failed:', err);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
export interface Classification {
|
||||
importance: 'important' | 'other';
|
||||
summary?: string;
|
||||
draftResponse?: string;
|
||||
}
|
||||
|
||||
const ClassificationSchema = z.object({
|
||||
importance: z.enum(['important', 'other']).describe('important = real correspondence, action-required, or content worth referencing later. other = newsletters, marketing, automated notifications, transactional receipts, cold outreach.'),
|
||||
summary: z.string().optional().describe('One or two sentences capturing what the thread is about and any implied action. Required when importance is important. Omit when other.'),
|
||||
draftResponse: z.string().optional().describe('A complete draft reply the user can send as-is or edit. Plain text. Required when importance is important AND the thread implies a response is wanted. Omit when other, or when no response is appropriate (e.g. an FYI from a colleague that does not need a reply).'),
|
||||
});
|
||||
|
||||
const SYSTEM_PROMPT = `You classify a Gmail thread for a personal inbox view and, when appropriate, draft a reply on behalf of the user.
|
||||
|
||||
# Importance
|
||||
|
||||
Decide if the thread is "important" or "other":
|
||||
- important: real human correspondence the user is part of (customer, investor, team, vendor, candidate); a time-sensitive notification; a message that needs a response from the user; anything worth referencing later (contracts, pricing, deadlines, decisions).
|
||||
- other: newsletters, industry digests, marketing or promotional, product tips from vendors, automated notifications (verifications, recording uploads, platform policy updates), transactional confirmations (payment receipts, GST/tax filings, salary disbursements), unsolicited cold outreach.
|
||||
|
||||
# Summary (important only)
|
||||
|
||||
When the thread is important, write a 1-2 sentence summary that captures the gist and any action implied. Omit when "other".
|
||||
|
||||
# Draft response (important only)
|
||||
|
||||
When the thread is important AND a reply is reasonably expected from the user, write a complete draft reply they could send as-is.
|
||||
|
||||
Apply the user's email-style guide (when provided below) — match their tone, sign-off, length, and phrasing patterns. If no style guide is provided, default to a brief, warm, professional voice.
|
||||
|
||||
For scheduling-related threads (where the sender proposes meeting times, asks for the user's availability, or follows up on a meeting request), look at the user's upcoming calendar (provided below) and either:
|
||||
- Propose 2-3 specific time windows from genuinely free slots, or
|
||||
- Confirm/decline a specific time the sender proposed, based on calendar conflicts.
|
||||
|
||||
Use the same timezone the user appears to operate in (inferable from their previous messages or calendar events).
|
||||
|
||||
Omit the draft when:
|
||||
- importance is "other"
|
||||
- the thread is purely informational and doesn't ask for a reply
|
||||
- the latest message is from the user (they already replied; no draft needed)
|
||||
- you can't write a meaningful reply without information you don't have (don't fabricate)
|
||||
|
||||
Be decisive — pick exactly one importance label. Do not hedge.`;
|
||||
|
||||
function userReplied(snapshot: GmailThreadSnapshot, userEmail: string | null): boolean {
|
||||
if (!userEmail) return false;
|
||||
const needle = userEmail.toLowerCase();
|
||||
return snapshot.messages.some(m => (m.from || '').toLowerCase().includes(needle));
|
||||
}
|
||||
|
||||
function buildPrompt(
|
||||
snapshot: GmailThreadSnapshot,
|
||||
userEmail: string | null,
|
||||
styleGuide: string | null,
|
||||
calendar: CalendarSlice[],
|
||||
): string {
|
||||
const lines: string[] = [];
|
||||
|
||||
if (userEmail) {
|
||||
lines.push(`# Your identity`);
|
||||
lines.push(`The user's own email is ${userEmail}. You write as this person when drafting replies.`);
|
||||
lines.push('');
|
||||
}
|
||||
|
||||
if (styleGuide) {
|
||||
lines.push(`# Email style guide`);
|
||||
lines.push(styleGuide);
|
||||
lines.push('');
|
||||
}
|
||||
|
||||
lines.push(`# User's upcoming calendar (next ${CALENDAR_LOOKAHEAD_DAYS} days)`);
|
||||
lines.push(formatCalendar(calendar));
|
||||
lines.push('');
|
||||
|
||||
lines.push(`# Thread to classify`);
|
||||
lines.push(`Subject: ${snapshot.subject || '(no subject)'}`);
|
||||
lines.push(`Message count: ${snapshot.messages.length}`);
|
||||
lines.push('');
|
||||
|
||||
for (let i = 0; i < snapshot.messages.length; i += 1) {
|
||||
const msg = snapshot.messages[i];
|
||||
const isLast = i === snapshot.messages.length - 1;
|
||||
lines.push(`## Message ${i + 1}${isLast ? ' (latest)' : ''}`);
|
||||
lines.push(`From: ${msg.from || 'unknown'}`);
|
||||
if (msg.to) lines.push(`To: ${msg.to}`);
|
||||
if (msg.date) lines.push(`Date: ${msg.date}`);
|
||||
const body = (msg.body || '').replace(/\s+/g, ' ').slice(0, isLast ? 2000 : 600).trim();
|
||||
if (body) {
|
||||
lines.push('');
|
||||
lines.push(body);
|
||||
}
|
||||
lines.push('');
|
||||
}
|
||||
|
||||
return lines.join('\n');
|
||||
}
|
||||
|
||||
export interface ClassifyOptions {
|
||||
skipDraft?: boolean;
|
||||
}
|
||||
|
||||
export async function classifyThread(
|
||||
snapshot: GmailThreadSnapshot,
|
||||
userEmail: string | null,
|
||||
options: ClassifyOptions = {},
|
||||
): Promise<Classification> {
|
||||
if (userReplied(snapshot, userEmail)) {
|
||||
return { importance: 'important' };
|
||||
}
|
||||
|
||||
try {
|
||||
const styleGuide = readEmailStyleGuide();
|
||||
const calendar = readUpcomingCalendar();
|
||||
|
||||
const modelId = await getKgModel();
|
||||
const { provider } = await getDefaultModelAndProvider();
|
||||
const config = await resolveProviderConfig(provider);
|
||||
const model = createProvider(config).languageModel(modelId);
|
||||
|
||||
const systemPrompt = options.skipDraft
|
||||
? `${SYSTEM_PROMPT}\n\n# Skip the draft\n\nThe user already has their own draft in progress for this thread — DO NOT generate a draftResponse. Always omit the draftResponse field.`
|
||||
: SYSTEM_PROMPT;
|
||||
|
||||
const result = await generateObject({
|
||||
model,
|
||||
system: systemPrompt,
|
||||
prompt: buildPrompt(snapshot, userEmail, styleGuide, calendar),
|
||||
schema: ClassificationSchema,
|
||||
});
|
||||
|
||||
captureLlmUsage({
|
||||
useCase: 'knowledge_sync',
|
||||
subUseCase: 'email_classifier',
|
||||
model: modelId,
|
||||
provider,
|
||||
usage: result.usage,
|
||||
});
|
||||
|
||||
const out: Classification = { importance: result.object.importance };
|
||||
if (result.object.importance === 'important') {
|
||||
if (result.object.summary) out.summary = result.object.summary;
|
||||
if (!options.skipDraft && result.object.draftResponse) out.draftResponse = result.object.draftResponse;
|
||||
}
|
||||
return out;
|
||||
} catch (err) {
|
||||
console.warn(`[Email classifier] LLM call failed for thread ${snapshot.threadId}:`, err);
|
||||
return { importance: 'important' };
|
||||
}
|
||||
}
|
||||
|
|
@ -134,7 +134,7 @@ async function publishCalendarSyncEvent(
|
|||
|
||||
// Configuration
|
||||
const SYNC_DIR = path.join(WorkDir, 'calendar_sync');
|
||||
const SYNC_INTERVAL_MS = 5 * 60 * 1000; // Check every 5 minutes
|
||||
const SYNC_INTERVAL_MS = 30 * 1000; // Check every 30 seconds
|
||||
const LOOKBACK_DAYS = 7;
|
||||
const REQUIRED_SCOPES = [
|
||||
'https://www.googleapis.com/auth/calendar.events.readonly',
|
||||
|
|
|
|||
|
|
@ -8,19 +8,114 @@ import { GoogleClientFactory } from './google-client-factory.js';
|
|||
import { serviceLogger, type ServiceRunContext } from '../services/service_logger.js';
|
||||
import { limitEventItems } from './limit_event_items.js';
|
||||
import { createEvent } from '../events/producer.js';
|
||||
import { classifyThread, getUserEmail } from './classify_thread.js';
|
||||
|
||||
// Configuration
|
||||
const SYNC_DIR = path.join(WorkDir, 'gmail_sync');
|
||||
const SYNC_INTERVAL_MS = 5 * 60 * 1000; // Check every 5 minutes
|
||||
const LEGACY_CACHE_DIR = path.join(SYNC_DIR, 'cache');
|
||||
const CACHE_DIR = path.join(WorkDir, 'inbox_lists');
|
||||
|
||||
(function migrateLegacyCacheDir() {
|
||||
try {
|
||||
if (fs.existsSync(LEGACY_CACHE_DIR) && !fs.existsSync(CACHE_DIR)) {
|
||||
fs.renameSync(LEGACY_CACHE_DIR, CACHE_DIR);
|
||||
console.log(`[Gmail] Migrated cache from ${LEGACY_CACHE_DIR} → ${CACHE_DIR}`);
|
||||
}
|
||||
} catch (err) {
|
||||
console.warn('[Gmail] Cache directory migration failed:', err);
|
||||
}
|
||||
})();
|
||||
const SYNC_INTERVAL_MS = 30 * 1000; // Check every 30 seconds
|
||||
const REQUIRED_SCOPE = 'https://www.googleapis.com/auth/gmail.readonly';
|
||||
const MAX_THREADS_IN_DIGEST = 10;
|
||||
const nhm = new NodeHtmlMarkdown();
|
||||
|
||||
interface SnapshotCacheEntry {
|
||||
historyId: string;
|
||||
fetchedAt: string;
|
||||
snapshot: GmailThreadSnapshot;
|
||||
}
|
||||
|
||||
function cachePath(threadId: string): string {
|
||||
return path.join(CACHE_DIR, `${encodeURIComponent(threadId)}.json`);
|
||||
}
|
||||
|
||||
function readCachedSnapshot(threadId: string): SnapshotCacheEntry | null {
|
||||
try {
|
||||
const raw = fs.readFileSync(cachePath(threadId), 'utf-8');
|
||||
return JSON.parse(raw) as SnapshotCacheEntry;
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
function writeCachedSnapshot(threadId: string, historyId: string, snapshot: GmailThreadSnapshot): void {
|
||||
try {
|
||||
if (!fs.existsSync(CACHE_DIR)) fs.mkdirSync(CACHE_DIR, { recursive: true });
|
||||
const entry: SnapshotCacheEntry = {
|
||||
historyId,
|
||||
fetchedAt: new Date().toISOString(),
|
||||
snapshot,
|
||||
};
|
||||
fs.writeFileSync(cachePath(threadId), JSON.stringify(entry), 'utf-8');
|
||||
} catch (err) {
|
||||
console.warn(`[Gmail cache] write failed for ${threadId}:`, err);
|
||||
}
|
||||
}
|
||||
|
||||
export function saveMessageBodyHeight(threadId: string, messageId: string, height: number): void {
|
||||
const cached = readCachedSnapshot(threadId);
|
||||
if (!cached) return;
|
||||
const message = cached.snapshot.messages.find((m) => m.id === messageId);
|
||||
if (!message) return;
|
||||
if (message.bodyHeight === height) return;
|
||||
message.bodyHeight = height;
|
||||
try {
|
||||
fs.writeFileSync(cachePath(threadId), JSON.stringify(cached), 'utf-8');
|
||||
} catch (err) {
|
||||
console.warn(`[Gmail cache] height write failed for ${threadId}/${messageId}:`, err);
|
||||
}
|
||||
}
|
||||
|
||||
interface SyncedThread {
|
||||
threadId: string;
|
||||
markdown: string;
|
||||
}
|
||||
|
||||
export interface GmailThreadSnapshot {
|
||||
threadId: string;
|
||||
threadUrl: string;
|
||||
summary?: string;
|
||||
subject?: string;
|
||||
from?: string;
|
||||
to?: string;
|
||||
date?: string;
|
||||
latest_email?: string;
|
||||
past_summary?: string;
|
||||
unread?: boolean;
|
||||
importance?: 'important' | 'other';
|
||||
draft_response?: string;
|
||||
gmail_draft?: string;
|
||||
messages: Array<{
|
||||
id?: string;
|
||||
from?: string;
|
||||
to?: string;
|
||||
cc?: string;
|
||||
date?: string;
|
||||
subject?: string;
|
||||
body?: string;
|
||||
bodyHtml?: string;
|
||||
unread?: boolean;
|
||||
bodyHeight?: number;
|
||||
attachments?: Array<{
|
||||
filename: string;
|
||||
mimeType?: string;
|
||||
sizeBytes?: number;
|
||||
savedPath: string;
|
||||
}>;
|
||||
}>;
|
||||
}
|
||||
|
||||
function summarizeGmailSync(threads: SyncedThread[]): string {
|
||||
const lines: string[] = [
|
||||
`# Gmail sync update`,
|
||||
|
|
@ -93,35 +188,416 @@ function decodeBase64(data: string): string {
|
|||
return Buffer.from(data, 'base64').toString('utf-8');
|
||||
}
|
||||
|
||||
function extractBodyParts(payload: gmail.Schema$MessagePart): { text: string; html: string } {
|
||||
const out = { text: '', html: '' };
|
||||
const walk = (part: gmail.Schema$MessagePart): void => {
|
||||
const mime = part.mimeType || '';
|
||||
if (mime === 'text/html' && part.body?.data) {
|
||||
if (!out.html) out.html = decodeBase64(part.body.data);
|
||||
return;
|
||||
}
|
||||
if (mime === 'text/plain' && part.body?.data) {
|
||||
if (!out.text) out.text = decodeBase64(part.body.data);
|
||||
return;
|
||||
}
|
||||
if (part.parts) {
|
||||
for (const sub of part.parts) walk(sub);
|
||||
}
|
||||
};
|
||||
walk(payload);
|
||||
return out;
|
||||
}
|
||||
|
||||
function getBody(payload: gmail.Schema$MessagePart): string {
|
||||
let body = "";
|
||||
if (payload.parts) {
|
||||
for (const part of payload.parts) {
|
||||
if (part.mimeType === 'text/plain' && part.body && part.body.data) {
|
||||
const text = decodeBase64(part.body.data);
|
||||
// Strip quoted lines
|
||||
const cleanLines = text.split('\n').filter((line: string) => !line.trim().startsWith('>'));
|
||||
body += cleanLines.join('\n');
|
||||
} else if (part.mimeType === 'text/html' && part.body && part.body.data) {
|
||||
const html = decodeBase64(part.body.data);
|
||||
const md = nhm.translate(html);
|
||||
// Simple quote stripping for MD
|
||||
const cleanLines = md.split('\n').filter((line: string) => !line.trim().startsWith('>'));
|
||||
body += cleanLines.join('\n');
|
||||
} else if (part.parts) {
|
||||
body += getBody(part);
|
||||
const { text, html } = extractBodyParts(payload);
|
||||
if (html) {
|
||||
const md = nhm.translate(html);
|
||||
return md.split('\n').filter((line: string) => !line.trim().startsWith('>')).join('\n');
|
||||
}
|
||||
if (text) {
|
||||
return text.split('\n').filter((line: string) => !line.trim().startsWith('>')).join('\n');
|
||||
}
|
||||
return '';
|
||||
}
|
||||
|
||||
interface ExtractedAttachment {
|
||||
filename: string;
|
||||
mimeType?: string;
|
||||
sizeBytes?: number;
|
||||
savedPath: string;
|
||||
}
|
||||
|
||||
/**
|
||||
* Walk a message MIME tree and collect "real" attachments — parts with a
|
||||
* filename + attachmentId, excluding cid-referenced inline images (those
|
||||
* already get baked into bodyHtml as data URLs).
|
||||
*
|
||||
* Returns workspace-relative paths matching the convention used by
|
||||
* saveAttachment / processThread, so the renderer can hand them to
|
||||
* shell.openPath via the existing IPC.
|
||||
*/
|
||||
function extractAttachments(msgId: string, payload: gmail.Schema$MessagePart): ExtractedAttachment[] {
|
||||
const out: ExtractedAttachment[] = [];
|
||||
const walk = (part: gmail.Schema$MessagePart): void => {
|
||||
const filename = part.filename;
|
||||
const attId = part.body?.attachmentId;
|
||||
if (filename && attId) {
|
||||
// Exclude only true inline images (image/* with a Content-ID, which
|
||||
// get baked into bodyHtml as data URLs by inlineCidImages). Other
|
||||
// parts with Content-ID — PDFs, .log files, .ics, etc. — are real
|
||||
// attachments; Gmail just stamps Content-ID on most parts.
|
||||
const cid = part.headers?.find(h => h.name?.toLowerCase() === 'content-id')?.value;
|
||||
const mime = part.mimeType || '';
|
||||
const isInlineImage = !!cid && mime.startsWith('image/');
|
||||
if (!isInlineImage) {
|
||||
const safeName = `${msgId}_${cleanFilename(filename)}`;
|
||||
out.push({
|
||||
filename,
|
||||
mimeType: part.mimeType ?? undefined,
|
||||
sizeBytes: typeof part.body?.size === 'number' ? part.body.size : undefined,
|
||||
savedPath: `gmail_sync/attachments/${safeName}`,
|
||||
});
|
||||
}
|
||||
}
|
||||
} else if (payload.body && payload.body.data) {
|
||||
const data = decodeBase64(payload.body.data);
|
||||
if (payload.mimeType === 'text/html') {
|
||||
const md = nhm.translate(data);
|
||||
body += md.split('\n').filter((line: string) => !line.trim().startsWith('>')).join('\n');
|
||||
} else {
|
||||
body += data.split('\n').filter((line: string) => !line.trim().startsWith('>')).join('\n');
|
||||
if (part.parts) for (const sub of part.parts) walk(sub);
|
||||
};
|
||||
walk(payload);
|
||||
return out;
|
||||
}
|
||||
|
||||
async function inlineCidImages(
|
||||
gmailClient: gmail.Gmail,
|
||||
messageId: string,
|
||||
payload: gmail.Schema$MessagePart,
|
||||
html: string,
|
||||
): Promise<string> {
|
||||
if (!/src\s*=\s*["']?cid:/i.test(html)) return html;
|
||||
|
||||
const inlineParts: Array<{ contentId: string; mimeType: string; attachmentId: string }> = [];
|
||||
const collect = (part: gmail.Schema$MessagePart): void => {
|
||||
const cidHeader = part.headers?.find(h => h.name?.toLowerCase() === 'content-id')?.value;
|
||||
const attachmentId = part.body?.attachmentId;
|
||||
const mime = part.mimeType || '';
|
||||
if (cidHeader && attachmentId && mime.startsWith('image/')) {
|
||||
inlineParts.push({
|
||||
contentId: cidHeader.replace(/^<|>$/g, '').trim(),
|
||||
mimeType: mime,
|
||||
attachmentId,
|
||||
});
|
||||
}
|
||||
if (part.parts) for (const sub of part.parts) collect(sub);
|
||||
};
|
||||
collect(payload);
|
||||
if (inlineParts.length === 0) return html;
|
||||
|
||||
const dataUrls = new Map<string, string>();
|
||||
await Promise.all(inlineParts.map(async (part) => {
|
||||
try {
|
||||
const res = await gmailClient.users.messages.attachments.get({
|
||||
userId: 'me',
|
||||
messageId,
|
||||
id: part.attachmentId,
|
||||
});
|
||||
const b64 = res.data.data;
|
||||
if (!b64) return;
|
||||
// Gmail returns base64url; data URLs need standard base64
|
||||
const normalized = b64.replace(/-/g, '+').replace(/_/g, '/');
|
||||
dataUrls.set(part.contentId, `data:${part.mimeType};base64,${normalized}`);
|
||||
} catch (err) {
|
||||
console.warn(`[Gmail] inline image fetch failed for ${part.contentId}:`, err);
|
||||
}
|
||||
}));
|
||||
|
||||
let rewritten = html;
|
||||
for (const [cid, url] of dataUrls) {
|
||||
const escaped = cid.replace(/[.*+?^${}()|[\]\\]/g, '\\$&');
|
||||
rewritten = rewritten.replace(new RegExp(`cid:${escaped}`, 'gi'), url);
|
||||
}
|
||||
return rewritten;
|
||||
}
|
||||
|
||||
function normalizeBody(body: string): string {
|
||||
return body.replace(/\r\n/g, '\n').replace(/\n{3,}/g, '\n\n').trim();
|
||||
}
|
||||
|
||||
function headerValue(headers: gmail.Schema$MessagePartHeader[] | undefined, name: string): string | undefined {
|
||||
return headers?.find(h => h.name?.toLowerCase() === name.toLowerCase())?.value || undefined;
|
||||
}
|
||||
|
||||
export interface RecentThreadInfo {
|
||||
threadId: string;
|
||||
historyId: string;
|
||||
snippet?: string;
|
||||
}
|
||||
|
||||
export type InboxSection = 'important' | 'other';
|
||||
|
||||
export interface InboxPageOptions {
|
||||
section: InboxSection;
|
||||
cursor?: string;
|
||||
limit?: number;
|
||||
}
|
||||
|
||||
export interface InboxPageResult {
|
||||
threads: GmailThreadSnapshot[];
|
||||
nextCursor: string | null;
|
||||
}
|
||||
|
||||
interface IndexedEntry {
|
||||
threadId: string;
|
||||
dateMs: number;
|
||||
snapshot: GmailThreadSnapshot;
|
||||
}
|
||||
|
||||
function snapshotImportance(s: GmailThreadSnapshot): InboxSection {
|
||||
return s.importance === 'other' ? 'other' : 'important';
|
||||
}
|
||||
|
||||
function snapshotDateMs(s: GmailThreadSnapshot): number {
|
||||
const latest = s.messages[s.messages.length - 1];
|
||||
const raw = latest?.date || s.date;
|
||||
if (!raw) return 0;
|
||||
const ms = Date.parse(raw);
|
||||
return Number.isFinite(ms) ? ms : 0;
|
||||
}
|
||||
|
||||
function parseCursor(cursor: string | undefined): { dateMs: number; threadId: string } | null {
|
||||
if (!cursor) return null;
|
||||
const idx = cursor.indexOf('|');
|
||||
if (idx < 0) return null;
|
||||
const dateMs = Number(cursor.slice(0, idx));
|
||||
const threadId = cursor.slice(idx + 1);
|
||||
if (!Number.isFinite(dateMs) || !threadId) return null;
|
||||
return { dateMs, threadId };
|
||||
}
|
||||
|
||||
function encodeCursor(entry: { dateMs: number; threadId: string }): string {
|
||||
return `${entry.dateMs}|${entry.threadId}`;
|
||||
}
|
||||
|
||||
export function listImportantThreads(opts: { cursor?: string; limit?: number } = {}): InboxPageResult {
|
||||
return listInboxPage({ section: 'important', ...opts });
|
||||
}
|
||||
|
||||
export function listEverythingElseThreads(opts: { cursor?: string; limit?: number } = {}): InboxPageResult {
|
||||
return listInboxPage({ section: 'other', ...opts });
|
||||
}
|
||||
|
||||
export function listInboxPage(opts: InboxPageOptions): InboxPageResult {
|
||||
const limit = Math.max(1, Math.min(100, opts.limit ?? 25));
|
||||
const cursor = parseCursor(opts.cursor);
|
||||
|
||||
if (!fs.existsSync(CACHE_DIR)) return { threads: [], nextCursor: null };
|
||||
|
||||
let names: string[];
|
||||
try {
|
||||
names = fs.readdirSync(CACHE_DIR);
|
||||
} catch {
|
||||
return { threads: [], nextCursor: null };
|
||||
}
|
||||
|
||||
const entries: IndexedEntry[] = [];
|
||||
for (const name of names) {
|
||||
if (!name.endsWith('.json')) continue;
|
||||
const filePath = path.join(CACHE_DIR, name);
|
||||
try {
|
||||
const raw = fs.readFileSync(filePath, 'utf-8');
|
||||
const wrapper = JSON.parse(raw) as SnapshotCacheEntry;
|
||||
const snapshot = wrapper.snapshot;
|
||||
if (!snapshot) continue;
|
||||
if (snapshotImportance(snapshot) !== opts.section) continue;
|
||||
entries.push({
|
||||
threadId: snapshot.threadId,
|
||||
dateMs: snapshotDateMs(snapshot),
|
||||
snapshot,
|
||||
});
|
||||
} catch (err) {
|
||||
console.warn(`[Inbox lists] read failed for ${name}:`, err);
|
||||
}
|
||||
}
|
||||
return body;
|
||||
|
||||
// Newest first, threadId asc as tiebreak.
|
||||
entries.sort((a, b) => {
|
||||
if (b.dateMs !== a.dateMs) return b.dateMs - a.dateMs;
|
||||
return a.threadId < b.threadId ? -1 : 1;
|
||||
});
|
||||
|
||||
let startIdx = 0;
|
||||
if (cursor) {
|
||||
startIdx = entries.findIndex((e) => {
|
||||
if (e.dateMs < cursor.dateMs) return true;
|
||||
if (e.dateMs === cursor.dateMs && e.threadId > cursor.threadId) return true;
|
||||
return false;
|
||||
});
|
||||
if (startIdx < 0) startIdx = entries.length;
|
||||
}
|
||||
|
||||
const slice = entries.slice(startIdx, startIdx + limit);
|
||||
const hasMore = startIdx + slice.length < entries.length;
|
||||
const last = slice[slice.length - 1];
|
||||
|
||||
return {
|
||||
threads: slice.map((e) => e.snapshot),
|
||||
nextCursor: hasMore && last ? encodeCursor({ dateMs: last.dateMs, threadId: last.threadId }) : null,
|
||||
};
|
||||
}
|
||||
|
||||
export async function listRecentThreadIds(daysAgo: number = 2): Promise<RecentThreadInfo[]> {
|
||||
const auth = await GoogleClientFactory.getClient();
|
||||
if (!auth) {
|
||||
throw new Error('Gmail is not connected.');
|
||||
}
|
||||
|
||||
const gmailClient = google.gmail({ version: 'v1', auth });
|
||||
const since = new Date();
|
||||
since.setDate(since.getDate() - daysAgo);
|
||||
const dateQuery = since.toISOString().split('T')[0].replace(/-/g, '/');
|
||||
|
||||
const results: RecentThreadInfo[] = [];
|
||||
let pageToken: string | undefined;
|
||||
do {
|
||||
const res = await gmailClient.users.threads.list({
|
||||
userId: 'me',
|
||||
q: `after:${dateQuery}`,
|
||||
pageToken,
|
||||
});
|
||||
const threads = res.data.threads || [];
|
||||
for (const thread of threads) {
|
||||
if (thread.id && thread.historyId) {
|
||||
results.push({
|
||||
threadId: thread.id,
|
||||
historyId: thread.historyId,
|
||||
snippet: thread.snippet || undefined,
|
||||
});
|
||||
}
|
||||
}
|
||||
pageToken = res.data.nextPageToken ?? undefined;
|
||||
} while (pageToken);
|
||||
|
||||
return results;
|
||||
}
|
||||
|
||||
/**
|
||||
* Build a GmailThreadSnapshot from an already-fetched threads.get response,
|
||||
* classify it, and write to inbox_lists/. Called by the background sync
|
||||
* (processThread) — the only path that materializes snapshots.
|
||||
*
|
||||
* Returns null when the thread has no visible (non-draft) messages —
|
||||
* those shouldn't show up in the inbox.
|
||||
*/
|
||||
async function buildAndCacheSnapshot(
|
||||
threadId: string,
|
||||
threadData: gmail.Schema$Thread,
|
||||
gmailClient: gmail.Gmail,
|
||||
auth: OAuth2Client,
|
||||
): Promise<GmailThreadSnapshot | null> {
|
||||
const messages = threadData.messages;
|
||||
if (!messages || messages.length === 0) return null;
|
||||
|
||||
const cached = readCachedSnapshot(threadId);
|
||||
// Short-circuit: if the thread hasn't changed since we last classified it,
|
||||
// skip the rebuild + classifier. Saves the cid-image fetches and one LLM
|
||||
// call per unchanged thread (matters most during fullSync after a
|
||||
// historyId expiry, where the whole window is re-walked).
|
||||
// We require `importance` to be present too — pre-classifier cache files
|
||||
// would otherwise stick around forever uncategorised.
|
||||
if (
|
||||
threadData.historyId &&
|
||||
cached &&
|
||||
cached.historyId === threadData.historyId &&
|
||||
cached.snapshot.importance
|
||||
) {
|
||||
return cached.snapshot;
|
||||
}
|
||||
const heightCarryover = new Map<string, number>();
|
||||
if (cached) {
|
||||
for (const m of cached.snapshot.messages) {
|
||||
if (m.id && typeof m.bodyHeight === 'number') heightCarryover.set(m.id, m.bodyHeight);
|
||||
}
|
||||
}
|
||||
|
||||
const parsed = await Promise.all(messages.map(async (msg) => {
|
||||
const headers = msg.payload?.headers || [];
|
||||
const parts = msg.payload ? extractBodyParts(msg.payload) : { text: '', html: '' };
|
||||
const body = msg.payload ? normalizeBody(getBody(msg.payload)) : '';
|
||||
let bodyHtml: string | undefined;
|
||||
if (parts.html && msg.payload && msg.id) {
|
||||
try {
|
||||
bodyHtml = await inlineCidImages(gmailClient, msg.id, msg.payload, parts.html);
|
||||
} catch (err) {
|
||||
console.warn(`[Gmail] inline image embed failed for message ${msg.id}:`, err);
|
||||
bodyHtml = parts.html;
|
||||
}
|
||||
}
|
||||
const isDraft = msg.labelIds?.includes('DRAFT') ?? false;
|
||||
const attachments = msg.payload && msg.id ? extractAttachments(msg.id, msg.payload) : [];
|
||||
return {
|
||||
id: msg.id || undefined,
|
||||
from: headerValue(headers, 'From') || 'Unknown',
|
||||
to: headerValue(headers, 'To'),
|
||||
cc: headerValue(headers, 'Cc'),
|
||||
date: headerValue(headers, 'Date'),
|
||||
subject: headerValue(headers, 'Subject') || '(No Subject)',
|
||||
body,
|
||||
bodyHtml,
|
||||
unread: msg.labelIds?.includes('UNREAD') ?? false,
|
||||
bodyHeight: msg.id ? heightCarryover.get(msg.id) : undefined,
|
||||
messageIdHeader: headerValue(headers, 'Message-ID') || headerValue(headers, 'Message-Id') || undefined,
|
||||
attachments: attachments.length > 0 ? attachments : undefined,
|
||||
isDraft,
|
||||
};
|
||||
}));
|
||||
|
||||
const sentMessages = parsed.filter((m) => !m.isDraft);
|
||||
const draftMessages = parsed.filter((m) => m.isDraft);
|
||||
const visibleMessages = sentMessages.map(({ isDraft: _isDraft, ...rest }) => rest);
|
||||
const latestDraftBody = draftMessages.length > 0
|
||||
? draftMessages[draftMessages.length - 1]!.body.trim()
|
||||
: '';
|
||||
|
||||
if (visibleMessages.length === 0) return null;
|
||||
|
||||
const latest = visibleMessages[visibleMessages.length - 1]!;
|
||||
const earlier = visibleMessages.slice(0, -1);
|
||||
const earlierSummary = earlier
|
||||
.map((msg) => {
|
||||
const date = msg.date ? ` (${msg.date})` : '';
|
||||
const body = msg.body.replace(/\s+/g, ' ').slice(0, 500).trim();
|
||||
return `${msg.from}${date}: ${body}`;
|
||||
})
|
||||
.filter(Boolean)
|
||||
.join('\n\n');
|
||||
|
||||
const snapshot: GmailThreadSnapshot = {
|
||||
threadId,
|
||||
threadUrl: `https://mail.google.com/mail/u/0/#all/${threadId}`,
|
||||
subject: latest.subject || visibleMessages[0]?.subject,
|
||||
from: latest.from,
|
||||
to: latest.to,
|
||||
date: latest.date,
|
||||
latest_email: latest.body,
|
||||
past_summary: earlierSummary || undefined,
|
||||
unread: visibleMessages.some((m) => m.unread),
|
||||
messages: visibleMessages,
|
||||
gmail_draft: latestDraftBody || undefined,
|
||||
};
|
||||
|
||||
try {
|
||||
const userEmail = await getUserEmail(auth);
|
||||
const skipDraft = latestDraftBody.length > 0;
|
||||
const classification = await classifyThread(snapshot, userEmail, { skipDraft });
|
||||
snapshot.importance = classification.importance;
|
||||
if (classification.summary) snapshot.summary = classification.summary;
|
||||
if (classification.draftResponse) snapshot.draft_response = classification.draftResponse;
|
||||
} catch (err) {
|
||||
console.warn(`[Gmail] classify failed for ${threadId}:`, err);
|
||||
}
|
||||
|
||||
if (threadData.historyId) {
|
||||
writeCachedSnapshot(threadId, threadData.historyId, snapshot);
|
||||
}
|
||||
|
||||
return snapshot;
|
||||
}
|
||||
|
||||
async function saveAttachment(gmail: gmail.Gmail, userId: string, msgId: string, part: gmail.Schema$MessagePart, attachmentsDir: string): Promise<string | null> {
|
||||
|
|
@ -225,6 +701,14 @@ async function processThread(auth: OAuth2Client, threadId: string, syncDir: stri
|
|||
fs.writeFileSync(path.join(syncDir, `${threadId}.md`), mdContent);
|
||||
console.log(`Synced Thread: ${subject} (${threadId})`);
|
||||
|
||||
// Also build + cache the rich snapshot for the inbox view.
|
||||
// Reuses the threads.get response — no extra API call.
|
||||
try {
|
||||
await buildAndCacheSnapshot(threadId, thread, gmail, auth);
|
||||
} catch (err) {
|
||||
console.warn(`[Gmail] Inbox snapshot build failed for ${threadId}:`, err);
|
||||
}
|
||||
|
||||
return { threadId, markdown: mdContent };
|
||||
|
||||
} catch (error) {
|
||||
|
|
@ -233,6 +717,46 @@ async function processThread(auth: OAuth2Client, threadId: string, syncDir: stri
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* After a sync cycle, prune inbox_lists/ entries for threadIds that are
|
||||
* no longer in INBOX (archived/trashed elsewhere). Single threads.list call,
|
||||
* keeps the cache in lock-step with Gmail's INBOX label.
|
||||
*/
|
||||
async function pruneInboxCache(auth: OAuth2Client): Promise<void> {
|
||||
if (!fs.existsSync(CACHE_DIR)) return;
|
||||
try {
|
||||
const gmailClient = google.gmail({ version: 'v1', auth });
|
||||
const inInbox = new Set<string>();
|
||||
let pageToken: string | undefined;
|
||||
do {
|
||||
const res = await gmailClient.users.threads.list({
|
||||
userId: 'me',
|
||||
labelIds: ['INBOX'],
|
||||
maxResults: 500,
|
||||
pageToken,
|
||||
});
|
||||
for (const t of res.data.threads || []) {
|
||||
if (t.id) inInbox.add(t.id);
|
||||
}
|
||||
pageToken = res.data.nextPageToken ?? undefined;
|
||||
} while (pageToken);
|
||||
|
||||
for (const name of fs.readdirSync(CACHE_DIR)) {
|
||||
if (!name.endsWith('.json')) continue;
|
||||
const threadId = decodeURIComponent(name.replace(/\.json$/, ''));
|
||||
if (!inInbox.has(threadId)) {
|
||||
try {
|
||||
fs.rmSync(path.join(CACHE_DIR, name), { force: true });
|
||||
} catch (err) {
|
||||
console.warn(`[Gmail] prune failed for ${threadId}:`, err);
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (err) {
|
||||
console.warn('[Gmail] pruneInboxCache failed:', err);
|
||||
}
|
||||
}
|
||||
|
||||
function loadState(stateFile: string): { historyId?: string; last_sync?: string } {
|
||||
if (fs.existsSync(stateFile)) {
|
||||
return JSON.parse(fs.readFileSync(stateFile, 'utf-8'));
|
||||
|
|
@ -515,6 +1039,10 @@ async function performSync() {
|
|||
await partialSync(auth, state.historyId, SYNC_DIR, ATTACHMENTS_DIR, STATE_FILE, LOOKBACK_DAYS);
|
||||
}
|
||||
|
||||
// Keep inbox_lists/ in lock-step with Gmail's INBOX label —
|
||||
// remove cache files for threads that were archived/trashed elsewhere.
|
||||
await pruneInboxCache(auth);
|
||||
|
||||
console.log("Sync completed.");
|
||||
} catch (error) {
|
||||
console.error("Error during sync:", error);
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue