Add tracks — auto-updating note blocks with scheduled and event-driven triggers

Track blocks are YAML-fenced sections embedded in markdown notes whose output
is rewritten by a background agent. Three trigger types: manual (Run button or
Copilot), scheduled (cron / window / once with a 2 min grace window), and
event-driven (Gmail/Calendar sync events routed via an LLM classifier with a
second-pass agent decision). Output lives between <!--track-target:ID-->
comment markers that render as editable content in the Tiptap editor so users
can read and extend AI-generated content inline.

Core:
- Schedule and event pipelines run as independent polling loops (15s / 5s),
  both calling the same triggerTrackUpdate orchestrator. Events are FIFO via
  monotonic IDs; a per-track Set guards against duplicate runs.
- Track-run agent builds three message variants (manual/timed/event) — the
  event variant includes a Pass 2 directive to skip updates on false positives
  flagged by the liberal Pass 1 router.
- IPC surface: track:run/get/update/replaceYaml/delete plus tracks:events
  forward of the pub-sub bus to the renderer.
- Gmail emits per-thread events; Calendar bundles a digest per sync.

Copilot:
- New `tracks` skill (auto-generated canonical schema from Zod via
  z.toJSONSchema) teaches block creation, editing, and proactive suggestion.
- `run-track-block` tool with optional `context` parameter for backfills
  (e.g. seeding a new email-tracking block from existing synced emails).

Renderer:
- Tiptap chip (display-only) opens a rich modal with tabs, toggle, schedule
  details, raw YAML editor, and confirm-to-delete. All mutations go through
  IPC so the backend stays the single writer.
- Target regions use two atom marker nodes (open/close) around real editable
  content — custom blocks render natively, users can add their own notes.
- "Edit with Copilot" seeds a chat session with the note attached.

Docs: apps/x/TRACKS.md covers product flows, technical pipeline, and a
catalog of every LLM prompt involved with file+line pointers.
This commit is contained in:
Ramnique Singh 2026-04-14 13:51:45 +05:30 committed by GitHub
parent ab0147d475
commit e2c13f0f6f
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
33 changed files with 3405 additions and 2 deletions

View file

@ -11,6 +11,7 @@ import { execTool } from "../application/lib/exec-tool.js";
import { AskHumanRequestEvent, RunEvent, ToolPermissionRequestEvent } from "@x/shared/dist/runs.js";
import { BuiltinTools } from "../application/lib/builtin-tools.js";
import { buildCopilotAgent } from "../application/assistant/agent.js";
import { buildTrackRunAgent } from "../knowledge/track/run-agent.js";
import { isBlocked, extractCommandNames } from "../application/lib/command-executor.js";
import container from "../di/container.js";
import { IModelConfigRepo } from "../models/repo.js";
@ -372,6 +373,10 @@ export async function loadAgent(id: string): Promise<z.infer<typeof Agent>> {
return buildCopilotAgent();
}
if (id === "track-run") {
return buildTrackRunAgent();
}
if (id === 'note_creation') {
const raw = getNoteCreationRaw();
let agent: z.infer<typeof Agent> = {

View file

@ -70,6 +70,8 @@ Rowboat is an agentic assistant for everyday work - emails, meetings, projects,
**App Control:** When users ask you to open notes, show the bases or graph view, filter or search notes, or manage saved views, load the \`app-navigation\` skill first. It provides structured guidance for navigating the app UI and controlling the knowledge base view.
**Tracks (Auto-Updating Note Blocks):** When users ask you to **track**, **monitor**, **watch**, or **keep an eye on** something in a note or say things like "every morning tell me X", "show the current Y in this note", "pin live updates of Z here" load the \`tracks\` skill first. Also load it when a user presses Cmd+K with a note open and requests auto-refreshing content at the cursor. Track blocks are YAML-fenced scheduled blocks whose output is rewritten on each run — useful for weather, news, prices, status pages, and personal dashboards.
## Learning About the User (save-to-memory)

View file

@ -12,10 +12,13 @@ import createPresentationsSkill from "./create-presentations/skill.js";
import appNavigationSkill from "./app-navigation/skill.js";
import composioIntegrationSkill from "./composio-integration/skill.js";
import tracksSkill from "./tracks/skill.js";
const CURRENT_DIR = path.dirname(fileURLToPath(import.meta.url));
const CATALOG_PREFIX = "src/application/assistant/skills";
// console.log(tracksSkill);
type SkillDefinition = {
id: string; // Also used as folder name
title: string;
@ -96,6 +99,12 @@ const definitions: SkillDefinition[] = [
summary: "Navigate the app UI - open notes, switch views, filter/search the knowledge base, and manage saved views.",
content: appNavigationSkill,
},
{
id: "tracks",
title: "Tracks",
summary: "Create and manage track blocks — YAML-scheduled auto-updating content blocks in notes (weather, news, prices, status, dashboards). Insert at cursor (Cmd+K) or append to notes.",
content: tracksSkill,
},
];
const skillEntries = definitions.map((definition) => ({

View file

@ -0,0 +1,318 @@
import { z } from 'zod';
import { stringify as stringifyYaml } from 'yaml';
import { TrackBlockSchema } from '@x/shared/dist/track-block.js';
const schemaYaml = stringifyYaml(z.toJSONSchema(TrackBlockSchema)).trimEnd();
export const skill = String.raw`
# Tracks Skill
You are helping the user create and manage **track blocks** YAML-fenced, auto-updating content blocks embedded in notes. Load this skill whenever the user wants to track, monitor, watch, or keep an eye on something in a note, asks for recurring/auto-refreshing content ("every morning...", "show current...", "pin live X here"), or presses Cmd+K and requests auto-updating content at the cursor.
## What Is a Track Block
A track block is a scheduled, agent-run block embedded directly inside a markdown note. Each block has:
- A YAML-fenced ` + "`" + `track` + "`" + ` block that defines the instruction, schedule, and metadata.
- A sibling "target region" an HTML-comment-fenced area where the generated output lives. The runner rewrites the target region on each scheduled run.
**Concrete example** (a track that shows the current time in Chicago every hour):
` + "```" + `track
trackId: chicago-time
instruction: Show the current time in Chicago, IL in 12-hour format.
active: true
schedule:
type: cron
expression: "0 * * * *"
` + "```" + `
<!--track-target:chicago-time-->
<!--/track-target:chicago-time-->
Good use cases:
- Weather / air quality for a location
- News digests or headlines
- Stock or crypto prices
- Sports scores
- Service status pages
- Personal dashboards (today's calendar, steps, focus stats)
- Any recurring summary that decays fast
## Anatomy
Each track has two parts that live next to each other in the note:
1. The ` + "`" + `track` + "`" + ` code fence contains the YAML config. The fence language tag is literally ` + "`" + `track` + "`" + `.
2. The target-comment region ` + "`" + `<!--track-target:ID-->` + "`" + ` and ` + "`" + `<!--/track-target:ID-->` + "`" + ` with optional content between. The ID must match the ` + "`" + `trackId` + "`" + ` in the YAML.
The target region is **sibling**, not nested. It must **never** live inside the ` + "`" + "```" + `track` + "`" + ` fence.
## Canonical Schema
Below is the authoritative schema for a track block (generated at build time from the TypeScript source never out of date). Use it to validate every field name, type, and constraint before writing YAML:
` + "```" + `yaml
${schemaYaml}
` + "```" + `
**Runtime-managed fields never write these yourself:** ` + "`" + `lastRunAt` + "`" + `, ` + "`" + `lastRunId` + "`" + `, ` + "`" + `lastRunSummary` + "`" + `.
## Choosing a trackId
- Kebab-case, short, descriptive: ` + "`" + `chicago-time` + "`" + `, ` + "`" + `sfo-weather` + "`" + `, ` + "`" + `hn-top5` + "`" + `, ` + "`" + `btc-usd` + "`" + `.
- **Must be unique within the note file.** Before inserting, read the file and check:
- All existing ` + "`" + `trackId:` + "`" + ` lines in ` + "`" + "```" + `track` + "`" + ` blocks
- All existing ` + "`" + `<!--track-target:...-->` + "`" + ` comments
- If you need disambiguation, add scope: ` + "`" + `btc-price-usd` + "`" + `, ` + "`" + `weather-home` + "`" + `, ` + "`" + `news-ai-2` + "`" + `.
- Don't reuse an old ID even if the previous block was deleted pick a fresh one.
## Writing a Good Instruction
- **Specific and actionable.** State exactly what to fetch or compute.
- **Single-focus.** One block = one purpose. Split "weather + news + stocks" into three blocks, don't bundle.
- **Imperative voice, 1-3 sentences.**
- **Mention output style** if it matters ("markdown bullet list", "one sentence", "table with 5 rows").
Good:
> Fetch the current temperature, feels-like, and conditions for Chicago, IL in Fahrenheit. Return as a single line: "72°F (feels like 70°F), partly cloudy".
Bad:
> Tell me about Chicago.
## Schedules
Schedule is an **optional** discriminated union. Three types:
### ` + "`" + `cron` + "`" + ` recurring at exact times
` + "```" + `yaml
schedule:
type: cron
expression: "0 * * * *"
` + "```" + `
Fires at the exact cron time. Use when the user wants precise timing ("at 9am daily", "every hour on the hour").
### ` + "`" + `window` + "`" + ` recurring within a time-of-day range
` + "```" + `yaml
schedule:
type: window
cron: "0 0 * * 1-5"
startTime: "09:00"
endTime: "17:00"
` + "```" + `
Fires **at most once per cron occurrence**, but only if the current time is within ` + "`" + `startTime` + "`" + `` + "`" + `endTime` + "`" + ` (24-hour HH:MM, local). Use when the user wants "sometime in the morning" or "once per weekday during work hours" flexible timing with bounds.
### ` + "`" + `once` + "`" + ` one-shot at a future time
` + "```" + `yaml
schedule:
type: once
runAt: "2026-04-14T09:00:00"
` + "```" + `
Fires once at ` + "`" + `runAt` + "`" + ` and never again. Local time, no ` + "`" + `Z` + "`" + ` suffix.
### Cron cookbook
- ` + "`" + `"*/15 * * * *"` + "`" + ` every 15 minutes
- ` + "`" + `"0 * * * *"` + "`" + ` every hour on the hour
- ` + "`" + `"0 8 * * *"` + "`" + ` daily at 8am
- ` + "`" + `"0 9 * * 1-5"` + "`" + ` weekdays at 9am
- ` + "`" + `"0 0 * * 0"` + "`" + ` Sundays at midnight
- ` + "`" + `"0 0 1 * *"` + "`" + ` first of month at midnight
**Omit ` + "`" + `schedule` + "`" + ` entirely for a manual-only track** the user triggers it via the Play button in the UI.
## Event Triggers (third trigger type)
In addition to manual and scheduled, a track can be triggered by **events** incoming signals from the user's data sources (currently: gmail emails). Set ` + "`" + `eventMatchCriteria` + "`" + ` to a description of what kinds of events should consider this track for an update:
` + "```" + `track
trackId: q3-planning-emails
instruction: Maintain a running summary of decisions and open questions about Q3 planning, drawn from emails on the topic.
active: true
eventMatchCriteria: Emails about Q3 planning, roadmap decisions, or quarterly OKRs
` + "```" + `
How it works:
1. When a new event arrives (e.g. an email syncs), a fast LLM classifier checks ` + "`" + `eventMatchCriteria` + "`" + ` against the event content.
2. If it might match, the track-run agent receives both the event payload and the existing track content, and decides whether to actually update.
3. If the event isn't truly relevant on closer inspection, the agent skips the update no fabricated content.
When to suggest event triggers:
- The user wants to **maintain a living summary** of a topic ("keep notes on everything related to project X").
- The content depends on **incoming signals** rather than periodic refresh ("update this whenever a relevant email arrives").
- Mention to the user: scheduled (cron) is for time-driven updates; event is for signal-driven updates. They can be combined a track can have both a ` + "`" + `schedule` + "`" + ` and ` + "`" + `eventMatchCriteria` + "`" + ` (it'll run on schedule AND on relevant events).
Writing good ` + "`" + `eventMatchCriteria` + "`" + `:
- Be descriptive but not overly narrow Pass 1 routing is liberal by design.
- Examples: ` + "`" + `"Emails from John about the migration project"` + "`" + `, ` + "`" + `"Calendar events related to customer interviews"` + "`" + `, ` + "`" + `"Meeting notes that mention pricing changes"` + "`" + `.
Tracks **without** ` + "`" + `eventMatchCriteria` + "`" + ` opt out of events entirely they'll only run on schedule or manually.
## Insertion Workflow
### Cmd+K with cursor context
When the user invokes Cmd+K, the context includes an attachment mention like:
> User has attached the following files:
> - notes.md (text/markdown) at knowledge/notes.md (line 42)
Workflow:
1. Extract the ` + "`" + `path` + "`" + ` and ` + "`" + `line N` + "`" + ` from the attachment.
2. ` + "`" + `workspace-readFile({ path })` + "`" + ` always re-read fresh.
3. Check existing ` + "`" + `trackId` + "`" + `s in the file to guarantee uniqueness.
4. Locate the line. Pick a **unique 2-3 line anchor** around line N (a full heading, a distinctive sentence). Avoid blank lines and generic text.
5. Construct the full track block (YAML + target pair).
6. ` + "`" + `workspace-edit({ path, oldString: <anchor>, newString: <anchor with block spliced at line N> })` + "`" + `.
### Sidebar chat with a specific note
1. If a file is mentioned/attached, read it.
2. If ambiguous, ask one question: "Which note should I add the track to?"
3. **Default placement: append** to the end of the file. Find the last non-empty line as the anchor. ` + "`" + `newString` + "`" + ` = that line + ` + "`" + `\n\n` + "`" + ` + track block + target pair.
4. If the user specified a section ("under the Weather heading"), anchor on that heading.
### No note context at all
Ask one question: "Which note should this track live in?" Don't create a new note unless the user asks.
## The Exact Text to Insert
Write it verbatim like this (including the blank line between fence and target):
` + "```" + `track
trackId: <id>
instruction: <instruction>
active: true
schedule:
type: cron
expression: "0 * * * *"
` + "```" + `
<!--track-target:<id>-->
<!--/track-target:<id>-->
**Rules:**
- One blank line between the closing ` + "`" + "```" + `" + " fence and the ` + "`" + `<!--track-target:ID-->` + "`" + `.
- Target pair is **empty on creation**. The runner fills it on the first run.
- **Always quote cron expressions** in YAML they contain spaces and ` + "`" + `*` + "`" + `.
- Use 2-space YAML indent. No tabs.
- Top-level markdown only never inside a code fence, blockquote, or table.
## After Insertion
- Confirm in one line: "Added ` + "`" + `chicago-time` + "`" + ` track, refreshing hourly."
- **Then offer to run it once now** (see "Running a Track" below) especially valuable for newly created blocks where the target region is otherwise empty until the next scheduled or event-triggered run.
- **Do not** write anything into the ` + "`" + `<!--track-target:...-->` + "`" + ` region yourself use the ` + "`" + `run-track-block` + "`" + ` tool to delegate to the track agent.
## Running a Track (the ` + "`" + `run-track-block` + "`" + ` tool)
The ` + "`" + `run-track-block` + "`" + ` tool manually triggers a track run right now. Equivalent to the user clicking the Play button but you can pass extra ` + "`" + `context` + "`" + ` to bias what the track agent does on this single run (without modifying the block's ` + "`" + `instruction` + "`" + `).
### When to proactively offer to run
These are upsells ask first, don't run silently.
- **Just created a new track block.** Before declaring done, offer:
> "Want me to run it once now to seed the initial content?"
This is **especially valuable for event-triggered tracks** (with ` + "`" + `eventMatchCriteria` + "`" + `) otherwise the target region stays empty until the next matching event arrives.
For tracks that pull from existing local data (synced emails, calendar, meeting notes), suggest a **backfill** with explicit context (see below).
- **Just edited an existing track.** Offer:
> "Want me to run it now to see the updated output?"
- **Explicit user request.** "run the X track", "test it", "refresh that block" call the tool directly.
### Using the ` + "`" + `context` + "`" + ` parameter (the powerful case)
The ` + "`" + `context` + "`" + ` parameter is extra guidance for the track agent on this run only. It's the difference between a stock refresh and a smart backfill.
**Examples:**
- New track: "Track emails about Q3 planning" after creating it, run with:
> context: "Initial backfill — scan ` + "`" + `gmail_sync/` + "`" + ` for emails from the last 90 days that match this track's topic (Q3 planning, OKRs, roadmap), and synthesize the initial summary."
- New track: "Summarize this week's customer calls" run with:
> context: "Backfill from this week's meeting notes in ` + "`" + `granola_sync/` + "`" + ` and ` + "`" + `fireflies_sync/` + "`" + `."
- Manual refresh after the user mentions a recent change:
> context: "Focus on changes from the last 7 days only."
- Plain refresh (user says "run it now"): **omit ` + "`" + `context` + "`" + ` entirely**. Don't invent context it can mislead the agent.
### What to do with the result
The tool returns ` + "`" + `{ success, runId, action, summary, contentAfter, error }` + "`" + `:
- **` + "`" + `action: 'replace'` + "`" + `** the track was updated. Confirm with one line, optionally citing the first line of ` + "`" + `contentAfter` + "`" + `:
> "Done — track now shows: 72°F, partly cloudy in Chicago."
- **` + "`" + `action: 'no_update'` + "`" + `** the agent decided nothing needed to change. Tell the user briefly; ` + "`" + `summary` + "`" + ` may explain why.
- **` + "`" + `error` + "`" + ` set** surface it concisely. If the error is ` + "`" + `'Already running'` + "`" + ` (concurrency guard), let the user know the track is mid-run and to retry shortly.
### Don'ts
- **Don't auto-run** after every edit ask first.
- **Don't pass ` + "`" + `context` + "`" + `** for a plain refresh — only when there's specific extra guidance to give.
- **Don't use ` + "`" + `run-track-block` + "`" + ` to manually write content** — that's ` + "`" + `update-track-content` + "`" + `'s job (and even that should be rare; the track agent handles content via this tool).
- **Don't ` + "`" + `run-track-block` + "`" + ` repeatedly** in a single turn one run per user-facing action.
## Proactive Suggestions
When the user signals interest in recurring or time-decaying info, **offer a track block** instead of a one-off answer. Signals:
- "I want to track / monitor / watch / keep an eye on / follow X"
- "Can you check on X every morning / hourly / weekly?"
- The user just asked a one-off question whose answer decays (weather, score, price, status, news).
- The user is building a time-sensitive page (weekly dashboard, morning briefing).
Suggestion style one line, concrete:
> "I can turn this into a track block that refreshes hourly — want that?"
Don't upsell aggressively. If the user clearly wants a one-off answer, give them one.
## Don'ts
- **Don't reuse** an existing ` + "`" + `trackId` + "`" + ` in the same file.
- **Don't add ` + "`" + `schedule` + "`" + `** if the user explicitly wants a manual-only track.
- **Don't write** ` + "`" + `lastRunAt` + "`" + `, ` + "`" + `lastRunId` + "`" + `, or ` + "`" + `lastRunSummary` + "`" + ` runtime-managed.
- **Don't nest** the ` + "`" + `<!--track-target:ID-->` + "`" + ` region inside the ` + "`" + "```" + `track` + "`" + ` fence.
- **Don't touch** content between ` + "`" + `<!--track-target:ID-->` + "`" + ` and ` + "`" + `<!--/track-target:ID-->` + "`" + ` — that's generated content.
- **Don't schedule** with ` + "`" + `"* * * * *"` + "`" + ` (every minute) unless the user explicitly asks.
- **Don't add a ` + "`" + `Z` + "`" + ` suffix** on ` + "`" + `runAt` + "`" + ` local time only.
- **Don't use ` + "`" + `workspace-writeFile` + "`" + `** to rewrite the whole file always ` + "`" + `workspace-edit` + "`" + ` with a unique anchor.
## Editing or Removing an Existing Track
**Change schedule or instruction:** read the file, ` + "`" + `workspace-edit` + "`" + ` the YAML body. Anchor on the unique ` + "`" + `trackId: <id>` + "`" + ` line plus a few surrounding lines.
**Pause without deleting:** flip ` + "`" + `active: false` + "`" + `.
**Remove entirely:** ` + "`" + `workspace-edit` + "`" + ` with ` + "`" + `oldString` + "`" + ` = the full ` + "`" + "```" + `track` + "`" + ` block **plus** the target pair (so generated content also disappears), ` + "`" + `newString` + "`" + ` = empty.
## Quick Reference
Minimal template:
` + "```" + `track
trackId: <kebab-id>
instruction: <what to produce>
active: true
schedule:
type: cron
expression: "0 * * * *"
` + "```" + `
<!--track-target:<kebab-id>-->
<!--/track-target:<kebab-id>-->
Top cron expressions: ` + "`" + `"0 * * * *"` + "`" + ` (hourly), ` + "`" + `"0 8 * * *"` + "`" + ` (daily 8am), ` + "`" + `"0 9 * * 1-5"` + "`" + ` (weekdays 9am), ` + "`" + `"*/15 * * * *"` + "`" + ` (every 15m).
`;
export default skill;

View file

@ -25,6 +25,7 @@ import { isSignedIn } from "../../account/account.js";
import { getGatewayProvider } from "../../models/gateway.js";
import { getAccessToken } from "../../auth/tokens.js";
import { API_URL } from "../../config/env.js";
import { updateContent, updateTrackBlock } from "../../knowledge/track/fileops.js";
// Parser libraries are loaded dynamically inside parseFile.execute()
// to avoid pulling pdfjs-dist's DOM polyfills into the main bundle.
// Import paths are computed so esbuild cannot statically resolve them.
@ -1431,4 +1432,56 @@ export const BuiltinTools: z.infer<typeof BuiltinToolsSchema> = {
},
isAvailable: async () => isComposioConfigured(),
},
'update-track-content': {
description: "Update the output content of a track block in a knowledge note. This replaces the content inside the track's target region (between <!--track-target:ID--> markers), or creates the target region if it doesn't exist. Also updates the track's lastRunAt timestamp.",
inputSchema: z.object({
filePath: z.string().describe("Workspace-relative path to the note file (e.g., 'knowledge/Notes/my-note.md')"),
trackId: z.string().describe("The track block's trackId"),
content: z.string().describe("The new content to place inside the track's target region"),
}),
execute: async ({ filePath, trackId, content }: { filePath: string; trackId: string; content: string }) => {
try {
await updateContent(filePath, trackId, content);
await updateTrackBlock(filePath, trackId, { lastRunAt: new Date().toISOString() });
return { success: true, message: `Updated track ${trackId} in ${filePath}` };
} catch (err) {
const msg = err instanceof Error ? err.message : String(err);
return { success: false, error: msg };
}
},
},
'run-track-block': {
description: "Manually trigger a track block to run now. Equivalent to the user clicking the Play button on the block, but you can pass extra `context` to bias what the track agent does this run — most useful for backfills (e.g. seeding a new email-tracking block from existing synced emails) or focused refreshes. Returns the action taken, summary, and the new content.",
inputSchema: z.object({
filePath: z.string().describe("Workspace-relative path to the note file (e.g., 'knowledge/Notes/my-note.md')"),
trackId: z.string().describe("The track block's trackId (must exist in the file)"),
context: z.string().optional().describe(
"Optional extra context for the track agent to consider for THIS run only — does not modify the block's instruction. " +
"Use it to drive backfills (e.g. 'Backfill from existing synced emails in gmail_sync/ from the last 90 days about this topic') " +
"or focused refreshes (e.g. 'Focus on changes from the last 7 days'). " +
"Omit for a plain refresh."
),
}),
execute: async ({ filePath, trackId, context }: { filePath: string; trackId: string; context?: string }) => {
const knowledgeRelativePath = filePath.replace(/^knowledge\//, '');
try {
// Lazy import to break a module-init cycle:
// builtin-tools → track/runner → runs/runs → agents/runtime → builtin-tools
const { triggerTrackUpdate } = await import("../../knowledge/track/runner.js");
const result = await triggerTrackUpdate(trackId, knowledgeRelativePath, context, 'manual');
return {
success: !result.error,
runId: result.runId,
action: result.action,
summary: result.summary,
contentAfter: result.contentAfter,
error: result.error,
};
} catch (err) {
const msg = err instanceof Error ? err.message : String(err);
return { success: false, error: msg };
}
},
},
};

View file

@ -9,6 +9,130 @@ import { serviceLogger, type ServiceRunContext } from '../services/service_logge
import { limitEventItems } from './limit_event_items.js';
import { executeAction, useComposioForGoogleCalendar } from '../composio/client.js';
import { composioAccountsRepo } from '../composio/repo.js';
import { createEvent } from './track/events.js';
const MAX_EVENTS_IN_DIGEST = 50;
const MAX_DESCRIPTION_CHARS = 500;
type AnyEvent = Record<string, unknown> | cal.Schema$Event;
function getStr(obj: unknown, key: string): string | undefined {
if (obj && typeof obj === 'object' && key in obj) {
const v = (obj as Record<string, unknown>)[key];
return typeof v === 'string' ? v : undefined;
}
return undefined;
}
function formatEventTime(event: AnyEvent): string {
const start = (event as Record<string, unknown>).start as Record<string, unknown> | undefined;
const end = (event as Record<string, unknown>).end as Record<string, unknown> | undefined;
const startStr = getStr(start, 'dateTime') ?? getStr(start, 'date') ?? 'unknown';
const endStr = getStr(end, 'dateTime') ?? getStr(end, 'date') ?? 'unknown';
return `${startStr}${endStr}`;
}
function formatEventBlock(event: AnyEvent, label: 'NEW' | 'UPDATED'): string {
const id = getStr(event, 'id') ?? '(unknown id)';
const title = getStr(event, 'summary') ?? '(no title)';
const time = formatEventTime(event);
const organizer = getStr((event as Record<string, unknown>).organizer, 'email') ?? 'unknown';
const location = getStr(event, 'location') ?? '';
const rawDescription = getStr(event, 'description') ?? '';
const description = rawDescription.length > MAX_DESCRIPTION_CHARS
? rawDescription.slice(0, MAX_DESCRIPTION_CHARS) + '…(truncated)'
: rawDescription;
const attendeesRaw = (event as Record<string, unknown>).attendees;
let attendeesLine = '';
if (Array.isArray(attendeesRaw) && attendeesRaw.length > 0) {
const emails = attendeesRaw
.map(a => getStr(a, 'email'))
.filter((e): e is string => !!e);
if (emails.length > 0) {
attendeesLine = `**Attendees:** ${emails.join(', ')}\n`;
}
}
return [
`### [${label}] ${title}`,
`**ID:** ${id}`,
`**Time:** ${time}`,
`**Organizer:** ${organizer}`,
location ? `**Location:** ${location}` : '',
attendeesLine.trimEnd(),
description ? `\n${description}` : '',
].filter(Boolean).join('\n');
}
function summarizeCalendarSync(
newEvents: AnyEvent[],
updatedEvents: AnyEvent[],
deletedEventIds: string[],
): string {
const totalChanges = newEvents.length + updatedEvents.length + deletedEventIds.length;
const lines: string[] = [
`# Calendar sync update`,
``,
`${newEvents.length} new, ${updatedEvents.length} updated, ${deletedEventIds.length} deleted.`,
``,
];
const allChanges: Array<{ event: AnyEvent; label: 'NEW' | 'UPDATED' }> = [
...newEvents.map(e => ({ event: e, label: 'NEW' as const })),
...updatedEvents.map(e => ({ event: e, label: 'UPDATED' as const })),
];
const shown = allChanges.slice(0, MAX_EVENTS_IN_DIGEST);
const hidden = allChanges.length - shown.length;
if (shown.length > 0) {
lines.push(`## Changed events`, ``);
for (const { event, label } of shown) {
lines.push(formatEventBlock(event, label), ``);
}
if (hidden > 0) {
lines.push(`_…and ${hidden} more change(s) omitted from digest._`, ``);
}
}
if (deletedEventIds.length > 0) {
lines.push(`## Deleted event IDs`, ``);
for (const id of deletedEventIds.slice(0, MAX_EVENTS_IN_DIGEST)) {
lines.push(`- ${id}`);
}
if (deletedEventIds.length > MAX_EVENTS_IN_DIGEST) {
lines.push(`- _…and ${deletedEventIds.length - MAX_EVENTS_IN_DIGEST} more_`);
}
lines.push(``);
}
if (totalChanges === 0) {
lines.push(`(no changes — should not be emitted)`);
}
return lines.join('\n');
}
async function publishCalendarSyncEvent(
newEvents: AnyEvent[],
updatedEvents: AnyEvent[],
deletedEventIds: string[],
): Promise<void> {
if (newEvents.length === 0 && updatedEvents.length === 0 && deletedEventIds.length === 0) {
return;
}
try {
await createEvent({
source: 'calendar',
type: 'calendar.synced',
createdAt: new Date().toISOString(),
payload: summarizeCalendarSync(newEvents, updatedEvents, deletedEventIds),
});
} catch (err) {
console.error('[Calendar] Failed to publish sync event:', err);
}
}
// Configuration
const SYNC_DIR = path.join(WorkDir, 'calendar_sync');
@ -194,6 +318,8 @@ async function syncCalendarWindow(auth: OAuth2Client, syncDir: string, lookbackD
let deletedCount = 0;
let attachmentCount = 0;
const changedTitles: string[] = [];
const newEvents: AnyEvent[] = [];
const updatedEvents: AnyEvent[] = [];
const ensureRun = async () => {
if (!runId) {
@ -234,8 +360,10 @@ async function syncCalendarWindow(auth: OAuth2Client, syncDir: string, lookbackD
changedTitles.push(result.title);
if (result.isNew) {
newCount++;
newEvents.push(event);
} else {
updatedCount++;
updatedEvents.push(event);
}
}
@ -253,6 +381,9 @@ async function syncCalendarWindow(auth: OAuth2Client, syncDir: string, lookbackD
deletedCount = deletedFiles.length;
}
// Publish a single bundled event capturing all changes from this sync.
await publishCalendarSyncEvent(newEvents, updatedEvents, deletedFiles);
if (runId) {
const totalChanges = newCount + updatedCount + deletedCount + attachmentCount;
const limitedTitles = limitEventItems(changedTitles);
@ -438,6 +569,8 @@ async function performSyncComposio() {
let newCount = 0;
let updatedCount = 0;
const changedTitles: string[] = [];
const newEvents: AnyEvent[] = [];
const updatedEvents: AnyEvent[] = [];
let pageToken: string | null = null;
const MAX_PAGES = 20;
@ -508,8 +641,10 @@ async function performSyncComposio() {
changedTitles.push(saveResult.title);
if (saveResult.isNew) {
newCount++;
newEvents.push(event);
} else {
updatedCount++;
updatedEvents.push(event);
}
}
}
@ -534,6 +669,9 @@ async function performSyncComposio() {
deletedCount = deletedFiles.length;
}
// Publish a single bundled event capturing all changes from this sync.
await publishCalendarSyncEvent(newEvents, updatedEvents, deletedFiles);
// Log results if any changes were detected (run was started by ensureRun)
if (run) {
const r = run as ServiceRunContext;

View file

@ -9,6 +9,7 @@ import { serviceLogger, type ServiceRunContext } from '../services/service_logge
import { limitEventItems } from './limit_event_items.js';
import { executeAction, useComposioForGoogle } from '../composio/client.js';
import { composioAccountsRepo } from '../composio/repo.js';
import { createEvent } from './track/events.js';
// Configuration
const SYNC_DIR = path.join(WorkDir, 'gmail_sync');
@ -172,6 +173,13 @@ async function processThread(auth: OAuth2Client, threadId: string, syncDir: stri
fs.writeFileSync(path.join(syncDir, `${threadId}.md`), mdContent);
console.log(`Synced Thread: ${subject} (${threadId})`);
await createEvent({
source: 'gmail',
type: 'email.synced',
createdAt: new Date().toISOString(),
payload: mdContent,
});
} catch (error) {
console.error(`Error processing thread ${threadId}:`, error);
}
@ -595,6 +603,12 @@ async function processThreadComposio(connectedAccountId: string, threadId: strin
fs.writeFileSync(path.join(syncDir, `${cleanFilename(threadId)}.md`), mdContent);
console.log(`[Gmail] Synced Thread: ${parsed.subject} (${threadId})`);
await createEvent({
source: 'gmail',
type: 'email.synced',
createdAt: new Date().toISOString(),
payload: mdContent,
});
newestDate = tryParseDate(parsed.date);
} else {
const firstParsed = parseMessageData(messages[0]);
@ -617,6 +631,12 @@ async function processThreadComposio(connectedAccountId: string, threadId: strin
fs.writeFileSync(path.join(syncDir, `${cleanFilename(threadId)}.md`), mdContent);
console.log(`[Gmail] Synced Thread: ${firstParsed.subject} (${threadId})`);
await createEvent({
source: 'gmail',
type: 'email.synced',
createdAt: new Date().toISOString(),
payload: mdContent,
});
}
if (!newestDate) return null;

View file

@ -0,0 +1,23 @@
import type { TrackEventType } from '@x/shared/dist/track-block.js';
type Handler = (event: TrackEventType) => void;
class TrackBus {
private subs: Handler[] = [];
publish(event: TrackEventType): void {
for (const handler of this.subs) {
handler(event);
}
}
subscribe(handler: Handler): () => void {
this.subs.push(handler);
return () => {
const idx = this.subs.indexOf(handler);
if (idx >= 0) this.subs.splice(idx, 1);
};
}
}
export const trackBus = new TrackBus();

View file

@ -0,0 +1,189 @@
import fs from 'fs';
import path from 'path';
import { PrefixLogger, trackBlock } from '@x/shared';
import type { KnowledgeEvent } from '@x/shared/dist/track-block.js';
import { WorkDir } from '../../config/config.js';
import * as workspace from '../../workspace/workspace.js';
import { fetchAll } from './fileops.js';
import { triggerTrackUpdate } from './runner.js';
import { findCandidates, type ParsedTrack } from './routing.js';
import type { IMonotonicallyIncreasingIdGenerator } from '../../application/lib/id-gen.js';
import container from '../../di/container.js';
const POLL_INTERVAL_MS = 5_000; // 5 seconds — events should feel responsive
const EVENTS_DIR = path.join(WorkDir, 'events');
const PENDING_DIR = path.join(EVENTS_DIR, 'pending');
const DONE_DIR = path.join(EVENTS_DIR, 'done');
const log = new PrefixLogger('EventProcessor');
/**
* Write a KnowledgeEvent to the events/pending/ directory.
* Filename is a monotonically increasing ID so events sort by creation order.
* Call this function in chronological order (oldest event first) within a sync batch
* to ensure correct ordering.
*/
export async function createEvent(event: Omit<KnowledgeEvent, 'id'>): Promise<void> {
fs.mkdirSync(PENDING_DIR, { recursive: true });
const idGen = container.resolve<IMonotonicallyIncreasingIdGenerator>('idGenerator');
const id = await idGen.next();
const fullEvent: KnowledgeEvent = { id, ...event };
const filePath = path.join(PENDING_DIR, `${id}.json`);
fs.writeFileSync(filePath, JSON.stringify(fullEvent, null, 2), 'utf-8');
}
function ensureDirs(): void {
fs.mkdirSync(PENDING_DIR, { recursive: true });
fs.mkdirSync(DONE_DIR, { recursive: true });
}
async function listAllTracks(): Promise<ParsedTrack[]> {
const tracks: ParsedTrack[] = [];
let entries;
try {
entries = await workspace.readdir('knowledge', { recursive: true });
} catch {
return tracks;
}
const mdFiles = entries
.filter(e => e.kind === 'file' && e.name.endsWith('.md'))
.map(e => e.path.replace(/^knowledge\//, ''));
for (const filePath of mdFiles) {
let parsedTracks;
try {
parsedTracks = await fetchAll(filePath);
} catch {
continue;
}
for (const t of parsedTracks) {
tracks.push({
trackId: t.track.trackId,
filePath,
eventMatchCriteria: t.track.eventMatchCriteria ?? '',
instruction: t.track.instruction,
active: t.track.active,
});
}
}
return tracks;
}
function moveEventToDone(filename: string, enriched: KnowledgeEvent): void {
const donePath = path.join(DONE_DIR, filename);
const pendingPath = path.join(PENDING_DIR, filename);
fs.writeFileSync(donePath, JSON.stringify(enriched, null, 2), 'utf-8');
try {
fs.unlinkSync(pendingPath);
} catch (err) {
log.log(`Failed to remove pending event ${filename}:`, err);
}
}
async function processOneEvent(filename: string): Promise<void> {
const pendingPath = path.join(PENDING_DIR, filename);
let event: KnowledgeEvent;
try {
const raw = fs.readFileSync(pendingPath, 'utf-8');
const parsed = JSON.parse(raw);
event = trackBlock.KnowledgeEventSchema.parse(parsed);
} catch (err) {
const msg = err instanceof Error ? err.message : String(err);
log.log(`Malformed event ${filename}, moving to done with error:`, msg);
const stub: KnowledgeEvent = {
id: filename.replace(/\.json$/, ''),
source: 'unknown',
type: 'unknown',
createdAt: new Date().toISOString(),
payload: '',
processedAt: new Date().toISOString(),
error: `Failed to parse: ${msg}`,
};
moveEventToDone(filename, stub);
return;
}
log.log(`Processing event ${event.id} (source=${event.source}, type=${event.type})`);
const allTracks = await listAllTracks();
const candidates = await findCandidates(event, allTracks);
const runIds: string[] = [];
let processingError: string | undefined;
// Sequential — preserves total ordering
for (const candidate of candidates) {
try {
const result = await triggerTrackUpdate(
candidate.trackId,
candidate.filePath,
event.payload,
'event',
);
if (result.runId) runIds.push(result.runId);
log.log(`Candidate ${candidate.trackId}: ${result.action}${result.error ? ` (${result.error})` : ''}`);
} catch (err) {
const msg = err instanceof Error ? err.message : String(err);
log.log(`Error triggering candidate ${candidate.trackId}:`, msg);
processingError = (processingError ? processingError + '; ' : '') + `${candidate.trackId}: ${msg}`;
}
}
const enriched: KnowledgeEvent = {
...event,
processedAt: new Date().toISOString(),
candidates: candidates.map(c => ({ trackId: c.trackId, filePath: c.filePath })),
runIds,
...(processingError ? { error: processingError } : {}),
};
moveEventToDone(filename, enriched);
}
async function processPendingEvents(): Promise<void> {
ensureDirs();
let filenames: string[];
try {
filenames = fs.readdirSync(PENDING_DIR).filter(f => f.endsWith('.json'));
} catch (err) {
log.log('Failed to read pending dir:', err);
return;
}
if (filenames.length === 0) return;
// FIFO: monotonic IDs are lexicographically sortable
filenames.sort();
log.log(`Processing ${filenames.length} pending event(s)`);
for (const filename of filenames) {
try {
await processOneEvent(filename);
} catch (err) {
log.log(`Unhandled error processing ${filename}:`, err);
// Keep the loop alive — don't move file, will retry on next tick
}
}
}
export async function init(): Promise<void> {
log.log(`Starting, polling every ${POLL_INTERVAL_MS / 1000}s`);
ensureDirs();
// Initial run
await processPendingEvents();
while (true) {
await new Promise(resolve => setTimeout(resolve, POLL_INTERVAL_MS));
try {
await processPendingEvents();
} catch (err) {
log.log('Error in main loop:', err);
}
}
}

View file

@ -0,0 +1,190 @@
import z from 'zod';
import fs from 'fs/promises';
import path from 'path';
import { parse as parseYaml, stringify as stringifyYaml } from 'yaml';
import { WorkDir } from '../../config/config.js';
import { TrackBlockSchema } from '@x/shared/dist/track-block.js';
import { TrackStateSchema } from './types.js';
const KNOWLEDGE_DIR = path.join(WorkDir, 'knowledge');
function absPath(filePath: string): string {
return path.join(KNOWLEDGE_DIR, filePath);
}
export async function fetchAll(filePath: string): Promise<z.infer<typeof TrackStateSchema>[]> {
let content: string;
try {
content = await fs.readFile(absPath(filePath), 'utf-8');
} catch {
return [];
}
const lines = content.split('\n');
const blocks: z.infer<typeof TrackStateSchema>[] = [];
let i = 0;
const contentFenceStartMatcher = /<!--track-target:(.+)-->/;
const contentFenceEndMatcher = /<!--\/track-target:(.+)-->/;
while (i < lines.length) {
if (lines[i].trim() === '```track') {
const fenceStart = i;
i++;
const blockLines: string[] = [];
while (i < lines.length && lines[i].trim() !== '```') {
blockLines.push(lines[i]);
i++;
}
try {
const data = parseYaml(blockLines.join('\n'));
const result = TrackBlockSchema.safeParse(data);
if (result.success) {
blocks.push({ track: result.data, fenceStart, fenceEnd: i, content: '' });
}
} catch { /* skip */ }
} else if (contentFenceStartMatcher.test(lines[i])) {
const match = contentFenceStartMatcher.exec(lines[i]);
if (match) {
const trackId = match[1];
// have we already collected this track block?
const existingBlock = blocks.find(b => b.track.trackId === trackId);
if (!existingBlock) {
i++;
continue;
}
const contentStart = i + 1;
while (i < lines.length && !contentFenceEndMatcher.test(lines[i])) {
i++;
}
const contentEnd = i;
existingBlock.content = lines.slice(contentStart, contentEnd).join('\n');
}
}
i++;
}
return blocks;
}
export async function fetch(filePath: string, trackId: string): Promise<z.infer<typeof TrackStateSchema> | null> {
const blocks = await fetchAll(filePath);
return blocks.find(b => b.track.trackId === trackId) ?? null;
}
/**
* Fetch a track block and return its canonical YAML string (or null if not found).
* Useful for IPC handlers that need to return the fresh YAML without taking a
* dependency on the `yaml` package themselves.
*/
export async function fetchYaml(filePath: string, trackId: string): Promise<string | null> {
const block = await fetch(filePath, trackId);
if (!block) return null;
return stringifyYaml(block.track).trimEnd();
}
export async function updateContent(filePath: string, trackId: string, newContent: string): Promise<void> {
let content = await fs.readFile(absPath(filePath), 'utf-8');
const openTag = `<!--track-target:${trackId}-->`;
const closeTag = `<!--/track-target:${trackId}-->`;
const openIdx = content.indexOf(openTag);
const closeIdx = content.indexOf(closeTag);
if (openIdx !== -1 && closeIdx !== -1 && closeIdx > openIdx) {
content = content.slice(0, openIdx + openTag.length) + '\n' + newContent + '\n' + content.slice(closeIdx);
} else {
const block = await fetch(filePath, trackId);
if (!block) {
throw new Error(`Track ${trackId} not found in ${filePath}`);
}
const lines = content.split('\n');
const insertAt = Math.min(block.fenceEnd + 1, lines.length);
const contentFence = [openTag, newContent, closeTag];
lines.splice(insertAt, 0, ...contentFence);
content = lines.join('\n');
}
await fs.writeFile(absPath(filePath), content, 'utf-8');
}
export async function updateTrackBlock(filepath: string, trackId: string, updates: Partial<z.infer<typeof TrackBlockSchema>>): Promise<void> {
const block = await fetch(filepath, trackId);
if (!block) {
throw new Error(`Track ${trackId} not found in ${filepath}`);
}
block.track = { ...block.track, ...updates };
// read file contents
let content = await fs.readFile(absPath(filepath), 'utf-8');
const lines = content.split('\n');
const yaml = stringifyYaml(block.track).trimEnd();
const yamlLines = yaml ? yaml.split('\n') : [];
lines.splice(block.fenceStart, block.fenceEnd - block.fenceStart + 1, '```track', ...yamlLines, '```');
content = lines.join('\n');
await fs.writeFile(absPath(filepath), content, 'utf-8');
}
/**
* Replace the entire YAML of a track block on disk with a new string.
* Unlike updateTrackBlock (which merges), this writes the raw YAML verbatim
* used when the user explicitly edits raw YAML in the modal.
* The new YAML must still parse to a valid TrackBlock with a matching trackId,
* otherwise the write is rejected.
*/
export async function replaceTrackBlockYaml(filePath: string, trackId: string, newYaml: string): Promise<void> {
const block = await fetch(filePath, trackId);
if (!block) {
throw new Error(`Track ${trackId} not found in ${filePath}`);
}
const parsed = TrackBlockSchema.safeParse(parseYaml(newYaml));
if (!parsed.success) {
throw new Error(`Invalid track YAML: ${parsed.error.message}`);
}
if (parsed.data.trackId !== trackId) {
throw new Error(`trackId cannot be changed (was "${trackId}", got "${parsed.data.trackId}")`);
}
const content = await fs.readFile(absPath(filePath), 'utf-8');
const lines = content.split('\n');
const yamlLines = newYaml.trimEnd().split('\n');
lines.splice(block.fenceStart, block.fenceEnd - block.fenceStart + 1, '```track', ...yamlLines, '```');
await fs.writeFile(absPath(filePath), lines.join('\n'), 'utf-8');
}
/**
* Remove a track block and its sibling target region from the file.
*/
export async function deleteTrackBlock(filePath: string, trackId: string): Promise<void> {
const block = await fetch(filePath, trackId);
if (!block) {
// Already gone — treat as success.
return;
}
const content = await fs.readFile(absPath(filePath), 'utf-8');
const lines = content.split('\n');
const openTag = `<!--track-target:${trackId}-->`;
const closeTag = `<!--/track-target:${trackId}-->`;
// Find target region (may not exist)
let targetStart = -1;
let targetEnd = -1;
for (let i = 0; i < lines.length; i++) {
if (lines[i].includes(openTag)) { targetStart = i; }
if (targetStart !== -1 && lines[i].includes(closeTag)) { targetEnd = i; break; }
}
// Build a list of [start, end] ranges to remove, sorted descending so
// indices stay valid as we splice.
const ranges: Array<[number, number]> = [];
ranges.push([block.fenceStart, block.fenceEnd]);
if (targetStart !== -1 && targetEnd !== -1 && targetEnd >= targetStart) {
ranges.push([targetStart, targetEnd]);
}
ranges.sort((a, b) => b[0] - a[0]);
for (const [start, end] of ranges) {
lines.splice(start, end - start + 1);
// Also drop a trailing blank line if the removal left two in a row.
if (start < lines.length && lines[start].trim() === '' && start > 0 && lines[start - 1].trim() === '') {
lines.splice(start, 1);
}
}
await fs.writeFile(absPath(filePath), lines.join('\n'), 'utf-8');
}

View file

@ -0,0 +1,118 @@
import { generateObject } from 'ai';
import { trackBlock, PrefixLogger } from '@x/shared';
import type { KnowledgeEvent } from '@x/shared/dist/track-block.js';
import container from '../../di/container.js';
import type { IModelConfigRepo } from '../../models/repo.js';
import { createProvider } from '../../models/models.js';
import { isSignedIn } from '../../account/account.js';
import { getGatewayProvider } from '../../models/gateway.js';
const log = new PrefixLogger('TrackRouting');
const BATCH_SIZE = 20;
export interface ParsedTrack {
trackId: string;
filePath: string;
eventMatchCriteria: string;
instruction: string;
active: boolean;
}
const ROUTING_SYSTEM_PROMPT = `You are a routing classifier for a knowledge management system.
You will receive an event (something that happened an email, meeting, message, etc.) and a list of track blocks. Each track block has:
- trackId: an identifier (only unique within its file)
- filePath: the note file the track lives in
- eventMatchCriteria: a description of what kinds of signals are relevant to this track
Your job is to identify which track blocks MIGHT be relevant to this event.
Rules:
- Be LIBERAL in your selections. Include any track that is even moderately relevant.
- Prefer false positives over false negatives. It is much better to include a track that turns out to be irrelevant than to miss one that was relevant.
- Only exclude tracks that are CLEARLY and OBVIOUSLY irrelevant to the event.
- Do not attempt to judge whether the event contains enough information to update the track. That is handled by a later stage.
- Return an empty list only if no tracks are relevant at all.
- For each candidate, return BOTH trackId and filePath exactly as given. trackIds are not globally unique.`;
async function resolveModel() {
const repo = container.resolve<IModelConfigRepo>('modelConfigRepo');
const config = await repo.getConfig();
const signedIn = await isSignedIn();
const provider = signedIn
? await getGatewayProvider()
: createProvider(config.provider);
const modelId = config.knowledgeGraphModel
|| (signedIn ? 'gpt-5.4' : config.model);
return provider.languageModel(modelId);
}
function buildRoutingPrompt(event: KnowledgeEvent, batch: ParsedTrack[]): string {
const trackList = batch
.map((t, i) => `${i + 1}. trackId: ${t.trackId}\n filePath: ${t.filePath}\n eventMatchCriteria: ${t.eventMatchCriteria}`)
.join('\n\n');
return `## Event
Source: ${event.source}
Type: ${event.type}
Time: ${event.createdAt}
${event.payload}
## Track Blocks
${trackList}`;
}
function trackKey(trackId: string, filePath: string): string {
return `${filePath}::${trackId}`;
}
export async function findCandidates(
event: KnowledgeEvent,
allTracks: ParsedTrack[],
): Promise<ParsedTrack[]> {
// Short-circuit for targeted re-runs — skip LLM routing entirely
if (event.targetTrackId && event.targetFilePath) {
const target = allTracks.find(t =>
t.trackId === event.targetTrackId && t.filePath === event.targetFilePath
);
return target ? [target] : [];
}
const filtered = allTracks.filter(t =>
t.active && t.instruction && t.eventMatchCriteria
);
if (filtered.length === 0) {
log.log(`No event-eligible tracks (none with eventMatchCriteria)`);
return [];
}
log.log(`Routing event ${event.id} against ${filtered.length} track(s)`);
const model = await resolveModel();
const candidateKeys = new Set<string>();
for (let i = 0; i < filtered.length; i += BATCH_SIZE) {
const batch = filtered.slice(i, i + BATCH_SIZE);
try {
const { object } = await generateObject({
model,
system: ROUTING_SYSTEM_PROMPT,
prompt: buildRoutingPrompt(event, batch),
schema: trackBlock.Pass1OutputSchema,
});
for (const c of object.candidates) {
candidateKeys.add(trackKey(c.trackId, c.filePath));
}
} catch (err) {
log.log(`Routing batch ${i / BATCH_SIZE} failed:`, err);
}
}
const candidates = filtered.filter(t => candidateKeys.has(trackKey(t.trackId, t.filePath)));
log.log(`Event ${event.id}: ${candidates.length} candidate(s) — ${candidates.map(c => `${c.trackId}@${c.filePath}`).join(', ') || '(none)'}`);
return candidates;
}

View file

@ -0,0 +1,65 @@
import z from 'zod';
import { Agent, ToolAttachment } from '@x/shared/dist/agent.js';
import { BuiltinTools } from '../../application/lib/builtin-tools.js';
import { WorkDir } from '../../config/config.js';
const TRACK_RUN_INSTRUCTIONS = `You are a track block runner — a background agent that updates a specific section of a knowledge note.
You will receive a message containing a track instruction, the current content of the target region, and optionally some context. Your job is to follow the instruction and produce updated content.
# Background Mode
You are running as a background task there is no user present.
- Do NOT ask clarifying questions make reasonable assumptions
- Be concise and action-oriented just do the work
# The Knowledge Graph
The knowledge graph is stored as plain markdown in \`${WorkDir}/knowledge/\` (inside the workspace). It's organized into:
- **People/** Notes on individuals
- **Organizations/** Notes on companies
- **Projects/** Notes on initiatives
- **Topics/** Notes on recurring themes
Use workspace tools to search and read the knowledge graph for context.
# How to Access the Knowledge Graph
**CRITICAL:** Always include \`knowledge/\` in paths.
- \`workspace-grep({ pattern: "Acme", path: "knowledge/" })\`
- \`workspace-readFile("knowledge/People/Sarah Chen.md")\`
- \`workspace-readdir("knowledge/People")\`
**NEVER** use an empty path or root path.
# How to Write Your Result
Use the \`update-track-content\` tool to write your result. The message will tell you the file path and track ID.
- Produce the COMPLETE replacement content (not a diff)
- Preserve existing content that's still relevant
- Write in a clear, concise style appropriate for personal notes
# Web Search
You have access to \`web-search\` for tracks that need external information (news, trends, current events). Use it when the track instruction requires information beyond the knowledge graph.
# After You're Done
End your response with a brief summary of what you did (1-2 sentences).
`;
export function buildTrackRunAgent(): z.infer<typeof Agent> {
const tools: Record<string, z.infer<typeof ToolAttachment>> = {};
for (const name of Object.keys(BuiltinTools)) {
if (name === 'executeCommand') continue;
tools[name] = { type: 'builtin', name };
}
return {
name: 'track-run',
description: 'Background agent that updates track block content',
instructions: TRACK_RUN_INSTRUCTIONS,
tools,
};
}

View file

@ -0,0 +1,168 @@
import z from 'zod';
import { fetchAll, updateTrackBlock } from './fileops.js';
import { createRun, createMessage } from '../../runs/runs.js';
import { extractAgentResponse, waitForRunCompletion } from '../../agents/utils.js';
import { trackBus } from './bus.js';
import type { TrackStateSchema } from './types.js';
import { PrefixLogger } from '@x/shared/dist/prefix-logger.js';
export interface TrackUpdateResult {
trackId: string;
runId: string | null;
action: 'replace' | 'no_update';
contentBefore: string | null;
contentAfter: string | null;
summary: string | null;
error?: string;
}
// ---------------------------------------------------------------------------
// Agent run
// ---------------------------------------------------------------------------
function buildMessage(
filePath: string,
track: z.infer<typeof TrackStateSchema>,
trigger: 'manual' | 'timed' | 'event',
context?: string,
): string {
const now = new Date();
const localNow = now.toLocaleString('en-US', { dateStyle: 'full', timeStyle: 'long' });
const tz = Intl.DateTimeFormat().resolvedOptions().timeZone;
let msg = `Update track **${track.track.trackId}** in \`${filePath}\`.
**Time:** ${localNow} (${tz})
**Instruction:**
${track.track.instruction}
**Current content:**
${track.content || '(empty — first run)'}
Use \`update-track-content\` with filePath=\`${filePath}\` and trackId=\`${track.track.trackId}\`.`;
if (trigger === 'event') {
msg += `
**Trigger:** Event match (a Pass 1 routing classifier flagged this track as potentially relevant to the event below)
**Event match criteria for this track:**
${track.track.eventMatchCriteria ?? '(none — should not happen for event-triggered runs)'}
**Event payload:**
${context ?? '(no payload)'}
**Decision:** Determine whether this event genuinely warrants updating the track content. If the event is not meaningfully relevant on closer inspection, skip the update do NOT call \`update-track-content\`. Only call the tool if the event provides new or changed information that should be reflected in the track.`;
} else if (context) {
msg += `\n\n**Context:**\n${context}`;
}
return msg;
}
// ---------------------------------------------------------------------------
// Concurrency guard
// ---------------------------------------------------------------------------
const runningTracks = new Set<string>();
// ---------------------------------------------------------------------------
// Public API
// ---------------------------------------------------------------------------
/**
* Trigger an update for a specific track block.
* Can be called by any trigger system (manual, cron, event matching).
*/
export async function triggerTrackUpdate(
trackId: string,
filePath: string,
context?: string,
trigger: 'manual' | 'timed' | 'event' = 'manual',
): Promise<TrackUpdateResult> {
const key = `${trackId}:${filePath}`;
const logger = new PrefixLogger('track:runner');
logger.log('triggering track update', trackId, filePath, trigger, context);
if (runningTracks.has(key)) {
logger.log('skipping, already running');
return { trackId, runId: null, action: 'no_update', contentBefore: null, contentAfter: null, summary: null, error: 'Already running' };
}
runningTracks.add(key);
try {
const tracks = await fetchAll(filePath);
logger.log('fetched tracks from file', tracks);
const track = tracks.find(t => t.track.trackId === trackId);
if (!track) {
logger.log('track not found', trackId, filePath, trigger, context);
return { trackId, runId: null, action: 'no_update', contentBefore: null, contentAfter: null, summary: null, error: 'Track not found' };
}
const contentBefore = track.content;
// Emit start event — runId is set after agent run is created
const agentRun = await createRun({ agentId: 'track-run' });
// Set lastRunAt and lastRunId immediately (before agent executes) so
// the scheduler's next poll won't re-trigger this track.
await updateTrackBlock(filePath, trackId, {
lastRunAt: new Date().toISOString(),
lastRunId: agentRun.id,
});
await trackBus.publish({
type: 'track_run_start',
trackId,
filePath,
trigger,
runId: agentRun.id,
});
try {
await createMessage(agentRun.id, buildMessage(filePath, track, trigger, context));
await waitForRunCompletion(agentRun.id);
const summary = await extractAgentResponse(agentRun.id);
const updatedTracks = await fetchAll(filePath);
const contentAfter = updatedTracks.find(t => t.track.trackId === trackId)?.content;
const didUpdate = contentAfter !== contentBefore;
// Update summary on completion
await updateTrackBlock(filePath, trackId, {
lastRunSummary: summary ?? undefined,
});
await trackBus.publish({
type: 'track_run_complete',
trackId,
filePath,
runId: agentRun.id,
summary: summary ?? undefined,
});
return {
trackId,
runId: agentRun.id,
action: didUpdate ? 'replace' : 'no_update',
contentBefore: contentBefore ?? null,
contentAfter: contentAfter ?? null,
summary,
};
} catch (err) {
const msg = err instanceof Error ? err.message : String(err);
await trackBus.publish({
type: 'track_run_complete',
trackId,
filePath,
runId: agentRun.id,
error: msg,
});
return { trackId, runId: agentRun.id, action: 'no_update', contentBefore: contentBefore ?? null, contentAfter: null, summary: null, error: msg };
}
} finally {
runningTracks.delete(key);
}
}

View file

@ -0,0 +1,63 @@
import { CronExpressionParser } from 'cron-parser';
import type { TrackSchedule } from '@x/shared/dist/track-block.js';
const GRACE_MS = 2 * 60 * 1000; // 2 minutes
/**
* Determine if a scheduled track is due to run.
* All schedule types enforce a 2-minute grace period if the scheduled time
* was more than 2 minutes ago, it's considered a miss and skipped.
*/
export function isTrackScheduleDue(schedule: TrackSchedule, lastRunAt: string | null): boolean {
const now = new Date();
switch (schedule.type) {
case 'cron': {
if (!lastRunAt) return true; // Never ran — immediately due
try {
// Find the MOST RECENT occurrence at-or-before `now`, not the
// occurrence right after lastRunAt. If lastRunAt is old, that
// occurrence would be ancient too and always fall outside the
// grace window, blocking every future fire.
const interval = CronExpressionParser.parse(schedule.expression, {
currentDate: now,
});
const prevRun = interval.prev().toDate();
// Already ran at-or-after this occurrence → skip.
if (new Date(lastRunAt).getTime() >= prevRun.getTime()) return false;
// Within grace → fire. Outside grace → missed, skip.
return now.getTime() <= prevRun.getTime() + GRACE_MS;
} catch {
return false;
}
}
case 'window': {
// Time-of-day filter (applies regardless of lastRunAt state).
const [startHour, startMin] = schedule.startTime.split(':').map(Number);
const [endHour, endMin] = schedule.endTime.split(':').map(Number);
const startMinutes = startHour * 60 + startMin;
const endMinutes = endHour * 60 + endMin;
const nowMinutes = now.getHours() * 60 + now.getMinutes();
if (nowMinutes < startMinutes || nowMinutes > endMinutes) return false;
if (!lastRunAt) return true;
try {
const interval = CronExpressionParser.parse(schedule.cron, {
currentDate: now,
});
const prevRun = interval.prev().toDate();
if (new Date(lastRunAt).getTime() >= prevRun.getTime()) return false;
return now.getTime() <= prevRun.getTime() + GRACE_MS;
} catch {
return false;
}
}
case 'once': {
if (lastRunAt) return false; // Already ran
const runAt = new Date(schedule.runAt);
return now >= runAt && now.getTime() <= runAt.getTime() + GRACE_MS;
}
}
}

View file

@ -0,0 +1,66 @@
import { PrefixLogger } from '@x/shared';
import * as workspace from '../../workspace/workspace.js';
import { fetchAll } from './fileops.js';
import { triggerTrackUpdate } from './runner.js';
import { isTrackScheduleDue } from './schedule-utils.js';
const log = new PrefixLogger('TrackScheduler');
const POLL_INTERVAL_MS = 15_000; // 15 seconds
async function listKnowledgeMarkdownFiles(): Promise<string[]> {
try {
const entries = await workspace.readdir('knowledge', { recursive: true });
return entries
.filter(e => e.kind === 'file' && e.name.endsWith('.md'))
.map(e => e.path.replace(/^knowledge\//, ''));
} catch {
return [];
}
}
async function processScheduledTracks(): Promise<void> {
const relativePaths = await listKnowledgeMarkdownFiles();
log.log(`Scanning ${relativePaths.length} markdown files`);
for (const relativePath of relativePaths) {
let tracks;
try {
tracks = await fetchAll(relativePath);
} catch {
continue;
}
for (const trackState of tracks) {
const { track } = trackState;
if (!track.active) continue;
if (!track.schedule) continue;
const due = isTrackScheduleDue(track.schedule, track.lastRunAt ?? null);
log.log(`Track "${track.trackId}" in ${relativePath}: schedule=${track.schedule.type}, lastRunAt=${track.lastRunAt ?? 'never'}, due=${due}`);
if (due) {
log.log(`Triggering "${track.trackId}" in ${relativePath}`);
triggerTrackUpdate(track.trackId, relativePath, undefined, 'timed').catch(err => {
log.log(`Error running ${track.trackId}:`, err);
});
}
}
}
}
export async function init(): Promise<void> {
log.log(`Starting, polling every ${POLL_INTERVAL_MS / 1000}s`);
// Initial run
await processScheduledTracks();
// Periodic polling
while (true) {
await new Promise(resolve => setTimeout(resolve, POLL_INTERVAL_MS));
try {
await processScheduledTracks();
} catch (error) {
log.log('Error in main loop:', error);
}
}
}

View file

@ -0,0 +1,9 @@
import z from "zod";
import { TrackBlockSchema } from "@x/shared/dist/track-block.js";
export const TrackStateSchema = z.object({
track: TrackBlockSchema,
fenceStart: z.number(),
fenceEnd: z.number(),
content: z.string(),
});