background agent 2

This commit is contained in:
Arjun 2026-02-04 15:55:42 +05:30
parent eef7e1d508
commit 7791541f30
8 changed files with 431 additions and 47 deletions

View file

@ -12,9 +12,9 @@
"@ai-sdk/anthropic": "^2.0.44",
"@ai-sdk/google": "^2.0.25",
"@ai-sdk/openai": "^2.0.53",
"@composio/core": "^0.6.0",
"@ai-sdk/openai-compatible": "^1.0.27",
"@ai-sdk/provider": "^2.0.0",
"@composio/core": "^0.6.0",
"@google-cloud/local-auth": "^3.0.1",
"@modelcontextprotocol/sdk": "^1.25.1",
"@openrouter/ai-sdk-provider": "^1.2.6",
@ -24,6 +24,7 @@
"ai": "^5.0.102",
"awilix": "^12.0.5",
"chokidar": "^4.0.3",
"cron-parser": "^5.5.0",
"glob": "^13.0.0",
"google-auth-library": "^10.5.0",
"googleapis": "^169.0.0",

View file

@ -0,0 +1,335 @@
import { CronExpressionParser } from "cron-parser";
import container from "../di/container.js";
import { IAgentScheduleRepo } from "./repo.js";
import { IAgentScheduleStateRepo } from "./state-repo.js";
import { IRunsRepo } from "../runs/repo.js";
import { IAgentRuntime } from "../agents/runtime.js";
import { IMonotonicallyIncreasingIdGenerator } from "../application/lib/id-gen.js";
import { AgentScheduleConfig, AgentScheduleEntry } from "@x/shared/dist/agent-schedule.js";
import { AgentScheduleState, AgentScheduleStateEntry } from "@x/shared/dist/agent-schedule-state.js";
import { MessageEvent } from "@x/shared/dist/runs.js";
import z from "zod";
const DEFAULT_STARTING_MESSAGE = "go";
const POLL_INTERVAL_MS = 60 * 1000; // 1 minute
const TIMEOUT_MS = 30 * 60 * 1000; // 30 minutes
/**
* Convert a Date to local ISO 8601 string (without Z suffix).
* Example: "2024-02-05T08:30:00"
*/
function toLocalISOString(date: Date): string {
const pad = (n: number) => n.toString().padStart(2, "0");
return `${date.getFullYear()}-${pad(date.getMonth() + 1)}-${pad(date.getDate())}T${pad(date.getHours())}:${pad(date.getMinutes())}:${pad(date.getSeconds())}`;
}
// --- Wake Signal for Immediate Run Trigger ---
let wakeResolve: (() => void) | null = null;
export function triggerRun(): void {
if (wakeResolve) {
console.log("[AgentRunner] Triggered - waking up immediately");
wakeResolve();
wakeResolve = null;
}
}
function interruptibleSleep(ms: number): Promise<void> {
return new Promise((resolve) => {
const timeout = setTimeout(() => {
wakeResolve = null;
resolve();
}, ms);
wakeResolve = () => {
clearTimeout(timeout);
resolve();
};
});
}
/**
* Calculate the next run time for a schedule.
* Returns ISO datetime string or null if schedule shouldn't run again.
*/
function calculateNextRunAt(
schedule: z.infer<typeof AgentScheduleEntry>["schedule"]
): string | null {
const now = new Date();
switch (schedule.type) {
case "cron": {
try {
const interval = CronExpressionParser.parse(schedule.expression, {
currentDate: now,
});
return toLocalISOString(interval.next().toDate());
} catch (error) {
console.error("[AgentRunner] Invalid cron expression:", schedule.expression, error);
return null;
}
}
case "window": {
try {
// Parse base cron to get the next occurrence date
const interval = CronExpressionParser.parse(schedule.cron, {
currentDate: now,
});
const nextDate = interval.next().toDate();
// Parse start and end times
const [startHour, startMin] = schedule.startTime.split(":").map(Number);
const [endHour, endMin] = schedule.endTime.split(":").map(Number);
// Pick a random time within the window
const startMinutes = startHour * 60 + startMin;
const endMinutes = endHour * 60 + endMin;
const randomMinutes = startMinutes + Math.floor(Math.random() * (endMinutes - startMinutes));
nextDate.setHours(Math.floor(randomMinutes / 60), randomMinutes % 60, 0, 0);
return toLocalISOString(nextDate);
} catch (error) {
console.error("[AgentRunner] Invalid window schedule:", error);
return null;
}
}
case "once": {
// Once schedules don't have a "next" run - they're done after first run
return null;
}
}
}
/**
* Check if an agent should run now based on its schedule and state.
*/
function shouldRunNow(
entry: z.infer<typeof AgentScheduleEntry>,
state: z.infer<typeof AgentScheduleStateEntry> | null
): boolean {
// Don't run if disabled
if (entry.enabled === false) {
return false;
}
// Don't run if already running
if (state?.status === "running") {
return false;
}
// Don't run once-schedules that are already triggered
if (entry.schedule.type === "once" && state?.status === "triggered") {
return false;
}
const now = new Date();
// For once-schedules without state, check if runAt time has passed
if (entry.schedule.type === "once") {
const runAt = new Date(entry.schedule.runAt);
return now >= runAt;
}
// For cron and window schedules, check nextRunAt
if (!state?.nextRunAt) {
// No nextRunAt set - needs to be initialized, so run now
return true;
}
const nextRunAt = new Date(state.nextRunAt);
return now >= nextRunAt;
}
/**
* Run a single agent.
*/
async function runAgent(
agentName: string,
entry: z.infer<typeof AgentScheduleEntry>,
stateRepo: IAgentScheduleStateRepo,
runsRepo: IRunsRepo,
agentRuntime: IAgentRuntime,
idGenerator: IMonotonicallyIncreasingIdGenerator
): Promise<void> {
console.log(`[AgentRunner] Starting agent: ${agentName}`);
const startedAt = toLocalISOString(new Date());
// Update state to running with startedAt timestamp
await stateRepo.updateAgentState(agentName, {
status: "running",
startedAt: startedAt,
});
try {
// Create a new run
const run = await runsRepo.create({ agentId: agentName });
console.log(`[AgentRunner] Created run ${run.id} for agent ${agentName}`);
// Add the starting message as a user message
const startingMessage = entry.startingMessage ?? DEFAULT_STARTING_MESSAGE;
const messageEvent: z.infer<typeof MessageEvent> = {
runId: run.id,
type: "message",
messageId: await idGenerator.next(),
message: {
role: "user",
content: startingMessage,
},
subflow: [],
};
await runsRepo.appendEvents(run.id, [messageEvent]);
console.log(`[AgentRunner] Sent starting message to agent ${agentName}: "${startingMessage}"`);
// Trigger the run
await agentRuntime.trigger(run.id);
// Calculate next run time
const nextRunAt = calculateNextRunAt(entry.schedule);
// Update state to finished (clear startedAt)
const currentState = await stateRepo.getAgentState(agentName);
await stateRepo.updateAgentState(agentName, {
status: entry.schedule.type === "once" ? "triggered" : "finished",
startedAt: null,
lastRunAt: toLocalISOString(new Date()),
nextRunAt: nextRunAt,
lastError: null,
runCount: (currentState?.runCount ?? 0) + 1,
});
console.log(`[AgentRunner] Finished agent: ${agentName}`);
} catch (error) {
console.error(`[AgentRunner] Error running agent ${agentName}:`, error);
// Calculate next run time even on failure (for retry)
const nextRunAt = calculateNextRunAt(entry.schedule);
// Update state to failed (clear startedAt)
const currentState = await stateRepo.getAgentState(agentName);
await stateRepo.updateAgentState(agentName, {
status: "failed",
startedAt: null,
lastRunAt: toLocalISOString(new Date()),
nextRunAt: nextRunAt,
lastError: error instanceof Error ? error.message : String(error),
runCount: (currentState?.runCount ?? 0) + 1,
});
}
}
/**
* Check for timed-out agents and mark them as failed.
*/
async function checkForTimeouts(
state: z.infer<typeof AgentScheduleState>,
config: z.infer<typeof AgentScheduleConfig>,
stateRepo: IAgentScheduleStateRepo
): Promise<void> {
const now = new Date();
for (const [agentName, agentState] of Object.entries(state.agents)) {
if (agentState.status === "running" && agentState.startedAt) {
const startedAt = new Date(agentState.startedAt);
const elapsed = now.getTime() - startedAt.getTime();
if (elapsed > TIMEOUT_MS) {
console.log(`[AgentRunner] Agent ${agentName} timed out after ${Math.round(elapsed / 1000 / 60)} minutes`);
// Get schedule entry for calculating next run
const entry = config.agents[agentName];
const nextRunAt = entry ? calculateNextRunAt(entry.schedule) : null;
await stateRepo.updateAgentState(agentName, {
status: "failed",
startedAt: null,
lastRunAt: toLocalISOString(now),
nextRunAt: nextRunAt,
lastError: `Timed out after ${Math.round(elapsed / 1000 / 60)} minutes`,
runCount: (agentState.runCount ?? 0) + 1,
});
}
}
}
}
/**
* Main polling loop.
*/
async function pollAndRun(): Promise<void> {
const scheduleRepo = container.resolve<IAgentScheduleRepo>("agentScheduleRepo");
const stateRepo = container.resolve<IAgentScheduleStateRepo>("agentScheduleStateRepo");
const runsRepo = container.resolve<IRunsRepo>("runsRepo");
const agentRuntime = container.resolve<IAgentRuntime>("agentRuntime");
const idGenerator = container.resolve<IMonotonicallyIncreasingIdGenerator>("idGenerator");
// Load config and state
let config: z.infer<typeof AgentScheduleConfig>;
let state: z.infer<typeof AgentScheduleState>;
try {
config = await scheduleRepo.getConfig();
state = await stateRepo.getState();
} catch (error) {
console.error("[AgentRunner] Error loading config/state:", error);
return;
}
// Check for timed-out agents first
await checkForTimeouts(state, config, stateRepo);
// Reload state after timeout checks (state may have changed)
try {
state = await stateRepo.getState();
} catch (error) {
console.error("[AgentRunner] Error reloading state:", error);
return;
}
// Check each agent
for (const [agentName, entry] of Object.entries(config.agents)) {
const agentState = state.agents[agentName] ?? null;
// Initialize state if needed (set nextRunAt for new agents)
if (!agentState && entry.schedule.type !== "once") {
const nextRunAt = calculateNextRunAt(entry.schedule);
if (nextRunAt) {
await stateRepo.updateAgentState(agentName, {
status: "scheduled",
startedAt: null,
lastRunAt: null,
nextRunAt: nextRunAt,
lastError: null,
runCount: 0,
});
console.log(`[AgentRunner] Initialized state for ${agentName}, next run at ${nextRunAt}`);
}
continue; // Don't run immediately on first initialization
}
if (shouldRunNow(entry, agentState)) {
// Run agent (don't await - let it run in background)
runAgent(agentName, entry, stateRepo, runsRepo, agentRuntime, idGenerator).catch((error) => {
console.error(`[AgentRunner] Unhandled error in runAgent for ${agentName}:`, error);
});
}
}
}
/**
* Initialize the background agent runner service.
* Polls every minute to check for agents that need to run.
*/
export async function init(): Promise<void> {
console.log("[AgentRunner] Starting background agent runner service");
while (true) {
try {
await pollAndRun();
} catch (error) {
console.error("[AgentRunner] Error in main loop:", error);
}
await interruptibleSleep(POLL_INTERVAL_MS);
}
}

View file

@ -40,6 +40,7 @@ export class FSAgentScheduleStateRepo implements IAgentScheduleStateRepo {
const state = await this.getState();
const existing = state.agents[agentName] ?? {
status: "scheduled" as const,
startedAt: null,
lastRunAt: null,
nextRunAt: null,
lastError: null,

View file

@ -39,6 +39,8 @@ Background agents run automatically based on schedules defined in ` + "`~/.rowbo
### Schedule Types
**IMPORTANT: All times are in local time** (the timezone of the machine running Rowboat).
**1. Cron Schedule** - Runs at exact times defined by cron expression
` + "```json" + `
{
@ -76,13 +78,25 @@ The agent will run once at a random time within the window. Use this when you wa
{
"schedule": {
"type": "once",
"runAt": "2024-02-05T10:30:00Z"
"runAt": "2024-02-05T10:30:00"
},
"enabled": true
}
` + "```" + `
Use this for one-time tasks like migrations or setup scripts.
Use this for one-time tasks like migrations or setup scripts. The ` + "`runAt`" + ` is in local time (no Z suffix).
### Starting Message
You can specify a ` + "`startingMessage`" + ` that gets sent to the agent when it starts. If not provided, defaults to ` + "`\"go\"`" + `.
` + "```json" + `
{
"schedule": { "type": "cron", "expression": "0 8 * * *" },
"enabled": true,
"startingMessage": "Please summarize my emails from the last 24 hours"
}
` + "```" + `
### Complete Schedule Example
@ -94,7 +108,8 @@ Use this for one-time tasks like migrations or setup scripts.
"type": "cron",
"expression": "0 8 * * *"
},
"enabled": true
"enabled": true,
"startingMessage": "Summarize my emails and calendar for today"
},
"morning_briefing": {
"schedule": {
@ -108,7 +123,7 @@ Use this for one-time tasks like migrations or setup scripts.
"one_time_setup": {
"schedule": {
"type": "once",
"runAt": "2024-12-01T12:00:00Z"
"runAt": "2024-12-01T12:00:00"
},
"enabled": true
}
@ -301,89 +316,96 @@ summariser:
## Complete Multi-Agent Workflow Example
**Podcast creation workflow** - This is all done through agents calling other agents:
**Email digest workflow** - This is all done through agents calling other agents:
**1. Task-specific agent** (` + "`agents/summariser_agent.md`" + `):
**1. Task-specific agent** (` + "`agents/email_reader.md`" + `):
` + "```markdown" + `
---
model: gpt-5.1
tools:
bash:
read_file:
type: builtin
name: executeCommand
name: workspace-readFile
list_dir:
type: builtin
name: workspace-readdir
---
# Summariser Agent
# Email Reader Agent
Download and summarise an arxiv paper. Use curl to fetch the PDF.
Output just the GIST in two lines. Don't ask for human input.
Read emails from the gmail_sync folder and extract key information.
Look for unread or recent emails and summarize the sender, subject, and key points.
Don't ask for human input.
` + "```" + `
**2. Agent that delegates to other agents** (` + "`agents/summarise-a-few.md`" + `):
**2. Agent that delegates to other agents** (` + "`agents/daily_summary.md`" + `):
` + "```markdown" + `
---
model: gpt-5.1
tools:
summariser:
email_reader:
type: agent
name: summariser_agent
name: email_reader
write_file:
type: builtin
name: workspace-writeFile
---
# Summarise Multiple Papers
# Daily Summary Agent
Pick 2 interesting papers and summarise each using the summariser tool.
Pass the paper URL to the tool. Don't ask for human input.
1. Use the email_reader tool to get email summaries
2. Create a consolidated daily digest
3. Save the digest to the knowledge base
Don't ask for human input.
` + "```" + `
**3. Orchestrator agent** (` + "`agents/podcast_workflow.md`" + `):
**3. Orchestrator agent** (` + "`agents/morning_briefing.md`" + `):
` + "```markdown" + `
---
model: gpt-5.1
tools:
bash:
type: builtin
name: executeCommand
summarise_papers:
daily_summary:
type: agent
name: summarise-a-few
text_to_speech:
name: daily_summary
search:
type: mcp
name: text_to_speech
mcpServerName: elevenLabs
description: Generate audio from text
name: search
mcpServerName: exa
description: Search the web for news
inputSchema:
type: object
properties:
text:
query:
type: string
description: Text to convert to speech
description: Search query
---
# Podcast Workflow
# Morning Briefing Workflow
Create a podcast from arXiv papers:
Create a morning briefing:
1. Fetch arXiv papers about agents using bash
2. Pick papers and summarise them using summarise_papers
3. Create a podcast transcript
4. Generate audio using text_to_speech
1. Get email digest using daily_summary
2. Search for relevant news using the search tool
3. Compile a comprehensive morning briefing
Execute these steps in sequence.
Execute these steps in sequence. Don't ask for human input.
` + "```" + `
**4. Schedule the workflow** in ` + "`~/.rowboat/config/agent-schedule.json`" + `:
` + "```json" + `
{
"agents": {
"podcast_workflow": {
"morning_briefing": {
"schedule": {
"type": "cron",
"expression": "0 6 * * 1"
"expression": "0 7 * * *"
},
"enabled": true
"enabled": true,
"startingMessage": "Create my morning briefing for today"
}
}
}
` + "```" + `
This schedules the podcast workflow to run every Monday at 6am.
This schedules the morning briefing workflow to run every day at 7am local time.
## Naming and organization rules
- **All agents live in ` + "`agents/*.md`" + `** - Markdown files with YAML frontmatter
@ -401,6 +423,7 @@ This schedules the podcast workflow to run every Monday at 6am.
6. **Orchestration**: Create a top-level agent that coordinates the workflow
7. **Scheduling**: Use appropriate schedule types - cron for recurring, window for flexible timing, once for one-time tasks
8. **Error handling**: Background agents should handle errors gracefully since there's no human to intervene
9. **Avoid executeCommand**: Do NOT attach ` + "`executeCommand`" + ` to background agents as it poses security risks when running unattended. Instead, use the specific builtin tools needed (` + "`workspace-readFile`" + `, ` + "`workspace-writeFile`" + `, etc.) or MCP tools for external integrations
## Validation & Best Practices