diff --git a/apps/x/apps/main/src/main.ts b/apps/x/apps/main/src/main.ts index 94ed9d53..804c4f93 100644 --- a/apps/x/apps/main/src/main.ts +++ b/apps/x/apps/main/src/main.ts @@ -23,6 +23,7 @@ import { init as initNoteTagging } from "@x/core/dist/knowledge/tag_notes.js"; import { init as initInlineTasks } from "@x/core/dist/knowledge/inline_tasks.js"; import { init as initAgentRunner } from "@x/core/dist/agent-schedule/runner.js"; import { init as initAgentNotes } from "@x/core/dist/knowledge/agent_notes.js"; +import { init as initTrackScheduler } from "@x/core/dist/knowledge/track/scheduler.js"; import { initConfigs } from "@x/core/dist/config/initConfigs.js"; import started from "electron-squirrel-startup"; @@ -233,6 +234,9 @@ app.whenReady().then(async () => { // start tracks watcher startTracksWatcher(); + // start track scheduler (cron/window/once) + initTrackScheduler(); + // start gmail sync initGmailSync(); diff --git a/apps/x/apps/renderer/src/extensions/track-block.tsx b/apps/x/apps/renderer/src/extensions/track-block.tsx index 84973710..864ceb17 100644 --- a/apps/x/apps/renderer/src/extensions/track-block.tsx +++ b/apps/x/apps/renderer/src/extensions/track-block.tsx @@ -57,7 +57,7 @@ function TrackBlockView({ node, deleteNode, updateAttributes, extension }: { // Track run status from the global hook const allTrackStatus = useTrackStatus() - const runState = allTrackStatus.get(`${track.trackId}:${trackFilePath}`) ?? { status: 'idle' as const } + const runState = allTrackStatus.get(`${track?.trackId}:${trackFilePath}`) ?? { status: 'idle' as const } const runStatus = runState.status const runSummary = runState.summary ?? runState.error ?? null diff --git a/apps/x/packages/core/src/knowledge/track/runner.ts b/apps/x/packages/core/src/knowledge/track/runner.ts index 3250f275..eb06ef56 100644 --- a/apps/x/packages/core/src/knowledge/track/runner.ts +++ b/apps/x/packages/core/src/knowledge/track/runner.ts @@ -42,6 +42,12 @@ Use \`update-track-content\` with filePath=\`${filePath}\` and trackId=\`${track return msg; } +// --------------------------------------------------------------------------- +// Concurrency guard +// --------------------------------------------------------------------------- + +const runningTracks = new Set(); + // --------------------------------------------------------------------------- // Public API // --------------------------------------------------------------------------- @@ -54,69 +60,85 @@ export async function triggerTrackUpdate( trackId: string, filePath: string, context?: string, + trigger: 'manual' | 'timed' | 'event' = 'manual', ): Promise { - console.log('triggerTrackUpdate', trackId, filePath, context); - const tracks = await fetchAll(filePath); - const track = tracks.find(t => t.track.trackId === trackId); - if (!track) { - return { trackId, action: 'no_update', contentBefore: null, contentAfter: null, summary: null, error: 'Track not found' }; + const key = `${trackId}:${filePath}`; + if (runningTracks.has(key)) { + return { trackId, action: 'no_update', contentBefore: null, contentAfter: null, summary: null, error: 'Already running' }; } - - const contentBefore = track.content; - - // Emit start event — runId is set after agent run is created - const agentRun = await createRun({ agentId: 'track-run' }); - - await trackBus.publish({ - type: 'track_run_start', - trackId, - filePath, - trigger: 'manual', - runId: agentRun.id, - }); + runningTracks.add(key); try { - await createMessage(agentRun.id, buildMessage(filePath, track, context)); - await waitForRunCompletion(agentRun.id); - const summary = await extractAgentResponse(agentRun.id); + console.log('triggerTrackUpdate', trackId, filePath, trigger, context); + const tracks = await fetchAll(filePath); + const track = tracks.find(t => t.track.trackId === trackId); + if (!track) { + return { trackId, action: 'no_update', contentBefore: null, contentAfter: null, summary: null, error: 'Track not found' }; + } - const updatedTracks = await fetchAll(filePath); - const contentAfter = updatedTracks.find(t => t.track.trackId === trackId)?.content; - const didUpdate = contentAfter !== contentBefore; + const contentBefore = track.content; - // Update track block metadata + // Emit start event — runId is set after agent run is created + const agentRun = await createRun({ agentId: 'track-run' }); + + // Set lastRunAt and lastRunId immediately (before agent executes) so + // the scheduler's next poll won't re-trigger this track. await updateTrackBlock(filePath, trackId, { lastRunAt: new Date().toISOString(), lastRunId: agentRun.id, - lastRunSummary: summary ?? undefined, }); await trackBus.publish({ - type: 'track_run_complete', + type: 'track_run_start', trackId, filePath, + trigger, runId: agentRun.id, - summary: summary ?? undefined, }); - return { - trackId, - action: didUpdate ? 'replace' : 'no_update', - contentBefore: contentBefore ?? null, - contentAfter: contentAfter ?? null, - summary, - }; - } catch (err) { - const msg = err instanceof Error ? err.message : String(err); + try { + await createMessage(agentRun.id, buildMessage(filePath, track, context)); + await waitForRunCompletion(agentRun.id); + const summary = await extractAgentResponse(agentRun.id); - await trackBus.publish({ - type: 'track_run_complete', - trackId, - filePath, - runId: agentRun.id, - error: msg, - }); + const updatedTracks = await fetchAll(filePath); + const contentAfter = updatedTracks.find(t => t.track.trackId === trackId)?.content; + const didUpdate = contentAfter !== contentBefore; - return { trackId, action: 'no_update', contentBefore: contentBefore ?? null, contentAfter: null, summary: null, error: msg }; + // Update summary on completion + await updateTrackBlock(filePath, trackId, { + lastRunSummary: summary ?? undefined, + }); + + await trackBus.publish({ + type: 'track_run_complete', + trackId, + filePath, + runId: agentRun.id, + summary: summary ?? undefined, + }); + + return { + trackId, + action: didUpdate ? 'replace' : 'no_update', + contentBefore: contentBefore ?? null, + contentAfter: contentAfter ?? null, + summary, + }; + } catch (err) { + const msg = err instanceof Error ? err.message : String(err); + + await trackBus.publish({ + type: 'track_run_complete', + trackId, + filePath, + runId: agentRun.id, + error: msg, + }); + + return { trackId, action: 'no_update', contentBefore: contentBefore ?? null, contentAfter: null, summary: null, error: msg }; + } + } finally { + runningTracks.delete(key); } } diff --git a/apps/x/packages/core/src/knowledge/track/schedule-utils.ts b/apps/x/packages/core/src/knowledge/track/schedule-utils.ts new file mode 100644 index 00000000..d62524ae --- /dev/null +++ b/apps/x/packages/core/src/knowledge/track/schedule-utils.ts @@ -0,0 +1,64 @@ +import { CronExpressionParser } from 'cron-parser'; +import type { TrackSchedule } from '@x/shared/dist/track-block.js'; + +const GRACE_MS = 2 * 60 * 1000; // 2 minutes + +/** + * Determine if a scheduled track is due to run. + * All schedule types enforce a 2-minute grace period — if the scheduled time + * was more than 2 minutes ago, it's considered a miss and skipped. + */ +export function isTrackScheduleDue(schedule: TrackSchedule, lastRunAt: string | null): boolean { + const now = new Date(); + + switch (schedule.type) { + case 'cron': { + if (!lastRunAt) return true; // Never ran — immediately due + try { + const interval = CronExpressionParser.parse(schedule.expression, { + currentDate: new Date(lastRunAt), + }); + const nextRun = interval.next().toDate(); + return now >= nextRun && now.getTime() <= nextRun.getTime() + GRACE_MS; + } catch { + return false; + } + } + case 'window': { + if (!lastRunAt) { + // Never ran — due if within the time window now + const [startHour, startMin] = schedule.startTime.split(':').map(Number); + const [endHour, endMin] = schedule.endTime.split(':').map(Number); + const startMinutes = startHour * 60 + startMin; + const endMinutes = endHour * 60 + endMin; + const nowMinutes = now.getHours() * 60 + now.getMinutes(); + return nowMinutes >= startMinutes && nowMinutes <= endMinutes; + } + try { + const interval = CronExpressionParser.parse(schedule.cron, { + currentDate: new Date(lastRunAt), + }); + const nextRun = interval.next().toDate(); + if (!(now >= nextRun && now.getTime() <= nextRun.getTime() + GRACE_MS)) { + return false; + } + + // Check if current time is within the time window + const [startHour, startMin] = schedule.startTime.split(':').map(Number); + const [endHour, endMin] = schedule.endTime.split(':').map(Number); + const startMinutes = startHour * 60 + startMin; + const endMinutes = endHour * 60 + endMin; + const nowMinutes = now.getHours() * 60 + now.getMinutes(); + + return nowMinutes >= startMinutes && nowMinutes <= endMinutes; + } catch { + return false; + } + } + case 'once': { + if (lastRunAt) return false; // Already ran + const runAt = new Date(schedule.runAt); + return now >= runAt && now.getTime() <= runAt.getTime() + GRACE_MS; + } + } +} diff --git a/apps/x/packages/core/src/knowledge/track/scheduler.ts b/apps/x/packages/core/src/knowledge/track/scheduler.ts new file mode 100644 index 00000000..e86a8ffc --- /dev/null +++ b/apps/x/packages/core/src/knowledge/track/scheduler.ts @@ -0,0 +1,81 @@ +import fs from 'fs'; +import path from 'path'; +import { PrefixLogger } from '@x/shared'; +import { WorkDir } from '../../config/config.js'; +import { fetchAll } from './fileops.js'; +import { triggerTrackUpdate } from './runner.js'; +import { isTrackScheduleDue } from './schedule-utils.js'; + +const log = new PrefixLogger('TrackScheduler'); +const KNOWLEDGE_DIR = path.join(WorkDir, 'knowledge'); +const POLL_INTERVAL_MS = 15_000; // 15 seconds + +function scanMarkdownFiles(dir: string): string[] { + if (!fs.existsSync(dir)) return []; + const files: string[] = []; + const entries = fs.readdirSync(dir, { withFileTypes: true }); + for (const entry of entries) { + if (entry.name.startsWith('.')) continue; + const fullPath = path.join(dir, entry.name); + if (entry.isDirectory()) { + files.push(...scanMarkdownFiles(fullPath)); + } else if (entry.isFile() && entry.name.endsWith('.md')) { + files.push(fullPath); + } + } + return files; +} + +async function processScheduledTracks(): Promise { + if (!fs.existsSync(KNOWLEDGE_DIR)) { + log.log('Knowledge directory not found'); + return; + } + + const allFiles = scanMarkdownFiles(KNOWLEDGE_DIR); + log.log(`Scanning ${allFiles.length} markdown files`); + + for (const fullPath of allFiles) { + const relativePath = path.relative(KNOWLEDGE_DIR, fullPath); + + let tracks; + try { + tracks = await fetchAll(relativePath); + } catch { + continue; + } + + for (const trackState of tracks) { + const { track } = trackState; + if (!track.active) continue; + if (!track.schedule) continue; + + const due = isTrackScheduleDue(track.schedule, track.lastRunAt ?? null); + log.log(`Track "${track.trackId}" in ${relativePath}: schedule=${track.schedule.type}, lastRunAt=${track.lastRunAt ?? 'never'}, due=${due}`); + + if (due) { + log.log(`Triggering "${track.trackId}" in ${relativePath}`); + triggerTrackUpdate(track.trackId, relativePath, undefined, 'timed').catch(err => { + log.log(`Error running ${track.trackId}:`, err); + }); + } + } + } +} + +export async function init(): Promise { + log.log(`Starting, polling every ${POLL_INTERVAL_MS / 1000}s`); + + // Initial run + await processScheduledTracks(); + + // Periodic polling + while (true) { + await new Promise(resolve => setTimeout(resolve, POLL_INTERVAL_MS)); + try { + await processScheduledTracks(); + } catch (error) { + log.log('Error in main loop:', error); + } + } +} diff --git a/apps/x/packages/shared/src/track-block.ts b/apps/x/packages/shared/src/track-block.ts index 32f09c60..fb05003c 100644 --- a/apps/x/packages/shared/src/track-block.ts +++ b/apps/x/packages/shared/src/track-block.ts @@ -1,10 +1,30 @@ import z from 'zod'; +export const TrackScheduleSchema = z.discriminatedUnion('type', [ + z.object({ + type: z.literal('cron'), + expression: z.string(), + }), + z.object({ + type: z.literal('window'), + cron: z.string(), + startTime: z.string(), + endTime: z.string(), + }), + z.object({ + type: z.literal('once'), + runAt: z.string(), + }), +]); + +export type TrackSchedule = z.infer; + export const TrackBlockSchema = z.object({ trackId: z.string(), instruction: z.string(), matchCriteria: z.string().optional(), active: z.boolean().default(true), + schedule: TrackScheduleSchema.optional(), lastRunAt: z.string().optional(), lastRunId: z.string().optional(), lastRunSummary: z.string().optional(), @@ -31,5 +51,4 @@ export const TrackRunCompleteEvent = z.object({ export const TrackEvent = z.union([TrackRunStartEvent, TrackRunCompleteEvent]); export type TrackBlock = z.infer; -export type TrackResult = z.infer; export type TrackEventType = z.infer;