From e9a073e2e4668274c4395f8744afe207b3ba9b40 Mon Sep 17 00:00:00 2001 From: Ramnique Singh <30795890+ramnique@users.noreply.github.com> Date: Tue, 14 Apr 2026 00:59:02 +0530 Subject: [PATCH] cal sync digest --- .../core/src/knowledge/sync_calendar.ts | 138 ++++++++++++++++++ 1 file changed, 138 insertions(+) diff --git a/apps/x/packages/core/src/knowledge/sync_calendar.ts b/apps/x/packages/core/src/knowledge/sync_calendar.ts index c6a10f8e..b6258975 100644 --- a/apps/x/packages/core/src/knowledge/sync_calendar.ts +++ b/apps/x/packages/core/src/knowledge/sync_calendar.ts @@ -9,6 +9,130 @@ import { serviceLogger, type ServiceRunContext } from '../services/service_logge import { limitEventItems } from './limit_event_items.js'; import { executeAction, useComposioForGoogleCalendar } from '../composio/client.js'; import { composioAccountsRepo } from '../composio/repo.js'; +import { createEvent } from './track/events.js'; + +const MAX_EVENTS_IN_DIGEST = 50; +const MAX_DESCRIPTION_CHARS = 500; + +type AnyEvent = Record | cal.Schema$Event; + +function getStr(obj: unknown, key: string): string | undefined { + if (obj && typeof obj === 'object' && key in obj) { + const v = (obj as Record)[key]; + return typeof v === 'string' ? v : undefined; + } + return undefined; +} + +function formatEventTime(event: AnyEvent): string { + const start = (event as Record).start as Record | undefined; + const end = (event as Record).end as Record | undefined; + const startStr = getStr(start, 'dateTime') ?? getStr(start, 'date') ?? 'unknown'; + const endStr = getStr(end, 'dateTime') ?? getStr(end, 'date') ?? 'unknown'; + return `${startStr} → ${endStr}`; +} + +function formatEventBlock(event: AnyEvent, label: 'NEW' | 'UPDATED'): string { + const id = getStr(event, 'id') ?? '(unknown id)'; + const title = getStr(event, 'summary') ?? '(no title)'; + const time = formatEventTime(event); + const organizer = getStr((event as Record).organizer, 'email') ?? 'unknown'; + const location = getStr(event, 'location') ?? ''; + const rawDescription = getStr(event, 'description') ?? ''; + const description = rawDescription.length > MAX_DESCRIPTION_CHARS + ? rawDescription.slice(0, MAX_DESCRIPTION_CHARS) + '…(truncated)' + : rawDescription; + + const attendeesRaw = (event as Record).attendees; + let attendeesLine = ''; + if (Array.isArray(attendeesRaw) && attendeesRaw.length > 0) { + const emails = attendeesRaw + .map(a => getStr(a, 'email')) + .filter((e): e is string => !!e); + if (emails.length > 0) { + attendeesLine = `**Attendees:** ${emails.join(', ')}\n`; + } + } + + return [ + `### [${label}] ${title}`, + `**ID:** ${id}`, + `**Time:** ${time}`, + `**Organizer:** ${organizer}`, + location ? `**Location:** ${location}` : '', + attendeesLine.trimEnd(), + description ? `\n${description}` : '', + ].filter(Boolean).join('\n'); +} + +function summarizeCalendarSync( + newEvents: AnyEvent[], + updatedEvents: AnyEvent[], + deletedEventIds: string[], +): string { + const totalChanges = newEvents.length + updatedEvents.length + deletedEventIds.length; + const lines: string[] = [ + `# Calendar sync update`, + ``, + `${newEvents.length} new, ${updatedEvents.length} updated, ${deletedEventIds.length} deleted.`, + ``, + ]; + + const allChanges: Array<{ event: AnyEvent; label: 'NEW' | 'UPDATED' }> = [ + ...newEvents.map(e => ({ event: e, label: 'NEW' as const })), + ...updatedEvents.map(e => ({ event: e, label: 'UPDATED' as const })), + ]; + + const shown = allChanges.slice(0, MAX_EVENTS_IN_DIGEST); + const hidden = allChanges.length - shown.length; + + if (shown.length > 0) { + lines.push(`## Changed events`, ``); + for (const { event, label } of shown) { + lines.push(formatEventBlock(event, label), ``); + } + if (hidden > 0) { + lines.push(`_…and ${hidden} more change(s) omitted from digest._`, ``); + } + } + + if (deletedEventIds.length > 0) { + lines.push(`## Deleted event IDs`, ``); + for (const id of deletedEventIds.slice(0, MAX_EVENTS_IN_DIGEST)) { + lines.push(`- ${id}`); + } + if (deletedEventIds.length > MAX_EVENTS_IN_DIGEST) { + lines.push(`- _…and ${deletedEventIds.length - MAX_EVENTS_IN_DIGEST} more_`); + } + lines.push(``); + } + + if (totalChanges === 0) { + lines.push(`(no changes — should not be emitted)`); + } + + return lines.join('\n'); +} + +async function publishCalendarSyncEvent( + newEvents: AnyEvent[], + updatedEvents: AnyEvent[], + deletedEventIds: string[], +): Promise { + if (newEvents.length === 0 && updatedEvents.length === 0 && deletedEventIds.length === 0) { + return; + } + try { + await createEvent({ + source: 'calendar', + type: 'calendar.synced', + createdAt: new Date().toISOString(), + payload: summarizeCalendarSync(newEvents, updatedEvents, deletedEventIds), + }); + } catch (err) { + console.error('[Calendar] Failed to publish sync event:', err); + } +} // Configuration const SYNC_DIR = path.join(WorkDir, 'calendar_sync'); @@ -194,6 +318,8 @@ async function syncCalendarWindow(auth: OAuth2Client, syncDir: string, lookbackD let deletedCount = 0; let attachmentCount = 0; const changedTitles: string[] = []; + const newEvents: AnyEvent[] = []; + const updatedEvents: AnyEvent[] = []; const ensureRun = async () => { if (!runId) { @@ -234,8 +360,10 @@ async function syncCalendarWindow(auth: OAuth2Client, syncDir: string, lookbackD changedTitles.push(result.title); if (result.isNew) { newCount++; + newEvents.push(event); } else { updatedCount++; + updatedEvents.push(event); } } @@ -253,6 +381,9 @@ async function syncCalendarWindow(auth: OAuth2Client, syncDir: string, lookbackD deletedCount = deletedFiles.length; } + // Publish a single bundled event capturing all changes from this sync. + await publishCalendarSyncEvent(newEvents, updatedEvents, deletedFiles); + if (runId) { const totalChanges = newCount + updatedCount + deletedCount + attachmentCount; const limitedTitles = limitEventItems(changedTitles); @@ -438,6 +569,8 @@ async function performSyncComposio() { let newCount = 0; let updatedCount = 0; const changedTitles: string[] = []; + const newEvents: AnyEvent[] = []; + const updatedEvents: AnyEvent[] = []; let pageToken: string | null = null; const MAX_PAGES = 20; @@ -508,8 +641,10 @@ async function performSyncComposio() { changedTitles.push(saveResult.title); if (saveResult.isNew) { newCount++; + newEvents.push(event); } else { updatedCount++; + updatedEvents.push(event); } } } @@ -534,6 +669,9 @@ async function performSyncComposio() { deletedCount = deletedFiles.length; } + // Publish a single bundled event capturing all changes from this sync. + await publishCalendarSyncEvent(newEvents, updatedEvents, deletedFiles); + // Log results if any changes were detected (run was started by ensureRun) if (run) { const r = run as ServiceRunContext;