mirror of
https://github.com/rowboatlabs/rowboat.git
synced 2026-05-22 18:45:19 +02:00
feat: background tasks
Adds Background Tasks — recurring background agents the user can set up to either keep a digest current (daily email summary, top HN stories, weather brief) or perform a recurring action (draft a reply, post to Slack, call an API). Each task is a persistent set of instructions plus optional triggers (schedule, time-of-day window, or matching incoming Gmail / calendar event). The agent reads the verbs in the instructions on every run and picks the right mode automatically. User-facing surfaces: - New "Background tasks" entry in the sidebar, with a table listing every task, its schedule, last run, and an active toggle. - A detail page per task with a max-width reader showing the task's current output and a control sidebar for editing instructions, triggers, and reviewing run history. - "New task" can open in a free-form box where the user describes what they want and Copilot sets it up end-to-end, or in a structured form for manual setup. - "Edit with Copilot" hand-off from the detail view, pre-seeded with the task's context. Under the hood: - The event pipeline that previously powered live-notes is now a generic consumer registry. Live-notes and background tasks both subscribe; incoming events are routed to candidates from both concurrently. - Schedule helpers and the agent-message trigger block are factored out of live-notes into shared modules. Both features use the same building blocks now. - Copilot's proactive routing is reframed: anything recurring (cadence words, watch / monitor verbs, action verbs, event-conditional asks) now flows to background tasks. Live-notes load only on explicit mention. - A small reliability fix for the run-creation fallback chain: an empty-string model/provider passed by an LLM tool call now correctly falls through to the default instead of being persisted as a real value. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
13fa80c687
commit
b01af12148
45 changed files with 4025 additions and 594 deletions
104
apps/x/packages/core/src/agents/build-trigger-block.ts
Normal file
104
apps/x/packages/core/src/agents/build-trigger-block.ts
Normal file
|
|
@ -0,0 +1,104 @@
|
|||
import type { Triggers } from '@x/shared/dist/live-note.js';
|
||||
|
||||
export type TriggerType = 'manual' | 'cron' | 'window' | 'event';
|
||||
|
||||
export interface BuildTriggerBlockOptions {
|
||||
trigger: TriggerType;
|
||||
triggers?: Triggers;
|
||||
|
||||
/** For 'manual' / 'cron' / 'window' branches — extra context for THIS run. */
|
||||
context?: string;
|
||||
|
||||
/** For 'event' branch — the matched event's payload. */
|
||||
eventPayload?: string;
|
||||
|
||||
/**
|
||||
* Noun for the target entity in the event-branch wording — "flagged this
|
||||
* {targetNoun}", "Event match criteria for this {targetNoun}:". Live-note
|
||||
* passes 'note'; bg-task passes 'task'. Default 'target'.
|
||||
*/
|
||||
targetNoun?: string;
|
||||
|
||||
/**
|
||||
* Noun for the user's persistent intent — "if your {instructionsNoun}
|
||||
* specifies different behavior…" in the cron/window branches. Live-note
|
||||
* passes 'objective'; bg-task uses the default 'instructions'.
|
||||
*/
|
||||
instructionsNoun?: string;
|
||||
|
||||
/**
|
||||
* Text shown inside the manual-trigger parenthetical, after "Manual run".
|
||||
* Live-note passes:
|
||||
* "user-triggered — either the Run button in the Live Note panel or the
|
||||
* `run-live-note-agent` tool"
|
||||
* Bg-task passes:
|
||||
* "user-triggered — either the Run button in the Background Task detail
|
||||
* view or the `run-background-task-agent` tool"
|
||||
*/
|
||||
manualParen?: string;
|
||||
|
||||
/**
|
||||
* The "**Decision:** …" paragraph appended to the event branch. Live-note
|
||||
* and bg-task pass their own copies so the directive matches their
|
||||
* domain (edit the file vs. act on the event).
|
||||
*/
|
||||
eventDecisionDirective?: string;
|
||||
}
|
||||
|
||||
function describeWindow(triggers: Triggers | undefined): string {
|
||||
const ws = triggers?.windows;
|
||||
if (!ws || ws.length === 0) return 'a configured window';
|
||||
return ws.map(w => `${w.startTime}–${w.endTime}`).join(', ');
|
||||
}
|
||||
|
||||
/**
|
||||
* Build the "**Trigger:** …" paragraph appended to a scheduled/event/manual
|
||||
* agent message. Shared between the live-note runner and the bg-task runner —
|
||||
* each passes domain-specific nouns and the event-branch decision directive.
|
||||
*/
|
||||
export function buildTriggerBlock(opts: BuildTriggerBlockOptions): string {
|
||||
const {
|
||||
trigger,
|
||||
triggers,
|
||||
context,
|
||||
eventPayload,
|
||||
targetNoun = 'target',
|
||||
instructionsNoun = 'instructions',
|
||||
manualParen = 'user-triggered',
|
||||
eventDecisionDirective,
|
||||
} = opts;
|
||||
|
||||
if (trigger === 'event') {
|
||||
const criteria = triggers?.eventMatchCriteria ?? '(none — should not happen for event-triggered runs)';
|
||||
const decision = eventDecisionDirective ?? '';
|
||||
return `
|
||||
|
||||
**Trigger:** Event match — Pass 1 routing flagged this ${targetNoun} as potentially relevant to the event below.
|
||||
|
||||
**Event match criteria for this ${targetNoun}:**
|
||||
${criteria}
|
||||
|
||||
**Event payload:**
|
||||
${eventPayload ?? '(no payload)'}
|
||||
|
||||
${decision}`;
|
||||
}
|
||||
|
||||
if (trigger === 'cron') {
|
||||
const expr = triggers?.cronExpr ?? '(unknown)';
|
||||
return `
|
||||
|
||||
**Trigger:** Scheduled refresh — the cron expression \`${expr}\` matched. This is a baseline refresh; if your ${instructionsNoun} specifies different behavior for cron vs window vs event runs, follow the cron branch.${context ? `\n\n**Context:**\n${context}` : ''}`;
|
||||
}
|
||||
|
||||
if (trigger === 'window') {
|
||||
return `
|
||||
|
||||
**Trigger:** Scheduled refresh — fired inside the configured window (${describeWindow(triggers)}). This is a forgiving baseline refresh that runs once per day per window; reactive updates are handled by event triggers (when configured). If your ${instructionsNoun} specifies different behavior for cron vs window vs event runs, follow the window branch.${context ? `\n\n**Context:**\n${context}` : ''}`;
|
||||
}
|
||||
|
||||
// manual
|
||||
return `
|
||||
|
||||
**Trigger:** Manual run (${manualParen}).${context ? `\n\n**Context:**\n${context}` : ''}`;
|
||||
}
|
||||
|
|
@ -12,6 +12,7 @@ import { AskHumanRequestEvent, RunEvent, ToolPermissionRequestEvent } from "@x/s
|
|||
import { BuiltinTools } from "../application/lib/builtin-tools.js";
|
||||
import { buildCopilotAgent } from "../application/assistant/agent.js";
|
||||
import { buildLiveNoteAgent } from "../knowledge/live-note/agent.js";
|
||||
import { buildBackgroundTaskAgent } from "../background-tasks/agent.js";
|
||||
import { isBlocked, extractCommandNames } from "../application/lib/command-executor.js";
|
||||
import container from "../di/container.js";
|
||||
import { IModelConfigRepo } from "../models/repo.js";
|
||||
|
|
@ -405,6 +406,10 @@ export async function loadAgent(id: string): Promise<z.infer<typeof Agent>> {
|
|||
return buildLiveNoteAgent();
|
||||
}
|
||||
|
||||
if (id === "background-task-agent") {
|
||||
return buildBackgroundTaskAgent();
|
||||
}
|
||||
|
||||
if (id === 'note_creation') {
|
||||
const raw = getNoteCreationRaw();
|
||||
let agent: z.infer<typeof Agent> = {
|
||||
|
|
|
|||
|
|
@ -32,7 +32,7 @@ export function getErrorDetails(error: unknown): string {
|
|||
|
||||
/**
|
||||
* Extract the assistant's final text response from a run's log.
|
||||
* @param runId
|
||||
* @param runId
|
||||
* @returns The assistant's final text response or null if not found.
|
||||
*/
|
||||
export async function extractAgentResponse(runId: string): Promise<string | null> {
|
||||
|
|
|
|||
|
|
@ -1,6 +1,6 @@
|
|||
import { AsyncLocalStorage } from 'node:async_hooks';
|
||||
|
||||
export type UseCase = 'copilot_chat' | 'live_note_agent' | 'meeting_note' | 'knowledge_sync';
|
||||
export type UseCase = 'copilot_chat' | 'live_note_agent' | 'background_task_agent' | 'meeting_note' | 'knowledge_sync';
|
||||
|
||||
export interface UseCaseContext {
|
||||
useCase: UseCase;
|
||||
|
|
|
|||
|
|
@ -84,13 +84,13 @@ ${thirdPartyBlock}**Meeting Prep:** When users ask you to prepare for a meeting,
|
|||
|
||||
**App Control:** When users ask you to open notes, show the bases or graph view, filter or search notes, or manage saved views, load the \`app-navigation\` skill first. It provides structured guidance for navigating the app UI and controlling the knowledge base view.
|
||||
|
||||
**Live Notes (Self-Updating Knowledge):** A note's body can be agent-maintained — a *live* note refreshes on a schedule and/or reacts to incoming emails / calendar events to satisfy a single persistent **objective**. This is a flagship feature. **Listen for any signal that the user wants something to keep itself updated**, even when they don't use the words "live" or "track" — load the \`live-note\` skill the moment you spot one.
|
||||
**Background Tasks (Self-Running Work):** Rowboat can run *background tasks* — persistent instructions the agent fires on a schedule and/or in response to incoming emails / calendar events. A bg-task either maintains a snapshot in its \`index.md\` (digest, dashboard, rolling summary) or performs a recurring side-effect (send a Slack message, draft an email, post to a webhook, call an API). This is the flagship surface for *anything recurring*.
|
||||
|
||||
*Strong signals (load the skill, act without asking):* "every morning / daily / hourly…", "keep a running summary of…", "maintain a digest of…", "watch / monitor / keep an eye on…", "pin live updates of…", "track / follow X", "whenever a relevant email comes in…".
|
||||
*Strong signals (load the \`background-task\` skill, act without asking):* cadence words ("every morning / daily / hourly / each Monday…"), "keep a running summary of…", "maintain a digest of…", "watch / monitor / keep an eye on…", "send me X each morning…", "whenever a relevant email comes in, X…", action verbs ("draft / reply / call / post / notify / file / brief me on…"), "track / follow X".
|
||||
|
||||
*Medium signals (load the skill, answer the one-off question, then offer to keep it updated):* one-off questions about decaying info ("what's the weather?", "top HN stories?", "USD/INR right now?", "service X status?"), **"what's the latest [news/update/situation] on X" / "what's happening with X" / "any updates on X" / "catch me up on X"** about a person, company, project, or topic, note-anchored snapshots ("show me my schedule here", "put my open tasks here"), or recurring artifacts ("morning briefing", "weekly review", "Acme deal dashboard"). **Heuristic for the catch-all case:** if you reach for \`web-search\` or a news tool to answer a topic-following question, the answer is exactly the kind of thing a live note would refresh on a schedule — load the skill and offer at the end.
|
||||
*Medium signals (load the skill, answer the one-off, then offer):* one-off questions about decaying info ("what's the weather?", "top HN stories?"), "what's the latest on X / catch me up on X / any updates on X" about a person, company, project, or topic, recurring artifacts ("morning briefing", "weekly review", "Acme deal dashboard"). **Heuristic:** if you reach for \`web-search\` or a news tool to answer a recurring question, the answer is the kind of thing a bg-task would refresh on a schedule.
|
||||
|
||||
A live note is a single \`live:\` block in a note's frontmatter — one objective, plus an optional \`triggers\` object (\`cronExpr\` / \`windows\` / \`eventMatchCriteria\`, each independently optional). Users manage live notes in the **Live Note panel** (Radio icon at the top-right of the editor). **If the note is already live**, extend its existing \`objective\` in natural language to absorb the new ask — never create a second objective. When you make a passive note live (or extend an objective), tell the user where to manage it.
|
||||
**Live Notes:** If the user explicitly says "live note" or "live-note", load the \`live-note\` skill. Otherwise, do not propose live notes — prefer the \`background-task\` skill for anything recurring.
|
||||
**Browser Control:** When users ask you to open a website, browse in-app, search the web in the embedded browser, or interact with a live webpage inside Rowboat, load the \`browser-control\` skill first. It explains the \`read-page -> indexed action -> refreshed page\` workflow for the browser pane.
|
||||
|
||||
**Notifications:** When you need to send a desktop notification — completion alert after a long task, time-sensitive update, or a clickable result that lands the user on a specific note/view — load the \`notify-user\` skill first. It documents the \`notify-user\` tool and the \`rowboat://\` deep links you can attach to it.
|
||||
|
|
|
|||
|
|
@ -0,0 +1,138 @@
|
|||
import { z } from 'zod';
|
||||
import { stringify as stringifyYaml } from 'yaml';
|
||||
import { BackgroundTaskSchema } from '@x/shared/dist/background-task.js';
|
||||
|
||||
const schemaYaml = stringifyYaml(z.toJSONSchema(BackgroundTaskSchema)).trimEnd();
|
||||
|
||||
export const skill = String.raw`
|
||||
# Background Tasks Skill
|
||||
|
||||
A *background task* is a persistent agent the user configures once and the framework keeps firing — on a schedule, inside time-of-day windows, and/or in response to matching incoming events (Gmail threads, calendar changes). Each task lives at \`bg-tasks/<slug>/\` and owns two artifacts:
|
||||
|
||||
- \`task.yaml\` — the spec (the user's **instructions**, triggers, runtime state). You and the user both treat this as the source of truth.
|
||||
- \`index.md\` — the agent-owned body. The runtime never writes here; the bg-task agent does, each run.
|
||||
|
||||
A task is one of two shapes — the agent decides per run from the verbs in \`instructions\`:
|
||||
|
||||
| Mode | Trigger verbs | Behavior |
|
||||
|---|---|---|
|
||||
| **OUTPUT** | "maintain / show / summarize / track / digest" | Rewrite \`index.md\` to reflect the current state. |
|
||||
| **ACTION** | "send / draft / post / notify / file / reply / call" | Perform the action, then append a one-line journal entry under \`## Journal\` in \`index.md\`. |
|
||||
|
||||
Mixed instructions ("summarize and email it") trigger both.
|
||||
|
||||
## Tools you'll use (and ones you WON'T)
|
||||
|
||||
You have three dedicated builtin tools for this skill:
|
||||
|
||||
- \`create-background-task\` — materializes a new task on disk. **Use this. Do not write \`task.yaml\` yourself with \`workspace-edit\`, and do not search the codebase for IPC channels like \`bg-task:create\`** — they're renderer-side and not callable from here.
|
||||
- \`patch-background-task\` — updates an existing task (instructions / triggers / active / model). Use this for the extend-don't-fork case.
|
||||
- \`run-background-task-agent\` — manually fires a task to run now. Always call this immediately after \`create-background-task\` so the user sees content.
|
||||
|
||||
To inspect what tasks already exist, use \`workspace-glob\` on \`bg-tasks/*/task.yaml\` and \`workspace-readFile\` on candidates. The user's bg-tasks folder is workspace-relative.
|
||||
|
||||
## Mode: act-first
|
||||
|
||||
Bg-task creation is **action-first**. Don't ask "should I?" — read the request, pick a name, call \`create-background-task\`, then call \`run-background-task-agent\` with the returned slug. Confirm in one line past-tense at the end. Tell the user the surface name: "Manage it from Background tasks in the sidebar."
|
||||
|
||||
The only exception: if a related bg-task already exists, **extend its instructions** via \`patch-background-task\` rather than creating a duplicate (see "Extend, don't fork").
|
||||
|
||||
## When you're loaded
|
||||
|
||||
The host's trigger paragraph loads this skill on:
|
||||
|
||||
- **Cadence**: "every morning", "daily", "hourly", "each Monday"
|
||||
- **Watch/monitor**: "watch / monitor / keep an eye on / track / follow X"
|
||||
- **Recurring artifact**: "morning briefing", "weekly review", "Acme deal dashboard"
|
||||
- **Event-conditional**: "whenever a relevant email comes in, …"
|
||||
- **Action verbs**: "draft / reply / call / post / notify / file / brief me on"
|
||||
- **Decay questions**: "what's the weather", "top HN stories", "latest on X" — answer the one-off, then offer
|
||||
|
||||
If the user explicitly says "live note" / "live-note", the host loads the \`live-note\` skill instead — don't try to handle that case here.
|
||||
|
||||
## Workflow
|
||||
|
||||
1. **Check for existing tasks.** Before creating, glob \`bg-tasks/*/task.yaml\` and read any candidates whose intent might overlap with the user's ask. If a related task exists, jump to "Extend, don't fork" below.
|
||||
|
||||
2. **Pick a name.** Use a short, friendly title in title-case: "Morning weather", "Q3 deal digest", "HN top stories". The framework slugifies it (lowercase, dashes) for the folder — you don't manage the slug.
|
||||
|
||||
3. **Write the instructions.** Capture the user's intent in their own words, with concrete verbs. Bake any specifics (which source, which audience, output shape) into the instructions — the agent re-reads them on every run.
|
||||
|
||||
- Good: *"Summarize my unread emails since yesterday 6pm into a one-paragraph digest plus a bulleted list of action items. Skip newsletters and automated notifications."*
|
||||
- Bad: *"Daily email summary."* (vague — agent will improvise unhelpfully)
|
||||
|
||||
4. **Pick triggers.** All three are independently optional; mix freely.
|
||||
|
||||
- \`cronExpr\` — exact times. \`"0 7 * * *"\` = 7am daily.
|
||||
- \`windows\` — time-of-day bands. Each fires once per day inside the band, anywhere — forgiving when the app was offline.
|
||||
- \`eventMatchCriteria\` — a natural-language description of which incoming events should wake the task (e.g. "Emails about Q3 OKRs from the leadership team"). Pass-1 routing matches; the agent does Pass-2 before acting.
|
||||
|
||||
No triggers at all = manual-only. The user clicks Run.
|
||||
|
||||
5. **Call \`create-background-task\`.** Required: \`name\`, \`instructions\`. Optional: \`triggers\`, \`model\`, \`provider\` (leave model/provider unset unless the user explicitly asked). The tool returns a slug.
|
||||
|
||||
6. **Call \`run-background-task-agent\`** with the slug. The agent runs once and populates \`index.md\`.
|
||||
|
||||
7. **Confirm.** One line. Name the task. Point at the sidebar. Done.
|
||||
|
||||
## Extend, don't fork
|
||||
|
||||
When the user's new ask overlaps with an existing task — e.g. they say "also include X" or the ask is a refinement of an existing task's intent — call \`patch-background-task\` instead of creating a duplicate.
|
||||
|
||||
Signals that you should extend:
|
||||
- The user says "also …" / "and on top of that …" / "while you're at it …"
|
||||
- The new ask is a refinement of an existing task's intent (different threshold, additional source, slightly different output)
|
||||
|
||||
When extending, pass the full rewritten \`instructions\` — don't try to surgical-edit a single sentence. The agent rereads instructions every run, so a clean rewrite is fine. After \`patch-background-task\` returns, call \`run-background-task-agent\` on the same slug so the user sees the updated output.
|
||||
|
||||
## Worked examples
|
||||
|
||||
### OUTPUT — morning briefing
|
||||
|
||||
User: *"Every morning at 7, give me a one-paragraph summary of overnight news in AI agents."*
|
||||
|
||||
1. \`create-background-task\` with:
|
||||
- \`name\`: "AI agent overnight news"
|
||||
- \`instructions\`: "Search the web and Hacker News for news about AI agents (autonomous LLM agents, agentic frameworks, agent benchmarks) published in the last 24 hours. Summarize the top developments in one paragraph (3-5 sentences) followed by a 3-5 item bulleted list of the most significant items with a single-sentence note each. Replace the body of index.md."
|
||||
- \`triggers\`: { \`cronExpr\`: "0 7 * * *" }
|
||||
2. \`run-background-task-agent\` slug=ai-agent-overnight-news.
|
||||
3. "Done — created the **AI agent overnight news** task. It'll run every morning at 7 and you can find it in Background tasks in the sidebar."
|
||||
|
||||
### ACTION — email auto-reply
|
||||
|
||||
User: *"Whenever I get an email about Q3 planning, draft a reply asking when they're free this week."*
|
||||
|
||||
1. \`create-background-task\` with:
|
||||
- \`name\`: "Q3 email auto-reply drafts"
|
||||
- \`instructions\`: "When an event arrives describing an email thread about Q3 planning, use the Gmail draft-create tool to draft a reply to the latest message asking the sender when they're free for a 30-minute call this week. Do not send the draft — leave it in Drafts for me to review. After drafting, append a journal entry to index.md noting the thread subject and the draft id."
|
||||
- \`triggers\`: { \`eventMatchCriteria\`: "Emails about Q3 planning (roadmap, OKRs, headcount, exec priorities)" }
|
||||
2. \`run-background-task-agent\` slug=q3-email-auto-reply-drafts.
|
||||
3. "Done — created the **Q3 email auto-reply drafts** task. It'll fire on relevant Gmail threads. Manage it from Background tasks in the sidebar."
|
||||
|
||||
### ACTION + journal — Slack watcher
|
||||
|
||||
User: *"Every weekday morning at 9, post a summary of unresolved high-priority issues to #engineering on Slack."*
|
||||
|
||||
1. \`create-background-task\` with:
|
||||
- \`name\`: "Daily eng triage"
|
||||
- \`instructions\`: "Each run, query <issue tracker> for unresolved issues labeled priority:high or above. Summarize counts by owner and the three oldest items. Send the summary to #engineering via the Slack tool. After sending, append a journal entry to index.md with the timestamp and the message id."
|
||||
- \`triggers\`: { \`cronExpr\`: "0 9 * * 1-5" }
|
||||
2. \`run-background-task-agent\` slug=daily-eng-triage.
|
||||
|
||||
## Canonical Schema
|
||||
|
||||
\`\`\`yaml
|
||||
${schemaYaml}
|
||||
\`\`\`
|
||||
|
||||
Notes:
|
||||
- \`active\` defaults to true. Patch \`{ active: false }\` to pause without deleting.
|
||||
- \`createdAt\` and \`lastRun\` are runtime-managed — never write them yourself.
|
||||
- The \`triggers\` block reuses Live Notes' \`Triggers\` schema verbatim. Cron grace and 5-minute backoff semantics are identical.
|
||||
|
||||
## Exceptions
|
||||
|
||||
The \`Background tasks\` sidebar view has a "New task" button that opens a form-driven flow. If the user is editing fields there or asking about a specific task from that view, *you* are not the right surface — the form is. Point at it ("You can also do this from the New task button in the Background tasks view") and step aside.
|
||||
`;
|
||||
|
||||
export default skill;
|
||||
|
|
@ -14,6 +14,7 @@ import browserControlSkill from "./browser-control/skill.js";
|
|||
import codeWithAgentsSkill from "./code-with-agents/skill.js";
|
||||
import composioIntegrationSkill from "./composio-integration/skill.js";
|
||||
import liveNoteSkill from "./live-note/skill.js";
|
||||
import backgroundTaskSkill from "./background-task/skill.js";
|
||||
import notifyUserSkill from "./notify-user/skill.js";
|
||||
|
||||
const CURRENT_DIR = path.dirname(fileURLToPath(import.meta.url));
|
||||
|
|
@ -101,10 +102,16 @@ const definitions: SkillDefinition[] = [
|
|||
summary: "Write code, build projects, create scripts, or fix bugs by delegating to Claude Code or Codex via acpx.",
|
||||
content: codeWithAgentsSkill,
|
||||
},
|
||||
{
|
||||
id: "background-task",
|
||||
title: "Background Tasks",
|
||||
summary: "Set up a recurring background task — persistent instructions the agent fires on a schedule and/or on matching events (Gmail, Calendar). Either maintains an `index.md` digest (OUTPUT mode) or performs a recurring side-effect like drafting a reply / posting to Slack / calling an API (ACTION mode). Flagship surface for anything recurring.",
|
||||
content: backgroundTaskSkill,
|
||||
},
|
||||
{
|
||||
id: "live-note",
|
||||
title: "Live Notes",
|
||||
summary: "Make notes self-updating — a single `live:` objective in the frontmatter that the live-note agent maintains on a schedule, on incoming events, or manually (weather, news, prices, status, dashboards).",
|
||||
summary: "Make a specific markdown note self-updating — a single `live:` objective in the frontmatter that the live-note agent maintains on a schedule or on incoming events. Load only when the user explicitly says 'live note' / 'live-note'; for anything else recurring, prefer the background-task skill.",
|
||||
content: liveNoteSkill,
|
||||
},
|
||||
{
|
||||
|
|
|
|||
|
|
@ -18,6 +18,32 @@ import { composioAccountsRepo } from "../../composio/repo.js";
|
|||
import { executeAction as executeComposioAction, isConfigured as isComposioConfigured, searchTools as searchComposioTools } from "../../composio/client.js";
|
||||
import { CURATED_TOOLKITS, CURATED_TOOLKIT_SLUGS } from "@x/shared/dist/composio.js";
|
||||
import { BrowserControlInputSchema, type BrowserControlInput } from "@x/shared/dist/browser-control.js";
|
||||
import { BackgroundTaskSchema, TriggersSchema } from "@x/shared/dist/background-task.js";
|
||||
|
||||
// Inputs for the bg-task builtin tools. Reuse the canonical schema field
|
||||
// descriptions; only `triggers` gets a tighter contextual override (the
|
||||
// shared TriggersSchema description is written from the live-note perspective).
|
||||
const CreateBackgroundTaskInput = BackgroundTaskSchema.pick({
|
||||
name: true,
|
||||
instructions: true,
|
||||
triggers: true,
|
||||
model: true,
|
||||
provider: true,
|
||||
}).extend({
|
||||
triggers: TriggersSchema.optional().describe('All three sub-fields (cronExpr, windows, eventMatchCriteria) are independently optional — mix freely. No triggers at all = manual-only (user clicks Run).'),
|
||||
});
|
||||
|
||||
const PatchBackgroundTaskInput = BackgroundTaskSchema.pick({
|
||||
name: true,
|
||||
instructions: true,
|
||||
active: true,
|
||||
triggers: true,
|
||||
model: true,
|
||||
provider: true,
|
||||
}).partial().extend({
|
||||
slug: z.string().describe('The slug of the task to update (the folder name under bg-tasks/).'),
|
||||
triggers: TriggersSchema.optional().describe('Replace the triggers object. To remove all triggers (make manual-only) pass an empty object.'),
|
||||
});
|
||||
import { ensureLoaded as ensureBrowserSkillsLoaded, readSkillContent as readBrowserSkillContent, refreshFromRemote as refreshBrowserSkills } from "../browser-skills/index.js";
|
||||
import type { ToolContext } from "./exec-tool.js";
|
||||
import { generateText } from "ai";
|
||||
|
|
@ -1624,6 +1650,70 @@ export const BuiltinTools: z.infer<typeof BuiltinToolsSchema> = {
|
|||
},
|
||||
},
|
||||
|
||||
'create-background-task': {
|
||||
description: "Create a new background task on disk. This is the tool you call to materialize a bg-task — do NOT try to write `task.yaml` yourself with workspace-edit, and do NOT search the codebase for IPC channels like `bg-task:create`. The framework slugifies the name and lays out `bg-tasks/<slug>/{task.yaml,index.md,runs/}`. After this returns, immediately call `run-background-task-agent` with the returned slug so the user sees content right away.",
|
||||
inputSchema: CreateBackgroundTaskInput,
|
||||
execute: async (input: z.infer<typeof CreateBackgroundTaskInput>) => {
|
||||
try {
|
||||
const { createTask } = await import("../../background-tasks/fileops.js");
|
||||
const result = await createTask({
|
||||
name: input.name,
|
||||
instructions: input.instructions,
|
||||
...(input.triggers ? { triggers: input.triggers } : {}),
|
||||
...(input.model ? { model: input.model } : {}),
|
||||
...(input.provider ? { provider: input.provider } : {}),
|
||||
});
|
||||
return { success: true, slug: result.slug };
|
||||
} catch (err) {
|
||||
return { success: false, error: err instanceof Error ? err.message : String(err) };
|
||||
}
|
||||
},
|
||||
},
|
||||
|
||||
'patch-background-task': {
|
||||
description: "Update an existing background task — instructions, triggers, active, or model/provider. Use this when the user's new ask overlaps with an existing task (extend-don't-fork): rewrite the instructions in full to absorb the new ask rather than creating a duplicate sibling task. Look up existing tasks with `workspace-glob` on `bg-tasks/*/task.yaml` and `workspace-readFile` on the candidates first.",
|
||||
inputSchema: PatchBackgroundTaskInput,
|
||||
execute: async (input: z.infer<typeof PatchBackgroundTaskInput>) => {
|
||||
try {
|
||||
const { patchTask } = await import("../../background-tasks/fileops.js");
|
||||
const { slug, ...partial } = input;
|
||||
const result = await patchTask(slug, partial);
|
||||
return { success: true, task: result };
|
||||
} catch (err) {
|
||||
return { success: false, error: err instanceof Error ? err.message : String(err) };
|
||||
}
|
||||
},
|
||||
},
|
||||
|
||||
'run-background-task-agent': {
|
||||
description: "Manually trigger a background task to run now. Equivalent to the user clicking the Run button in the Background Task detail view. Pass extra `context` to bias what the agent does this run (e.g. a backfill instruction) — does NOT modify the task's persistent instructions.",
|
||||
inputSchema: z.object({
|
||||
slug: z.string().describe("The slug of the bg-task to run (e.g., 'morning-weather'). The slug is what `bg-task:create` returns."),
|
||||
context: z.string().optional().describe(
|
||||
"Optional extra context for THIS run only — does not modify the task's instructions. " +
|
||||
"Use it for backfills (e.g. 'Backfill from emails received in the last 7 days') " +
|
||||
"or focused refreshes (e.g. 'Focus on changes since yesterday'). " +
|
||||
"Omit for a plain run."
|
||||
),
|
||||
}),
|
||||
execute: async ({ slug, context }: { slug: string; context?: string }) => {
|
||||
try {
|
||||
// Lazy import to break a module-init cycle, mirroring run-live-note-agent.
|
||||
const { runBackgroundTask } = await import("../../background-tasks/runner.js");
|
||||
const result = await runBackgroundTask(slug, 'manual', context);
|
||||
return {
|
||||
success: !result.error,
|
||||
runId: result.runId,
|
||||
summary: result.summary,
|
||||
error: result.error,
|
||||
};
|
||||
} catch (err) {
|
||||
const msg = err instanceof Error ? err.message : String(err);
|
||||
return { success: false, error: msg };
|
||||
}
|
||||
},
|
||||
},
|
||||
|
||||
'notify-user': {
|
||||
description: "Show a native OS notification to the user. Clicking the notification opens the provided link in the default browser, or focuses the Rowboat app if no link is given.",
|
||||
inputSchema: z.object({
|
||||
|
|
|
|||
84
apps/x/packages/core/src/background-tasks/agent.ts
Normal file
84
apps/x/packages/core/src/background-tasks/agent.ts
Normal file
|
|
@ -0,0 +1,84 @@
|
|||
import z from 'zod';
|
||||
import { Agent, ToolAttachment } from '@x/shared/dist/agent.js';
|
||||
import { BuiltinTools } from '../application/lib/builtin-tools.js';
|
||||
import { KNOWLEDGE_NOTE_STYLE_GUIDE } from '../application/lib/knowledge-note-style.js';
|
||||
import { WorkDir } from '../config/config.js';
|
||||
|
||||
export const BACKGROUND_TASK_AGENT_INSTRUCTIONS = `You are the background-task agent — a self-running agent that fires on a schedule and/or in response to incoming events to act on persistent **instructions** the user wrote.
|
||||
|
||||
You are running with **no user present** to clarify, approve, or watch.
|
||||
- Do NOT ask clarifying questions — make the most reasonable interpretation of the instructions and proceed.
|
||||
- Do NOT hedge or preamble ("I'll now...", "Let me..."). Just do the work.
|
||||
- Do NOT produce chat-style output. The user sees only the changes you make and your final summary line.
|
||||
|
||||
# Task folder
|
||||
|
||||
Your task folder is \`bg-tasks/<slug>/\` (the path is given in the run message). It contains:
|
||||
- \`task.yaml\` — the spec. **Never touch this.** The runtime owns it.
|
||||
- \`index.md\` — agent-owned. You read and write this freely via \`workspace-readFile\` / \`workspace-edit\`.
|
||||
- \`runs/\` — your own run logs (jsonl). You don't write to it directly; the runtime does.
|
||||
|
||||
You can also read and write anywhere else under the workspace (\`knowledge/\`, etc.) when your instructions call for it.
|
||||
|
||||
# Two modes — decide each run from the verbs in your instructions
|
||||
|
||||
OUTPUT MODE — keep \`index.md\` aligned to the instructions.
|
||||
Use when instructions imply a **current state** artifact:
|
||||
- "Maintain / show / summarize / track / digest of / dashboard for / brief on …"
|
||||
- "Keep me posted on …" / "What's the latest on …"
|
||||
On every run: \`workspace-readFile\` \`index.md\`, decide the smallest patch that brings it into alignment with the instructions, apply with \`workspace-edit\`. Patch-style discipline: edit one region, re-read, then edit the next. Avoid one-shot rewrites.
|
||||
|
||||
ACTION MODE — perform a side-effect, append a journal entry.
|
||||
Use when instructions imply a **recurring action**:
|
||||
- "Send / draft / post / notify / file / reply / publish / call / forward …"
|
||||
On every run: perform the action using the appropriate tool (Slack, email, web-fetch, MCP, …). Then **append a one-liner** to \`index.md\` under a \`## Journal\` heading describing what you did, with the local time. Example:
|
||||
|
||||
## Journal
|
||||
|
||||
- 2026-05-12 14:00 — Sent the Q3 digest to #leadership (3 threads, 2 decisions).
|
||||
- 2026-05-11 14:00 — No qualifying threads; nothing sent.
|
||||
|
||||
If your instructions imply BOTH ("summarize and email it"), do both per run.
|
||||
|
||||
# Triggers
|
||||
|
||||
The run message tells you which trigger fired and how to interpret it:
|
||||
- **Manual** — the user clicked Run or called the \`run-background-task-agent\` tool. Optional \`Context:\` adds a one-off bias for THIS run.
|
||||
- **Cron / Window** — scheduled refresh. Use it as a baseline tick.
|
||||
- **Event** — Pass-1 routing flagged this task as potentially relevant to an event. Decide whether the event genuinely warrants acting. If on closer inspection it's not meaningfully relevant, **skip the action and the journal entry** — don't update \`index.md\` at all. Only act if the event provides information your instructions imply you should react to.
|
||||
|
||||
# Workspace conventions
|
||||
|
||||
${KNOWLEDGE_NOTE_STYLE_GUIDE}
|
||||
|
||||
# Failure and fallback
|
||||
|
||||
Do NOT fabricate. If a data source is unavailable (network error, missing API key, empty result), skip the run rather than write a misleading artifact. In ACTION mode, that means: no journal entry. In OUTPUT mode, leave \`index.md\` alone. Your final summary should explain what blocked the work.
|
||||
|
||||
# Final summary
|
||||
|
||||
End your run with a 1-2 sentence summary captured as \`lastRun.summary\`. State the action and the substance. Good:
|
||||
- "Updated — 3 new HN stories, top is 'Show HN: …' at 842 pts."
|
||||
- "Sent the digest to #leadership (2 deals updated)."
|
||||
- "Skipped — event was a calendar invite unrelated to Q3."
|
||||
- "Failed — web-search returned no results."
|
||||
|
||||
Avoid: "I updated the file.", "Done!", "Here is the update:". The summary is a data point, not a sign-off.
|
||||
|
||||
The workspace lives at \`${WorkDir}\`.
|
||||
`;
|
||||
|
||||
export function buildBackgroundTaskAgent(): z.infer<typeof Agent> {
|
||||
const tools: Record<string, z.infer<typeof ToolAttachment>> = {};
|
||||
for (const name of Object.keys(BuiltinTools)) {
|
||||
if (name === 'executeCommand') continue;
|
||||
tools[name] = { type: 'builtin', name };
|
||||
}
|
||||
|
||||
return {
|
||||
name: 'background-task-agent',
|
||||
description: 'Background agent that runs on a schedule/event and either keeps a task\'s index.md current (OUTPUT mode) or performs a recurring side-effect and journals it (ACTION mode).',
|
||||
instructions: BACKGROUND_TASK_AGENT_INSTRUCTIONS,
|
||||
tools,
|
||||
};
|
||||
}
|
||||
23
apps/x/packages/core/src/background-tasks/bus.ts
Normal file
23
apps/x/packages/core/src/background-tasks/bus.ts
Normal file
|
|
@ -0,0 +1,23 @@
|
|||
import type { BackgroundTaskAgentEventType } from '@x/shared/dist/background-task.js';
|
||||
|
||||
type Handler = (event: BackgroundTaskAgentEventType) => void;
|
||||
|
||||
class BackgroundTaskBus {
|
||||
private subs: Handler[] = [];
|
||||
|
||||
publish(event: BackgroundTaskAgentEventType): void {
|
||||
for (const handler of this.subs) {
|
||||
handler(event);
|
||||
}
|
||||
}
|
||||
|
||||
subscribe(handler: Handler): () => void {
|
||||
this.subs.push(handler);
|
||||
return () => {
|
||||
const idx = this.subs.indexOf(handler);
|
||||
if (idx >= 0) this.subs.splice(idx, 1);
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
export const backgroundTaskBus = new BackgroundTaskBus();
|
||||
63
apps/x/packages/core/src/background-tasks/event-consumer.ts
Normal file
63
apps/x/packages/core/src/background-tasks/event-consumer.ts
Normal file
|
|
@ -0,0 +1,63 @@
|
|||
import type { EventConsumer, EventConsumerTarget } from '../events/consumer.js';
|
||||
import { routeBatch } from '../events/routing.js';
|
||||
import { createProvider } from '../models/models.js';
|
||||
import {
|
||||
getDefaultModelAndProvider,
|
||||
getBackgroundTaskAgentModel,
|
||||
resolveProviderConfig,
|
||||
} from '../models/defaults.js';
|
||||
import { listTasks } from './fileops.js';
|
||||
import { runBackgroundTask } from './runner.js';
|
||||
|
||||
async function resolveRoutingModel() {
|
||||
const modelId = await getBackgroundTaskAgentModel();
|
||||
const { provider } = await getDefaultModelAndProvider();
|
||||
const config = await resolveProviderConfig(provider);
|
||||
return {
|
||||
model: createProvider(config).languageModel(modelId),
|
||||
modelId,
|
||||
providerName: provider,
|
||||
};
|
||||
}
|
||||
|
||||
async function listEligibleTargets(): Promise<EventConsumerTarget[]> {
|
||||
// Walk all tasks once; pagination doesn't apply to the routing pass — the
|
||||
// classifier needs to see all event-eligible tasks together.
|
||||
const result = await listTasks({ limit: 10_000 });
|
||||
const out: EventConsumerTarget[] = [];
|
||||
for (const summary of result.items) {
|
||||
if (!summary.active) continue;
|
||||
const eventMatchCriteria = summary.triggers?.eventMatchCriteria;
|
||||
if (!eventMatchCriteria) continue;
|
||||
out.push({
|
||||
id: summary.slug,
|
||||
instructions: summary.instructions,
|
||||
eventMatchCriteria,
|
||||
});
|
||||
}
|
||||
return out;
|
||||
}
|
||||
|
||||
export const backgroundTaskEventConsumer: EventConsumer = {
|
||||
name: 'bg-task',
|
||||
|
||||
listEligibleTargets,
|
||||
|
||||
findCandidates: async (event, targets) => {
|
||||
// Targeted re-run from the UI — skip Pass-1.
|
||||
if (event.target?.consumer === 'bg-task') {
|
||||
return targets.some(t => t.id === event.target!.id) ? [event.target.id] : [];
|
||||
}
|
||||
return routeBatch(event, targets, {
|
||||
entitySingular: 'background task',
|
||||
entityPlural: 'background tasks',
|
||||
useCase: 'background_task_agent',
|
||||
resolveModel: resolveRoutingModel,
|
||||
});
|
||||
},
|
||||
|
||||
fireCandidate: async (event, slug) => {
|
||||
const result = await runBackgroundTask(slug, 'event', event.payload);
|
||||
return { runId: result.runId, error: result.error };
|
||||
},
|
||||
};
|
||||
255
apps/x/packages/core/src/background-tasks/fileops.ts
Normal file
255
apps/x/packages/core/src/background-tasks/fileops.ts
Normal file
|
|
@ -0,0 +1,255 @@
|
|||
import fs from 'fs/promises';
|
||||
import path from 'path';
|
||||
import { parse as parseYaml, stringify as stringifyYaml } from 'yaml';
|
||||
import {
|
||||
BackgroundTaskSchema,
|
||||
type BackgroundTask,
|
||||
type BackgroundTaskSummary,
|
||||
} from '@x/shared/dist/background-task.js';
|
||||
import { WorkDir } from '../config/config.js';
|
||||
import { withFileLock } from '../knowledge/file-lock.js';
|
||||
|
||||
const BG_TASKS_DIR = path.join(WorkDir, 'bg-tasks');
|
||||
|
||||
function taskDir(slug: string): string {
|
||||
return path.join(BG_TASKS_DIR, slug);
|
||||
}
|
||||
|
||||
export function taskYamlPath(slug: string): string {
|
||||
return path.join(taskDir(slug), 'task.yaml');
|
||||
}
|
||||
|
||||
export function taskIndexPath(slug: string): string {
|
||||
return path.join(taskDir(slug), 'index.md');
|
||||
}
|
||||
|
||||
/**
|
||||
* Plain-text pointer file at `bg-tasks/<slug>/runs.log`. Each line is a runId
|
||||
* (the canonical id of a run whose jsonl lives at the global location
|
||||
* `$WorkDir/runs/<runId>.jsonl`). Newest first: the runner prepends on each
|
||||
* start, so reading top-down gives most-recent-first ordering without sorting.
|
||||
*/
|
||||
export function taskRunsLogPath(slug: string): string {
|
||||
return path.join(taskDir(slug), 'runs.log');
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Slug
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
const MAX_SLUG_LEN = 60;
|
||||
|
||||
export function slugify(name: string): string {
|
||||
const base = name
|
||||
.toLowerCase()
|
||||
.normalize('NFKD')
|
||||
.replace(/[^\w\s-]/g, '')
|
||||
.trim()
|
||||
.replace(/[\s_-]+/g, '-')
|
||||
.replace(/^-+|-+$/g, '')
|
||||
.slice(0, MAX_SLUG_LEN);
|
||||
return base || 'task';
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Read
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
export async function fetchTask(slug: string): Promise<BackgroundTask | null> {
|
||||
let raw: string;
|
||||
try {
|
||||
raw = await fs.readFile(taskYamlPath(slug), 'utf-8');
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
let parsed: unknown;
|
||||
try {
|
||||
parsed = parseYaml(raw);
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
const result = BackgroundTaskSchema.safeParse(parsed);
|
||||
return result.success ? result.data : null;
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Write
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
/**
|
||||
* Merge a partial update into the task.yaml. Used by the renderer for
|
||||
* structural edits (active toggle, instructions, triggers, model) and by the
|
||||
* runner for the `lastRun*` runtime fields.
|
||||
*/
|
||||
export async function patchTask(slug: string, partial: Partial<BackgroundTask>): Promise<BackgroundTask> {
|
||||
return withFileLock(taskYamlPath(slug), async () => {
|
||||
const current = await fetchTask(slug);
|
||||
if (!current) {
|
||||
throw new Error(`Task '${slug}' not found`);
|
||||
}
|
||||
const next: BackgroundTask = { ...current, ...partial };
|
||||
await fs.writeFile(taskYamlPath(slug), stringifyYaml(next), 'utf-8');
|
||||
return next;
|
||||
});
|
||||
}
|
||||
|
||||
export interface CreateTaskInput {
|
||||
name: string;
|
||||
instructions: string;
|
||||
triggers?: BackgroundTask['triggers'];
|
||||
model?: string;
|
||||
provider?: string;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a new bg-task folder + task.yaml + empty index.md. Returns the slug
|
||||
* assigned (which may include `-2`, `-3`, … suffix if the natural slug
|
||||
* collides with an existing folder). Slug collisions retry up to 50 times
|
||||
* before giving up. Note: runs.log is created lazily on the first run.
|
||||
*/
|
||||
export async function createTask(input: CreateTaskInput): Promise<{ slug: string }> {
|
||||
await fs.mkdir(BG_TASKS_DIR, { recursive: true });
|
||||
|
||||
const baseSlug = slugify(input.name);
|
||||
let slug = baseSlug;
|
||||
let attempt = 1;
|
||||
while (true) {
|
||||
try {
|
||||
await fs.mkdir(taskDir(slug), { recursive: false });
|
||||
break;
|
||||
} catch (err: unknown) {
|
||||
const e = err as { code?: string };
|
||||
if (e.code === 'EEXIST') {
|
||||
attempt += 1;
|
||||
if (attempt > 50) {
|
||||
throw new Error(`Slug collision: could not find a free slug after ${attempt - 1} attempts`);
|
||||
}
|
||||
slug = `${baseSlug}-${attempt}`;
|
||||
continue;
|
||||
}
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
|
||||
const task: BackgroundTask = {
|
||||
name: input.name,
|
||||
instructions: input.instructions,
|
||||
active: true,
|
||||
...(input.triggers ? { triggers: input.triggers } : {}),
|
||||
...(input.model ? { model: input.model } : {}),
|
||||
...(input.provider ? { provider: input.provider } : {}),
|
||||
createdAt: new Date().toISOString(),
|
||||
};
|
||||
|
||||
await fs.writeFile(taskYamlPath(slug), stringifyYaml(task), 'utf-8');
|
||||
await fs.writeFile(taskIndexPath(slug), `# ${input.name}\n\n`, 'utf-8');
|
||||
|
||||
return { slug };
|
||||
}
|
||||
|
||||
/** Delete a bg-task — removes the entire folder. */
|
||||
export async function deleteTask(slug: string): Promise<void> {
|
||||
return withFileLock(taskYamlPath(slug), async () => {
|
||||
await fs.rm(taskDir(slug), { recursive: true, force: true });
|
||||
});
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Listing tasks
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
export interface ListTasksOptions {
|
||||
offset?: number;
|
||||
limit?: number;
|
||||
sort?: 'createdAt:desc' | 'createdAt:asc' | 'name:asc';
|
||||
}
|
||||
|
||||
export interface ListTasksResult {
|
||||
items: BackgroundTaskSummary[];
|
||||
total: number;
|
||||
}
|
||||
|
||||
export async function listTasks(opts: ListTasksOptions = {}): Promise<ListTasksResult> {
|
||||
const offset = opts.offset ?? 0;
|
||||
const limit = opts.limit ?? 50;
|
||||
const sort = opts.sort ?? 'createdAt:desc';
|
||||
|
||||
let entries: string[];
|
||||
try {
|
||||
entries = await fs.readdir(BG_TASKS_DIR);
|
||||
} catch (err: unknown) {
|
||||
const e = err as { code?: string };
|
||||
if (e.code === 'ENOENT') return { items: [], total: 0 };
|
||||
throw err;
|
||||
}
|
||||
|
||||
const all: BackgroundTaskSummary[] = [];
|
||||
for (const slug of entries) {
|
||||
if (slug.startsWith('.')) continue;
|
||||
const task = await fetchTask(slug);
|
||||
if (!task) continue;
|
||||
all.push({
|
||||
slug,
|
||||
name: task.name,
|
||||
instructions: task.instructions,
|
||||
active: task.active,
|
||||
...(task.triggers ? { triggers: task.triggers } : {}),
|
||||
createdAt: task.createdAt,
|
||||
...(task.lastAttemptAt ? { lastAttemptAt: task.lastAttemptAt } : {}),
|
||||
...(task.lastRunId ? { lastRunId: task.lastRunId } : {}),
|
||||
...(task.lastRunAt ? { lastRunAt: task.lastRunAt } : {}),
|
||||
...(task.lastRunSummary ? { lastRunSummary: task.lastRunSummary } : {}),
|
||||
...(task.lastRunError ? { lastRunError: task.lastRunError } : {}),
|
||||
});
|
||||
}
|
||||
|
||||
all.sort((a, b) => {
|
||||
if (sort === 'name:asc') return a.name.localeCompare(b.name);
|
||||
const aT = new Date(a.createdAt).getTime();
|
||||
const bT = new Date(b.createdAt).getTime();
|
||||
return sort === 'createdAt:asc' ? aT - bT : bT - aT;
|
||||
});
|
||||
|
||||
return {
|
||||
items: all.slice(offset, offset + limit),
|
||||
total: all.length,
|
||||
};
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Runs pointer file (`runs.log`)
|
||||
//
|
||||
// One line per run, runId only. Prepended on each start so the newest is at
|
||||
// the top — no sorting needed on read. The actual transcript jsonl lives in
|
||||
// the global `$WorkDir/runs/<runId>.jsonl`; readers fetch via the standard
|
||||
// runs:fetch IPC. Read concurrency is unconstrained; write is serialized via
|
||||
// `withFileLock` on the task.yaml path (same lock as patches, so a run-start
|
||||
// patch and a prepend don't race).
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
export async function prependRunId(slug: string, runId: string): Promise<void> {
|
||||
return withFileLock(taskYamlPath(slug), async () => {
|
||||
const filePath = taskRunsLogPath(slug);
|
||||
let existing = '';
|
||||
try {
|
||||
existing = await fs.readFile(filePath, 'utf-8');
|
||||
} catch (err: unknown) {
|
||||
const e = err as { code?: string };
|
||||
if (e.code !== 'ENOENT') throw err;
|
||||
}
|
||||
await fs.writeFile(filePath, `${runId}\n${existing}`, 'utf-8');
|
||||
});
|
||||
}
|
||||
|
||||
export async function readRunIds(slug: string, limit?: number): Promise<string[]> {
|
||||
let content = '';
|
||||
try {
|
||||
content = await fs.readFile(taskRunsLogPath(slug), 'utf-8');
|
||||
} catch (err: unknown) {
|
||||
const e = err as { code?: string };
|
||||
if (e.code === 'ENOENT') return [];
|
||||
throw err;
|
||||
}
|
||||
const ids = content.split('\n').map(s => s.trim()).filter(Boolean);
|
||||
return limit ? ids.slice(0, limit) : ids;
|
||||
}
|
||||
193
apps/x/packages/core/src/background-tasks/runner.ts
Normal file
193
apps/x/packages/core/src/background-tasks/runner.ts
Normal file
|
|
@ -0,0 +1,193 @@
|
|||
import type { BackgroundTask, BackgroundTaskTriggerType } from '@x/shared/dist/background-task.js';
|
||||
import { PrefixLogger } from '@x/shared/dist/prefix-logger.js';
|
||||
import { fetchTask, patchTask, prependRunId } from './fileops.js';
|
||||
import { createRun, createMessage } from '../runs/runs.js';
|
||||
import { getBackgroundTaskAgentModel } from '../models/defaults.js';
|
||||
import { extractAgentResponse, waitForRunCompletion } from '../agents/utils.js';
|
||||
import { buildTriggerBlock } from '../agents/build-trigger-block.js';
|
||||
import { backgroundTaskBus } from './bus.js';
|
||||
|
||||
const log = new PrefixLogger('BgTask:Agent');
|
||||
|
||||
export interface BackgroundTaskAgentResult {
|
||||
slug: string;
|
||||
runId: string | null;
|
||||
summary: string | null;
|
||||
error?: string;
|
||||
}
|
||||
|
||||
const SUMMARY_LOG_LIMIT = 120;
|
||||
|
||||
function truncate(s: string | null | undefined, n = SUMMARY_LOG_LIMIT): string {
|
||||
if (!s) return '';
|
||||
return s.length <= n ? s : `${s.slice(0, n - 1)}…`;
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Agent run message
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
const BG_TASK_EVENT_DECISION_DIRECTIVE = '**Decision:** Determine whether this event genuinely warrants taking the action your instructions describe. If the event is not meaningfully relevant on closer inspection, skip the run — do not modify `index.md` and do not perform any side-effect. Only act if the event provides new or changed information that the instructions imply you should react to.';
|
||||
|
||||
const BG_TASK_MANUAL_PAREN = 'user-triggered — either the Run button in the Background Task detail view or the `run-background-task-agent` tool';
|
||||
|
||||
function buildMessage(
|
||||
slug: string,
|
||||
task: BackgroundTask,
|
||||
trigger: BackgroundTaskTriggerType,
|
||||
context?: string,
|
||||
): string {
|
||||
const now = new Date();
|
||||
const localNow = now.toLocaleString('en-US', { dateStyle: 'full', timeStyle: 'long' });
|
||||
const tz = Intl.DateTimeFormat().resolvedOptions().timeZone;
|
||||
|
||||
const wsFolder = `bg-tasks/${slug}/`;
|
||||
|
||||
const baseMessage = `Run the background task at \`${wsFolder}\`.
|
||||
|
||||
**Time:** ${localNow} (${tz})
|
||||
|
||||
**Instructions:**
|
||||
${task.instructions}
|
||||
|
||||
Your task folder is \`${wsFolder}\`. The user-visible artifact is \`${wsFolder}index.md\` — read it with \`workspace-readFile\` and update it with \`workspace-edit\` per the OUTPUT / ACTION mode rule. Do not touch \`${wsFolder}task.yaml\` (the runtime owns it).`;
|
||||
|
||||
return baseMessage + buildTriggerBlock({
|
||||
trigger,
|
||||
triggers: task.triggers,
|
||||
// The 'event' branch passes the event payload as `context`; every
|
||||
// other trigger uses `context` as a one-off bias for THIS run.
|
||||
context: trigger === 'event' ? undefined : context,
|
||||
eventPayload: trigger === 'event' ? context : undefined,
|
||||
targetNoun: 'task',
|
||||
instructionsNoun: 'instructions',
|
||||
manualParen: BG_TASK_MANUAL_PAREN,
|
||||
eventDecisionDirective: BG_TASK_EVENT_DECISION_DIRECTIVE,
|
||||
});
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Concurrency guard — keyed by slug
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
const runningTasks = new Set<string>();
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Public API
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
/**
|
||||
* Run the bg-task agent on a specific task.
|
||||
* Called by the scheduler ('cron' | 'window'), the event processor ('event'),
|
||||
* the renderer detail Run button ('manual'), or the `run-background-task-agent`
|
||||
* builtin tool ('manual').
|
||||
*/
|
||||
export async function runBackgroundTask(
|
||||
slug: string,
|
||||
trigger: BackgroundTaskTriggerType = 'manual',
|
||||
context?: string,
|
||||
): Promise<BackgroundTaskAgentResult> {
|
||||
if (runningTasks.has(slug)) {
|
||||
log.log(`${slug} — skip: already running`);
|
||||
return { slug, runId: null, summary: null, error: 'Already running' };
|
||||
}
|
||||
runningTasks.add(slug);
|
||||
|
||||
try {
|
||||
const task = await fetchTask(slug);
|
||||
if (!task) {
|
||||
log.log(`${slug} — skip: task not found`);
|
||||
return { slug, runId: null, summary: null, error: 'Task not found' };
|
||||
}
|
||||
|
||||
// `||` not `??`: an empty-string `task.model` (occasionally synthesized
|
||||
// by an LLM call to create-background-task) should fall through to the
|
||||
// default just like undefined does.
|
||||
const model = task.model || await getBackgroundTaskAgentModel();
|
||||
const agentRun = await createRun({
|
||||
agentId: 'background-task-agent',
|
||||
model,
|
||||
...(task.provider ? { provider: task.provider } : {}),
|
||||
useCase: 'background_task_agent',
|
||||
// Granular trigger as analytics sub-use-case — matches live-note's
|
||||
// pattern at runner.ts:149.
|
||||
subUseCase: trigger,
|
||||
});
|
||||
|
||||
const runId = agentRun.id;
|
||||
// Record this run in the task's runs.log pointer file (newest first).
|
||||
// The transcript itself lives at the global $WorkDir/runs/<runId>.jsonl
|
||||
// — runs.log is just an index that ties runIds to this task.
|
||||
await prependRunId(slug, runId);
|
||||
const startedAt = new Date().toISOString();
|
||||
|
||||
log.log(`${slug} — start trigger=${trigger} runId=${runId}`);
|
||||
|
||||
// Bump `lastAttemptAt` + `lastRunId` immediately (before the agent
|
||||
// executes). `lastAttemptAt` is the scheduler's backoff anchor and the
|
||||
// disk-persistent in-flight signal (lastAttemptAt > lastRunAt). Crucially
|
||||
// we leave `lastRunAt` / `lastRunSummary` / `lastRunError` untouched —
|
||||
// the previous successful run stays visible in the UI even while this
|
||||
// new run is in-flight or fails.
|
||||
await patchTask(slug, {
|
||||
lastAttemptAt: startedAt,
|
||||
lastRunId: runId,
|
||||
});
|
||||
|
||||
backgroundTaskBus.publish({
|
||||
type: 'background_task_agent_start',
|
||||
slug,
|
||||
trigger,
|
||||
runId,
|
||||
});
|
||||
|
||||
try {
|
||||
await createMessage(runId, buildMessage(slug, task, trigger, context));
|
||||
await waitForRunCompletion(runId, { throwOnError: true });
|
||||
const summary = await extractAgentResponse(runId);
|
||||
|
||||
// Success — bump cycle anchor, refresh summary, clear any prior error.
|
||||
await patchTask(slug, {
|
||||
lastRunAt: new Date().toISOString(),
|
||||
lastRunSummary: summary ?? undefined,
|
||||
lastRunError: undefined,
|
||||
});
|
||||
|
||||
log.log(`${slug} — done summary="${truncate(summary)}"`);
|
||||
|
||||
backgroundTaskBus.publish({
|
||||
type: 'background_task_agent_complete',
|
||||
slug,
|
||||
runId,
|
||||
...(summary ? { summary } : {}),
|
||||
});
|
||||
|
||||
return { slug, runId, summary };
|
||||
} catch (err) {
|
||||
const msg = err instanceof Error ? err.message : String(err);
|
||||
|
||||
// Failure — only record the error. `lastRunAt` and `lastRunSummary`
|
||||
// are deliberately untouched so the user keeps seeing the last good
|
||||
// state; the scheduler's backoff (lastAttemptAt + 5min) prevents
|
||||
// retry-storming.
|
||||
try {
|
||||
await patchTask(slug, { lastRunError: msg });
|
||||
} catch {
|
||||
// don't mask the original error
|
||||
}
|
||||
|
||||
log.log(`${slug} — failed: ${truncate(msg)}`);
|
||||
|
||||
backgroundTaskBus.publish({
|
||||
type: 'background_task_agent_complete',
|
||||
slug,
|
||||
runId,
|
||||
error: msg,
|
||||
});
|
||||
|
||||
return { slug, runId, summary: null, error: msg };
|
||||
}
|
||||
} finally {
|
||||
runningTasks.delete(slug);
|
||||
}
|
||||
}
|
||||
93
apps/x/packages/core/src/background-tasks/scheduler.ts
Normal file
93
apps/x/packages/core/src/background-tasks/scheduler.ts
Normal file
|
|
@ -0,0 +1,93 @@
|
|||
import { PrefixLogger } from '@x/shared';
|
||||
import { listTasks } from './fileops.js';
|
||||
import { runBackgroundTask } from './runner.js';
|
||||
import { backoffRemainingMs, dueTimedTrigger } from '../schedule/utils.js';
|
||||
|
||||
const log = new PrefixLogger('BgTask:Scheduler');
|
||||
const POLL_INTERVAL_MS = 15_000; // 15 seconds — matches live-note scheduler
|
||||
|
||||
function humanMs(ms: number): string {
|
||||
const s = Math.round(ms / 1000);
|
||||
if (s < 60) return `${s}s`;
|
||||
const m = Math.round(s / 60);
|
||||
return `${m}m`;
|
||||
}
|
||||
|
||||
async function processScheduledTasks(): Promise<void> {
|
||||
const { items } = await listTasks({ limit: 10_000 });
|
||||
|
||||
let scannedCount = items.length;
|
||||
let activeCount = 0;
|
||||
let pausedCount = 0;
|
||||
let firedCount = 0;
|
||||
let backoffCount = 0;
|
||||
let inFlightCount = 0;
|
||||
|
||||
for (const task of items) {
|
||||
if (!task.active) {
|
||||
pausedCount++;
|
||||
continue;
|
||||
}
|
||||
activeCount++;
|
||||
|
||||
// In-flight skip — `lastAttemptAt` set more recently than `lastRunAt`
|
||||
// means the latest attempt never completed. The in-memory concurrency
|
||||
// guard in the runner is the fast path; this is the disk-persistent
|
||||
// backstop covering crashes mid-run.
|
||||
const attemptAt = task.lastAttemptAt;
|
||||
const completedAt = task.lastRunAt;
|
||||
if (attemptAt && (!completedAt || attemptAt > completedAt)) {
|
||||
// …but only treat as in-flight if the attempt is still within the
|
||||
// backoff window. After backoff expires the next iteration is free
|
||||
// to retry (matches the runner's fail/crash recovery story).
|
||||
if (backoffRemainingMs(attemptAt) > 0) {
|
||||
inFlightCount++;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
// Cycle anchor: only successful runs advance the cycle. Failures
|
||||
// leave the cycle unfired so the next natural occurrence retries
|
||||
// (gated by backoff).
|
||||
const source = dueTimedTrigger(task.triggers, completedAt ?? null);
|
||||
if (!source) continue;
|
||||
|
||||
const backoffMs = backoffRemainingMs(attemptAt ?? null);
|
||||
if (backoffMs > 0) {
|
||||
backoffCount++;
|
||||
log.log(`${task.slug} — skip (matched ${source}, backoff ${humanMs(backoffMs)} remaining)`);
|
||||
continue;
|
||||
}
|
||||
|
||||
firedCount++;
|
||||
log.log(`${task.slug} — firing (matched ${source})`);
|
||||
runBackgroundTask(task.slug, source).catch(err => {
|
||||
log.log(`${task.slug} — fire error: ${err instanceof Error ? err.message : String(err)}`);
|
||||
});
|
||||
}
|
||||
|
||||
if (activeCount > 0 || firedCount > 0 || backoffCount > 0 || inFlightCount > 0) {
|
||||
log.log(
|
||||
`tick — scanned ${scannedCount} tasks, ${activeCount} active` +
|
||||
(pausedCount > 0 ? `, ${pausedCount} paused` : '') +
|
||||
(inFlightCount > 0 ? `, ${inFlightCount} in-flight` : '') +
|
||||
(firedCount > 0 ? `, fired ${firedCount}` : '') +
|
||||
(backoffCount > 0 ? `, backoff ${backoffCount}` : ''),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
export async function init(): Promise<void> {
|
||||
log.log(`starting, polling every ${POLL_INTERVAL_MS / 1000}s`);
|
||||
|
||||
await processScheduledTasks();
|
||||
|
||||
while (true) {
|
||||
await new Promise(resolve => setTimeout(resolve, POLL_INTERVAL_MS));
|
||||
try {
|
||||
await processScheduledTasks();
|
||||
} catch (err) {
|
||||
log.log(`tick error: ${err instanceof Error ? err.message : String(err)}`);
|
||||
}
|
||||
}
|
||||
}
|
||||
41
apps/x/packages/core/src/events/consumer.ts
Normal file
41
apps/x/packages/core/src/events/consumer.ts
Normal file
|
|
@ -0,0 +1,41 @@
|
|||
import type { RowboatEvent } from '@x/shared/dist/events.js';
|
||||
|
||||
/**
|
||||
* A target (live note, bg-task, …) that a consumer might fire on an event.
|
||||
* The `id` is consumer-defined — a workspace-relative path for live-note, a
|
||||
* slug for bg-task. The processor never interprets it.
|
||||
*/
|
||||
export interface EventConsumerTarget {
|
||||
id: string;
|
||||
instructions: string;
|
||||
eventMatchCriteria: string;
|
||||
}
|
||||
|
||||
export interface EventConsumerFireResult {
|
||||
runId: string | null;
|
||||
error?: string;
|
||||
}
|
||||
|
||||
/**
|
||||
* An event consumer registers itself with the event processor in `init.ts`.
|
||||
* On each pending event, the processor runs all consumers' `findCandidates`
|
||||
* concurrently (Pass-1 routing), then fires each consumer's candidates
|
||||
* sequentially (preserves per-target FIFO) while consumers run in parallel.
|
||||
*/
|
||||
export interface EventConsumer {
|
||||
/** Stable identifier for logging and the enriched event `consumers` map. */
|
||||
name: string;
|
||||
|
||||
/** All eligible candidates this consumer would consider routing into. */
|
||||
listEligibleTargets(): Promise<EventConsumerTarget[]>;
|
||||
|
||||
/**
|
||||
* Pass-1 routing. The implementation usually short-circuits when
|
||||
* `event.target?.consumer === this.name`, otherwise delegates to
|
||||
* `routeBatch` from `./routing.js`.
|
||||
*/
|
||||
findCandidates(event: RowboatEvent, targets: EventConsumerTarget[]): Promise<string[]>;
|
||||
|
||||
/** Fire the consumer's agent on a single candidate id. */
|
||||
fireCandidate(event: RowboatEvent, id: string): Promise<EventConsumerFireResult>;
|
||||
}
|
||||
32
apps/x/packages/core/src/events/init.ts
Normal file
32
apps/x/packages/core/src/events/init.ts
Normal file
|
|
@ -0,0 +1,32 @@
|
|||
import { PrefixLogger } from '@x/shared';
|
||||
import { processPendingEvents } from './processor.js';
|
||||
import { ensureEventDirs } from './producer.js';
|
||||
|
||||
export { registerConsumer } from './processor.js';
|
||||
export type { EventConsumer, EventConsumerTarget, EventConsumerFireResult } from './consumer.js';
|
||||
export { routeBatch } from './routing.js';
|
||||
export type { RouteBatchOptions } from './routing.js';
|
||||
export { createEvent } from './producer.js';
|
||||
|
||||
const log = new PrefixLogger('Events:Processor');
|
||||
const POLL_INTERVAL_MS = 5_000; // 5 seconds — events should feel responsive
|
||||
|
||||
/**
|
||||
* Start the event processor's tick loop. Consumers must be registered via
|
||||
* `registerConsumer` before this is called.
|
||||
*/
|
||||
export async function init(): Promise<void> {
|
||||
log.log(`starting, polling every ${POLL_INTERVAL_MS / 1000}s`);
|
||||
ensureEventDirs();
|
||||
|
||||
await processPendingEvents();
|
||||
|
||||
while (true) {
|
||||
await new Promise(resolve => setTimeout(resolve, POLL_INTERVAL_MS));
|
||||
try {
|
||||
await processPendingEvents();
|
||||
} catch (err) {
|
||||
log.log(`tick error: ${err instanceof Error ? err.message : String(err)}`);
|
||||
}
|
||||
}
|
||||
}
|
||||
170
apps/x/packages/core/src/events/processor.ts
Normal file
170
apps/x/packages/core/src/events/processor.ts
Normal file
|
|
@ -0,0 +1,170 @@
|
|||
import fs from 'fs';
|
||||
import path from 'path';
|
||||
import { events, PrefixLogger } from '@x/shared';
|
||||
import type { RowboatEvent, ConsumerResult } from '@x/shared/dist/events.js';
|
||||
import type { EventConsumer } from './consumer.js';
|
||||
import { PENDING_DIR, DONE_DIR, ensureEventDirs } from './producer.js';
|
||||
|
||||
const log = new PrefixLogger('Events:Processor');
|
||||
|
||||
let registeredConsumers: EventConsumer[] = [];
|
||||
|
||||
export function registerConsumer(consumer: EventConsumer): void {
|
||||
registeredConsumers.push(consumer);
|
||||
log.log(`registered consumer: ${consumer.name}`);
|
||||
}
|
||||
|
||||
/** @internal — for tests. */
|
||||
export function _resetConsumersForTests(): void {
|
||||
registeredConsumers = [];
|
||||
}
|
||||
|
||||
function moveEventToDone(filename: string, enriched: RowboatEvent): void {
|
||||
const donePath = path.join(DONE_DIR, filename);
|
||||
const pendingPath = path.join(PENDING_DIR, filename);
|
||||
fs.writeFileSync(donePath, JSON.stringify(enriched, null, 2), 'utf-8');
|
||||
try {
|
||||
fs.unlinkSync(pendingPath);
|
||||
} catch (err) {
|
||||
log.log(`failed to remove pending event ${filename}: ${err instanceof Error ? err.message : String(err)}`);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Materialize the legacy `targetFilePath` field — events written by the
|
||||
* pre-rename code carry the flat field; the new processor reads it as a
|
||||
* live-note targeted re-run.
|
||||
*/
|
||||
function migrateLegacyTarget(event: RowboatEvent): RowboatEvent {
|
||||
if (event.target || !event.targetFilePath) return event;
|
||||
return { ...event, target: { consumer: 'live-note', id: event.targetFilePath } };
|
||||
}
|
||||
|
||||
async function processOneEvent(filename: string): Promise<void> {
|
||||
const pendingPath = path.join(PENDING_DIR, filename);
|
||||
|
||||
let event: RowboatEvent;
|
||||
try {
|
||||
const raw = fs.readFileSync(pendingPath, 'utf-8');
|
||||
const parsed = JSON.parse(raw);
|
||||
event = events.RowboatEventSchema.parse(parsed);
|
||||
event = migrateLegacyTarget(event);
|
||||
} catch (err) {
|
||||
const msg = err instanceof Error ? err.message : String(err);
|
||||
log.log(`event:${filename} — malformed, moving to done with error: ${msg}`);
|
||||
const stub: RowboatEvent = {
|
||||
id: filename.replace(/\.json$/, ''),
|
||||
source: 'unknown',
|
||||
type: 'unknown',
|
||||
createdAt: new Date().toISOString(),
|
||||
payload: '',
|
||||
processedAt: new Date().toISOString(),
|
||||
error: `Failed to parse: ${msg}`,
|
||||
};
|
||||
moveEventToDone(filename, stub);
|
||||
return;
|
||||
}
|
||||
|
||||
log.log(`event:${event.id} — received source=${event.source} type=${event.type}`);
|
||||
|
||||
if (registeredConsumers.length === 0) {
|
||||
// No consumers — drop with a note in `done/` so the dir doesn't fill.
|
||||
const enriched: RowboatEvent = {
|
||||
...event,
|
||||
processedAt: new Date().toISOString(),
|
||||
consumers: {},
|
||||
};
|
||||
moveEventToDone(filename, enriched);
|
||||
return;
|
||||
}
|
||||
|
||||
// Pass-1: run all consumers' routing concurrently. Each consumer is
|
||||
// responsible for short-circuiting when `event.target?.consumer === this.name`.
|
||||
const passOne = await Promise.all(registeredConsumers.map(async (consumer) => {
|
||||
try {
|
||||
const targets = await consumer.listEligibleTargets();
|
||||
const candidateIds = await consumer.findCandidates(event, targets);
|
||||
return { consumer, candidateIds, error: undefined as string | undefined };
|
||||
} catch (err) {
|
||||
const msg = err instanceof Error ? err.message : String(err);
|
||||
log.log(`event:${event.id} — consumer ${consumer.name} Pass-1 threw: ${msg}`);
|
||||
return { consumer, candidateIds: [], error: msg };
|
||||
}
|
||||
}));
|
||||
|
||||
// Firing: each consumer fires its candidates sequentially (preserves
|
||||
// per-target FIFO). Consumers run in parallel via Promise.all.
|
||||
const fired = await Promise.all(passOne.map(async ({ consumer, candidateIds, error }) => {
|
||||
const result: ConsumerResult = { candidateIds, runIds: [], errors: error ? [error] : [] };
|
||||
|
||||
for (const id of candidateIds) {
|
||||
try {
|
||||
const r = await consumer.fireCandidate(event, id);
|
||||
if (r.runId) result.runIds.push(r.runId);
|
||||
if (r.error) {
|
||||
result.errors!.push(`${id}: ${r.error}`);
|
||||
}
|
||||
} catch (err) {
|
||||
const msg = err instanceof Error ? err.message : String(err);
|
||||
log.log(`event:${event.id} — consumer ${consumer.name} candidate ${id} threw: ${msg}`);
|
||||
result.errors!.push(`${id}: ${msg}`);
|
||||
}
|
||||
}
|
||||
|
||||
if (result.errors!.length === 0) {
|
||||
delete result.errors;
|
||||
}
|
||||
|
||||
const total = candidateIds.length;
|
||||
if (total > 0) {
|
||||
const errCount = result.errors?.length ?? 0;
|
||||
const okCount = result.runIds.length;
|
||||
log.log(`event:${event.id} — ${consumer.name} processed ok=${okCount} errors=${errCount}`);
|
||||
}
|
||||
|
||||
return { name: consumer.name, result };
|
||||
}));
|
||||
|
||||
const consumersMap: Record<string, ConsumerResult> = {};
|
||||
for (const { name, result } of fired) {
|
||||
consumersMap[name] = result;
|
||||
}
|
||||
|
||||
const enriched: RowboatEvent = {
|
||||
...event,
|
||||
processedAt: new Date().toISOString(),
|
||||
consumers: consumersMap,
|
||||
};
|
||||
|
||||
moveEventToDone(filename, enriched);
|
||||
}
|
||||
|
||||
export async function processPendingEvents(): Promise<void> {
|
||||
ensureEventDirs();
|
||||
|
||||
let filenames: string[];
|
||||
try {
|
||||
filenames = fs.readdirSync(PENDING_DIR).filter(f => f.endsWith('.json'));
|
||||
} catch (err) {
|
||||
log.log(`failed to read pending dir: ${err instanceof Error ? err.message : String(err)}`);
|
||||
return;
|
||||
}
|
||||
|
||||
if (filenames.length === 0) return;
|
||||
|
||||
// FIFO: monotonic IDs are lexicographically sortable
|
||||
filenames.sort();
|
||||
|
||||
if (filenames.length > 1) {
|
||||
log.log(`tick — ${filenames.length} pending events`);
|
||||
}
|
||||
|
||||
for (const filename of filenames) {
|
||||
try {
|
||||
await processOneEvent(filename);
|
||||
} catch (err) {
|
||||
log.log(`event:${filename} — unhandled error: ${err instanceof Error ? err.message : String(err)}`);
|
||||
// Keep the loop alive — don't move file, will retry on next tick
|
||||
}
|
||||
}
|
||||
}
|
||||
31
apps/x/packages/core/src/events/producer.ts
Normal file
31
apps/x/packages/core/src/events/producer.ts
Normal file
|
|
@ -0,0 +1,31 @@
|
|||
import fs from 'fs';
|
||||
import path from 'path';
|
||||
import type { RowboatEvent } from '@x/shared/dist/events.js';
|
||||
import { WorkDir } from '../config/config.js';
|
||||
import type { IMonotonicallyIncreasingIdGenerator } from '../application/lib/id-gen.js';
|
||||
import container from '../di/container.js';
|
||||
|
||||
export const EVENTS_DIR = path.join(WorkDir, 'events');
|
||||
export const PENDING_DIR = path.join(EVENTS_DIR, 'pending');
|
||||
export const DONE_DIR = path.join(EVENTS_DIR, 'done');
|
||||
|
||||
/**
|
||||
* Write a RowboatEvent to the events/pending/ directory. The filename is the
|
||||
* monotonically increasing ID so events sort by creation order. Producers
|
||||
* (gmail/calendar sync) call this in chronological order within a batch.
|
||||
*/
|
||||
export async function createEvent(event: Omit<RowboatEvent, 'id'>): Promise<void> {
|
||||
fs.mkdirSync(PENDING_DIR, { recursive: true });
|
||||
|
||||
const idGen = container.resolve<IMonotonicallyIncreasingIdGenerator>('idGenerator');
|
||||
const id = await idGen.next();
|
||||
|
||||
const fullEvent: RowboatEvent = { id, ...event };
|
||||
const filePath = path.join(PENDING_DIR, `${id}.json`);
|
||||
fs.writeFileSync(filePath, JSON.stringify(fullEvent, null, 2), 'utf-8');
|
||||
}
|
||||
|
||||
export function ensureEventDirs(): void {
|
||||
fs.mkdirSync(PENDING_DIR, { recursive: true });
|
||||
fs.mkdirSync(DONE_DIR, { recursive: true });
|
||||
}
|
||||
116
apps/x/packages/core/src/events/routing.ts
Normal file
116
apps/x/packages/core/src/events/routing.ts
Normal file
|
|
@ -0,0 +1,116 @@
|
|||
import { generateObject } from 'ai';
|
||||
import type { LanguageModel } from 'ai';
|
||||
import { events, PrefixLogger } from '@x/shared';
|
||||
import type { RowboatEvent } from '@x/shared/dist/events.js';
|
||||
import { captureLlmUsage } from '../analytics/usage.js';
|
||||
import type { UseCase } from '../analytics/use_case.js';
|
||||
import type { EventConsumerTarget } from './consumer.js';
|
||||
|
||||
const log = new PrefixLogger('Events:Routing');
|
||||
|
||||
const BATCH_SIZE = 20;
|
||||
|
||||
/**
|
||||
* Pass-1 LLM classifier — given an event and a list of candidate targets,
|
||||
* return the subset whose intent/matchCriteria suggest the event might be
|
||||
* relevant. Liberal by design: the consumer's agent does Pass-2 on the event
|
||||
* payload before acting.
|
||||
*/
|
||||
export interface RouteBatchOptions {
|
||||
/** Singular noun for prompt wording, e.g. 'live note', 'background task'. */
|
||||
entitySingular: string;
|
||||
/** Plural noun for prompt wording, e.g. 'live notes', 'background tasks'. */
|
||||
entityPlural: string;
|
||||
/** Analytics use case (e.g. 'live_note_agent', 'background_task_agent'). */
|
||||
useCase: UseCase;
|
||||
/** Resolver returning the LLM to use for routing. Each consumer provides its own
|
||||
* so model selection stays aligned with the consumer's agent model. */
|
||||
resolveModel: () => Promise<{ model: LanguageModel; modelId: string; providerName: string }>;
|
||||
}
|
||||
|
||||
function buildSystemPrompt(opts: Pick<RouteBatchOptions, 'entitySingular' | 'entityPlural'>): string {
|
||||
const { entitySingular, entityPlural } = opts;
|
||||
return `You are a routing classifier for a personal productivity workspace.
|
||||
|
||||
You will receive an event (something that happened — an email, meeting, message, etc.) and a list of ${entityPlural}. Each one has:
|
||||
- id: an identifier you return in the output
|
||||
- intent: the persistent intent of the ${entitySingular} (what it should keep being / containing / doing)
|
||||
- matchCriteria: an explicit description of which kinds of incoming signals should wake this ${entitySingular}
|
||||
|
||||
Your job is to identify which ${entityPlural} MIGHT be relevant to this event.
|
||||
|
||||
Rules:
|
||||
- Be LIBERAL in your selections. Include any ${entitySingular} that is even moderately relevant.
|
||||
- Prefer false positives over false negatives — it is much better to include one that turns out to be irrelevant than to miss one that was relevant.
|
||||
- Only exclude entries that are CLEARLY and OBVIOUSLY irrelevant to the event.
|
||||
- Do not attempt to judge whether the event contains enough information to act on. That is handled by the agent in a later stage.
|
||||
- Return an empty list only if no entries are relevant at all.
|
||||
- Return each candidate's id exactly as given.`;
|
||||
}
|
||||
|
||||
function buildPrompt(event: RowboatEvent, batch: EventConsumerTarget[], entityPlural: string): string {
|
||||
const list = batch
|
||||
.map((t, i) => `${i + 1}. id: ${t.id}\n intent: ${t.instructions}\n matchCriteria: ${t.eventMatchCriteria}`)
|
||||
.join('\n\n');
|
||||
|
||||
return `## Event
|
||||
|
||||
Source: ${event.source}
|
||||
Type: ${event.type}
|
||||
Time: ${event.createdAt}
|
||||
|
||||
${event.payload}
|
||||
|
||||
## ${entityPlural[0].toUpperCase()}${entityPlural.slice(1)}
|
||||
|
||||
${list}`;
|
||||
}
|
||||
|
||||
/**
|
||||
* Run Pass-1 routing for one consumer. Returns the subset of `targets` whose
|
||||
* ids the classifier flagged as relevant. Batched in groups of 20.
|
||||
*/
|
||||
export async function routeBatch(
|
||||
event: RowboatEvent,
|
||||
targets: EventConsumerTarget[],
|
||||
opts: RouteBatchOptions,
|
||||
): Promise<string[]> {
|
||||
if (targets.length === 0) {
|
||||
log.log(`event:${event.id} — no eligible ${opts.entityPlural}`);
|
||||
return [];
|
||||
}
|
||||
|
||||
log.log(`event:${event.id} — routing against ${targets.length} ${targets.length === 1 ? opts.entitySingular : opts.entityPlural}`);
|
||||
|
||||
const { model, modelId, providerName } = await opts.resolveModel();
|
||||
const systemPrompt = buildSystemPrompt(opts);
|
||||
const matched = new Set<string>();
|
||||
|
||||
for (let i = 0; i < targets.length; i += BATCH_SIZE) {
|
||||
const batch = targets.slice(i, i + BATCH_SIZE);
|
||||
try {
|
||||
const result = await generateObject({
|
||||
model,
|
||||
system: systemPrompt,
|
||||
prompt: buildPrompt(event, batch, opts.entityPlural),
|
||||
schema: events.Pass1OutputSchema,
|
||||
});
|
||||
captureLlmUsage({
|
||||
useCase: opts.useCase,
|
||||
subUseCase: 'routing',
|
||||
model: modelId,
|
||||
provider: providerName,
|
||||
usage: result.usage,
|
||||
});
|
||||
for (const id of result.object.ids) {
|
||||
matched.add(id);
|
||||
}
|
||||
} catch (err) {
|
||||
log.log(`event:${event.id} — Pass1 batch ${Math.floor(i / BATCH_SIZE)} failed: ${err instanceof Error ? err.message : String(err)}`);
|
||||
}
|
||||
}
|
||||
|
||||
const candidateIds = targets.filter(t => matched.has(t.id)).map(t => t.id);
|
||||
log.log(`event:${event.id} — Pass1 → ${candidateIds.length} candidate${candidateIds.length === 1 ? '' : 's'}${candidateIds.length > 0 ? `: ${candidateIds.join(', ')}` : ''}`);
|
||||
return candidateIds;
|
||||
}
|
||||
|
|
@ -0,0 +1,79 @@
|
|||
import * as workspace from '../../workspace/workspace.js';
|
||||
import { fetchLiveNote } from './fileops.js';
|
||||
import { runLiveNoteAgent } from './runner.js';
|
||||
import type { EventConsumer, EventConsumerTarget } from '../../events/consumer.js';
|
||||
import { routeBatch } from '../../events/routing.js';
|
||||
import { createProvider } from '../../models/models.js';
|
||||
import { getDefaultModelAndProvider, getLiveNoteAgentModel, resolveProviderConfig } from '../../models/defaults.js';
|
||||
|
||||
async function resolveRoutingModel() {
|
||||
const modelId = await getLiveNoteAgentModel();
|
||||
const { provider } = await getDefaultModelAndProvider();
|
||||
const config = await resolveProviderConfig(provider);
|
||||
return {
|
||||
model: createProvider(config).languageModel(modelId),
|
||||
modelId,
|
||||
providerName: provider,
|
||||
};
|
||||
}
|
||||
|
||||
async function listKnowledgeMarkdownFiles(): Promise<string[]> {
|
||||
try {
|
||||
const entries = await workspace.readdir('knowledge', { recursive: true });
|
||||
return entries
|
||||
.filter(e => e.kind === 'file' && e.name.endsWith('.md'))
|
||||
.map(e => e.path.replace(/^knowledge\//, ''));
|
||||
} catch {
|
||||
return [];
|
||||
}
|
||||
}
|
||||
|
||||
async function listEligibleTargets(): Promise<EventConsumerTarget[]> {
|
||||
const out: EventConsumerTarget[] = [];
|
||||
const filePaths = await listKnowledgeMarkdownFiles();
|
||||
|
||||
for (const filePath of filePaths) {
|
||||
let live;
|
||||
try {
|
||||
live = await fetchLiveNote(filePath);
|
||||
} catch {
|
||||
continue;
|
||||
}
|
||||
if (!live) continue;
|
||||
if (live.active === false) continue;
|
||||
|
||||
const eventMatchCriteria = live.triggers?.eventMatchCriteria;
|
||||
if (!eventMatchCriteria) continue;
|
||||
|
||||
out.push({
|
||||
id: filePath,
|
||||
instructions: live.objective,
|
||||
eventMatchCriteria,
|
||||
});
|
||||
}
|
||||
return out;
|
||||
}
|
||||
|
||||
export const liveNoteEventConsumer: EventConsumer = {
|
||||
name: 'live-note',
|
||||
|
||||
listEligibleTargets,
|
||||
|
||||
findCandidates: async (event, targets) => {
|
||||
// Targeted re-run from the UI — skip Pass-1.
|
||||
if (event.target?.consumer === 'live-note') {
|
||||
return targets.some(t => t.id === event.target!.id) ? [event.target.id] : [];
|
||||
}
|
||||
return routeBatch(event, targets, {
|
||||
entitySingular: 'live note',
|
||||
entityPlural: 'live notes',
|
||||
useCase: 'live_note_agent',
|
||||
resolveModel: resolveRoutingModel,
|
||||
});
|
||||
},
|
||||
|
||||
fireCandidate: async (event, filePath) => {
|
||||
const result = await runLiveNoteAgent(filePath, 'event', event.payload);
|
||||
return { runId: result.runId, error: result.error };
|
||||
},
|
||||
};
|
||||
|
|
@ -1,204 +0,0 @@
|
|||
import fs from 'fs';
|
||||
import path from 'path';
|
||||
import { PrefixLogger, liveNote } from '@x/shared';
|
||||
import type { KnowledgeEvent } from '@x/shared/dist/live-note.js';
|
||||
import { WorkDir } from '../../config/config.js';
|
||||
import * as workspace from '../../workspace/workspace.js';
|
||||
import { fetchLiveNote } from './fileops.js';
|
||||
import { runLiveNoteAgent } from './runner.js';
|
||||
import { findCandidates, type ParsedLiveNote } from './routing.js';
|
||||
import type { IMonotonicallyIncreasingIdGenerator } from '../../application/lib/id-gen.js';
|
||||
import container from '../../di/container.js';
|
||||
|
||||
const POLL_INTERVAL_MS = 5_000; // 5 seconds — events should feel responsive
|
||||
const EVENTS_DIR = path.join(WorkDir, 'events');
|
||||
const PENDING_DIR = path.join(EVENTS_DIR, 'pending');
|
||||
const DONE_DIR = path.join(EVENTS_DIR, 'done');
|
||||
|
||||
const log = new PrefixLogger('LiveNote:Events');
|
||||
|
||||
/**
|
||||
* Write a KnowledgeEvent to the events/pending/ directory.
|
||||
* Filename is a monotonically increasing ID so events sort by creation order.
|
||||
* Call this function in chronological order (oldest event first) within a sync batch
|
||||
* to ensure correct ordering.
|
||||
*/
|
||||
export async function createEvent(event: Omit<KnowledgeEvent, 'id'>): Promise<void> {
|
||||
fs.mkdirSync(PENDING_DIR, { recursive: true });
|
||||
|
||||
const idGen = container.resolve<IMonotonicallyIncreasingIdGenerator>('idGenerator');
|
||||
const id = await idGen.next();
|
||||
|
||||
const fullEvent: KnowledgeEvent = { id, ...event };
|
||||
const filePath = path.join(PENDING_DIR, `${id}.json`);
|
||||
fs.writeFileSync(filePath, JSON.stringify(fullEvent, null, 2), 'utf-8');
|
||||
}
|
||||
|
||||
function ensureDirs(): void {
|
||||
fs.mkdirSync(PENDING_DIR, { recursive: true });
|
||||
fs.mkdirSync(DONE_DIR, { recursive: true });
|
||||
}
|
||||
|
||||
async function listEventEligibleLiveNotes(): Promise<ParsedLiveNote[]> {
|
||||
const out: ParsedLiveNote[] = [];
|
||||
let entries;
|
||||
try {
|
||||
entries = await workspace.readdir('knowledge', { recursive: true });
|
||||
} catch {
|
||||
return out;
|
||||
}
|
||||
const mdFiles = entries
|
||||
.filter(e => e.kind === 'file' && e.name.endsWith('.md'))
|
||||
.map(e => e.path.replace(/^knowledge\//, ''));
|
||||
|
||||
for (const filePath of mdFiles) {
|
||||
let live;
|
||||
try {
|
||||
live = await fetchLiveNote(filePath);
|
||||
} catch {
|
||||
continue;
|
||||
}
|
||||
if (!live) continue;
|
||||
if (live.active === false) continue;
|
||||
|
||||
const eventMatchCriteria = live.triggers?.eventMatchCriteria;
|
||||
if (!eventMatchCriteria) continue; // not event-eligible
|
||||
|
||||
out.push({
|
||||
filePath,
|
||||
objective: live.objective,
|
||||
eventMatchCriteria,
|
||||
});
|
||||
}
|
||||
return out;
|
||||
}
|
||||
|
||||
function moveEventToDone(filename: string, enriched: KnowledgeEvent): void {
|
||||
const donePath = path.join(DONE_DIR, filename);
|
||||
const pendingPath = path.join(PENDING_DIR, filename);
|
||||
fs.writeFileSync(donePath, JSON.stringify(enriched, null, 2), 'utf-8');
|
||||
try {
|
||||
fs.unlinkSync(pendingPath);
|
||||
} catch (err) {
|
||||
log.log(`failed to remove pending event ${filename}: ${err instanceof Error ? err.message : String(err)}`);
|
||||
}
|
||||
}
|
||||
|
||||
async function processOneEvent(filename: string): Promise<void> {
|
||||
const pendingPath = path.join(PENDING_DIR, filename);
|
||||
|
||||
let event: KnowledgeEvent;
|
||||
try {
|
||||
const raw = fs.readFileSync(pendingPath, 'utf-8');
|
||||
const parsed = JSON.parse(raw);
|
||||
event = liveNote.KnowledgeEventSchema.parse(parsed);
|
||||
} catch (err) {
|
||||
const msg = err instanceof Error ? err.message : String(err);
|
||||
log.log(`event:${filename} — malformed, moving to done with error: ${msg}`);
|
||||
const stub: KnowledgeEvent = {
|
||||
id: filename.replace(/\.json$/, ''),
|
||||
source: 'unknown',
|
||||
type: 'unknown',
|
||||
createdAt: new Date().toISOString(),
|
||||
payload: '',
|
||||
processedAt: new Date().toISOString(),
|
||||
error: `Failed to parse: ${msg}`,
|
||||
};
|
||||
moveEventToDone(filename, stub);
|
||||
return;
|
||||
}
|
||||
|
||||
log.log(`event:${event.id} — received source=${event.source} type=${event.type}`);
|
||||
|
||||
const eligible = await listEventEligibleLiveNotes();
|
||||
const candidates = await findCandidates(event, eligible);
|
||||
|
||||
if (candidates.length === 0) {
|
||||
log.log(`event:${event.id} — no candidates (${eligible.length} eligible note${eligible.length === 1 ? '' : 's'})`);
|
||||
} else {
|
||||
log.log(`event:${event.id} — dispatching to ${candidates.length} candidate${candidates.length === 1 ? '' : 's'}: ${candidates.map(c => c.filePath).join(', ')}`);
|
||||
}
|
||||
|
||||
const runIds: string[] = [];
|
||||
let processingError: string | undefined;
|
||||
let okCount = 0;
|
||||
let errCount = 0;
|
||||
|
||||
// Sequential — preserves total ordering
|
||||
for (const candidate of candidates) {
|
||||
try {
|
||||
const result = await runLiveNoteAgent(candidate.filePath, 'event', event.payload);
|
||||
if (result.runId) runIds.push(result.runId);
|
||||
if (result.error) {
|
||||
errCount++;
|
||||
} else {
|
||||
okCount++;
|
||||
}
|
||||
} catch (err) {
|
||||
errCount++;
|
||||
const msg = err instanceof Error ? err.message : String(err);
|
||||
log.log(`event:${event.id} — candidate ${candidate.filePath} threw: ${msg}`);
|
||||
processingError = (processingError ? processingError + '; ' : '') + `${candidate.filePath}: ${msg}`;
|
||||
}
|
||||
}
|
||||
|
||||
if (candidates.length > 0) {
|
||||
log.log(`event:${event.id} — processed ok=${okCount} errors=${errCount}`);
|
||||
}
|
||||
|
||||
const enriched: KnowledgeEvent = {
|
||||
...event,
|
||||
processedAt: new Date().toISOString(),
|
||||
candidateFilePaths: candidates.map(c => c.filePath),
|
||||
runIds,
|
||||
...(processingError ? { error: processingError } : {}),
|
||||
};
|
||||
|
||||
moveEventToDone(filename, enriched);
|
||||
}
|
||||
|
||||
async function processPendingEvents(): Promise<void> {
|
||||
ensureDirs();
|
||||
|
||||
let filenames: string[];
|
||||
try {
|
||||
filenames = fs.readdirSync(PENDING_DIR).filter(f => f.endsWith('.json'));
|
||||
} catch (err) {
|
||||
log.log(`failed to read pending dir: ${err instanceof Error ? err.message : String(err)}`);
|
||||
return;
|
||||
}
|
||||
|
||||
if (filenames.length === 0) return;
|
||||
|
||||
// FIFO: monotonic IDs are lexicographically sortable
|
||||
filenames.sort();
|
||||
|
||||
if (filenames.length > 1) {
|
||||
log.log(`tick — ${filenames.length} pending events`);
|
||||
}
|
||||
|
||||
for (const filename of filenames) {
|
||||
try {
|
||||
await processOneEvent(filename);
|
||||
} catch (err) {
|
||||
log.log(`event:${filename} — unhandled error: ${err instanceof Error ? err.message : String(err)}`);
|
||||
// Keep the loop alive — don't move file, will retry on next tick
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export async function init(): Promise<void> {
|
||||
log.log(`starting, polling every ${POLL_INTERVAL_MS / 1000}s`);
|
||||
ensureDirs();
|
||||
|
||||
await processPendingEvents();
|
||||
|
||||
while (true) {
|
||||
await new Promise(resolve => setTimeout(resolve, POLL_INTERVAL_MS));
|
||||
try {
|
||||
await processPendingEvents();
|
||||
} catch (err) {
|
||||
log.log(`tick error: ${err instanceof Error ? err.message : String(err)}`);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -1,111 +0,0 @@
|
|||
import { generateObject } from 'ai';
|
||||
import { liveNote, PrefixLogger } from '@x/shared';
|
||||
import type { KnowledgeEvent } from '@x/shared/dist/live-note.js';
|
||||
import { createProvider } from '../../models/models.js';
|
||||
import { getDefaultModelAndProvider, getLiveNoteAgentModel, resolveProviderConfig } from '../../models/defaults.js';
|
||||
import { captureLlmUsage } from '../../analytics/usage.js';
|
||||
|
||||
const log = new PrefixLogger('LiveNote:Routing');
|
||||
|
||||
const BATCH_SIZE = 20;
|
||||
|
||||
export interface ParsedLiveNote {
|
||||
filePath: string;
|
||||
objective: string;
|
||||
eventMatchCriteria: string;
|
||||
}
|
||||
|
||||
const ROUTING_SYSTEM_PROMPT = `You are a routing classifier for a personal knowledge base.
|
||||
|
||||
You will receive an event (something that happened — an email, meeting, message, etc.) and a list of *live notes*. Each live note has:
|
||||
- filePath: the path of the note file
|
||||
- objective: the persistent intent of the note (what it should keep being / containing)
|
||||
- matchCriteria: an explicit description of which kinds of incoming signals should wake this note
|
||||
|
||||
Your job is to identify which live notes MIGHT be relevant to this event.
|
||||
|
||||
Rules:
|
||||
- Be LIBERAL in your selections. Include any note that is even moderately relevant.
|
||||
- Prefer false positives over false negatives — it is much better to include a note that turns out to be irrelevant than to miss one that was relevant.
|
||||
- Only exclude notes that are CLEARLY and OBVIOUSLY irrelevant to the event.
|
||||
- Do not attempt to judge whether the event contains enough information to act on. That is handled by the live-note agent in a later stage.
|
||||
- Return an empty list only if no notes are relevant at all.
|
||||
- Return each candidate's filePath exactly as given.`;
|
||||
|
||||
async function resolveModel() {
|
||||
const modelId = await getLiveNoteAgentModel();
|
||||
const { provider } = await getDefaultModelAndProvider();
|
||||
const config = await resolveProviderConfig(provider);
|
||||
return {
|
||||
model: createProvider(config).languageModel(modelId),
|
||||
modelId,
|
||||
providerName: provider,
|
||||
};
|
||||
}
|
||||
|
||||
function buildRoutingPrompt(event: KnowledgeEvent, batch: ParsedLiveNote[]): string {
|
||||
const noteList = batch
|
||||
.map((n, i) => `${i + 1}. filePath: ${n.filePath}\n objective: ${n.objective}\n matchCriteria: ${n.eventMatchCriteria}`)
|
||||
.join('\n\n');
|
||||
|
||||
return `## Event
|
||||
|
||||
Source: ${event.source}
|
||||
Type: ${event.type}
|
||||
Time: ${event.createdAt}
|
||||
|
||||
${event.payload}
|
||||
|
||||
## Live notes
|
||||
|
||||
${noteList}`;
|
||||
}
|
||||
|
||||
export async function findCandidates(
|
||||
event: KnowledgeEvent,
|
||||
allLiveNotes: ParsedLiveNote[],
|
||||
): Promise<ParsedLiveNote[]> {
|
||||
// Short-circuit for targeted re-runs — skip LLM routing entirely
|
||||
if (event.targetFilePath) {
|
||||
const target = allLiveNotes.find(n => n.filePath === event.targetFilePath);
|
||||
return target ? [target] : [];
|
||||
}
|
||||
|
||||
if (allLiveNotes.length === 0) {
|
||||
log.log(`event:${event.id} — no event-eligible live notes`);
|
||||
return [];
|
||||
}
|
||||
|
||||
log.log(`event:${event.id} — routing against ${allLiveNotes.length} live note${allLiveNotes.length === 1 ? '' : 's'}`);
|
||||
|
||||
const { model, modelId, providerName } = await resolveModel();
|
||||
const candidatePaths = new Set<string>();
|
||||
|
||||
for (let i = 0; i < allLiveNotes.length; i += BATCH_SIZE) {
|
||||
const batch = allLiveNotes.slice(i, i + BATCH_SIZE);
|
||||
try {
|
||||
const result = await generateObject({
|
||||
model,
|
||||
system: ROUTING_SYSTEM_PROMPT,
|
||||
prompt: buildRoutingPrompt(event, batch),
|
||||
schema: liveNote.Pass1OutputSchema,
|
||||
});
|
||||
captureLlmUsage({
|
||||
useCase: 'live_note_agent',
|
||||
subUseCase: 'routing',
|
||||
model: modelId,
|
||||
provider: providerName,
|
||||
usage: result.usage,
|
||||
});
|
||||
for (const fp of result.object.filePaths) {
|
||||
candidatePaths.add(fp);
|
||||
}
|
||||
} catch (err) {
|
||||
log.log(`event:${event.id} — Pass1 batch ${Math.floor(i / BATCH_SIZE)} failed: ${err instanceof Error ? err.message : String(err)}`);
|
||||
}
|
||||
}
|
||||
|
||||
const candidates = allLiveNotes.filter(n => candidatePaths.has(n.filePath));
|
||||
log.log(`event:${event.id} — Pass1 → ${candidates.length} candidate${candidates.length === 1 ? '' : 's'}${candidates.length > 0 ? `: ${candidates.map(c => c.filePath).join(', ')}` : ''}`);
|
||||
return candidates;
|
||||
}
|
||||
|
|
@ -3,6 +3,7 @@ import { fetchLiveNote, patchLiveNote, readNoteBody } from './fileops.js';
|
|||
import { createRun, createMessage } from '../../runs/runs.js';
|
||||
import { getLiveNoteAgentModel } from '../../models/defaults.js';
|
||||
import { extractAgentResponse, waitForRunCompletion } from '../../agents/utils.js';
|
||||
import { buildTriggerBlock } from '../../agents/build-trigger-block.js';
|
||||
import { liveNoteBus } from './bus.js';
|
||||
import { PrefixLogger } from '@x/shared/dist/prefix-logger.js';
|
||||
|
||||
|
|
@ -29,50 +30,9 @@ function truncate(s: string | null | undefined, n = SUMMARY_LOG_LIMIT): string {
|
|||
// Agent run message
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
function describeWindow(triggers: LiveNote['triggers']): string {
|
||||
const ws = triggers?.windows;
|
||||
if (!ws || ws.length === 0) return 'a configured window';
|
||||
return ws.map(w => `${w.startTime}–${w.endTime}`).join(', ');
|
||||
}
|
||||
const LIVE_NOTE_EVENT_DECISION_DIRECTIVE = '**Decision:** Determine whether this event genuinely warrants updating the note. If the event is not meaningfully relevant on closer inspection, skip the update — do not call `workspace-edit`. Only edit the file if the event provides new or changed information that the objective implies should be reflected.';
|
||||
|
||||
function buildTriggerBlock(
|
||||
live: LiveNote,
|
||||
trigger: LiveNoteTriggerType,
|
||||
context: string | undefined,
|
||||
): string {
|
||||
if (trigger === 'event') {
|
||||
const criteria = live.triggers?.eventMatchCriteria ?? '(none — should not happen for event-triggered runs)';
|
||||
return `
|
||||
|
||||
**Trigger:** Event match — Pass 1 routing flagged this note as potentially relevant to the event below.
|
||||
|
||||
**Event match criteria for this note:**
|
||||
${criteria}
|
||||
|
||||
**Event payload:**
|
||||
${context ?? '(no payload)'}
|
||||
|
||||
**Decision:** Determine whether this event genuinely warrants updating the note. If the event is not meaningfully relevant on closer inspection, skip the update — do not call \`workspace-edit\`. Only edit the file if the event provides new or changed information that the objective implies should be reflected.`;
|
||||
}
|
||||
|
||||
if (trigger === 'cron') {
|
||||
const expr = live.triggers?.cronExpr ?? '(unknown)';
|
||||
return `
|
||||
|
||||
**Trigger:** Scheduled refresh — the cron expression \`${expr}\` matched. This is a baseline refresh; if your objective specifies different behavior for cron vs window vs event runs, follow the cron branch.${context ? `\n\n**Context:**\n${context}` : ''}`;
|
||||
}
|
||||
|
||||
if (trigger === 'window') {
|
||||
return `
|
||||
|
||||
**Trigger:** Scheduled refresh — fired inside the configured window (${describeWindow(live.triggers)}). This is a forgiving baseline refresh that runs once per day per window; reactive updates are handled by event triggers (when configured). If your objective specifies different behavior for cron vs window vs event runs, follow the window branch.${context ? `\n\n**Context:**\n${context}` : ''}`;
|
||||
}
|
||||
|
||||
// manual
|
||||
return `
|
||||
|
||||
**Trigger:** Manual run (user-triggered — either the Run button in the Live Note panel or the \`run-live-note-agent\` tool).${context ? `\n\n**Context:**\n${context}` : ''}`;
|
||||
}
|
||||
const LIVE_NOTE_MANUAL_PAREN = 'user-triggered — either the Run button in the Live Note panel or the `run-live-note-agent` tool';
|
||||
|
||||
function buildMessage(
|
||||
filePath: string,
|
||||
|
|
@ -97,7 +57,19 @@ ${live.objective}
|
|||
|
||||
Start by calling \`workspace-readFile\` on \`${wsPath}\` to read the current note (frontmatter + body) — the body may be long and you should fetch it yourself rather than rely on a snapshot. Then make small, incremental edits with \`workspace-edit\` to bring the body in line with the objective: edit one region, re-read to verify, then edit the next region. Avoid one-shot rewrites of the whole body. Do not modify the YAML frontmatter at the top of the file — that block is owned by the user and the runtime.`;
|
||||
|
||||
return baseMessage + buildTriggerBlock(live, trigger, context);
|
||||
return baseMessage + buildTriggerBlock({
|
||||
trigger,
|
||||
triggers: live.triggers,
|
||||
// The live-note "event" branch passes the payload as `context`; for
|
||||
// every other trigger that same arg is the manual/scheduled context
|
||||
// appended to the block. Preserve both contracts with a single split.
|
||||
context: trigger === 'event' ? undefined : context,
|
||||
eventPayload: trigger === 'event' ? context : undefined,
|
||||
targetNoun: 'note',
|
||||
instructionsNoun: 'objective',
|
||||
manualParen: LIVE_NOTE_MANUAL_PAREN,
|
||||
eventDecisionDirective: LIVE_NOTE_EVENT_DECISION_DIRECTIVE,
|
||||
});
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
|
|
|
|||
|
|
@ -1,92 +1,4 @@
|
|||
import { CronExpressionParser } from 'cron-parser';
|
||||
import type { Triggers } from '@x/shared/dist/live-note.js';
|
||||
|
||||
const GRACE_MS = 2 * 60 * 1000; // 2 minutes
|
||||
export const RETRY_BACKOFF_MS = 5 * 60 * 1000; // 5 minutes
|
||||
|
||||
/**
|
||||
* Decide whether a live note's `triggers` block has any timed sub-trigger
|
||||
* (cron or window) whose cycle is currently ready to fire. Pure cycle check —
|
||||
* does NOT consider backoff.
|
||||
*
|
||||
* - Cycle accounting (cron prev-occurrence, window once-per-day) is anchored
|
||||
* on `lastRunAt` — which is bumped only on *successful* completions. So a
|
||||
* failed run leaves the cycle unfired and this returns the matched trigger
|
||||
* again on the next tick (caller is expected to gate on backoff separately).
|
||||
* - `cronExpr` enforces a 2-minute grace window — if the scheduled time was
|
||||
* more than 2 minutes ago, it's a miss and skipped (avoids replay storms
|
||||
* after the app was offline).
|
||||
* - `windows` are forgiving: each window fires at most once per day per
|
||||
* successful run, anywhere inside its time-of-day band. Cycles anchored at
|
||||
* `startTime`. Adjacent windows sharing an endpoint (e.g. 08–12 and 12–15)
|
||||
* each still fire on the same day.
|
||||
*
|
||||
* Returns the source ('cron' | 'window') or null if no cycle is ready.
|
||||
*/
|
||||
export function dueTimedTrigger(
|
||||
triggers: Triggers | undefined,
|
||||
lastRunAt: string | null,
|
||||
): 'cron' | 'window' | null {
|
||||
if (!triggers) return null;
|
||||
|
||||
if (triggers.cronExpr && isCronDue(triggers.cronExpr, lastRunAt)) return 'cron';
|
||||
|
||||
if (triggers.windows) {
|
||||
for (const w of triggers.windows) {
|
||||
if (isWindowDue(w.startTime, w.endTime, lastRunAt)) return 'window';
|
||||
}
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Backoff check — has there been an attempt within `RETRY_BACKOFF_MS`?
|
||||
* Returns the milliseconds remaining until the backoff lifts (positive) or 0
|
||||
* if not in backoff. Caller logs the remaining time in human form.
|
||||
*/
|
||||
export function backoffRemainingMs(lastAttemptAt: string | null): number {
|
||||
if (!lastAttemptAt) return 0;
|
||||
const sinceAttempt = Date.now() - new Date(lastAttemptAt).getTime();
|
||||
if (sinceAttempt < 0 || sinceAttempt >= RETRY_BACKOFF_MS) return 0;
|
||||
return RETRY_BACKOFF_MS - sinceAttempt;
|
||||
}
|
||||
|
||||
function isCronDue(expression: string, lastRunAt: string | null): boolean {
|
||||
const now = new Date();
|
||||
if (!lastRunAt) return true; // never ran — immediately due
|
||||
|
||||
try {
|
||||
// Find the most recent occurrence at-or-before `now`, not the
|
||||
// occurrence right after lastRunAt — if lastRunAt is old, that
|
||||
// occurrence would be ancient too and always fall outside the
|
||||
// grace window, blocking every future fire.
|
||||
const interval = CronExpressionParser.parse(expression, { currentDate: now });
|
||||
const prevRun = interval.prev().toDate();
|
||||
|
||||
// Already ran at-or-after this occurrence → skip.
|
||||
if (new Date(lastRunAt).getTime() >= prevRun.getTime()) return false;
|
||||
|
||||
// Within grace → fire. Outside grace → missed, skip.
|
||||
return now.getTime() <= prevRun.getTime() + GRACE_MS;
|
||||
} catch {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
function isWindowDue(startTime: string, endTime: string, lastRunAt: string | null): boolean {
|
||||
const now = new Date();
|
||||
const [startHour, startMin] = startTime.split(':').map(Number);
|
||||
const [endHour, endMin] = endTime.split(':').map(Number);
|
||||
const startMinutes = startHour * 60 + startMin;
|
||||
const endMinutes = endHour * 60 + endMin;
|
||||
const nowMinutes = now.getHours() * 60 + now.getMinutes();
|
||||
if (nowMinutes < startMinutes || nowMinutes > endMinutes) return false;
|
||||
|
||||
if (!lastRunAt) return true;
|
||||
|
||||
const cycleStart = new Date(now);
|
||||
cycleStart.setHours(startHour, startMin, 0, 0);
|
||||
if (new Date(lastRunAt).getTime() > cycleStart.getTime()) return false;
|
||||
return true;
|
||||
}
|
||||
// Helpers moved to `packages/core/src/schedule/utils.ts` and shared with the
|
||||
// bg-task scheduler. This shim keeps existing imports working; remove after
|
||||
// the next release once nothing imports from this path.
|
||||
export { dueTimedTrigger, backoffRemainingMs, RETRY_BACKOFF_MS } from '../../schedule/utils.js';
|
||||
|
|
|
|||
|
|
@ -2,7 +2,7 @@ import { PrefixLogger } from '@x/shared';
|
|||
import * as workspace from '../../workspace/workspace.js';
|
||||
import { fetchLiveNote } from './fileops.js';
|
||||
import { runLiveNoteAgent } from './runner.js';
|
||||
import { backoffRemainingMs, dueTimedTrigger } from './schedule-utils.js';
|
||||
import { backoffRemainingMs, dueTimedTrigger } from '../../schedule/utils.js';
|
||||
|
||||
const log = new PrefixLogger('LiveNote:Scheduler');
|
||||
const POLL_INTERVAL_MS = 15_000; // 15 seconds
|
||||
|
|
|
|||
|
|
@ -7,7 +7,7 @@ import { WorkDir } from '../config/config.js';
|
|||
import { GoogleClientFactory } from './google-client-factory.js';
|
||||
import { serviceLogger } from '../services/service_logger.js';
|
||||
import { limitEventItems } from './limit_event_items.js';
|
||||
import { createEvent } from './live-note/events.js';
|
||||
import { createEvent } from '../events/producer.js';
|
||||
|
||||
const MAX_EVENTS_IN_DIGEST = 50;
|
||||
const MAX_DESCRIPTION_CHARS = 500;
|
||||
|
|
|
|||
|
|
@ -7,7 +7,7 @@ import { WorkDir } from '../config/config.js';
|
|||
import { GoogleClientFactory } from './google-client-factory.js';
|
||||
import { serviceLogger, type ServiceRunContext } from '../services/service_logger.js';
|
||||
import { limitEventItems } from './limit_event_items.js';
|
||||
import { createEvent } from './live-note/events.js';
|
||||
import { createEvent } from '../events/producer.js';
|
||||
|
||||
// Configuration
|
||||
const SYNC_DIR = path.join(WorkDir, 'gmail_sync');
|
||||
|
|
|
|||
|
|
@ -86,3 +86,13 @@ export async function getMeetingNotesModel(): Promise<string> {
|
|||
const cfg = await container.resolve<IModelConfigRepo>("modelConfigRepo").getConfig();
|
||||
return cfg.meetingNotesModel ?? cfg.model;
|
||||
}
|
||||
|
||||
/**
|
||||
* Model used by the background-task agent + routing classifier. Currently
|
||||
* mirrors `getLiveNoteAgentModel()` — both surfaces want a fast, reliable
|
||||
* agent model. Split into its own getter so a future per-feature override
|
||||
* doesn't require touching all call sites.
|
||||
*/
|
||||
export async function getBackgroundTaskAgentModel(): Promise<string> {
|
||||
return getLiveNoteAgentModel();
|
||||
}
|
||||
|
|
|
|||
|
|
@ -39,6 +39,10 @@ export type CreateRunRepoOptions = {
|
|||
subUseCase?: string;
|
||||
};
|
||||
|
||||
function runLogPath(runId: string): string {
|
||||
return path.join(WorkDir, 'runs', `${runId}.jsonl`);
|
||||
}
|
||||
|
||||
export interface IRunsRepo {
|
||||
create(options: CreateRunRepoOptions): Promise<z.infer<typeof Run>>;
|
||||
fetch(id: string): Promise<z.infer<typeof Run>>;
|
||||
|
|
@ -68,7 +72,7 @@ export class FSRunsRepo implements IRunsRepo {
|
|||
idGenerator: IMonotonicallyIncreasingIdGenerator;
|
||||
}) {
|
||||
this.idGenerator = idGenerator;
|
||||
// ensure runs directory exists
|
||||
// ensure default runs directory exists
|
||||
fsp.mkdir(path.join(WorkDir, 'runs'), { recursive: true });
|
||||
}
|
||||
|
||||
|
|
@ -186,7 +190,7 @@ export class FSRunsRepo implements IRunsRepo {
|
|||
|
||||
async appendEvents(runId: string, events: z.infer<typeof RunEvent>[]): Promise<void> {
|
||||
await fsp.appendFile(
|
||||
path.join(WorkDir, 'runs', `${runId}.jsonl`),
|
||||
runLogPath(runId),
|
||||
events.map(event => JSON.stringify(event)).join("\n") + "\n"
|
||||
);
|
||||
}
|
||||
|
|
@ -219,7 +223,7 @@ export class FSRunsRepo implements IRunsRepo {
|
|||
}
|
||||
|
||||
async fetch(id: string): Promise<z.infer<typeof Run>> {
|
||||
const contents = await fsp.readFile(path.join(WorkDir, 'runs', `${id}.jsonl`), 'utf8');
|
||||
const contents = await fsp.readFile(runLogPath(id), 'utf8');
|
||||
// Parse with the lenient schema so legacy start events (no model/provider) load.
|
||||
const rawEvents = contents.split('\n')
|
||||
.filter(line => line.trim() !== '')
|
||||
|
|
@ -314,7 +318,6 @@ export class FSRunsRepo implements IRunsRepo {
|
|||
}
|
||||
|
||||
async delete(id: string): Promise<void> {
|
||||
const filePath = path.join(WorkDir, 'runs', `${id}.jsonl`);
|
||||
await fsp.unlink(filePath);
|
||||
await fsp.unlink(runLogPath(id));
|
||||
}
|
||||
}
|
||||
|
|
@ -19,10 +19,13 @@ export async function createRun(opts: z.infer<typeof CreateRunOptions>): Promise
|
|||
|
||||
// Resolve model+provider once at creation: opts > agent declaration > defaults.
|
||||
// Both fields are plain strings (provider is a name, looked up at runtime).
|
||||
// Use `||` (not `??`) so an empty-string override — what an LLM tool call
|
||||
// sometimes synthesizes for "I'm not setting this" — falls through to the
|
||||
// next link in the chain instead of being treated as a real value.
|
||||
const agent = await loadAgent(opts.agentId);
|
||||
const defaults = await getDefaultModelAndProvider();
|
||||
const model = opts.model ?? agent.model ?? defaults.model;
|
||||
const provider = opts.provider ?? agent.provider ?? defaults.provider;
|
||||
const model = opts.model || agent.model || defaults.model;
|
||||
const provider = opts.provider || agent.provider || defaults.provider;
|
||||
const useCase = opts.useCase ?? "copilot_chat";
|
||||
|
||||
const run = await repo.create({
|
||||
|
|
|
|||
92
apps/x/packages/core/src/schedule/utils.ts
Normal file
92
apps/x/packages/core/src/schedule/utils.ts
Normal file
|
|
@ -0,0 +1,92 @@
|
|||
import { CronExpressionParser } from 'cron-parser';
|
||||
import type { Triggers } from '@x/shared/dist/live-note.js';
|
||||
|
||||
const GRACE_MS = 2 * 60 * 1000; // 2 minutes
|
||||
export const RETRY_BACKOFF_MS = 5 * 60 * 1000; // 5 minutes
|
||||
|
||||
/**
|
||||
* Decide whether a `triggers` block has any timed sub-trigger (cron or window)
|
||||
* whose cycle is currently ready to fire. Pure cycle check — does NOT consider
|
||||
* backoff. Used by both the live-note scheduler and the bg-task scheduler.
|
||||
*
|
||||
* - Cycle accounting (cron prev-occurrence, window once-per-day) is anchored
|
||||
* on `lastRunAt` — which is bumped only on *successful* completions. So a
|
||||
* failed run leaves the cycle unfired and this returns the matched trigger
|
||||
* again on the next tick (caller is expected to gate on backoff separately).
|
||||
* - `cronExpr` enforces a 2-minute grace window — if the scheduled time was
|
||||
* more than 2 minutes ago, it's a miss and skipped (avoids replay storms
|
||||
* after the app was offline).
|
||||
* - `windows` are forgiving: each window fires at most once per day per
|
||||
* successful run, anywhere inside its time-of-day band. Cycles anchored at
|
||||
* `startTime`. Adjacent windows sharing an endpoint (e.g. 08–12 and 12–15)
|
||||
* each still fire on the same day.
|
||||
*
|
||||
* Returns the source ('cron' | 'window') or null if no cycle is ready.
|
||||
*/
|
||||
export function dueTimedTrigger(
|
||||
triggers: Triggers | undefined,
|
||||
lastRunAt: string | null,
|
||||
): 'cron' | 'window' | null {
|
||||
if (!triggers) return null;
|
||||
|
||||
if (triggers.cronExpr && isCronDue(triggers.cronExpr, lastRunAt)) return 'cron';
|
||||
|
||||
if (triggers.windows) {
|
||||
for (const w of triggers.windows) {
|
||||
if (isWindowDue(w.startTime, w.endTime, lastRunAt)) return 'window';
|
||||
}
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Backoff check — has there been an attempt within `RETRY_BACKOFF_MS`?
|
||||
* Returns the milliseconds remaining until the backoff lifts (positive) or 0
|
||||
* if not in backoff. Caller logs the remaining time in human form.
|
||||
*/
|
||||
export function backoffRemainingMs(lastAttemptAt: string | null): number {
|
||||
if (!lastAttemptAt) return 0;
|
||||
const sinceAttempt = Date.now() - new Date(lastAttemptAt).getTime();
|
||||
if (sinceAttempt < 0 || sinceAttempt >= RETRY_BACKOFF_MS) return 0;
|
||||
return RETRY_BACKOFF_MS - sinceAttempt;
|
||||
}
|
||||
|
||||
function isCronDue(expression: string, lastRunAt: string | null): boolean {
|
||||
const now = new Date();
|
||||
if (!lastRunAt) return true; // never ran — immediately due
|
||||
|
||||
try {
|
||||
// Find the most recent occurrence at-or-before `now`, not the
|
||||
// occurrence right after lastRunAt — if lastRunAt is old, that
|
||||
// occurrence would be ancient too and always fall outside the
|
||||
// grace window, blocking every future fire.
|
||||
const interval = CronExpressionParser.parse(expression, { currentDate: now });
|
||||
const prevRun = interval.prev().toDate();
|
||||
|
||||
// Already ran at-or-after this occurrence → skip.
|
||||
if (new Date(lastRunAt).getTime() >= prevRun.getTime()) return false;
|
||||
|
||||
// Within grace → fire. Outside grace → missed, skip.
|
||||
return now.getTime() <= prevRun.getTime() + GRACE_MS;
|
||||
} catch {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
function isWindowDue(startTime: string, endTime: string, lastRunAt: string | null): boolean {
|
||||
const now = new Date();
|
||||
const [startHour, startMin] = startTime.split(':').map(Number);
|
||||
const [endHour, endMin] = endTime.split(':').map(Number);
|
||||
const startMinutes = startHour * 60 + startMin;
|
||||
const endMinutes = endHour * 60 + endMin;
|
||||
const nowMinutes = now.getHours() * 60 + now.getMinutes();
|
||||
if (nowMinutes < startMinutes || nowMinutes > endMinutes) return false;
|
||||
|
||||
if (!lastRunAt) return true;
|
||||
|
||||
const cycleStart = new Date(now);
|
||||
cycleStart.setHours(startHour, startMin, 0, 0);
|
||||
if (new Date(lastRunAt).getTime() > cycleStart.getTime()) return false;
|
||||
return true;
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue