convert Today.md track blocks to event-driven and batch Gmail sync events

Removes polling schedules from the up-next and calendar track blocks on
Today.md so they refresh only on calendar.synced events, and rewrites
the emails track instruction to consume a multi-thread digest payload.
Batches Gmail sync so one email.synced event covers a whole sync run
(capped at 10 threads per digest) instead of one event per thread,
which collapses Pass 1 routing calls for multi-thread syncs.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Ramnique Singh 2026-04-24 11:15:24 +05:30
parent 0bb256879c
commit bdf270b7a1
2 changed files with 96 additions and 55 deletions

View file

@ -21,14 +21,14 @@ const SECTIONS: Section[] = [
instruction: instruction:
`Write 1-3 sentences of plain markdown giving the user a shoulder-tap about what's next on their calendar today. `Write 1-3 sentences of plain markdown giving the user a shoulder-tap about what's next on their calendar today.
Data: read today's events from calendar_sync/ (workspace-readdir, then workspace-readFile each .json file). Filter to events whose start datetime is today and hasn't started yet. This section refreshes on calendar changes, not on a clock tick do NOT promise live minute countdowns. Frame urgency in buckets based on the event's start time relative to now:
- Start time is in the past or within roughly half an hour imminent: name the meeting and say it's starting soon (e.g. "Standup is starting — join link in the Calendar section below.").
- Start time is later this morning or this afternoon upcoming: name the meeting and roughly when (e.g. "Design review later this morning." / "1:1 with Sam this afternoon.").
- Start time is several hours out or nothing before then focus block: frame the gap (e.g. "Next up is the all-hands at 3pm — good long focus block until then.").
Lead based on how soon the next event is: Use the event's start time of day ("at 3pm", "this afternoon") rather than a countdown ("in 40 minutes"). Countdowns go stale between syncs.
- Under 15 minutes urgent ("Standup starts in 10 minutes — join link in the Calendar section below.")
- Under 2 hours lead with the event ("Design review in 40 minutes.")
- 2+ hours frame the gap as focus time ("Next up is standup at noon — you've got a solid 3-hour focus block.")
Always compute minutes-to-start against the actual current local time never say "nothing in the next X hours" if an event is in that window. Data: read today's events from calendar_sync/ (workspace-readdir, then workspace-readFile each .json file). Filter to events whose start datetime is today and hasn't ended yet for finding the next event, pick the earliest upcoming one; if all have passed, treat as clear.
If you find quick context in knowledge/ that's genuinely useful, add one short clause ("Ramnique pushed the OAuth PR yesterday — might come up"). Use workspace-grep / workspace-readFile conservatively; don't stall on deep research. If you find quick context in knowledge/ that's genuinely useful, add one short clause ("Ramnique pushed the OAuth PR yesterday — might come up"). Use workspace-grep / workspace-readFile conservatively; don't stall on deep research.
@ -38,10 +38,6 @@ Plain markdown prose only — no calendar block, no email block, no headings.`,
eventMatchCriteria: eventMatchCriteria:
`Calendar event changes affecting today — new meetings, reschedules, cancellations, meetings starting soon. Skip changes to events on other days.`, `Calendar event changes affecting today — new meetings, reschedules, cancellations, meetings starting soon. Skip changes to events on other days.`,
active: true, active: true,
schedule: {
type: 'cron',
expression: '*/15 * * * *',
},
}, },
}, },
{ {
@ -53,16 +49,14 @@ Plain markdown prose only — no calendar block, no email block, no headings.`,
Data: read calendar_sync/ via workspace-readdir, then workspace-readFile each .json event file. Filter to events occurring today. After 10am local time, drop meetings that have already ended only include meetings that haven't ended yet. Data: read calendar_sync/ via workspace-readdir, then workspace-readFile each .json event file. Filter to events occurring today. After 10am local time, drop meetings that have already ended only include meetings that haven't ended yet.
This section refreshes on calendar changes, not on a clock tick the "drop ended meetings" rule applies on each refresh, so an ended meeting disappears the next time any calendar event changes (not exactly on the clock hour). That's fine.
Always emit the calendar block, even when there are no remaining events (in that case use events: [] and showJoinButton: false). Set showJoinButton: true whenever any event has a conferenceLink. Always emit the calendar block, even when there are no remaining events (in that case use events: [] and showJoinButton: false). Set showJoinButton: true whenever any event has a conferenceLink.
After the block, you MAY add one short markdown line per event giving useful prep context pulled from knowledge/ ("Design review: last week we agreed to revisit the type-picker UX."). Keep it tight one line each, only when meaningful. Skip routine/recurring meetings.`, After the block, you MAY add one short markdown line per event giving useful prep context pulled from knowledge/ ("Design review: last week we agreed to revisit the type-picker UX."). Keep it tight one line each, only when meaningful. Skip routine/recurring meetings.`,
eventMatchCriteria: eventMatchCriteria:
`Calendar event changes affecting today — additions, updates, cancellations, reschedules.`, `Calendar event changes affecting today — additions, updates, cancellations, reschedules.`,
active: true, active: true,
schedule: {
type: 'cron',
expression: '0 * * * *',
},
}, },
}, },
{ {
@ -72,7 +66,7 @@ After the block, you MAY add one short markdown line per event giving useful pre
instruction: instruction:
`Maintain a digest of email threads worth the user's attention today, rendered as zero or more email blocks (one per thread). `Maintain a digest of email threads worth the user's attention today, rendered as zero or more email blocks (one per thread).
Event-driven path (primary): the agent message will include a freshly-synced thread's markdown as the event payload. Decide whether THIS thread warrants surfacing. If it's marketing, an auto-notification, a thread already closed out, or otherwise low-signal, skip the update do NOT call update-track-content. If it's attention-worthy, integrate it into the digest: add a new email block, or update the existing one if the same threadId is already shown. Event-driven path (primary): the agent message will include a "Gmail sync update" digest payload describing one or more freshly-synced threads from a single sync run. The digest lists each thread with its subject, sender, date, threadId, and body. Iterate over every thread in the payload and decide per thread whether it warrants surfacing. Skip marketing, auto-notifications, closed-out threads, and other low-signal mail. For threads that are attention-worthy, integrate them into the existing digest: add a new email block for a new threadId, or update the existing block if the threadId is already shown. If NONE of the threads in the payload are attention-worthy, skip the update do NOT call update-track-content. Emit at most one update-track-content call that covers the full set of changes from this event.
Manual path (fallback): with no event payload, scan gmail_sync/ via workspace-readdir (skip sync_state.json and attachments/). Read threads with workspace-readFile. Prioritize threads whose frontmatter action field is "reply" or "respond", plus other high-signal recent threads. Manual path (fallback): with no event payload, scan gmail_sync/ via workspace-readdir (skip sync_state.json and attachments/). Read threads with workspace-readFile. Prioritize threads whose frontmatter action field is "reply" or "respond", plus other high-signal recent threads.

View file

@ -15,8 +15,52 @@ import { createEvent } from './track/events.js';
const SYNC_DIR = path.join(WorkDir, 'gmail_sync'); const SYNC_DIR = path.join(WorkDir, 'gmail_sync');
const SYNC_INTERVAL_MS = 5 * 60 * 1000; // Check every 5 minutes const SYNC_INTERVAL_MS = 5 * 60 * 1000; // Check every 5 minutes
const REQUIRED_SCOPE = 'https://www.googleapis.com/auth/gmail.readonly'; const REQUIRED_SCOPE = 'https://www.googleapis.com/auth/gmail.readonly';
const MAX_THREADS_IN_DIGEST = 10;
const nhm = new NodeHtmlMarkdown(); const nhm = new NodeHtmlMarkdown();
interface SyncedThread {
threadId: string;
markdown: string;
}
function summarizeGmailSync(threads: SyncedThread[]): string {
const lines: string[] = [
`# Gmail sync update`,
``,
`${threads.length} new/updated thread${threads.length === 1 ? '' : 's'}.`,
``,
];
const shown = threads.slice(0, MAX_THREADS_IN_DIGEST);
const hidden = threads.length - shown.length;
if (shown.length > 0) {
lines.push(`## Threads`, ``);
for (const { markdown } of shown) {
lines.push(markdown.trimEnd(), ``, `---`, ``);
}
if (hidden > 0) {
lines.push(`_…and ${hidden} more thread(s) omitted from digest._`, ``);
}
}
return lines.join('\n');
}
async function publishGmailSyncEvent(threads: SyncedThread[]): Promise<void> {
if (threads.length === 0) return;
try {
await createEvent({
source: 'gmail',
type: 'email.synced',
createdAt: new Date().toISOString(),
payload: summarizeGmailSync(threads),
});
} catch (err) {
console.error('[Gmail] Failed to publish sync event:', err);
}
}
// --- Wake Signal for Immediate Sync Trigger --- // --- Wake Signal for Immediate Sync Trigger ---
let wakeResolve: (() => void) | null = null; let wakeResolve: (() => void) | null = null;
@ -113,14 +157,14 @@ async function saveAttachment(gmail: gmail.Gmail, userId: string, msgId: string,
// --- Sync Logic --- // --- Sync Logic ---
async function processThread(auth: OAuth2Client, threadId: string, syncDir: string, attachmentsDir: string) { async function processThread(auth: OAuth2Client, threadId: string, syncDir: string, attachmentsDir: string): Promise<SyncedThread | null> {
const gmail = google.gmail({ version: 'v1', auth }); const gmail = google.gmail({ version: 'v1', auth });
try { try {
const res = await gmail.users.threads.get({ userId: 'me', id: threadId }); const res = await gmail.users.threads.get({ userId: 'me', id: threadId });
const thread = res.data; const thread = res.data;
const messages = thread.messages; const messages = thread.messages;
if (!messages || messages.length === 0) return; if (!messages || messages.length === 0) return null;
// Subject from first message // Subject from first message
const firstHeader = messages[0].payload?.headers; const firstHeader = messages[0].payload?.headers;
@ -173,15 +217,11 @@ async function processThread(auth: OAuth2Client, threadId: string, syncDir: stri
fs.writeFileSync(path.join(syncDir, `${threadId}.md`), mdContent); fs.writeFileSync(path.join(syncDir, `${threadId}.md`), mdContent);
console.log(`Synced Thread: ${subject} (${threadId})`); console.log(`Synced Thread: ${subject} (${threadId})`);
await createEvent({ return { threadId, markdown: mdContent };
source: 'gmail',
type: 'email.synced',
createdAt: new Date().toISOString(),
payload: mdContent,
});
} catch (error) { } catch (error) {
console.error(`Error processing thread ${threadId}:`, error); console.error(`Error processing thread ${threadId}:`, error);
return null;
} }
} }
@ -262,10 +302,14 @@ async function fullSync(auth: OAuth2Client, syncDir: string, attachmentsDir: str
truncated: limitedThreads.truncated, truncated: limitedThreads.truncated,
}); });
const synced: SyncedThread[] = [];
for (const threadId of threadIds) { for (const threadId of threadIds) {
await processThread(auth, threadId, syncDir, attachmentsDir); const result = await processThread(auth, threadId, syncDir, attachmentsDir);
if (result) synced.push(result);
} }
await publishGmailSyncEvent(synced);
saveState(currentHistoryId, stateFile); saveState(currentHistoryId, stateFile);
await serviceLogger.log({ await serviceLogger.log({
type: 'run_complete', type: 'run_complete',
@ -365,10 +409,14 @@ async function partialSync(auth: OAuth2Client, startHistoryId: string, syncDir:
truncated: limitedThreads.truncated, truncated: limitedThreads.truncated,
}); });
const synced: SyncedThread[] = [];
for (const tid of threadIdList) { for (const tid of threadIdList) {
await processThread(auth, tid, syncDir, attachmentsDir); const result = await processThread(auth, tid, syncDir, attachmentsDir);
if (result) synced.push(result);
} }
await publishGmailSyncEvent(synced);
const profile = await gmail.users.getProfile({ userId: 'me' }); const profile = await gmail.users.getProfile({ userId: 'me' });
saveState(profile.data.historyId!, stateFile); saveState(profile.data.historyId!, stateFile);
await serviceLogger.log({ await serviceLogger.log({
@ -565,7 +613,12 @@ function extractBodyFromPayload(payload: Record<string, unknown>): string {
return ''; return '';
} }
async function processThreadComposio(connectedAccountId: string, threadId: string, syncDir: string): Promise<string | null> { interface ComposioThreadResult {
synced: SyncedThread | null;
newestIsoPlusOne: string | null;
}
async function processThreadComposio(connectedAccountId: string, threadId: string, syncDir: string): Promise<ComposioThreadResult> {
let threadResult; let threadResult;
try { try {
threadResult = await executeAction( threadResult = await executeAction(
@ -579,40 +632,34 @@ async function processThreadComposio(connectedAccountId: string, threadId: strin
); );
} catch (error) { } catch (error) {
console.warn(`[Gmail] Skipping thread ${threadId} (fetch failed):`, error instanceof Error ? error.message : error); console.warn(`[Gmail] Skipping thread ${threadId} (fetch failed):`, error instanceof Error ? error.message : error);
return null; return { synced: null, newestIsoPlusOne: null };
} }
if (!threadResult.successful || !threadResult.data) { if (!threadResult.successful || !threadResult.data) {
console.error(`[Gmail] Failed to fetch thread ${threadId}:`, threadResult.error); console.error(`[Gmail] Failed to fetch thread ${threadId}:`, threadResult.error);
return null; return { synced: null, newestIsoPlusOne: null };
} }
const data = threadResult.data as Record<string, unknown>; const data = threadResult.data as Record<string, unknown>;
const messages = data.messages as Array<Record<string, unknown>> | undefined; const messages = data.messages as Array<Record<string, unknown>> | undefined;
let newestDate: Date | null = null; let newestDate: Date | null = null;
let mdContent: string;
let subjectForLog: string;
if (!messages || messages.length === 0) { if (!messages || messages.length === 0) {
const parsed = parseMessageData(data); const parsed = parseMessageData(data);
const mdContent = `# ${parsed.subject}\n\n` + mdContent = `# ${parsed.subject}\n\n` +
`**Thread ID:** ${threadId}\n` + `**Thread ID:** ${threadId}\n` +
`**Message Count:** 1\n\n---\n\n` + `**Message Count:** 1\n\n---\n\n` +
`### From: ${parsed.from}\n` + `### From: ${parsed.from}\n` +
`**Date:** ${parsed.date}\n\n` + `**Date:** ${parsed.date}\n\n` +
`${parsed.body}\n\n---\n\n`; `${parsed.body}\n\n---\n\n`;
subjectForLog = parsed.subject;
fs.writeFileSync(path.join(syncDir, `${cleanFilename(threadId)}.md`), mdContent);
console.log(`[Gmail] Synced Thread: ${parsed.subject} (${threadId})`);
await createEvent({
source: 'gmail',
type: 'email.synced',
createdAt: new Date().toISOString(),
payload: mdContent,
});
newestDate = tryParseDate(parsed.date); newestDate = tryParseDate(parsed.date);
} else { } else {
const firstParsed = parseMessageData(messages[0]); const firstParsed = parseMessageData(messages[0]);
let mdContent = `# ${firstParsed.subject}\n\n`; mdContent = `# ${firstParsed.subject}\n\n`;
mdContent += `**Thread ID:** ${threadId}\n`; mdContent += `**Thread ID:** ${threadId}\n`;
mdContent += `**Message Count:** ${messages.length}\n\n---\n\n`; mdContent += `**Message Count:** ${messages.length}\n\n---\n\n`;
@ -628,19 +675,14 @@ async function processThreadComposio(connectedAccountId: string, threadId: strin
newestDate = msgDate; newestDate = msgDate;
} }
} }
subjectForLog = firstParsed.subject;
fs.writeFileSync(path.join(syncDir, `${cleanFilename(threadId)}.md`), mdContent);
console.log(`[Gmail] Synced Thread: ${firstParsed.subject} (${threadId})`);
await createEvent({
source: 'gmail',
type: 'email.synced',
createdAt: new Date().toISOString(),
payload: mdContent,
});
} }
if (!newestDate) return null; fs.writeFileSync(path.join(syncDir, `${cleanFilename(threadId)}.md`), mdContent);
return new Date(newestDate.getTime() + 1000).toISOString(); console.log(`[Gmail] Synced Thread: ${subjectForLog} (${threadId})`);
const newestIsoPlusOne = newestDate ? new Date(newestDate.getTime() + 1000).toISOString() : null;
return { synced: { threadId, markdown: mdContent }, newestIsoPlusOne };
} }
async function performSyncComposio() { async function performSyncComposio() {
@ -751,19 +793,22 @@ async function performSyncComposio() {
let highWaterMark: string | null = state?.last_sync ?? null; let highWaterMark: string | null = state?.last_sync ?? null;
let processedCount = 0; let processedCount = 0;
const synced: SyncedThread[] = [];
for (const threadId of allThreadIds) { for (const threadId of allThreadIds) {
// Re-check connection in case user disconnected mid-sync // Re-check connection in case user disconnected mid-sync
if (!composioAccountsRepo.isConnected('gmail')) { if (!composioAccountsRepo.isConnected('gmail')) {
console.log('[Gmail] Account disconnected during sync. Stopping.'); console.log('[Gmail] Account disconnected during sync. Stopping.');
return; break;
} }
try { try {
const newestInThread = await processThreadComposio(connectedAccountId, threadId, SYNC_DIR); const result = await processThreadComposio(connectedAccountId, threadId, SYNC_DIR);
processedCount++; processedCount++;
if (newestInThread) { if (result.synced) synced.push(result.synced);
if (!highWaterMark || new Date(newestInThread) > new Date(highWaterMark)) {
highWaterMark = newestInThread; if (result.newestIsoPlusOne) {
if (!highWaterMark || new Date(result.newestIsoPlusOne) > new Date(highWaterMark)) {
highWaterMark = result.newestIsoPlusOne;
} }
saveComposioState(STATE_FILE, highWaterMark); saveComposioState(STATE_FILE, highWaterMark);
} }
@ -772,6 +817,8 @@ async function performSyncComposio() {
} }
} }
await publishGmailSyncEvent(synced);
await serviceLogger.log({ await serviceLogger.log({
type: 'run_complete', type: 'run_complete',
service: run!.service, service: run!.service,