From 67b1fe97f77ec6d90df17fb985ee28d2226fca36 Mon Sep 17 00:00:00 2001 From: Arjun <6592213+arkml@users.noreply.github.com> Date: Wed, 13 May 2026 21:41:23 +0530 Subject: [PATCH] email processing agent --- .../renderer/src/components/email-view.tsx | 6 +- .../core/src/knowledge/classify_thread.ts | 115 ++++++++++++++++++ .../packages/core/src/knowledge/sync_gmail.ts | 11 ++ apps/x/packages/shared/src/blocks.ts | 1 + 4 files changed, 131 insertions(+), 2 deletions(-) create mode 100644 apps/x/packages/core/src/knowledge/classify_thread.ts diff --git a/apps/x/apps/renderer/src/components/email-view.tsx b/apps/x/apps/renderer/src/components/email-view.tsx index 3edcbc92..0c421dfa 100644 --- a/apps/x/apps/renderer/src/components/email-view.tsx +++ b/apps/x/apps/renderer/src/components/email-view.tsx @@ -562,8 +562,10 @@ export function EmailView() { const important: GmailThread[] = [] const other: GmailThread[] = [] for (const thread of filteredThreads) { - if (thread.messages.length > 1) important.push(thread) - else other.push(thread) + // Default unclassified threads to Important so we don't hide anything + // before the classifier has run on them. + if (thread.importance === 'other') other.push(thread) + else important.push(thread) } return { importantThreads: important, otherThreads: other } }, [filteredThreads]) diff --git a/apps/x/packages/core/src/knowledge/classify_thread.ts b/apps/x/packages/core/src/knowledge/classify_thread.ts new file mode 100644 index 00000000..c60b821a --- /dev/null +++ b/apps/x/packages/core/src/knowledge/classify_thread.ts @@ -0,0 +1,115 @@ +import { z } from 'zod'; +import { generateObject } from 'ai'; +import { google } from 'googleapis'; +import type { OAuth2Client } from 'google-auth-library'; +import { createProvider } from '../models/models.js'; +import { + getDefaultModelAndProvider, + getKgModel, + resolveProviderConfig, +} from '../models/defaults.js'; +import { captureLlmUsage } from '../analytics/usage.js'; +import type { GmailThreadSnapshot } from './sync_gmail.js'; + +let cachedUserEmail: string | null = null; + +export async function getUserEmail(auth: OAuth2Client): Promise { + if (cachedUserEmail) return cachedUserEmail; + try { + const gmailClient = google.gmail({ version: 'v1', auth }); + const res = await gmailClient.users.getProfile({ userId: 'me' }); + if (res.data.emailAddress) { + cachedUserEmail = res.data.emailAddress.toLowerCase(); + return cachedUserEmail; + } + } catch (err) { + console.warn('[Email classifier] getProfile failed:', err); + } + return null; +} + +export interface Classification { + importance: 'important' | 'other'; + summary?: string; +} + +const ClassificationSchema = z.object({ + importance: z.enum(['important', 'other']).describe('important = real correspondence, action-required, or content worth referencing later. other = newsletters, marketing, automated notifications, transactional receipts, cold outreach.'), + summary: z.string().optional().describe('One or two sentences capturing what the thread is about and any implied action. Required when importance is important. Omit when other.'), +}); + +const SYSTEM_PROMPT = `You classify a Gmail thread for a personal inbox view. + +Decide if the thread is "important" or "other": + +- important: real human correspondence the user is part of (customer, investor, team, vendor, candidate); a time-sensitive notification; a message that needs a response from the user; anything worth referencing later (contracts, pricing, deadlines, decisions). +- other: newsletters, industry digests, marketing or promotional, product tips from vendors, automated notifications (verifications, recording uploads, platform policy updates), transactional confirmations (payment receipts, GST/tax filings, salary disbursements), unsolicited cold outreach. + +When the thread is important, write a 1-2 sentence summary that captures the gist and any action implied (e.g. "Customer requesting a demo next Tuesday; needs a calendar link." or "Investor following up on Q3 metrics; reply expected."). Omit the summary when the thread is "other". + +Be decisive — pick exactly one label. Do not hedge.`; + +function userReplied(snapshot: GmailThreadSnapshot, userEmail: string | null): boolean { + if (!userEmail) return false; + const needle = userEmail.toLowerCase(); + return snapshot.messages.some(m => (m.from || '').toLowerCase().includes(needle)); +} + +function buildPrompt(snapshot: GmailThreadSnapshot): string { + const lines: string[] = []; + lines.push(`Subject: ${snapshot.subject || '(no subject)'}`); + lines.push(`Message count: ${snapshot.messages.length}`); + lines.push(''); + const latest = snapshot.messages[snapshot.messages.length - 1]; + if (latest) { + lines.push(`Latest message:`); + lines.push(` From: ${latest.from || 'unknown'}`); + if (latest.to) lines.push(` To: ${latest.to}`); + if (latest.date) lines.push(` Date: ${latest.date}`); + lines.push(''); + const snippet = (latest.body || '').replace(/\s+/g, ' ').slice(0, 1200).trim(); + lines.push(` Body:`); + lines.push(` ${snippet}`); + } + return lines.join('\n'); +} + +export async function classifyThread( + snapshot: GmailThreadSnapshot, + userEmail: string | null, +): Promise { + if (userReplied(snapshot, userEmail)) { + return { importance: 'important' }; + } + + try { + const modelId = await getKgModel(); + const { provider } = await getDefaultModelAndProvider(); + const config = await resolveProviderConfig(provider); + const model = createProvider(config).languageModel(modelId); + + const result = await generateObject({ + model, + system: SYSTEM_PROMPT, + prompt: buildPrompt(snapshot), + schema: ClassificationSchema, + }); + + captureLlmUsage({ + useCase: 'knowledge_sync', + subUseCase: 'email_classifier', + model: modelId, + provider, + usage: result.usage, + }); + + const out: Classification = { importance: result.object.importance }; + if (result.object.importance === 'important' && result.object.summary) { + out.summary = result.object.summary; + } + return out; + } catch (err) { + console.warn(`[Email classifier] LLM call failed for thread ${snapshot.threadId}:`, err); + return { importance: 'important' }; + } +} diff --git a/apps/x/packages/core/src/knowledge/sync_gmail.ts b/apps/x/packages/core/src/knowledge/sync_gmail.ts index 3125c2d3..6113de4b 100644 --- a/apps/x/packages/core/src/knowledge/sync_gmail.ts +++ b/apps/x/packages/core/src/knowledge/sync_gmail.ts @@ -8,6 +8,7 @@ import { GoogleClientFactory } from './google-client-factory.js'; import { serviceLogger, type ServiceRunContext } from '../services/service_logger.js'; import { limitEventItems } from './limit_event_items.js'; import { createEvent } from '../events/producer.js'; +import { classifyThread, getUserEmail } from './classify_thread.js'; // Configuration const SYNC_DIR = path.join(WorkDir, 'gmail_sync'); @@ -80,6 +81,7 @@ export interface GmailThreadSnapshot { latest_email?: string; past_summary?: string; unread?: boolean; + importance?: 'important' | 'other'; messages: Array<{ id?: string; from?: string; @@ -398,6 +400,15 @@ export async function fetchThreadSnapshot(threadId: string, expectedHistoryId?: messages: parsed, }; + try { + const userEmail = await getUserEmail(auth); + const classification = await classifyThread(snapshot, userEmail); + snapshot.importance = classification.importance; + if (classification.summary) snapshot.summary = classification.summary; + } catch (err) { + console.warn(`[Gmail] classify failed for ${threadId}:`, err); + } + if (res.data.historyId) { writeCachedSnapshot(threadId, res.data.historyId, snapshot); } diff --git a/apps/x/packages/shared/src/blocks.ts b/apps/x/packages/shared/src/blocks.ts index f4c79b20..c7b48011 100644 --- a/apps/x/packages/shared/src/blocks.ts +++ b/apps/x/packages/shared/src/blocks.ts @@ -121,6 +121,7 @@ export const GmailThreadSchema = EmailBlockSchema.extend({ threadId: z.string(), threadUrl: z.string().url(), unread: z.boolean().optional(), + importance: z.enum(['important', 'other']).optional(), messages: z.array(GmailThreadMessageSchema), });