mirror of
https://github.com/rowboatlabs/rowboat.git
synced 2026-06-15 20:05:16 +02:00
email processing agent
This commit is contained in:
parent
375d8bf2e0
commit
67b1fe97f7
4 changed files with 131 additions and 2 deletions
|
|
@ -562,8 +562,10 @@ export function EmailView() {
|
||||||
const important: GmailThread[] = []
|
const important: GmailThread[] = []
|
||||||
const other: GmailThread[] = []
|
const other: GmailThread[] = []
|
||||||
for (const thread of filteredThreads) {
|
for (const thread of filteredThreads) {
|
||||||
if (thread.messages.length > 1) important.push(thread)
|
// Default unclassified threads to Important so we don't hide anything
|
||||||
else other.push(thread)
|
// before the classifier has run on them.
|
||||||
|
if (thread.importance === 'other') other.push(thread)
|
||||||
|
else important.push(thread)
|
||||||
}
|
}
|
||||||
return { importantThreads: important, otherThreads: other }
|
return { importantThreads: important, otherThreads: other }
|
||||||
}, [filteredThreads])
|
}, [filteredThreads])
|
||||||
|
|
|
||||||
115
apps/x/packages/core/src/knowledge/classify_thread.ts
Normal file
115
apps/x/packages/core/src/knowledge/classify_thread.ts
Normal file
|
|
@ -0,0 +1,115 @@
|
||||||
|
import { z } from 'zod';
|
||||||
|
import { generateObject } from 'ai';
|
||||||
|
import { google } from 'googleapis';
|
||||||
|
import type { OAuth2Client } from 'google-auth-library';
|
||||||
|
import { createProvider } from '../models/models.js';
|
||||||
|
import {
|
||||||
|
getDefaultModelAndProvider,
|
||||||
|
getKgModel,
|
||||||
|
resolveProviderConfig,
|
||||||
|
} from '../models/defaults.js';
|
||||||
|
import { captureLlmUsage } from '../analytics/usage.js';
|
||||||
|
import type { GmailThreadSnapshot } from './sync_gmail.js';
|
||||||
|
|
||||||
|
let cachedUserEmail: string | null = null;
|
||||||
|
|
||||||
|
export async function getUserEmail(auth: OAuth2Client): Promise<string | null> {
|
||||||
|
if (cachedUserEmail) return cachedUserEmail;
|
||||||
|
try {
|
||||||
|
const gmailClient = google.gmail({ version: 'v1', auth });
|
||||||
|
const res = await gmailClient.users.getProfile({ userId: 'me' });
|
||||||
|
if (res.data.emailAddress) {
|
||||||
|
cachedUserEmail = res.data.emailAddress.toLowerCase();
|
||||||
|
return cachedUserEmail;
|
||||||
|
}
|
||||||
|
} catch (err) {
|
||||||
|
console.warn('[Email classifier] getProfile failed:', err);
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface Classification {
|
||||||
|
importance: 'important' | 'other';
|
||||||
|
summary?: string;
|
||||||
|
}
|
||||||
|
|
||||||
|
const ClassificationSchema = z.object({
|
||||||
|
importance: z.enum(['important', 'other']).describe('important = real correspondence, action-required, or content worth referencing later. other = newsletters, marketing, automated notifications, transactional receipts, cold outreach.'),
|
||||||
|
summary: z.string().optional().describe('One or two sentences capturing what the thread is about and any implied action. Required when importance is important. Omit when other.'),
|
||||||
|
});
|
||||||
|
|
||||||
|
const SYSTEM_PROMPT = `You classify a Gmail thread for a personal inbox view.
|
||||||
|
|
||||||
|
Decide if the thread is "important" or "other":
|
||||||
|
|
||||||
|
- important: real human correspondence the user is part of (customer, investor, team, vendor, candidate); a time-sensitive notification; a message that needs a response from the user; anything worth referencing later (contracts, pricing, deadlines, decisions).
|
||||||
|
- other: newsletters, industry digests, marketing or promotional, product tips from vendors, automated notifications (verifications, recording uploads, platform policy updates), transactional confirmations (payment receipts, GST/tax filings, salary disbursements), unsolicited cold outreach.
|
||||||
|
|
||||||
|
When the thread is important, write a 1-2 sentence summary that captures the gist and any action implied (e.g. "Customer requesting a demo next Tuesday; needs a calendar link." or "Investor following up on Q3 metrics; reply expected."). Omit the summary when the thread is "other".
|
||||||
|
|
||||||
|
Be decisive — pick exactly one label. Do not hedge.`;
|
||||||
|
|
||||||
|
function userReplied(snapshot: GmailThreadSnapshot, userEmail: string | null): boolean {
|
||||||
|
if (!userEmail) return false;
|
||||||
|
const needle = userEmail.toLowerCase();
|
||||||
|
return snapshot.messages.some(m => (m.from || '').toLowerCase().includes(needle));
|
||||||
|
}
|
||||||
|
|
||||||
|
function buildPrompt(snapshot: GmailThreadSnapshot): string {
|
||||||
|
const lines: string[] = [];
|
||||||
|
lines.push(`Subject: ${snapshot.subject || '(no subject)'}`);
|
||||||
|
lines.push(`Message count: ${snapshot.messages.length}`);
|
||||||
|
lines.push('');
|
||||||
|
const latest = snapshot.messages[snapshot.messages.length - 1];
|
||||||
|
if (latest) {
|
||||||
|
lines.push(`Latest message:`);
|
||||||
|
lines.push(` From: ${latest.from || 'unknown'}`);
|
||||||
|
if (latest.to) lines.push(` To: ${latest.to}`);
|
||||||
|
if (latest.date) lines.push(` Date: ${latest.date}`);
|
||||||
|
lines.push('');
|
||||||
|
const snippet = (latest.body || '').replace(/\s+/g, ' ').slice(0, 1200).trim();
|
||||||
|
lines.push(` Body:`);
|
||||||
|
lines.push(` ${snippet}`);
|
||||||
|
}
|
||||||
|
return lines.join('\n');
|
||||||
|
}
|
||||||
|
|
||||||
|
export async function classifyThread(
|
||||||
|
snapshot: GmailThreadSnapshot,
|
||||||
|
userEmail: string | null,
|
||||||
|
): Promise<Classification> {
|
||||||
|
if (userReplied(snapshot, userEmail)) {
|
||||||
|
return { importance: 'important' };
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
const modelId = await getKgModel();
|
||||||
|
const { provider } = await getDefaultModelAndProvider();
|
||||||
|
const config = await resolveProviderConfig(provider);
|
||||||
|
const model = createProvider(config).languageModel(modelId);
|
||||||
|
|
||||||
|
const result = await generateObject({
|
||||||
|
model,
|
||||||
|
system: SYSTEM_PROMPT,
|
||||||
|
prompt: buildPrompt(snapshot),
|
||||||
|
schema: ClassificationSchema,
|
||||||
|
});
|
||||||
|
|
||||||
|
captureLlmUsage({
|
||||||
|
useCase: 'knowledge_sync',
|
||||||
|
subUseCase: 'email_classifier',
|
||||||
|
model: modelId,
|
||||||
|
provider,
|
||||||
|
usage: result.usage,
|
||||||
|
});
|
||||||
|
|
||||||
|
const out: Classification = { importance: result.object.importance };
|
||||||
|
if (result.object.importance === 'important' && result.object.summary) {
|
||||||
|
out.summary = result.object.summary;
|
||||||
|
}
|
||||||
|
return out;
|
||||||
|
} catch (err) {
|
||||||
|
console.warn(`[Email classifier] LLM call failed for thread ${snapshot.threadId}:`, err);
|
||||||
|
return { importance: 'important' };
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -8,6 +8,7 @@ import { GoogleClientFactory } from './google-client-factory.js';
|
||||||
import { serviceLogger, type ServiceRunContext } from '../services/service_logger.js';
|
import { serviceLogger, type ServiceRunContext } from '../services/service_logger.js';
|
||||||
import { limitEventItems } from './limit_event_items.js';
|
import { limitEventItems } from './limit_event_items.js';
|
||||||
import { createEvent } from '../events/producer.js';
|
import { createEvent } from '../events/producer.js';
|
||||||
|
import { classifyThread, getUserEmail } from './classify_thread.js';
|
||||||
|
|
||||||
// Configuration
|
// Configuration
|
||||||
const SYNC_DIR = path.join(WorkDir, 'gmail_sync');
|
const SYNC_DIR = path.join(WorkDir, 'gmail_sync');
|
||||||
|
|
@ -80,6 +81,7 @@ export interface GmailThreadSnapshot {
|
||||||
latest_email?: string;
|
latest_email?: string;
|
||||||
past_summary?: string;
|
past_summary?: string;
|
||||||
unread?: boolean;
|
unread?: boolean;
|
||||||
|
importance?: 'important' | 'other';
|
||||||
messages: Array<{
|
messages: Array<{
|
||||||
id?: string;
|
id?: string;
|
||||||
from?: string;
|
from?: string;
|
||||||
|
|
@ -398,6 +400,15 @@ export async function fetchThreadSnapshot(threadId: string, expectedHistoryId?:
|
||||||
messages: parsed,
|
messages: parsed,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
try {
|
||||||
|
const userEmail = await getUserEmail(auth);
|
||||||
|
const classification = await classifyThread(snapshot, userEmail);
|
||||||
|
snapshot.importance = classification.importance;
|
||||||
|
if (classification.summary) snapshot.summary = classification.summary;
|
||||||
|
} catch (err) {
|
||||||
|
console.warn(`[Gmail] classify failed for ${threadId}:`, err);
|
||||||
|
}
|
||||||
|
|
||||||
if (res.data.historyId) {
|
if (res.data.historyId) {
|
||||||
writeCachedSnapshot(threadId, res.data.historyId, snapshot);
|
writeCachedSnapshot(threadId, res.data.historyId, snapshot);
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -121,6 +121,7 @@ export const GmailThreadSchema = EmailBlockSchema.extend({
|
||||||
threadId: z.string(),
|
threadId: z.string(),
|
||||||
threadUrl: z.string().url(),
|
threadUrl: z.string().url(),
|
||||||
unread: z.boolean().optional(),
|
unread: z.boolean().optional(),
|
||||||
|
importance: z.enum(['important', 'other']).optional(),
|
||||||
messages: z.array(GmailThreadMessageSchema),
|
messages: z.array(GmailThreadMessageSchema),
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue