From 570a315b36be7f28d4bb928d1714ed647d2e5f12 Mon Sep 17 00:00:00 2001 From: Ramnique Singh <30795890+ramnique@users.noreply.github.com> Date: Tue, 14 Apr 2026 00:53:58 +0530 Subject: [PATCH] evented works --- apps/x/apps/main/forge.config.cjs | 24 +-- apps/x/apps/main/src/main.ts | 4 + .../renderer/src/extensions/track-block.tsx | 8 +- .../assistant/skills/tracks/skill.ts | 27 +++ .../packages/core/src/knowledge/sync_gmail.ts | 20 ++ .../core/src/knowledge/track/events.ts | 189 ++++++++++++++++++ .../core/src/knowledge/track/routing.ts | 118 +++++++++++ .../core/src/knowledge/track/runner.ts | 31 ++- .../core/src/knowledge/track/scheduler.ts | 39 ++-- apps/x/packages/shared/src/track-block.ts | 35 +++- 10 files changed, 445 insertions(+), 50 deletions(-) create mode 100644 apps/x/packages/core/src/knowledge/track/events.ts create mode 100644 apps/x/packages/core/src/knowledge/track/routing.ts diff --git a/apps/x/apps/main/forge.config.cjs b/apps/x/apps/main/forge.config.cjs index 178cb7e1..68f4624d 100644 --- a/apps/x/apps/main/forge.config.cjs +++ b/apps/x/apps/main/forge.config.cjs @@ -14,18 +14,18 @@ module.exports = { extendInfo: { NSAudioCaptureUsageDescription: 'Rowboat needs access to system audio to transcribe meetings from other apps (Zoom, Meet, etc.)', }, - osxSign: { - batchCodesignCalls: true, - optionsForFile: () => ({ - entitlements: path.join(__dirname, 'entitlements.plist'), - 'entitlements-inherit': path.join(__dirname, 'entitlements.plist'), - }), - }, - osxNotarize: { - appleId: process.env.APPLE_ID, - appleIdPassword: process.env.APPLE_PASSWORD, - teamId: process.env.APPLE_TEAM_ID - }, + // osxSign: { + // batchCodesignCalls: true, + // optionsForFile: () => ({ + // entitlements: path.join(__dirname, 'entitlements.plist'), + // 'entitlements-inherit': path.join(__dirname, 'entitlements.plist'), + // }), + // }, + // osxNotarize: { + // appleId: process.env.APPLE_ID, + // appleIdPassword: process.env.APPLE_PASSWORD, + // teamId: process.env.APPLE_TEAM_ID + // }, // Since we bundle everything with esbuild, we don't need node_modules at all. // These settings prevent Forge's dependency walker (flora-colossus) from trying // to analyze/copy node_modules, which fails with pnpm's symlinked workspaces. diff --git a/apps/x/apps/main/src/main.ts b/apps/x/apps/main/src/main.ts index 804c4f93..e8c6ee53 100644 --- a/apps/x/apps/main/src/main.ts +++ b/apps/x/apps/main/src/main.ts @@ -24,6 +24,7 @@ import { init as initInlineTasks } from "@x/core/dist/knowledge/inline_tasks.js" import { init as initAgentRunner } from "@x/core/dist/agent-schedule/runner.js"; import { init as initAgentNotes } from "@x/core/dist/knowledge/agent_notes.js"; import { init as initTrackScheduler } from "@x/core/dist/knowledge/track/scheduler.js"; +import { init as initTrackEventProcessor } from "@x/core/dist/knowledge/track/events.js"; import { initConfigs } from "@x/core/dist/config/initConfigs.js"; import started from "electron-squirrel-startup"; @@ -237,6 +238,9 @@ app.whenReady().then(async () => { // start track scheduler (cron/window/once) initTrackScheduler(); + // start track event processor (consumes events/pending/, triggers matching tracks) + initTrackEventProcessor(); + // start gmail sync initGmailSync(); diff --git a/apps/x/apps/renderer/src/extensions/track-block.tsx b/apps/x/apps/renderer/src/extensions/track-block.tsx index 864ceb17..ff12c9fc 100644 --- a/apps/x/apps/renderer/src/extensions/track-block.tsx +++ b/apps/x/apps/renderer/src/extensions/track-block.tsx @@ -47,7 +47,7 @@ function TrackBlockView({ node, deleteNode, updateAttributes, extension }: { const trackId = track?.trackId ?? '' const instruction = track?.instruction ?? '' - const matchCriteria = track?.matchCriteria ?? '' + const eventMatchCriteria = track?.eventMatchCriteria ?? '' const active = track?.active ?? true const lastRunAt = track?.lastRunAt ?? '' const lastRunId = track?.lastRunId ?? '' @@ -225,9 +225,9 @@ function TrackBlockView({ node, deleteNode, updateAttributes, extension }: { )} {activeTab === 'criteria' && (
- {matchCriteria - ? {matchCriteria} - : No match criteria set} + {eventMatchCriteria + ? {eventMatchCriteria} + : No event match criteria set}
)} {activeTab === 'metadata' && ( diff --git a/apps/x/packages/core/src/application/assistant/skills/tracks/skill.ts b/apps/x/packages/core/src/application/assistant/skills/tracks/skill.ts index e5cd1792..fa431f29 100644 --- a/apps/x/packages/core/src/application/assistant/skills/tracks/skill.ts +++ b/apps/x/packages/core/src/application/assistant/skills/tracks/skill.ts @@ -126,6 +126,33 @@ Fires once at ` + "`" + `runAt` + "`" + ` and never again. Local time, no ` + "` **Omit ` + "`" + `schedule` + "`" + ` entirely for a manual-only track** — the user triggers it via the Play button in the UI. +## Event Triggers (third trigger type) + +In addition to manual and scheduled, a track can be triggered by **events** — incoming signals from the user's data sources (currently: gmail emails). Set ` + "`" + `eventMatchCriteria` + "`" + ` to a description of what kinds of events should consider this track for an update: + +` + "```" + `track +trackId: q3-planning-emails +instruction: Maintain a running summary of decisions and open questions about Q3 planning, drawn from emails on the topic. +active: true +eventMatchCriteria: Emails about Q3 planning, roadmap decisions, or quarterly OKRs +` + "```" + ` + +How it works: +1. When a new event arrives (e.g. an email syncs), a fast LLM classifier checks ` + "`" + `eventMatchCriteria` + "`" + ` against the event content. +2. If it might match, the track-run agent receives both the event payload and the existing track content, and decides whether to actually update. +3. If the event isn't truly relevant on closer inspection, the agent skips the update — no fabricated content. + +When to suggest event triggers: +- The user wants to **maintain a living summary** of a topic ("keep notes on everything related to project X"). +- The content depends on **incoming signals** rather than periodic refresh ("update this whenever a relevant email arrives"). +- Mention to the user: scheduled (cron) is for time-driven updates; event is for signal-driven updates. They can be combined — a track can have both a ` + "`" + `schedule` + "`" + ` and ` + "`" + `eventMatchCriteria` + "`" + ` (it'll run on schedule AND on relevant events). + +Writing good ` + "`" + `eventMatchCriteria` + "`" + `: +- Be descriptive but not overly narrow — Pass 1 routing is liberal by design. +- Examples: ` + "`" + `"Emails from John about the migration project"` + "`" + `, ` + "`" + `"Calendar events related to customer interviews"` + "`" + `, ` + "`" + `"Meeting notes that mention pricing changes"` + "`" + `. + +Tracks **without** ` + "`" + `eventMatchCriteria` + "`" + ` opt out of events entirely — they'll only run on schedule or manually. + ## Insertion Workflow ### Cmd+K with cursor context diff --git a/apps/x/packages/core/src/knowledge/sync_gmail.ts b/apps/x/packages/core/src/knowledge/sync_gmail.ts index 599e75ac..d00557a0 100644 --- a/apps/x/packages/core/src/knowledge/sync_gmail.ts +++ b/apps/x/packages/core/src/knowledge/sync_gmail.ts @@ -9,6 +9,7 @@ import { serviceLogger, type ServiceRunContext } from '../services/service_logge import { limitEventItems } from './limit_event_items.js'; import { executeAction, useComposioForGoogle } from '../composio/client.js'; import { composioAccountsRepo } from '../composio/repo.js'; +import { createEvent } from './track/events.js'; // Configuration const SYNC_DIR = path.join(WorkDir, 'gmail_sync'); @@ -172,6 +173,13 @@ async function processThread(auth: OAuth2Client, threadId: string, syncDir: stri fs.writeFileSync(path.join(syncDir, `${threadId}.md`), mdContent); console.log(`Synced Thread: ${subject} (${threadId})`); + await createEvent({ + source: 'gmail', + type: 'email.synced', + createdAt: new Date().toISOString(), + payload: mdContent, + }); + } catch (error) { console.error(`Error processing thread ${threadId}:`, error); } @@ -595,6 +603,12 @@ async function processThreadComposio(connectedAccountId: string, threadId: strin fs.writeFileSync(path.join(syncDir, `${cleanFilename(threadId)}.md`), mdContent); console.log(`[Gmail] Synced Thread: ${parsed.subject} (${threadId})`); + await createEvent({ + source: 'gmail', + type: 'email.synced', + createdAt: new Date().toISOString(), + payload: mdContent, + }); newestDate = tryParseDate(parsed.date); } else { const firstParsed = parseMessageData(messages[0]); @@ -617,6 +631,12 @@ async function processThreadComposio(connectedAccountId: string, threadId: strin fs.writeFileSync(path.join(syncDir, `${cleanFilename(threadId)}.md`), mdContent); console.log(`[Gmail] Synced Thread: ${firstParsed.subject} (${threadId})`); + await createEvent({ + source: 'gmail', + type: 'email.synced', + createdAt: new Date().toISOString(), + payload: mdContent, + }); } if (!newestDate) return null; diff --git a/apps/x/packages/core/src/knowledge/track/events.ts b/apps/x/packages/core/src/knowledge/track/events.ts new file mode 100644 index 00000000..d54ad215 --- /dev/null +++ b/apps/x/packages/core/src/knowledge/track/events.ts @@ -0,0 +1,189 @@ +import fs from 'fs'; +import path from 'path'; +import { PrefixLogger, trackBlock } from '@x/shared'; +import type { KnowledgeEvent } from '@x/shared/dist/track-block.js'; +import { WorkDir } from '../../config/config.js'; +import * as workspace from '../../workspace/workspace.js'; +import { fetchAll } from './fileops.js'; +import { triggerTrackUpdate } from './runner.js'; +import { findCandidates, type ParsedTrack } from './routing.js'; +import type { IMonotonicallyIncreasingIdGenerator } from '../../application/lib/id-gen.js'; +import container from '../../di/container.js'; + +const POLL_INTERVAL_MS = 5_000; // 5 seconds — events should feel responsive +const EVENTS_DIR = path.join(WorkDir, 'events'); +const PENDING_DIR = path.join(EVENTS_DIR, 'pending'); +const DONE_DIR = path.join(EVENTS_DIR, 'done'); + +const log = new PrefixLogger('EventProcessor'); + +/** + * Write a KnowledgeEvent to the events/pending/ directory. + * Filename is a monotonically increasing ID so events sort by creation order. + * Call this function in chronological order (oldest event first) within a sync batch + * to ensure correct ordering. + */ +export async function createEvent(event: Omit): Promise { + fs.mkdirSync(PENDING_DIR, { recursive: true }); + + const idGen = container.resolve('idGenerator'); + const id = await idGen.next(); + + const fullEvent: KnowledgeEvent = { id, ...event }; + const filePath = path.join(PENDING_DIR, `${id}.json`); + fs.writeFileSync(filePath, JSON.stringify(fullEvent, null, 2), 'utf-8'); +} + +function ensureDirs(): void { + fs.mkdirSync(PENDING_DIR, { recursive: true }); + fs.mkdirSync(DONE_DIR, { recursive: true }); +} + +async function listAllTracks(): Promise { + const tracks: ParsedTrack[] = []; + let entries; + try { + entries = await workspace.readdir('knowledge', { recursive: true }); + } catch { + return tracks; + } + const mdFiles = entries + .filter(e => e.kind === 'file' && e.name.endsWith('.md')) + .map(e => e.path.replace(/^knowledge\//, '')); + + for (const filePath of mdFiles) { + let parsedTracks; + try { + parsedTracks = await fetchAll(filePath); + } catch { + continue; + } + for (const t of parsedTracks) { + tracks.push({ + trackId: t.track.trackId, + filePath, + eventMatchCriteria: t.track.eventMatchCriteria ?? '', + instruction: t.track.instruction, + active: t.track.active, + }); + } + } + return tracks; +} + +function moveEventToDone(filename: string, enriched: KnowledgeEvent): void { + const donePath = path.join(DONE_DIR, filename); + const pendingPath = path.join(PENDING_DIR, filename); + fs.writeFileSync(donePath, JSON.stringify(enriched, null, 2), 'utf-8'); + try { + fs.unlinkSync(pendingPath); + } catch (err) { + log.log(`Failed to remove pending event ${filename}:`, err); + } +} + +async function processOneEvent(filename: string): Promise { + const pendingPath = path.join(PENDING_DIR, filename); + + let event: KnowledgeEvent; + try { + const raw = fs.readFileSync(pendingPath, 'utf-8'); + const parsed = JSON.parse(raw); + event = trackBlock.KnowledgeEventSchema.parse(parsed); + } catch (err) { + const msg = err instanceof Error ? err.message : String(err); + log.log(`Malformed event ${filename}, moving to done with error:`, msg); + const stub: KnowledgeEvent = { + id: filename.replace(/\.json$/, ''), + source: 'unknown', + type: 'unknown', + createdAt: new Date().toISOString(), + payload: '', + processedAt: new Date().toISOString(), + error: `Failed to parse: ${msg}`, + }; + moveEventToDone(filename, stub); + return; + } + + log.log(`Processing event ${event.id} (source=${event.source}, type=${event.type})`); + + const allTracks = await listAllTracks(); + const candidates = await findCandidates(event, allTracks); + + const runIds: string[] = []; + let processingError: string | undefined; + + // Sequential — preserves total ordering + for (const candidate of candidates) { + try { + const result = await triggerTrackUpdate( + candidate.trackId, + candidate.filePath, + event.payload, + 'event', + ); + if (result.runId) runIds.push(result.runId); + log.log(`Candidate ${candidate.trackId}: ${result.action}${result.error ? ` (${result.error})` : ''}`); + } catch (err) { + const msg = err instanceof Error ? err.message : String(err); + log.log(`Error triggering candidate ${candidate.trackId}:`, msg); + processingError = (processingError ? processingError + '; ' : '') + `${candidate.trackId}: ${msg}`; + } + } + + const enriched: KnowledgeEvent = { + ...event, + processedAt: new Date().toISOString(), + candidates: candidates.map(c => ({ trackId: c.trackId, filePath: c.filePath })), + runIds, + ...(processingError ? { error: processingError } : {}), + }; + + moveEventToDone(filename, enriched); +} + +async function processPendingEvents(): Promise { + ensureDirs(); + + let filenames: string[]; + try { + filenames = fs.readdirSync(PENDING_DIR).filter(f => f.endsWith('.json')); + } catch (err) { + log.log('Failed to read pending dir:', err); + return; + } + + if (filenames.length === 0) return; + + // FIFO: monotonic IDs are lexicographically sortable + filenames.sort(); + + log.log(`Processing ${filenames.length} pending event(s)`); + + for (const filename of filenames) { + try { + await processOneEvent(filename); + } catch (err) { + log.log(`Unhandled error processing ${filename}:`, err); + // Keep the loop alive — don't move file, will retry on next tick + } + } +} + +export async function init(): Promise { + log.log(`Starting, polling every ${POLL_INTERVAL_MS / 1000}s`); + ensureDirs(); + + // Initial run + await processPendingEvents(); + + while (true) { + await new Promise(resolve => setTimeout(resolve, POLL_INTERVAL_MS)); + try { + await processPendingEvents(); + } catch (err) { + log.log('Error in main loop:', err); + } + } +} diff --git a/apps/x/packages/core/src/knowledge/track/routing.ts b/apps/x/packages/core/src/knowledge/track/routing.ts new file mode 100644 index 00000000..f876106e --- /dev/null +++ b/apps/x/packages/core/src/knowledge/track/routing.ts @@ -0,0 +1,118 @@ +import { generateObject } from 'ai'; +import { trackBlock, PrefixLogger } from '@x/shared'; +import type { KnowledgeEvent } from '@x/shared/dist/track-block.js'; +import container from '../../di/container.js'; +import type { IModelConfigRepo } from '../../models/repo.js'; +import { createProvider } from '../../models/models.js'; +import { isSignedIn } from '../../account/account.js'; +import { getGatewayProvider } from '../../models/gateway.js'; + +const log = new PrefixLogger('TrackRouting'); + +const BATCH_SIZE = 20; + +export interface ParsedTrack { + trackId: string; + filePath: string; + eventMatchCriteria: string; + instruction: string; + active: boolean; +} + +const ROUTING_SYSTEM_PROMPT = `You are a routing classifier for a knowledge management system. + +You will receive an event (something that happened — an email, meeting, message, etc.) and a list of track blocks. Each track block has: +- trackId: an identifier (only unique within its file) +- filePath: the note file the track lives in +- eventMatchCriteria: a description of what kinds of signals are relevant to this track + +Your job is to identify which track blocks MIGHT be relevant to this event. + +Rules: +- Be LIBERAL in your selections. Include any track that is even moderately relevant. +- Prefer false positives over false negatives. It is much better to include a track that turns out to be irrelevant than to miss one that was relevant. +- Only exclude tracks that are CLEARLY and OBVIOUSLY irrelevant to the event. +- Do not attempt to judge whether the event contains enough information to update the track. That is handled by a later stage. +- Return an empty list only if no tracks are relevant at all. +- For each candidate, return BOTH trackId and filePath exactly as given. trackIds are not globally unique.`; + +async function resolveModel() { + const repo = container.resolve('modelConfigRepo'); + const config = await repo.getConfig(); + const signedIn = await isSignedIn(); + const provider = signedIn + ? await getGatewayProvider() + : createProvider(config.provider); + const modelId = config.knowledgeGraphModel + || (signedIn ? 'gpt-5.4' : config.model); + return provider.languageModel(modelId); +} + +function buildRoutingPrompt(event: KnowledgeEvent, batch: ParsedTrack[]): string { + const trackList = batch + .map((t, i) => `${i + 1}. trackId: ${t.trackId}\n filePath: ${t.filePath}\n eventMatchCriteria: ${t.eventMatchCriteria}`) + .join('\n\n'); + + return `## Event + +Source: ${event.source} +Type: ${event.type} +Time: ${event.createdAt} + +${event.payload} + +## Track Blocks + +${trackList}`; +} + +function trackKey(trackId: string, filePath: string): string { + return `${filePath}::${trackId}`; +} + +export async function findCandidates( + event: KnowledgeEvent, + allTracks: ParsedTrack[], +): Promise { + // Short-circuit for targeted re-runs — skip LLM routing entirely + if (event.targetTrackId && event.targetFilePath) { + const target = allTracks.find(t => + t.trackId === event.targetTrackId && t.filePath === event.targetFilePath + ); + return target ? [target] : []; + } + + const filtered = allTracks.filter(t => + t.active && t.instruction && t.eventMatchCriteria + ); + if (filtered.length === 0) { + log.log(`No event-eligible tracks (none with eventMatchCriteria)`); + return []; + } + + log.log(`Routing event ${event.id} against ${filtered.length} track(s)`); + + const model = await resolveModel(); + const candidateKeys = new Set(); + + for (let i = 0; i < filtered.length; i += BATCH_SIZE) { + const batch = filtered.slice(i, i + BATCH_SIZE); + try { + const { object } = await generateObject({ + model, + system: ROUTING_SYSTEM_PROMPT, + prompt: buildRoutingPrompt(event, batch), + schema: trackBlock.Pass1OutputSchema, + }); + for (const c of object.candidates) { + candidateKeys.add(trackKey(c.trackId, c.filePath)); + } + } catch (err) { + log.log(`Routing batch ${i / BATCH_SIZE} failed:`, err); + } + } + + const candidates = filtered.filter(t => candidateKeys.has(trackKey(t.trackId, t.filePath))); + log.log(`Event ${event.id}: ${candidates.length} candidate(s) — ${candidates.map(c => `${c.trackId}@${c.filePath}`).join(', ') || '(none)'}`); + return candidates; +} diff --git a/apps/x/packages/core/src/knowledge/track/runner.ts b/apps/x/packages/core/src/knowledge/track/runner.ts index d483353b..5ee90024 100644 --- a/apps/x/packages/core/src/knowledge/track/runner.ts +++ b/apps/x/packages/core/src/knowledge/track/runner.ts @@ -8,6 +8,7 @@ import { PrefixLogger } from '@x/shared/dist/prefix-logger.js'; export interface TrackUpdateResult { trackId: string; + runId: string | null; action: 'replace' | 'no_update'; contentBefore: string | null; contentAfter: string | null; @@ -19,7 +20,12 @@ export interface TrackUpdateResult { // Agent run // --------------------------------------------------------------------------- -function buildMessage(filePath: string, track: z.infer, context?: string): string { +function buildMessage( + filePath: string, + track: z.infer, + trigger: 'manual' | 'timed' | 'event', + context?: string, +): string { const now = new Date(); const localNow = now.toLocaleString('en-US', { dateStyle: 'full', timeStyle: 'long' }); const tz = Intl.DateTimeFormat().resolvedOptions().timeZone; @@ -36,7 +42,19 @@ ${track.content || '(empty — first run)'} Use \`update-track-content\` with filePath=\`${filePath}\` and trackId=\`${track.track.trackId}\`.`; - if (context) { + if (trigger === 'event') { + msg += ` + +**Trigger:** Event match (a Pass 1 routing classifier flagged this track as potentially relevant to the event below) + +**Event match criteria for this track:** +${track.track.eventMatchCriteria ?? '(none — should not happen for event-triggered runs)'} + +**Event payload:** +${context ?? '(no payload)'} + +**Decision:** Determine whether this event genuinely warrants updating the track content. If the event is not meaningfully relevant on closer inspection, skip the update — do NOT call \`update-track-content\`. Only call the tool if the event provides new or changed information that should be reflected in the track.`; + } else if (context) { msg += `\n\n**Context:**\n${context}`; } @@ -68,7 +86,7 @@ export async function triggerTrackUpdate( logger.log('triggering track update', trackId, filePath, trigger, context); if (runningTracks.has(key)) { logger.log('skipping, already running'); - return { trackId, action: 'no_update', contentBefore: null, contentAfter: null, summary: null, error: 'Already running' }; + return { trackId, runId: null, action: 'no_update', contentBefore: null, contentAfter: null, summary: null, error: 'Already running' }; } runningTracks.add(key); @@ -78,7 +96,7 @@ export async function triggerTrackUpdate( const track = tracks.find(t => t.track.trackId === trackId); if (!track) { logger.log('track not found', trackId, filePath, trigger, context); - return { trackId, action: 'no_update', contentBefore: null, contentAfter: null, summary: null, error: 'Track not found' }; + return { trackId, runId: null, action: 'no_update', contentBefore: null, contentAfter: null, summary: null, error: 'Track not found' }; } const contentBefore = track.content; @@ -102,7 +120,7 @@ export async function triggerTrackUpdate( }); try { - await createMessage(agentRun.id, buildMessage(filePath, track, context)); + await createMessage(agentRun.id, buildMessage(filePath, track, trigger, context)); await waitForRunCompletion(agentRun.id); const summary = await extractAgentResponse(agentRun.id); @@ -125,6 +143,7 @@ export async function triggerTrackUpdate( return { trackId, + runId: agentRun.id, action: didUpdate ? 'replace' : 'no_update', contentBefore: contentBefore ?? null, contentAfter: contentAfter ?? null, @@ -141,7 +160,7 @@ export async function triggerTrackUpdate( error: msg, }); - return { trackId, action: 'no_update', contentBefore: contentBefore ?? null, contentAfter: null, summary: null, error: msg }; + return { trackId, runId: agentRun.id, action: 'no_update', contentBefore: contentBefore ?? null, contentAfter: null, summary: null, error: msg }; } } finally { runningTracks.delete(key); diff --git a/apps/x/packages/core/src/knowledge/track/scheduler.ts b/apps/x/packages/core/src/knowledge/track/scheduler.ts index e86a8ffc..19327dc7 100644 --- a/apps/x/packages/core/src/knowledge/track/scheduler.ts +++ b/apps/x/packages/core/src/knowledge/track/scheduler.ts @@ -1,43 +1,28 @@ -import fs from 'fs'; -import path from 'path'; import { PrefixLogger } from '@x/shared'; -import { WorkDir } from '../../config/config.js'; +import * as workspace from '../../workspace/workspace.js'; import { fetchAll } from './fileops.js'; import { triggerTrackUpdate } from './runner.js'; import { isTrackScheduleDue } from './schedule-utils.js'; const log = new PrefixLogger('TrackScheduler'); -const KNOWLEDGE_DIR = path.join(WorkDir, 'knowledge'); const POLL_INTERVAL_MS = 15_000; // 15 seconds -function scanMarkdownFiles(dir: string): string[] { - if (!fs.existsSync(dir)) return []; - const files: string[] = []; - const entries = fs.readdirSync(dir, { withFileTypes: true }); - for (const entry of entries) { - if (entry.name.startsWith('.')) continue; - const fullPath = path.join(dir, entry.name); - if (entry.isDirectory()) { - files.push(...scanMarkdownFiles(fullPath)); - } else if (entry.isFile() && entry.name.endsWith('.md')) { - files.push(fullPath); - } +async function listKnowledgeMarkdownFiles(): Promise { + try { + const entries = await workspace.readdir('knowledge', { recursive: true }); + return entries + .filter(e => e.kind === 'file' && e.name.endsWith('.md')) + .map(e => e.path.replace(/^knowledge\//, '')); + } catch { + return []; } - return files; } async function processScheduledTracks(): Promise { - if (!fs.existsSync(KNOWLEDGE_DIR)) { - log.log('Knowledge directory not found'); - return; - } - - const allFiles = scanMarkdownFiles(KNOWLEDGE_DIR); - log.log(`Scanning ${allFiles.length} markdown files`); - - for (const fullPath of allFiles) { - const relativePath = path.relative(KNOWLEDGE_DIR, fullPath); + const relativePaths = await listKnowledgeMarkdownFiles(); + log.log(`Scanning ${relativePaths.length} markdown files`); + for (const relativePath of relativePaths) { let tracks; try { tracks = await fetchAll(relativePath); diff --git a/apps/x/packages/shared/src/track-block.ts b/apps/x/packages/shared/src/track-block.ts index a1f992bc..c9e738b7 100644 --- a/apps/x/packages/shared/src/track-block.ts +++ b/apps/x/packages/shared/src/track-block.ts @@ -22,7 +22,7 @@ export type TrackSchedule = z.infer; export const TrackBlockSchema = z.object({ trackId: z.string().regex(/^[a-z0-9]+(-[a-z0-9]+)*$/).describe('Kebab-case identifier, unique within the note file'), instruction: z.string().min(1).describe('What the agent should produce each run — specific, single-focus, imperative'), - matchCriteria: z.string().optional().describe('Optional filter for event-driven tracks'), + eventMatchCriteria: z.string().optional().describe('When set, this track participates in event-based triggering. Describe what kinds of events should consider this track for an update (e.g. "Emails about Q3 planning"). Omit to disable event triggers — the track will only run on schedule or manually.'), active: z.boolean().default(true).describe('Set false to pause without deleting'), schedule: TrackScheduleSchema.optional(), lastRunAt: z.string().optional().describe('Runtime-managed — never write this yourself'), @@ -30,6 +30,39 @@ export const TrackBlockSchema = z.object({ lastRunSummary: z.string().optional().describe('Runtime-managed — never write this yourself'), }); +// --------------------------------------------------------------------------- +// Knowledge events (event-driven track triggering pipeline) +// --------------------------------------------------------------------------- + +export const KnowledgeEventSchema = z.object({ + id: z.string().describe('Monotonically increasing ID; also the filename in events/pending/'), + source: z.string().describe('Producer of the event (e.g. "gmail", "calendar")'), + type: z.string().describe('Event type (e.g. "email.synced")'), + createdAt: z.string().describe('ISO timestamp when the event was produced'), + payload: z.string().describe('Human-readable event body, usually markdown'), + targetTrackId: z.string().optional().describe('If set, skip routing and target this track directly (used for re-runs)'), + targetFilePath: z.string().optional(), + // Enriched on move from pending/ to done/ + processedAt: z.string().optional(), + candidates: z.array(z.object({ + trackId: z.string(), + filePath: z.string(), + })).optional(), + runIds: z.array(z.string()).optional(), + error: z.string().optional(), +}); + +export type KnowledgeEvent = z.infer; + +export const Pass1OutputSchema = z.object({ + candidates: z.array(z.object({ + trackId: z.string().describe('The track block identifier'), + filePath: z.string().describe('The note file path the track lives in'), + })).describe('Tracks that may be relevant to this event. trackIds are only unique within a file, so always return both fields.'), +}); + +export type Pass1Output = z.infer; + // Track bus events export const TrackRunStartEvent = z.object({ type: z.literal('track_run_start'),