evented works

This commit is contained in:
Ramnique Singh 2026-04-14 00:53:58 +05:30
parent 5b16a0a42e
commit 570a315b36
10 changed files with 445 additions and 50 deletions

View file

@ -126,6 +126,33 @@ Fires once at ` + "`" + `runAt` + "`" + ` and never again. Local time, no ` + "`
**Omit ` + "`" + `schedule` + "`" + ` entirely for a manual-only track** the user triggers it via the Play button in the UI.
## Event Triggers (third trigger type)
In addition to manual and scheduled, a track can be triggered by **events** incoming signals from the user's data sources (currently: gmail emails). Set ` + "`" + `eventMatchCriteria` + "`" + ` to a description of what kinds of events should consider this track for an update:
` + "```" + `track
trackId: q3-planning-emails
instruction: Maintain a running summary of decisions and open questions about Q3 planning, drawn from emails on the topic.
active: true
eventMatchCriteria: Emails about Q3 planning, roadmap decisions, or quarterly OKRs
` + "```" + `
How it works:
1. When a new event arrives (e.g. an email syncs), a fast LLM classifier checks ` + "`" + `eventMatchCriteria` + "`" + ` against the event content.
2. If it might match, the track-run agent receives both the event payload and the existing track content, and decides whether to actually update.
3. If the event isn't truly relevant on closer inspection, the agent skips the update no fabricated content.
When to suggest event triggers:
- The user wants to **maintain a living summary** of a topic ("keep notes on everything related to project X").
- The content depends on **incoming signals** rather than periodic refresh ("update this whenever a relevant email arrives").
- Mention to the user: scheduled (cron) is for time-driven updates; event is for signal-driven updates. They can be combined a track can have both a ` + "`" + `schedule` + "`" + ` and ` + "`" + `eventMatchCriteria` + "`" + ` (it'll run on schedule AND on relevant events).
Writing good ` + "`" + `eventMatchCriteria` + "`" + `:
- Be descriptive but not overly narrow Pass 1 routing is liberal by design.
- Examples: ` + "`" + `"Emails from John about the migration project"` + "`" + `, ` + "`" + `"Calendar events related to customer interviews"` + "`" + `, ` + "`" + `"Meeting notes that mention pricing changes"` + "`" + `.
Tracks **without** ` + "`" + `eventMatchCriteria` + "`" + ` opt out of events entirely they'll only run on schedule or manually.
## Insertion Workflow
### Cmd+K with cursor context

View file

@ -9,6 +9,7 @@ import { serviceLogger, type ServiceRunContext } from '../services/service_logge
import { limitEventItems } from './limit_event_items.js';
import { executeAction, useComposioForGoogle } from '../composio/client.js';
import { composioAccountsRepo } from '../composio/repo.js';
import { createEvent } from './track/events.js';
// Configuration
const SYNC_DIR = path.join(WorkDir, 'gmail_sync');
@ -172,6 +173,13 @@ async function processThread(auth: OAuth2Client, threadId: string, syncDir: stri
fs.writeFileSync(path.join(syncDir, `${threadId}.md`), mdContent);
console.log(`Synced Thread: ${subject} (${threadId})`);
await createEvent({
source: 'gmail',
type: 'email.synced',
createdAt: new Date().toISOString(),
payload: mdContent,
});
} catch (error) {
console.error(`Error processing thread ${threadId}:`, error);
}
@ -595,6 +603,12 @@ async function processThreadComposio(connectedAccountId: string, threadId: strin
fs.writeFileSync(path.join(syncDir, `${cleanFilename(threadId)}.md`), mdContent);
console.log(`[Gmail] Synced Thread: ${parsed.subject} (${threadId})`);
await createEvent({
source: 'gmail',
type: 'email.synced',
createdAt: new Date().toISOString(),
payload: mdContent,
});
newestDate = tryParseDate(parsed.date);
} else {
const firstParsed = parseMessageData(messages[0]);
@ -617,6 +631,12 @@ async function processThreadComposio(connectedAccountId: string, threadId: strin
fs.writeFileSync(path.join(syncDir, `${cleanFilename(threadId)}.md`), mdContent);
console.log(`[Gmail] Synced Thread: ${firstParsed.subject} (${threadId})`);
await createEvent({
source: 'gmail',
type: 'email.synced',
createdAt: new Date().toISOString(),
payload: mdContent,
});
}
if (!newestDate) return null;

View file

@ -0,0 +1,189 @@
import fs from 'fs';
import path from 'path';
import { PrefixLogger, trackBlock } from '@x/shared';
import type { KnowledgeEvent } from '@x/shared/dist/track-block.js';
import { WorkDir } from '../../config/config.js';
import * as workspace from '../../workspace/workspace.js';
import { fetchAll } from './fileops.js';
import { triggerTrackUpdate } from './runner.js';
import { findCandidates, type ParsedTrack } from './routing.js';
import type { IMonotonicallyIncreasingIdGenerator } from '../../application/lib/id-gen.js';
import container from '../../di/container.js';
const POLL_INTERVAL_MS = 5_000; // 5 seconds — events should feel responsive
const EVENTS_DIR = path.join(WorkDir, 'events');
const PENDING_DIR = path.join(EVENTS_DIR, 'pending');
const DONE_DIR = path.join(EVENTS_DIR, 'done');
const log = new PrefixLogger('EventProcessor');
/**
* Write a KnowledgeEvent to the events/pending/ directory.
* Filename is a monotonically increasing ID so events sort by creation order.
* Call this function in chronological order (oldest event first) within a sync batch
* to ensure correct ordering.
*/
export async function createEvent(event: Omit<KnowledgeEvent, 'id'>): Promise<void> {
fs.mkdirSync(PENDING_DIR, { recursive: true });
const idGen = container.resolve<IMonotonicallyIncreasingIdGenerator>('idGenerator');
const id = await idGen.next();
const fullEvent: KnowledgeEvent = { id, ...event };
const filePath = path.join(PENDING_DIR, `${id}.json`);
fs.writeFileSync(filePath, JSON.stringify(fullEvent, null, 2), 'utf-8');
}
function ensureDirs(): void {
fs.mkdirSync(PENDING_DIR, { recursive: true });
fs.mkdirSync(DONE_DIR, { recursive: true });
}
async function listAllTracks(): Promise<ParsedTrack[]> {
const tracks: ParsedTrack[] = [];
let entries;
try {
entries = await workspace.readdir('knowledge', { recursive: true });
} catch {
return tracks;
}
const mdFiles = entries
.filter(e => e.kind === 'file' && e.name.endsWith('.md'))
.map(e => e.path.replace(/^knowledge\//, ''));
for (const filePath of mdFiles) {
let parsedTracks;
try {
parsedTracks = await fetchAll(filePath);
} catch {
continue;
}
for (const t of parsedTracks) {
tracks.push({
trackId: t.track.trackId,
filePath,
eventMatchCriteria: t.track.eventMatchCriteria ?? '',
instruction: t.track.instruction,
active: t.track.active,
});
}
}
return tracks;
}
function moveEventToDone(filename: string, enriched: KnowledgeEvent): void {
const donePath = path.join(DONE_DIR, filename);
const pendingPath = path.join(PENDING_DIR, filename);
fs.writeFileSync(donePath, JSON.stringify(enriched, null, 2), 'utf-8');
try {
fs.unlinkSync(pendingPath);
} catch (err) {
log.log(`Failed to remove pending event ${filename}:`, err);
}
}
async function processOneEvent(filename: string): Promise<void> {
const pendingPath = path.join(PENDING_DIR, filename);
let event: KnowledgeEvent;
try {
const raw = fs.readFileSync(pendingPath, 'utf-8');
const parsed = JSON.parse(raw);
event = trackBlock.KnowledgeEventSchema.parse(parsed);
} catch (err) {
const msg = err instanceof Error ? err.message : String(err);
log.log(`Malformed event ${filename}, moving to done with error:`, msg);
const stub: KnowledgeEvent = {
id: filename.replace(/\.json$/, ''),
source: 'unknown',
type: 'unknown',
createdAt: new Date().toISOString(),
payload: '',
processedAt: new Date().toISOString(),
error: `Failed to parse: ${msg}`,
};
moveEventToDone(filename, stub);
return;
}
log.log(`Processing event ${event.id} (source=${event.source}, type=${event.type})`);
const allTracks = await listAllTracks();
const candidates = await findCandidates(event, allTracks);
const runIds: string[] = [];
let processingError: string | undefined;
// Sequential — preserves total ordering
for (const candidate of candidates) {
try {
const result = await triggerTrackUpdate(
candidate.trackId,
candidate.filePath,
event.payload,
'event',
);
if (result.runId) runIds.push(result.runId);
log.log(`Candidate ${candidate.trackId}: ${result.action}${result.error ? ` (${result.error})` : ''}`);
} catch (err) {
const msg = err instanceof Error ? err.message : String(err);
log.log(`Error triggering candidate ${candidate.trackId}:`, msg);
processingError = (processingError ? processingError + '; ' : '') + `${candidate.trackId}: ${msg}`;
}
}
const enriched: KnowledgeEvent = {
...event,
processedAt: new Date().toISOString(),
candidates: candidates.map(c => ({ trackId: c.trackId, filePath: c.filePath })),
runIds,
...(processingError ? { error: processingError } : {}),
};
moveEventToDone(filename, enriched);
}
async function processPendingEvents(): Promise<void> {
ensureDirs();
let filenames: string[];
try {
filenames = fs.readdirSync(PENDING_DIR).filter(f => f.endsWith('.json'));
} catch (err) {
log.log('Failed to read pending dir:', err);
return;
}
if (filenames.length === 0) return;
// FIFO: monotonic IDs are lexicographically sortable
filenames.sort();
log.log(`Processing ${filenames.length} pending event(s)`);
for (const filename of filenames) {
try {
await processOneEvent(filename);
} catch (err) {
log.log(`Unhandled error processing ${filename}:`, err);
// Keep the loop alive — don't move file, will retry on next tick
}
}
}
export async function init(): Promise<void> {
log.log(`Starting, polling every ${POLL_INTERVAL_MS / 1000}s`);
ensureDirs();
// Initial run
await processPendingEvents();
while (true) {
await new Promise(resolve => setTimeout(resolve, POLL_INTERVAL_MS));
try {
await processPendingEvents();
} catch (err) {
log.log('Error in main loop:', err);
}
}
}

View file

@ -0,0 +1,118 @@
import { generateObject } from 'ai';
import { trackBlock, PrefixLogger } from '@x/shared';
import type { KnowledgeEvent } from '@x/shared/dist/track-block.js';
import container from '../../di/container.js';
import type { IModelConfigRepo } from '../../models/repo.js';
import { createProvider } from '../../models/models.js';
import { isSignedIn } from '../../account/account.js';
import { getGatewayProvider } from '../../models/gateway.js';
const log = new PrefixLogger('TrackRouting');
const BATCH_SIZE = 20;
export interface ParsedTrack {
trackId: string;
filePath: string;
eventMatchCriteria: string;
instruction: string;
active: boolean;
}
const ROUTING_SYSTEM_PROMPT = `You are a routing classifier for a knowledge management system.
You will receive an event (something that happened an email, meeting, message, etc.) and a list of track blocks. Each track block has:
- trackId: an identifier (only unique within its file)
- filePath: the note file the track lives in
- eventMatchCriteria: a description of what kinds of signals are relevant to this track
Your job is to identify which track blocks MIGHT be relevant to this event.
Rules:
- Be LIBERAL in your selections. Include any track that is even moderately relevant.
- Prefer false positives over false negatives. It is much better to include a track that turns out to be irrelevant than to miss one that was relevant.
- Only exclude tracks that are CLEARLY and OBVIOUSLY irrelevant to the event.
- Do not attempt to judge whether the event contains enough information to update the track. That is handled by a later stage.
- Return an empty list only if no tracks are relevant at all.
- For each candidate, return BOTH trackId and filePath exactly as given. trackIds are not globally unique.`;
async function resolveModel() {
const repo = container.resolve<IModelConfigRepo>('modelConfigRepo');
const config = await repo.getConfig();
const signedIn = await isSignedIn();
const provider = signedIn
? await getGatewayProvider()
: createProvider(config.provider);
const modelId = config.knowledgeGraphModel
|| (signedIn ? 'gpt-5.4' : config.model);
return provider.languageModel(modelId);
}
function buildRoutingPrompt(event: KnowledgeEvent, batch: ParsedTrack[]): string {
const trackList = batch
.map((t, i) => `${i + 1}. trackId: ${t.trackId}\n filePath: ${t.filePath}\n eventMatchCriteria: ${t.eventMatchCriteria}`)
.join('\n\n');
return `## Event
Source: ${event.source}
Type: ${event.type}
Time: ${event.createdAt}
${event.payload}
## Track Blocks
${trackList}`;
}
function trackKey(trackId: string, filePath: string): string {
return `${filePath}::${trackId}`;
}
export async function findCandidates(
event: KnowledgeEvent,
allTracks: ParsedTrack[],
): Promise<ParsedTrack[]> {
// Short-circuit for targeted re-runs — skip LLM routing entirely
if (event.targetTrackId && event.targetFilePath) {
const target = allTracks.find(t =>
t.trackId === event.targetTrackId && t.filePath === event.targetFilePath
);
return target ? [target] : [];
}
const filtered = allTracks.filter(t =>
t.active && t.instruction && t.eventMatchCriteria
);
if (filtered.length === 0) {
log.log(`No event-eligible tracks (none with eventMatchCriteria)`);
return [];
}
log.log(`Routing event ${event.id} against ${filtered.length} track(s)`);
const model = await resolveModel();
const candidateKeys = new Set<string>();
for (let i = 0; i < filtered.length; i += BATCH_SIZE) {
const batch = filtered.slice(i, i + BATCH_SIZE);
try {
const { object } = await generateObject({
model,
system: ROUTING_SYSTEM_PROMPT,
prompt: buildRoutingPrompt(event, batch),
schema: trackBlock.Pass1OutputSchema,
});
for (const c of object.candidates) {
candidateKeys.add(trackKey(c.trackId, c.filePath));
}
} catch (err) {
log.log(`Routing batch ${i / BATCH_SIZE} failed:`, err);
}
}
const candidates = filtered.filter(t => candidateKeys.has(trackKey(t.trackId, t.filePath)));
log.log(`Event ${event.id}: ${candidates.length} candidate(s) — ${candidates.map(c => `${c.trackId}@${c.filePath}`).join(', ') || '(none)'}`);
return candidates;
}

View file

@ -8,6 +8,7 @@ import { PrefixLogger } from '@x/shared/dist/prefix-logger.js';
export interface TrackUpdateResult {
trackId: string;
runId: string | null;
action: 'replace' | 'no_update';
contentBefore: string | null;
contentAfter: string | null;
@ -19,7 +20,12 @@ export interface TrackUpdateResult {
// Agent run
// ---------------------------------------------------------------------------
function buildMessage(filePath: string, track: z.infer<typeof TrackStateSchema>, context?: string): string {
function buildMessage(
filePath: string,
track: z.infer<typeof TrackStateSchema>,
trigger: 'manual' | 'timed' | 'event',
context?: string,
): string {
const now = new Date();
const localNow = now.toLocaleString('en-US', { dateStyle: 'full', timeStyle: 'long' });
const tz = Intl.DateTimeFormat().resolvedOptions().timeZone;
@ -36,7 +42,19 @@ ${track.content || '(empty — first run)'}
Use \`update-track-content\` with filePath=\`${filePath}\` and trackId=\`${track.track.trackId}\`.`;
if (context) {
if (trigger === 'event') {
msg += `
**Trigger:** Event match (a Pass 1 routing classifier flagged this track as potentially relevant to the event below)
**Event match criteria for this track:**
${track.track.eventMatchCriteria ?? '(none — should not happen for event-triggered runs)'}
**Event payload:**
${context ?? '(no payload)'}
**Decision:** Determine whether this event genuinely warrants updating the track content. If the event is not meaningfully relevant on closer inspection, skip the update do NOT call \`update-track-content\`. Only call the tool if the event provides new or changed information that should be reflected in the track.`;
} else if (context) {
msg += `\n\n**Context:**\n${context}`;
}
@ -68,7 +86,7 @@ export async function triggerTrackUpdate(
logger.log('triggering track update', trackId, filePath, trigger, context);
if (runningTracks.has(key)) {
logger.log('skipping, already running');
return { trackId, action: 'no_update', contentBefore: null, contentAfter: null, summary: null, error: 'Already running' };
return { trackId, runId: null, action: 'no_update', contentBefore: null, contentAfter: null, summary: null, error: 'Already running' };
}
runningTracks.add(key);
@ -78,7 +96,7 @@ export async function triggerTrackUpdate(
const track = tracks.find(t => t.track.trackId === trackId);
if (!track) {
logger.log('track not found', trackId, filePath, trigger, context);
return { trackId, action: 'no_update', contentBefore: null, contentAfter: null, summary: null, error: 'Track not found' };
return { trackId, runId: null, action: 'no_update', contentBefore: null, contentAfter: null, summary: null, error: 'Track not found' };
}
const contentBefore = track.content;
@ -102,7 +120,7 @@ export async function triggerTrackUpdate(
});
try {
await createMessage(agentRun.id, buildMessage(filePath, track, context));
await createMessage(agentRun.id, buildMessage(filePath, track, trigger, context));
await waitForRunCompletion(agentRun.id);
const summary = await extractAgentResponse(agentRun.id);
@ -125,6 +143,7 @@ export async function triggerTrackUpdate(
return {
trackId,
runId: agentRun.id,
action: didUpdate ? 'replace' : 'no_update',
contentBefore: contentBefore ?? null,
contentAfter: contentAfter ?? null,
@ -141,7 +160,7 @@ export async function triggerTrackUpdate(
error: msg,
});
return { trackId, action: 'no_update', contentBefore: contentBefore ?? null, contentAfter: null, summary: null, error: msg };
return { trackId, runId: agentRun.id, action: 'no_update', contentBefore: contentBefore ?? null, contentAfter: null, summary: null, error: msg };
}
} finally {
runningTracks.delete(key);

View file

@ -1,43 +1,28 @@
import fs from 'fs';
import path from 'path';
import { PrefixLogger } from '@x/shared';
import { WorkDir } from '../../config/config.js';
import * as workspace from '../../workspace/workspace.js';
import { fetchAll } from './fileops.js';
import { triggerTrackUpdate } from './runner.js';
import { isTrackScheduleDue } from './schedule-utils.js';
const log = new PrefixLogger('TrackScheduler');
const KNOWLEDGE_DIR = path.join(WorkDir, 'knowledge');
const POLL_INTERVAL_MS = 15_000; // 15 seconds
function scanMarkdownFiles(dir: string): string[] {
if (!fs.existsSync(dir)) return [];
const files: string[] = [];
const entries = fs.readdirSync(dir, { withFileTypes: true });
for (const entry of entries) {
if (entry.name.startsWith('.')) continue;
const fullPath = path.join(dir, entry.name);
if (entry.isDirectory()) {
files.push(...scanMarkdownFiles(fullPath));
} else if (entry.isFile() && entry.name.endsWith('.md')) {
files.push(fullPath);
}
async function listKnowledgeMarkdownFiles(): Promise<string[]> {
try {
const entries = await workspace.readdir('knowledge', { recursive: true });
return entries
.filter(e => e.kind === 'file' && e.name.endsWith('.md'))
.map(e => e.path.replace(/^knowledge\//, ''));
} catch {
return [];
}
return files;
}
async function processScheduledTracks(): Promise<void> {
if (!fs.existsSync(KNOWLEDGE_DIR)) {
log.log('Knowledge directory not found');
return;
}
const allFiles = scanMarkdownFiles(KNOWLEDGE_DIR);
log.log(`Scanning ${allFiles.length} markdown files`);
for (const fullPath of allFiles) {
const relativePath = path.relative(KNOWLEDGE_DIR, fullPath);
const relativePaths = await listKnowledgeMarkdownFiles();
log.log(`Scanning ${relativePaths.length} markdown files`);
for (const relativePath of relativePaths) {
let tracks;
try {
tracks = await fetchAll(relativePath);