mirror of
https://github.com/rowboatlabs/rowboat.git
synced 2026-05-01 03:16:29 +02:00
cal sync digest
This commit is contained in:
parent
570a315b36
commit
e9a073e2e4
1 changed files with 138 additions and 0 deletions
|
|
@ -9,6 +9,130 @@ import { serviceLogger, type ServiceRunContext } from '../services/service_logge
|
|||
import { limitEventItems } from './limit_event_items.js';
|
||||
import { executeAction, useComposioForGoogleCalendar } from '../composio/client.js';
|
||||
import { composioAccountsRepo } from '../composio/repo.js';
|
||||
import { createEvent } from './track/events.js';
|
||||
|
||||
const MAX_EVENTS_IN_DIGEST = 50;
|
||||
const MAX_DESCRIPTION_CHARS = 500;
|
||||
|
||||
type AnyEvent = Record<string, unknown> | cal.Schema$Event;
|
||||
|
||||
function getStr(obj: unknown, key: string): string | undefined {
|
||||
if (obj && typeof obj === 'object' && key in obj) {
|
||||
const v = (obj as Record<string, unknown>)[key];
|
||||
return typeof v === 'string' ? v : undefined;
|
||||
}
|
||||
return undefined;
|
||||
}
|
||||
|
||||
function formatEventTime(event: AnyEvent): string {
|
||||
const start = (event as Record<string, unknown>).start as Record<string, unknown> | undefined;
|
||||
const end = (event as Record<string, unknown>).end as Record<string, unknown> | undefined;
|
||||
const startStr = getStr(start, 'dateTime') ?? getStr(start, 'date') ?? 'unknown';
|
||||
const endStr = getStr(end, 'dateTime') ?? getStr(end, 'date') ?? 'unknown';
|
||||
return `${startStr} → ${endStr}`;
|
||||
}
|
||||
|
||||
function formatEventBlock(event: AnyEvent, label: 'NEW' | 'UPDATED'): string {
|
||||
const id = getStr(event, 'id') ?? '(unknown id)';
|
||||
const title = getStr(event, 'summary') ?? '(no title)';
|
||||
const time = formatEventTime(event);
|
||||
const organizer = getStr((event as Record<string, unknown>).organizer, 'email') ?? 'unknown';
|
||||
const location = getStr(event, 'location') ?? '';
|
||||
const rawDescription = getStr(event, 'description') ?? '';
|
||||
const description = rawDescription.length > MAX_DESCRIPTION_CHARS
|
||||
? rawDescription.slice(0, MAX_DESCRIPTION_CHARS) + '…(truncated)'
|
||||
: rawDescription;
|
||||
|
||||
const attendeesRaw = (event as Record<string, unknown>).attendees;
|
||||
let attendeesLine = '';
|
||||
if (Array.isArray(attendeesRaw) && attendeesRaw.length > 0) {
|
||||
const emails = attendeesRaw
|
||||
.map(a => getStr(a, 'email'))
|
||||
.filter((e): e is string => !!e);
|
||||
if (emails.length > 0) {
|
||||
attendeesLine = `**Attendees:** ${emails.join(', ')}\n`;
|
||||
}
|
||||
}
|
||||
|
||||
return [
|
||||
`### [${label}] ${title}`,
|
||||
`**ID:** ${id}`,
|
||||
`**Time:** ${time}`,
|
||||
`**Organizer:** ${organizer}`,
|
||||
location ? `**Location:** ${location}` : '',
|
||||
attendeesLine.trimEnd(),
|
||||
description ? `\n${description}` : '',
|
||||
].filter(Boolean).join('\n');
|
||||
}
|
||||
|
||||
function summarizeCalendarSync(
|
||||
newEvents: AnyEvent[],
|
||||
updatedEvents: AnyEvent[],
|
||||
deletedEventIds: string[],
|
||||
): string {
|
||||
const totalChanges = newEvents.length + updatedEvents.length + deletedEventIds.length;
|
||||
const lines: string[] = [
|
||||
`# Calendar sync update`,
|
||||
``,
|
||||
`${newEvents.length} new, ${updatedEvents.length} updated, ${deletedEventIds.length} deleted.`,
|
||||
``,
|
||||
];
|
||||
|
||||
const allChanges: Array<{ event: AnyEvent; label: 'NEW' | 'UPDATED' }> = [
|
||||
...newEvents.map(e => ({ event: e, label: 'NEW' as const })),
|
||||
...updatedEvents.map(e => ({ event: e, label: 'UPDATED' as const })),
|
||||
];
|
||||
|
||||
const shown = allChanges.slice(0, MAX_EVENTS_IN_DIGEST);
|
||||
const hidden = allChanges.length - shown.length;
|
||||
|
||||
if (shown.length > 0) {
|
||||
lines.push(`## Changed events`, ``);
|
||||
for (const { event, label } of shown) {
|
||||
lines.push(formatEventBlock(event, label), ``);
|
||||
}
|
||||
if (hidden > 0) {
|
||||
lines.push(`_…and ${hidden} more change(s) omitted from digest._`, ``);
|
||||
}
|
||||
}
|
||||
|
||||
if (deletedEventIds.length > 0) {
|
||||
lines.push(`## Deleted event IDs`, ``);
|
||||
for (const id of deletedEventIds.slice(0, MAX_EVENTS_IN_DIGEST)) {
|
||||
lines.push(`- ${id}`);
|
||||
}
|
||||
if (deletedEventIds.length > MAX_EVENTS_IN_DIGEST) {
|
||||
lines.push(`- _…and ${deletedEventIds.length - MAX_EVENTS_IN_DIGEST} more_`);
|
||||
}
|
||||
lines.push(``);
|
||||
}
|
||||
|
||||
if (totalChanges === 0) {
|
||||
lines.push(`(no changes — should not be emitted)`);
|
||||
}
|
||||
|
||||
return lines.join('\n');
|
||||
}
|
||||
|
||||
async function publishCalendarSyncEvent(
|
||||
newEvents: AnyEvent[],
|
||||
updatedEvents: AnyEvent[],
|
||||
deletedEventIds: string[],
|
||||
): Promise<void> {
|
||||
if (newEvents.length === 0 && updatedEvents.length === 0 && deletedEventIds.length === 0) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
await createEvent({
|
||||
source: 'calendar',
|
||||
type: 'calendar.synced',
|
||||
createdAt: new Date().toISOString(),
|
||||
payload: summarizeCalendarSync(newEvents, updatedEvents, deletedEventIds),
|
||||
});
|
||||
} catch (err) {
|
||||
console.error('[Calendar] Failed to publish sync event:', err);
|
||||
}
|
||||
}
|
||||
|
||||
// Configuration
|
||||
const SYNC_DIR = path.join(WorkDir, 'calendar_sync');
|
||||
|
|
@ -194,6 +318,8 @@ async function syncCalendarWindow(auth: OAuth2Client, syncDir: string, lookbackD
|
|||
let deletedCount = 0;
|
||||
let attachmentCount = 0;
|
||||
const changedTitles: string[] = [];
|
||||
const newEvents: AnyEvent[] = [];
|
||||
const updatedEvents: AnyEvent[] = [];
|
||||
|
||||
const ensureRun = async () => {
|
||||
if (!runId) {
|
||||
|
|
@ -234,8 +360,10 @@ async function syncCalendarWindow(auth: OAuth2Client, syncDir: string, lookbackD
|
|||
changedTitles.push(result.title);
|
||||
if (result.isNew) {
|
||||
newCount++;
|
||||
newEvents.push(event);
|
||||
} else {
|
||||
updatedCount++;
|
||||
updatedEvents.push(event);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -253,6 +381,9 @@ async function syncCalendarWindow(auth: OAuth2Client, syncDir: string, lookbackD
|
|||
deletedCount = deletedFiles.length;
|
||||
}
|
||||
|
||||
// Publish a single bundled event capturing all changes from this sync.
|
||||
await publishCalendarSyncEvent(newEvents, updatedEvents, deletedFiles);
|
||||
|
||||
if (runId) {
|
||||
const totalChanges = newCount + updatedCount + deletedCount + attachmentCount;
|
||||
const limitedTitles = limitEventItems(changedTitles);
|
||||
|
|
@ -438,6 +569,8 @@ async function performSyncComposio() {
|
|||
let newCount = 0;
|
||||
let updatedCount = 0;
|
||||
const changedTitles: string[] = [];
|
||||
const newEvents: AnyEvent[] = [];
|
||||
const updatedEvents: AnyEvent[] = [];
|
||||
let pageToken: string | null = null;
|
||||
const MAX_PAGES = 20;
|
||||
|
||||
|
|
@ -508,8 +641,10 @@ async function performSyncComposio() {
|
|||
changedTitles.push(saveResult.title);
|
||||
if (saveResult.isNew) {
|
||||
newCount++;
|
||||
newEvents.push(event);
|
||||
} else {
|
||||
updatedCount++;
|
||||
updatedEvents.push(event);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -534,6 +669,9 @@ async function performSyncComposio() {
|
|||
deletedCount = deletedFiles.length;
|
||||
}
|
||||
|
||||
// Publish a single bundled event capturing all changes from this sync.
|
||||
await publishCalendarSyncEvent(newEvents, updatedEvents, deletedFiles);
|
||||
|
||||
// Log results if any changes were detected (run was started by ensureRun)
|
||||
if (run) {
|
||||
const r = run as ServiceRunContext;
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue