mirror of
https://github.com/rowboatlabs/rowboat.git
synced 2026-05-12 00:32:38 +02:00
scheduled works
This commit is contained in:
parent
66dc065996
commit
9587368acf
6 changed files with 237 additions and 47 deletions
|
|
@ -42,6 +42,12 @@ Use \`update-track-content\` with filePath=\`${filePath}\` and trackId=\`${track
|
|||
return msg;
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Concurrency guard
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
const runningTracks = new Set<string>();
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Public API
|
||||
// ---------------------------------------------------------------------------
|
||||
|
|
@ -54,69 +60,85 @@ export async function triggerTrackUpdate(
|
|||
trackId: string,
|
||||
filePath: string,
|
||||
context?: string,
|
||||
trigger: 'manual' | 'timed' | 'event' = 'manual',
|
||||
): Promise<TrackUpdateResult> {
|
||||
console.log('triggerTrackUpdate', trackId, filePath, context);
|
||||
const tracks = await fetchAll(filePath);
|
||||
const track = tracks.find(t => t.track.trackId === trackId);
|
||||
if (!track) {
|
||||
return { trackId, action: 'no_update', contentBefore: null, contentAfter: null, summary: null, error: 'Track not found' };
|
||||
const key = `${trackId}:${filePath}`;
|
||||
if (runningTracks.has(key)) {
|
||||
return { trackId, action: 'no_update', contentBefore: null, contentAfter: null, summary: null, error: 'Already running' };
|
||||
}
|
||||
|
||||
const contentBefore = track.content;
|
||||
|
||||
// Emit start event — runId is set after agent run is created
|
||||
const agentRun = await createRun({ agentId: 'track-run' });
|
||||
|
||||
await trackBus.publish({
|
||||
type: 'track_run_start',
|
||||
trackId,
|
||||
filePath,
|
||||
trigger: 'manual',
|
||||
runId: agentRun.id,
|
||||
});
|
||||
runningTracks.add(key);
|
||||
|
||||
try {
|
||||
await createMessage(agentRun.id, buildMessage(filePath, track, context));
|
||||
await waitForRunCompletion(agentRun.id);
|
||||
const summary = await extractAgentResponse(agentRun.id);
|
||||
console.log('triggerTrackUpdate', trackId, filePath, trigger, context);
|
||||
const tracks = await fetchAll(filePath);
|
||||
const track = tracks.find(t => t.track.trackId === trackId);
|
||||
if (!track) {
|
||||
return { trackId, action: 'no_update', contentBefore: null, contentAfter: null, summary: null, error: 'Track not found' };
|
||||
}
|
||||
|
||||
const updatedTracks = await fetchAll(filePath);
|
||||
const contentAfter = updatedTracks.find(t => t.track.trackId === trackId)?.content;
|
||||
const didUpdate = contentAfter !== contentBefore;
|
||||
const contentBefore = track.content;
|
||||
|
||||
// Update track block metadata
|
||||
// Emit start event — runId is set after agent run is created
|
||||
const agentRun = await createRun({ agentId: 'track-run' });
|
||||
|
||||
// Set lastRunAt and lastRunId immediately (before agent executes) so
|
||||
// the scheduler's next poll won't re-trigger this track.
|
||||
await updateTrackBlock(filePath, trackId, {
|
||||
lastRunAt: new Date().toISOString(),
|
||||
lastRunId: agentRun.id,
|
||||
lastRunSummary: summary ?? undefined,
|
||||
});
|
||||
|
||||
await trackBus.publish({
|
||||
type: 'track_run_complete',
|
||||
type: 'track_run_start',
|
||||
trackId,
|
||||
filePath,
|
||||
trigger,
|
||||
runId: agentRun.id,
|
||||
summary: summary ?? undefined,
|
||||
});
|
||||
|
||||
return {
|
||||
trackId,
|
||||
action: didUpdate ? 'replace' : 'no_update',
|
||||
contentBefore: contentBefore ?? null,
|
||||
contentAfter: contentAfter ?? null,
|
||||
summary,
|
||||
};
|
||||
} catch (err) {
|
||||
const msg = err instanceof Error ? err.message : String(err);
|
||||
try {
|
||||
await createMessage(agentRun.id, buildMessage(filePath, track, context));
|
||||
await waitForRunCompletion(agentRun.id);
|
||||
const summary = await extractAgentResponse(agentRun.id);
|
||||
|
||||
await trackBus.publish({
|
||||
type: 'track_run_complete',
|
||||
trackId,
|
||||
filePath,
|
||||
runId: agentRun.id,
|
||||
error: msg,
|
||||
});
|
||||
const updatedTracks = await fetchAll(filePath);
|
||||
const contentAfter = updatedTracks.find(t => t.track.trackId === trackId)?.content;
|
||||
const didUpdate = contentAfter !== contentBefore;
|
||||
|
||||
return { trackId, action: 'no_update', contentBefore: contentBefore ?? null, contentAfter: null, summary: null, error: msg };
|
||||
// Update summary on completion
|
||||
await updateTrackBlock(filePath, trackId, {
|
||||
lastRunSummary: summary ?? undefined,
|
||||
});
|
||||
|
||||
await trackBus.publish({
|
||||
type: 'track_run_complete',
|
||||
trackId,
|
||||
filePath,
|
||||
runId: agentRun.id,
|
||||
summary: summary ?? undefined,
|
||||
});
|
||||
|
||||
return {
|
||||
trackId,
|
||||
action: didUpdate ? 'replace' : 'no_update',
|
||||
contentBefore: contentBefore ?? null,
|
||||
contentAfter: contentAfter ?? null,
|
||||
summary,
|
||||
};
|
||||
} catch (err) {
|
||||
const msg = err instanceof Error ? err.message : String(err);
|
||||
|
||||
await trackBus.publish({
|
||||
type: 'track_run_complete',
|
||||
trackId,
|
||||
filePath,
|
||||
runId: agentRun.id,
|
||||
error: msg,
|
||||
});
|
||||
|
||||
return { trackId, action: 'no_update', contentBefore: contentBefore ?? null, contentAfter: null, summary: null, error: msg };
|
||||
}
|
||||
} finally {
|
||||
runningTracks.delete(key);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
64
apps/x/packages/core/src/knowledge/track/schedule-utils.ts
Normal file
64
apps/x/packages/core/src/knowledge/track/schedule-utils.ts
Normal file
|
|
@ -0,0 +1,64 @@
|
|||
import { CronExpressionParser } from 'cron-parser';
|
||||
import type { TrackSchedule } from '@x/shared/dist/track-block.js';
|
||||
|
||||
const GRACE_MS = 2 * 60 * 1000; // 2 minutes
|
||||
|
||||
/**
|
||||
* Determine if a scheduled track is due to run.
|
||||
* All schedule types enforce a 2-minute grace period — if the scheduled time
|
||||
* was more than 2 minutes ago, it's considered a miss and skipped.
|
||||
*/
|
||||
export function isTrackScheduleDue(schedule: TrackSchedule, lastRunAt: string | null): boolean {
|
||||
const now = new Date();
|
||||
|
||||
switch (schedule.type) {
|
||||
case 'cron': {
|
||||
if (!lastRunAt) return true; // Never ran — immediately due
|
||||
try {
|
||||
const interval = CronExpressionParser.parse(schedule.expression, {
|
||||
currentDate: new Date(lastRunAt),
|
||||
});
|
||||
const nextRun = interval.next().toDate();
|
||||
return now >= nextRun && now.getTime() <= nextRun.getTime() + GRACE_MS;
|
||||
} catch {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
case 'window': {
|
||||
if (!lastRunAt) {
|
||||
// Never ran — due if within the time window now
|
||||
const [startHour, startMin] = schedule.startTime.split(':').map(Number);
|
||||
const [endHour, endMin] = schedule.endTime.split(':').map(Number);
|
||||
const startMinutes = startHour * 60 + startMin;
|
||||
const endMinutes = endHour * 60 + endMin;
|
||||
const nowMinutes = now.getHours() * 60 + now.getMinutes();
|
||||
return nowMinutes >= startMinutes && nowMinutes <= endMinutes;
|
||||
}
|
||||
try {
|
||||
const interval = CronExpressionParser.parse(schedule.cron, {
|
||||
currentDate: new Date(lastRunAt),
|
||||
});
|
||||
const nextRun = interval.next().toDate();
|
||||
if (!(now >= nextRun && now.getTime() <= nextRun.getTime() + GRACE_MS)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// Check if current time is within the time window
|
||||
const [startHour, startMin] = schedule.startTime.split(':').map(Number);
|
||||
const [endHour, endMin] = schedule.endTime.split(':').map(Number);
|
||||
const startMinutes = startHour * 60 + startMin;
|
||||
const endMinutes = endHour * 60 + endMin;
|
||||
const nowMinutes = now.getHours() * 60 + now.getMinutes();
|
||||
|
||||
return nowMinutes >= startMinutes && nowMinutes <= endMinutes;
|
||||
} catch {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
case 'once': {
|
||||
if (lastRunAt) return false; // Already ran
|
||||
const runAt = new Date(schedule.runAt);
|
||||
return now >= runAt && now.getTime() <= runAt.getTime() + GRACE_MS;
|
||||
}
|
||||
}
|
||||
}
|
||||
81
apps/x/packages/core/src/knowledge/track/scheduler.ts
Normal file
81
apps/x/packages/core/src/knowledge/track/scheduler.ts
Normal file
|
|
@ -0,0 +1,81 @@
|
|||
import fs from 'fs';
|
||||
import path from 'path';
|
||||
import { PrefixLogger } from '@x/shared';
|
||||
import { WorkDir } from '../../config/config.js';
|
||||
import { fetchAll } from './fileops.js';
|
||||
import { triggerTrackUpdate } from './runner.js';
|
||||
import { isTrackScheduleDue } from './schedule-utils.js';
|
||||
|
||||
const log = new PrefixLogger('TrackScheduler');
|
||||
const KNOWLEDGE_DIR = path.join(WorkDir, 'knowledge');
|
||||
const POLL_INTERVAL_MS = 15_000; // 15 seconds
|
||||
|
||||
function scanMarkdownFiles(dir: string): string[] {
|
||||
if (!fs.existsSync(dir)) return [];
|
||||
const files: string[] = [];
|
||||
const entries = fs.readdirSync(dir, { withFileTypes: true });
|
||||
for (const entry of entries) {
|
||||
if (entry.name.startsWith('.')) continue;
|
||||
const fullPath = path.join(dir, entry.name);
|
||||
if (entry.isDirectory()) {
|
||||
files.push(...scanMarkdownFiles(fullPath));
|
||||
} else if (entry.isFile() && entry.name.endsWith('.md')) {
|
||||
files.push(fullPath);
|
||||
}
|
||||
}
|
||||
return files;
|
||||
}
|
||||
|
||||
async function processScheduledTracks(): Promise<void> {
|
||||
if (!fs.existsSync(KNOWLEDGE_DIR)) {
|
||||
log.log('Knowledge directory not found');
|
||||
return;
|
||||
}
|
||||
|
||||
const allFiles = scanMarkdownFiles(KNOWLEDGE_DIR);
|
||||
log.log(`Scanning ${allFiles.length} markdown files`);
|
||||
|
||||
for (const fullPath of allFiles) {
|
||||
const relativePath = path.relative(KNOWLEDGE_DIR, fullPath);
|
||||
|
||||
let tracks;
|
||||
try {
|
||||
tracks = await fetchAll(relativePath);
|
||||
} catch {
|
||||
continue;
|
||||
}
|
||||
|
||||
for (const trackState of tracks) {
|
||||
const { track } = trackState;
|
||||
if (!track.active) continue;
|
||||
if (!track.schedule) continue;
|
||||
|
||||
const due = isTrackScheduleDue(track.schedule, track.lastRunAt ?? null);
|
||||
log.log(`Track "${track.trackId}" in ${relativePath}: schedule=${track.schedule.type}, lastRunAt=${track.lastRunAt ?? 'never'}, due=${due}`);
|
||||
|
||||
if (due) {
|
||||
log.log(`Triggering "${track.trackId}" in ${relativePath}`);
|
||||
triggerTrackUpdate(track.trackId, relativePath, undefined, 'timed').catch(err => {
|
||||
log.log(`Error running ${track.trackId}:`, err);
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export async function init(): Promise<void> {
|
||||
log.log(`Starting, polling every ${POLL_INTERVAL_MS / 1000}s`);
|
||||
|
||||
// Initial run
|
||||
await processScheduledTracks();
|
||||
|
||||
// Periodic polling
|
||||
while (true) {
|
||||
await new Promise(resolve => setTimeout(resolve, POLL_INTERVAL_MS));
|
||||
try {
|
||||
await processScheduledTracks();
|
||||
} catch (error) {
|
||||
log.log('Error in main loop:', error);
|
||||
}
|
||||
}
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue