Merge branch 'dev' into feat/today-minimal-polish

This commit is contained in:
gagan 2026-05-06 19:38:09 +05:30 committed by GitHub
commit eeb99320fc
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
80 changed files with 3655 additions and 1617 deletions

View file

@ -37,6 +37,7 @@
"openid-client": "^6.8.1",
"papaparse": "^5.5.3",
"pdf-parse": "^2.4.5",
"posthog-node": "^4.18.0",
"react": "^19.2.3",
"xlsx": "^0.18.5",
"yaml": "^2.8.2",

View file

@ -164,7 +164,11 @@ async function runAgent(
try {
// Create a new run via core (resolves agent + default model+provider).
const run = await createRun({ agentId: agentName });
const run = await createRun({
agentId: agentName,
useCase: 'copilot_chat',
subUseCase: 'scheduled',
});
console.log(`[AgentRunner] Created run ${run.id} for agent ${agentName}`);
// Add the starting message as a user message

View file

@ -26,6 +26,8 @@ import { IRunsLock } from "../runs/lock.js";
import { IAbortRegistry } from "../runs/abort-registry.js";
import { PrefixLogger } from "@x/shared";
import { parse } from "yaml";
import { captureLlmUsage } from "../analytics/usage.js";
import { enterUseCase, type UseCase } from "../analytics/use_case.js";
import { getRaw as getNoteCreationRaw } from "../knowledge/note_creation.js";
import { getRaw as getLabelingAgentRaw } from "../knowledge/labeling_agent.js";
import { getRaw as getNoteTaggingAgentRaw } from "../knowledge/note_tagging_agent.js";
@ -650,6 +652,8 @@ export class AgentState {
agentName: string | null = null;
runModel: string | null = null;
runProvider: string | null = null;
runUseCase: UseCase | null = null;
runSubUseCase: string | null = null;
messages: z.infer<typeof MessageList> = [];
lastAssistantMsg: z.infer<typeof AssistantMessage> | null = null;
subflowStates: Record<string, AgentState> = {};
@ -765,6 +769,8 @@ export class AgentState {
this.agentName = event.agentName;
this.runModel = event.model;
this.runProvider = event.provider;
this.runUseCase = event.useCase ?? null;
this.runSubUseCase = event.subUseCase ?? null;
break;
case "spawn-subflow":
// Seed the subflow state with its agent so downstream loadAgent works.
@ -775,6 +781,8 @@ export class AgentState {
this.subflowStates[event.toolCallId].agentName = event.agentName;
this.subflowStates[event.toolCallId].runModel = this.runModel;
this.subflowStates[event.toolCallId].runProvider = this.runProvider;
this.subflowStates[event.toolCallId].runUseCase = this.runUseCase;
this.subflowStates[event.toolCallId].runSubUseCase = this.runSubUseCase;
break;
case "message":
this.messages.push(event.message);
@ -881,6 +889,14 @@ export async function* streamAgent({
const model = provider.languageModel(modelId);
logger.log(`using model: ${modelId} (provider: ${state.runProvider})`);
// Install use-case context for tool-internal LLM calls (e.g. parseFile)
// so they can tag their `llm_usage` events with the parent run's category.
enterUseCase({
useCase: state.runUseCase ?? "copilot_chat",
...(state.runSubUseCase ? { subUseCase: state.runSubUseCase } : {}),
...(state.agentName ? { agentName: state.agentName } : {}),
});
let loopCounter = 0;
let voiceInput = false;
let voiceOutput: 'summary' | 'full' | null = null;
@ -1114,6 +1130,13 @@ export async function* streamAgent({
instructionsWithDateTime,
tools,
signal,
{
useCase: state.runUseCase ?? "copilot_chat",
...(state.runSubUseCase ? { subUseCase: state.runSubUseCase } : {}),
agentName: state.agentName ?? undefined,
modelId,
providerName: state.runProvider!,
},
)) {
messageBuilder.ingest(event);
yield* processEvent({
@ -1201,12 +1224,21 @@ export async function* streamAgent({
}
}
interface StreamLlmAnalytics {
useCase: UseCase;
subUseCase?: string;
agentName?: string;
modelId: string;
providerName: string;
}
async function* streamLlm(
model: LanguageModel,
messages: z.infer<typeof MessageList>,
instructions: string,
tools: ToolSet,
signal?: AbortSignal,
analytics?: StreamLlmAnalytics,
): AsyncGenerator<z.infer<typeof LlmStepStreamEvent>, void, unknown> {
const converted = convertFromMessages(messages);
console.log(`! SENDING payload to model: `, JSON.stringify(converted))
@ -1277,6 +1309,16 @@ async function* streamLlm(
};
break;
case "finish-step":
if (analytics) {
captureLlmUsage({
useCase: analytics.useCase,
...(analytics.subUseCase ? { subUseCase: analytics.subUseCase } : {}),
...(analytics.agentName ? { agentName: analytics.agentName } : {}),
model: analytics.modelId,
provider: analytics.providerName,
usage: event.usage,
});
}
yield {
type: "finish-step",
usage: event.usage,

View file

@ -1,6 +1,35 @@
import { bus } from "../runs/bus.js";
import { fetchRun } from "../runs/runs.js";
type RunRecord = Awaited<ReturnType<typeof fetchRun>>;
function extractRunErrors(run: RunRecord): string[] {
return run.log.flatMap((event) => event.type === "error" ? [event.error] : []);
}
export class RunFailedError extends Error {
readonly runId: string;
readonly errors: string[];
constructor(runId: string, errors: string[]) {
const firstError = errors.find(Boolean) ?? null;
super(firstError ? `Run ${runId} failed: ${firstError}` : `Run ${runId} failed`);
this.name = "RunFailedError";
this.runId = runId;
this.errors = errors;
}
}
export function getErrorDetails(error: unknown): string {
if (error instanceof RunFailedError) {
return error.errors.join("\n\n");
}
if (error instanceof Error) {
return error.message;
}
return String(error);
}
/**
* Extract the assistant's final text response from a run's log.
* @param runId
@ -28,13 +57,28 @@ export async function extractAgentResponse(runId: string): Promise<string | null
/**
* Wait for a run to complete by listening for run-processing-end event
*/
export async function waitForRunCompletion(runId: string): Promise<void> {
return new Promise(async (resolve) => {
const unsubscribe = await bus.subscribe('*', async (event) => {
if (event.type === 'run-processing-end' && event.runId === runId) {
unsubscribe();
resolve();
}
});
export async function waitForRunCompletion(
runId: string,
opts: { throwOnError?: boolean } = {},
): Promise<RunRecord> {
return new Promise((resolve, reject) => {
void (async () => {
const unsubscribe = await bus.subscribe('*', async (event) => {
if (event.type === 'run-processing-end' && event.runId === runId) {
unsubscribe();
try {
const run = await fetchRun(runId);
const errors = extractRunErrors(run);
if (opts.throwOnError && errors.length > 0) {
reject(new RunFailedError(runId, errors));
return;
}
resolve(run);
} catch (error) {
reject(error);
}
}
});
})().catch(reject);
});
}
}

View file

@ -0,0 +1,23 @@
import { isSignedIn } from '../account/account.js';
import { getBillingInfo } from '../billing/billing.js';
import { identify } from './posthog.js';
/**
* If the user has rowboat OAuth tokens, fetch their billing info and
* call posthog.identify(). Idempotent safe to call on every app start.
* Catches all errors so analytics never blocks app launch.
*/
export async function identifyIfSignedIn(): Promise<void> {
try {
if (!(await isSignedIn())) return;
const billing = await getBillingInfo();
if (!billing.userId) return;
identify(billing.userId, {
...(billing.userEmail ? { email: billing.userEmail } : {}),
plan: billing.subscriptionPlan,
status: billing.subscriptionStatus,
});
} catch (err) {
console.error('[Analytics] startup identify failed:', err);
}
}

View file

@ -0,0 +1,37 @@
import fs from 'node:fs';
import path from 'node:path';
import { randomUUID } from 'node:crypto';
import { WorkDir } from '../config/config.js';
const INSTALLATION_PATH = path.join(WorkDir, 'config', 'installation.json');
let cached: string | null = null;
export function getInstallationId(): string {
if (cached) return cached;
try {
if (fs.existsSync(INSTALLATION_PATH)) {
const raw = fs.readFileSync(INSTALLATION_PATH, 'utf-8');
const parsed = JSON.parse(raw) as { installationId?: string };
if (parsed.installationId && typeof parsed.installationId === 'string') {
cached = parsed.installationId;
return cached;
}
}
} catch (err) {
console.error('[Analytics] Failed to read installation.json:', err);
}
const id = randomUUID();
try {
const dir = path.dirname(INSTALLATION_PATH);
if (!fs.existsSync(dir)) {
fs.mkdirSync(dir, { recursive: true });
}
fs.writeFileSync(INSTALLATION_PATH, JSON.stringify({ installationId: id }, null, 2));
} catch (err) {
console.error('[Analytics] Failed to write installation.json:', err);
}
cached = id;
return id;
}

View file

@ -0,0 +1,90 @@
import { PostHog } from 'posthog-node';
import { getInstallationId } from './installation.js';
import { API_URL } from '../config/env.js';
// Build-time injected via esbuild `define` (apps/main/bundle.mjs).
// In dev/tsc, fall back to process.env so local runs work too.
const POSTHOG_KEY = process.env.POSTHOG_KEY ?? process.env.VITE_PUBLIC_POSTHOG_KEY ?? '';
const POSTHOG_HOST = process.env.POSTHOG_HOST ?? process.env.VITE_PUBLIC_POSTHOG_HOST ?? 'https://us.i.posthog.com';
let client: PostHog | null = null;
let initAttempted = false;
let identifiedUserId: string | null = null;
function getClient(): PostHog | null {
if (initAttempted) return client;
initAttempted = true;
if (!POSTHOG_KEY) {
console.log('[Analytics] POSTHOG_KEY not set; analytics disabled');
return null;
}
try {
client = new PostHog(POSTHOG_KEY, {
host: POSTHOG_HOST,
flushAt: 20,
flushInterval: 10_000,
});
// Tag the install with api_url as a person property up-front,
// so anonymous users are also segmentable by environment (api_url
// distinguishes prod / staging / custom — meaning is assigned in PostHog).
client.identify({
distinctId: getInstallationId(),
properties: { api_url: API_URL },
});
} catch (err) {
console.error('[Analytics] Failed to init PostHog:', err);
client = null;
}
return client;
}
function activeDistinctId(): string {
return identifiedUserId ?? getInstallationId();
}
export function capture(event: string, properties?: Record<string, unknown>): void {
const ph = getClient();
if (!ph) return;
try {
ph.capture({
distinctId: activeDistinctId(),
event,
properties,
});
} catch (err) {
console.error('[Analytics] capture failed:', err);
}
}
export function identify(userId: string, properties?: Record<string, unknown>): void {
const ph = getClient();
if (!ph) return;
try {
// Alias the anonymous installation ID to the rowboat user ID so historical
// anonymous events are linked to the identified user.
ph.alias({ distinctId: userId, alias: getInstallationId() });
ph.identify({
distinctId: userId,
properties: {
...properties,
api_url: API_URL,
},
});
identifiedUserId = userId;
} catch (err) {
console.error('[Analytics] identify failed:', err);
}
}
export function reset(): void {
identifiedUserId = null;
}
export async function shutdown(): Promise<void> {
if (!client) return;
try {
await client.shutdown();
} catch (err) {
console.error('[Analytics] shutdown failed:', err);
}
}

View file

@ -0,0 +1,38 @@
import { capture } from './posthog.js';
import type { UseCase } from './use_case.js';
// Shape compatible with ai-sdk v5 `LanguageModelUsage`.
// All fields are optional because providers report subsets.
export interface LlmUsageInput {
inputTokens?: number;
outputTokens?: number;
totalTokens?: number;
reasoningTokens?: number;
cachedInputTokens?: number;
}
export interface CaptureLlmUsageArgs {
useCase: UseCase;
subUseCase?: string;
agentName?: string;
model: string;
provider: string;
usage: LlmUsageInput | undefined;
}
export function captureLlmUsage(args: CaptureLlmUsageArgs): void {
const usage = args.usage ?? {};
const properties: Record<string, unknown> = {
use_case: args.useCase,
model: args.model,
provider: args.provider,
input_tokens: usage.inputTokens ?? 0,
output_tokens: usage.outputTokens ?? 0,
total_tokens: usage.totalTokens ?? (usage.inputTokens ?? 0) + (usage.outputTokens ?? 0),
};
if (args.subUseCase) properties.sub_use_case = args.subUseCase;
if (args.agentName) properties.agent_name = args.agentName;
if (usage.cachedInputTokens != null) properties.cached_input_tokens = usage.cachedInputTokens;
if (usage.reasoningTokens != null) properties.reasoning_tokens = usage.reasoningTokens;
capture('llm_usage', properties);
}

View file

@ -0,0 +1,28 @@
import { AsyncLocalStorage } from 'node:async_hooks';
export type UseCase = 'copilot_chat' | 'track_block' | 'meeting_note' | 'knowledge_sync';
export interface UseCaseContext {
useCase: UseCase;
subUseCase?: string;
agentName?: string;
}
const storage = new AsyncLocalStorage<UseCaseContext>();
export function withUseCase<T>(ctx: UseCaseContext, fn: () => T): T {
return storage.run(ctx, fn);
}
/**
* Permanently install a use-case context for the current async chain.
* Use inside generator functions where wrapping with `withUseCase()` doesn't
* compose. Child async work (e.g. tool execution) will inherit it.
*/
export function enterUseCase(ctx: UseCaseContext): void {
storage.enterWith(ctx);
}
export function getCurrentUseCase(): UseCaseContext | undefined {
return storage.getStore();
}

View file

@ -85,6 +85,8 @@ ${thirdPartyBlock}**Meeting Prep:** When users ask you to prepare for a meeting,
**Tracks (Auto-Updating Note Blocks):** When users ask you to **track**, **monitor**, **watch**, or **keep an eye on** something in a note or say things like "every morning tell me X", "show the current Y in this note", "pin live updates of Z here" load the \`tracks\` skill first. Also load it when a user presses Cmd+K with a note open and requests auto-refreshing content at the cursor. Track blocks are YAML-fenced scheduled blocks whose output is rewritten on each run — useful for weather, news, prices, status pages, and personal dashboards.
**Browser Control:** When users ask you to open a website, browse in-app, search the web in the embedded browser, or interact with a live webpage inside Rowboat, load the \`browser-control\` skill first. It explains the \`read-page -> indexed action -> refreshed page\` workflow for the browser pane.
**Notifications:** When you need to send a desktop notification completion alert after a long task, time-sensitive update, or a clickable result that lands the user on a specific note/view load the \`notify-user\` skill first. It documents the \`notify-user\` tool and the \`rowboat://\` deep links you can attach to it.
## Learning About the User (save-to-memory)

View file

@ -9,7 +9,15 @@ export interface RuntimeContext {
}
export function getExecutionShell(platform: NodeJS.Platform = process.platform): string {
return platform === 'win32' ? (process.env.ComSpec || 'cmd.exe') : '/bin/sh';
if (platform === 'win32') {
return process.env.ComSpec || 'cmd.exe';
}
if (process.env.SHELL) {
return process.env.SHELL;
}
return platform === 'darwin' ? '/bin/zsh' : '/bin/sh';
}
export function getRuntimeContext(platform: NodeJS.Platform = process.platform): RuntimeContext {

View file

@ -1,555 +0,0 @@
export const skill = String.raw`
# Background Agents
Load this skill whenever a user wants to inspect, create, edit, or schedule background agents inside the Rowboat workspace.
## Core Concepts
**IMPORTANT**: In the CLI, there are NO separate "workflow" files. Everything is an agent.
- **All definitions live in ` + "`agents/*.md`" + `** - Markdown files with YAML frontmatter
- Agents configure a model, tools (in frontmatter), and instructions (in the body)
- Tools can be: builtin (like ` + "`executeCommand`" + `), MCP integrations, or **other agents**
- **"Workflows" are just agents that orchestrate other agents** by having them as tools
- **Background agents run on schedules** defined in ` + "`config/agent-schedule.json`" + ` within the workspace root
## How multi-agent workflows work
1. **Create an orchestrator agent** that has other agents in its ` + "`tools`" + `
2. **Schedule the orchestrator** in agent-schedule.json (see Scheduling section below)
3. The orchestrator calls other agents as tools when needed
4. Data flows through tool call parameters and responses
## Scheduling Background Agents
Background agents run automatically based on schedules defined in ` + "`config/agent-schedule.json`" + ` in the workspace root.
### Schedule Configuration File
` + "```json" + `
{
"agents": {
"agent_name": {
"schedule": { ... },
"enabled": true
}
}
}
` + "```" + `
### Schedule Types
**IMPORTANT: All times are in local time** (the timezone of the machine running Rowboat).
**1. Cron Schedule** - Runs at exact times defined by cron expression
` + "```json" + `
{
"schedule": {
"type": "cron",
"expression": "0 8 * * *"
},
"enabled": true
}
` + "```" + `
Common cron expressions:
- ` + "`*/5 * * * *`" + ` - Every 5 minutes
- ` + "`0 8 * * *`" + ` - Every day at 8am
- ` + "`0 9 * * 1`" + ` - Every Monday at 9am
- ` + "`0 0 1 * *`" + ` - First day of every month at midnight
**2. Window Schedule** - Runs once during a time window
` + "```json" + `
{
"schedule": {
"type": "window",
"cron": "0 0 * * *",
"startTime": "08:00",
"endTime": "10:00"
},
"enabled": true
}
` + "```" + `
The agent will run once at a random time within the window. Use this when you want flexibility (e.g., "sometime in the morning" rather than "exactly at 8am").
**3. Once Schedule** - Runs exactly once at a specific time
` + "```json" + `
{
"schedule": {
"type": "once",
"runAt": "2024-02-05T10:30:00"
},
"enabled": true
}
` + "```" + `
Use this for one-time tasks like migrations or setup scripts. The ` + "`runAt`" + ` is in local time (no Z suffix).
### Starting Message
You can specify a ` + "`startingMessage`" + ` that gets sent to the agent when it starts. If not provided, defaults to ` + "`\"go\"`" + `.
` + "```json" + `
{
"schedule": { "type": "cron", "expression": "0 8 * * *" },
"enabled": true,
"startingMessage": "Please summarize my emails from the last 24 hours"
}
` + "```" + `
### Description
You can add a ` + "`description`" + ` field to describe what the agent does. This is displayed in the UI.
` + "```json" + `
{
"schedule": { "type": "cron", "expression": "0 8 * * *" },
"enabled": true,
"description": "Summarizes emails and calendar events every morning"
}
` + "```" + `
### Complete Schedule Example
` + "```json" + `
{
"agents": {
"daily_digest": {
"schedule": {
"type": "cron",
"expression": "0 8 * * *"
},
"enabled": true,
"description": "Daily email and calendar summary",
"startingMessage": "Summarize my emails and calendar for today"
},
"morning_briefing": {
"schedule": {
"type": "window",
"cron": "0 0 * * *",
"startTime": "07:00",
"endTime": "09:00"
},
"enabled": true,
"description": "Morning news and updates briefing"
},
"one_time_setup": {
"schedule": {
"type": "once",
"runAt": "2024-12-01T12:00:00"
},
"enabled": true,
"description": "One-time data migration task"
}
}
}
` + "```" + `
### Schedule State (Read-Only)
**IMPORTANT: Do NOT modify ` + "`agent-schedule-state.json`" + `** - it is managed automatically by the background runner.
The runner automatically tracks execution state in ` + "`config/agent-schedule-state.json`" + ` in the workspace root:
- ` + "`status`" + `: scheduled, running, finished, failed, triggered (for once-schedules)
- ` + "`lastRunAt`" + `: When the agent last ran
- ` + "`nextRunAt`" + `: When the agent will run next
- ` + "`lastError`" + `: Error message if the last run failed
- ` + "`runCount`" + `: Total number of runs
When you add an agent to ` + "`agent-schedule.json`" + `, the runner will automatically create and manage its state entry. You only need to edit ` + "`agent-schedule.json`" + `.
## Agent File Format
Agent files are **Markdown files with YAML frontmatter**. The frontmatter contains configuration (model, tools), and the body contains the instructions.
### Basic Structure
` + "```markdown" + `
---
model: gpt-5.1
tools:
tool_key:
type: builtin
name: tool_name
---
# Instructions
Your detailed instructions go here in Markdown format.
` + "```" + `
### Frontmatter Fields
- ` + "`model`" + `: (OPTIONAL) Model to use (e.g., 'gpt-5.1', 'claude-sonnet-4-5')
- ` + "`provider`" + `: (OPTIONAL) Provider alias from models.json
- ` + "`tools`" + `: (OPTIONAL) Object containing tool definitions
### Instructions (Body)
The Markdown body after the frontmatter contains the agent's instructions. Use standard Markdown formatting.
### Naming Rules
- Agent filename determines the agent name (without .md extension)
- Example: ` + "`summariser_agent.md`" + ` creates an agent named "summariser_agent"
- Use lowercase with underscores for multi-word names
- No spaces or special characters in names
- **The agent name in agent-schedule.json must match the filename** (without .md)
### Agent Format Example
` + "```markdown" + `
---
model: gpt-5.1
tools:
search:
type: mcp
name: firecrawl_search
description: Search the web
mcpServerName: firecrawl
inputSchema:
type: object
properties:
query:
type: string
description: Search query
required:
- query
---
# Web Search Agent
You are a web search agent. When asked a question:
1. Use the search tool to find relevant information
2. Summarize the results clearly
3. Cite your sources
Be concise and accurate.
` + "```" + `
## Tool Types & Schemas
Tools in agents must follow one of three types. Each has specific required fields.
### 1. Builtin Tools
Internal Rowboat tools (executeCommand, file operations, MCP queries, etc.)
**YAML Schema:**
` + "```yaml" + `
tool_key:
type: builtin
name: tool_name
` + "```" + `
**Required fields:**
- ` + "`type`" + `: Must be "builtin"
- ` + "`name`" + `: Builtin tool name (e.g., "executeCommand", "workspace-readFile")
**Example:**
` + "```yaml" + `
bash:
type: builtin
name: executeCommand
` + "```" + `
**Available builtin tools:**
- ` + "`executeCommand`" + ` - Execute shell commands
- ` + "`workspace-readFile`" + `, ` + "`workspace-writeFile`" + `, ` + "`workspace-remove`" + ` - File operations
- ` + "`workspace-readdir`" + `, ` + "`workspace-exists`" + `, ` + "`workspace-stat`" + ` - Directory operations
- ` + "`workspace-mkdir`" + `, ` + "`workspace-rename`" + `, ` + "`workspace-copy`" + ` - File/directory management
- ` + "`analyzeAgent`" + ` - Analyze agent structure
- ` + "`addMcpServer`" + `, ` + "`listMcpServers`" + `, ` + "`listMcpTools`" + ` - MCP management
- ` + "`loadSkill`" + ` - Load skill guidance
### 2. MCP Tools
Tools from external MCP servers (APIs, databases, web scraping, etc.)
**YAML Schema:**
` + "```yaml" + `
tool_key:
type: mcp
name: tool_name_from_server
description: What the tool does
mcpServerName: server_name_from_config
inputSchema:
type: object
properties:
param:
type: string
description: Parameter description
required:
- param
` + "```" + `
**Required fields:**
- ` + "`type`" + `: Must be "mcp"
- ` + "`name`" + `: Exact tool name from MCP server
- ` + "`description`" + `: What the tool does (helps agent understand when to use it)
- ` + "`mcpServerName`" + `: Server name from config/mcp.json
- ` + "`inputSchema`" + `: Full JSON Schema object for tool parameters
**Example:**
` + "```yaml" + `
search:
type: mcp
name: firecrawl_search
description: Search the web
mcpServerName: firecrawl
inputSchema:
type: object
properties:
query:
type: string
description: Search query
required:
- query
` + "```" + `
**Important:**
- Use ` + "`listMcpTools`" + ` to get the exact inputSchema from the server
- Copy the schema exactlydon't modify property types or structure
- Only include ` + "`required`" + ` array if parameters are mandatory
### 3. Agent Tools (for chaining agents)
Reference other agents as tools to build multi-agent workflows
**YAML Schema:**
` + "```yaml" + `
tool_key:
type: agent
name: target_agent_name
` + "```" + `
**Required fields:**
- ` + "`type`" + `: Must be "agent"
- ` + "`name`" + `: Name of the target agent (must exist in agents/ directory)
**Example:**
` + "```yaml" + `
summariser:
type: agent
name: summariser_agent
` + "```" + `
**How it works:**
- Use ` + "`type: agent`" + ` to call other agents as tools
- The target agent will be invoked with the parameters you pass
- Results are returned as tool output
- This is how you build multi-agent workflows
- The referenced agent file must exist (e.g., ` + "`agents/summariser_agent.md`" + `)
## Complete Multi-Agent Workflow Example
**Email digest workflow** - This is all done through agents calling other agents:
**1. Task-specific agent** (` + "`agents/email_reader.md`" + `):
` + "```markdown" + `
---
model: gpt-5.1
tools:
read_file:
type: builtin
name: workspace-readFile
list_dir:
type: builtin
name: workspace-readdir
---
# Email Reader Agent
Read emails from the gmail_sync folder and extract key information.
Look for unread or recent emails and summarize the sender, subject, and key points.
Don't ask for human input.
` + "```" + `
**2. Agent that delegates to other agents** (` + "`agents/daily_summary.md`" + `):
` + "```markdown" + `
---
model: gpt-5.1
tools:
email_reader:
type: agent
name: email_reader
write_file:
type: builtin
name: workspace-writeFile
---
# Daily Summary Agent
1. Use the email_reader tool to get email summaries
2. Create a consolidated daily digest
3. Save the digest to ~/Desktop/daily_digest.md
Don't ask for human input.
` + "```" + `
Note: The output path (` + "`~/Desktop/daily_digest.md`" + `) is hardcoded in the instructions. When creating agents that output files, always ask the user where they want files saved and include the full path in the agent instructions.
**3. Orchestrator agent** (` + "`agents/morning_briefing.md`" + `):
` + "```markdown" + `
---
model: gpt-5.1
tools:
daily_summary:
type: agent
name: daily_summary
search:
type: mcp
name: search
mcpServerName: exa
description: Search the web for news
inputSchema:
type: object
properties:
query:
type: string
description: Search query
---
# Morning Briefing Workflow
Create a morning briefing:
1. Get email digest using daily_summary
2. Search for relevant news using the search tool
3. Compile a comprehensive morning briefing
Execute these steps in sequence. Don't ask for human input.
` + "```" + `
**4. Schedule the workflow** in ` + "`config/agent-schedule.json`" + `:
` + "```json" + `
{
"agents": {
"morning_briefing": {
"schedule": {
"type": "cron",
"expression": "0 7 * * *"
},
"enabled": true,
"startingMessage": "Create my morning briefing for today"
}
}
}
` + "```" + `
This schedules the morning briefing workflow to run every day at 7am local time.
## Naming and organization rules
- **All agents live in ` + "`agents/*.md`" + `** - Markdown files with YAML frontmatter
- Agent filename (without .md) becomes the agent name
- When referencing an agent as a tool, use its filename without extension
- When scheduling an agent, use its filename without extension in agent-schedule.json
- Use relative paths (no \${BASE_DIR} prefixes) when giving examples to users
## Best practices for background agents
1. **Single responsibility**: Each agent should do one specific thing well
2. **Clear delegation**: Agent instructions should explicitly say when to call other agents
3. **Autonomous operation**: Add "Don't ask for human input" for background agents
4. **Data passing**: Make it clear what data to extract and pass between agents
5. **Tool naming**: Use descriptive tool keys (e.g., "summariser", "fetch_data", "analyze")
6. **Orchestration**: Create a top-level agent that coordinates the workflow
7. **Scheduling**: Use appropriate schedule types - cron for recurring, window for flexible timing, once for one-time tasks
8. **Error handling**: Background agents should handle errors gracefully since there's no human to intervene
9. **Avoid executeCommand**: Do NOT attach ` + "`executeCommand`" + ` to background agents as it poses security risks when running unattended. Instead, use the specific builtin tools needed (` + "`workspace-readFile`" + `, ` + "`workspace-writeFile`" + `, etc.) or MCP tools for external integrations
10. **File output paths**: When creating an agent that outputs files, ASK the user where the file should be stored (default to Desktop: ` + "`~/Desktop`" + `). Then hardcode the full output path in the agent's instructions so it knows exactly where to write files. Example instruction: "Save the output to /Users/username/Desktop/daily_report.md"
## Validation & Best Practices
### CRITICAL: Schema Compliance
- Agent files MUST be valid Markdown with YAML frontmatter
- Agent filename (without .md) becomes the agent name
- Tools in frontmatter MUST have valid ` + "`type`" + ` ("builtin", "mcp", or "agent")
- MCP tools MUST have all required fields: name, description, mcpServerName, inputSchema
- Agent tools MUST reference existing agent files
- Invalid agents will fail to load and prevent workflow execution
### File Creation/Update Process
1. When creating an agent, use ` + "`workspace-writeFile`" + ` with valid Markdown + YAML frontmatter
2. When updating an agent, read it first with ` + "`workspace-readFile`" + `, modify, then use ` + "`workspace-writeFile`" + `
3. Validate YAML syntax in frontmatter before writingmalformed YAML breaks the agent
4. **Quote strings containing colons** (e.g., ` + "`description: \"Default: 8\"`" + ` not ` + "`description: Default: 8`" + `)
5. Test agent loading after creation/update by using ` + "`analyzeAgent`" + `
### Common Validation Errors to Avoid
**WRONG - Missing frontmatter delimiters:**
` + "```markdown" + `
model: gpt-5.1
# My Agent
Instructions here
` + "```" + `
**WRONG - Invalid YAML indentation:**
` + "```markdown" + `
---
tools:
bash:
type: builtin
---
` + "```" + `
(bash should be indented under tools)
**WRONG - Invalid tool type:**
` + "```yaml" + `
tools:
tool1:
type: custom
name: something
` + "```" + `
(type must be builtin, mcp, or agent)
**WRONG - Unquoted strings containing colons:**
` + "```yaml" + `
tools:
search:
description: Number of results (default: 8)
` + "```" + `
(Strings with colons must be quoted: ` + "`description: \"Number of results (default: 8)\"`" + `)
**WRONG - MCP tool missing required fields:**
` + "```yaml" + `
tools:
search:
type: mcp
name: firecrawl_search
` + "```" + `
(Missing: description, mcpServerName, inputSchema)
**CORRECT - Minimal valid agent** (` + "`agents/simple_agent.md`" + `):
` + "```markdown" + `
---
model: gpt-5.1
---
# Simple Agent
Do simple tasks as instructed.
` + "```" + `
**CORRECT - Agent with MCP tool** (` + "`agents/search_agent.md`" + `):
` + "```markdown" + `
---
model: gpt-5.1
tools:
search:
type: mcp
name: firecrawl_search
description: Search the web
mcpServerName: firecrawl
inputSchema:
type: object
properties:
query:
type: string
---
# Search Agent
Use the search tool to find information on the web.
` + "```" + `
## Capabilities checklist
1. Explore ` + "`agents/`" + ` directory to understand existing agents before editing
2. Read existing agents with ` + "`workspace-readFile`" + ` before making changes
3. Validate YAML frontmatter syntax before creating/updating agents
4. Use ` + "`analyzeAgent`" + ` to verify agent structure after creation/update
5. When creating multi-agent workflows, create an orchestrator agent
6. Add other agents as tools with ` + "`type: agent`" + ` for chaining
7. Use ` + "`listMcpServers`" + ` and ` + "`listMcpTools`" + ` when adding MCP integrations
8. Configure schedules in ` + "`config/agent-schedule.json`" + ` (ONLY edit this file, NOT the state file)
9. Confirm work done and outline next steps once changes are complete
`;
export default skill;

View file

@ -14,8 +14,10 @@ Use this skill when the user asks you to open a website, browse in-app, search t
- page ` + "`url`" + ` and ` + "`title`" + `
- visible page text
- interactable elements with numbered ` + "`index`" + ` values
4. Prefer acting on those numbered indices with ` + "`click`" + ` / ` + "`type`" + ` / ` + "`press`" + `.
5. After each action, read the returned page snapshot before deciding the next step.
- ` + "`suggestedSkills`" + ` site-specific and interaction-specific skill hints for the current page
4. **Always inspect ` + "`suggestedSkills`" + ` before acting.** If any skill in the list matches what the user asked for (site or task), call ` + "`load-browser-skill({ id: \"<id>\" })`" + ` *first*, read it in full, then plan your actions. These skills encode selectors, timing, and gotchas that would otherwise cost you several failed attempts to rediscover. If no skill matches, proceed — but do not skip this check.
5. Prefer acting on those numbered indices with ` + "`click`" + ` / ` + "`type`" + ` / ` + "`press`" + `.
6. After each action, read the returned page snapshot before deciding the next step including re-checking ` + "`suggestedSkills`" + ` if the navigation landed you on a new domain.
## Actions
@ -92,12 +94,23 @@ Wait for the page to settle, useful after async UI changes.
Parameters:
- ` + "`ms`" + `: milliseconds to wait (optional)
## Companion Tools
### load-browser-skill
Rowboat caches a library of browser skills (from ` + "`browser-use/browser-harness`" + `) indexed by both **domain** (github, linkedin, amazon, booking, ) and **interaction type** within a domain (e.g. ` + "`github/repo-actions`" + `, ` + "`github/scraping`" + `, ` + "`arxiv-bulk/*`" + `). Whenever ` + "`browser-control`" + ` returns a ` + "`suggestedSkills`" + ` array which it does on ` + "`navigate`" + `, ` + "`new-tab`" + `, and ` + "`read-page`" + ` treat it as a required reading step, not optional. Pick the entry that matches the current task (domain match first, then the interaction-specific variant if one exists) and call ` + "`load-browser-skill({ id: \"<id>\" })`" + ` before attempting the action.
You can also proactively call ` + "`load-browser-skill({ action: \"list\", site: \"<site>\" })`" + ` when you know you're about to work on a site, to see what skills exist even if ` + "`suggestedSkills`" + ` is empty (e.g. before navigating).
These skills are written against a Python harness, so treat them as **reference knowledge**. Reuse the selectors, timing, and sequencing, but adapt them to Rowboat's structured browser actions. **Do not look for or call ` + "`http-fetch`" + `.** If a browser-harness recipe suggests ` + "`js(...)`" + ` or ` + "`http_get(...)`" + ` style shortcuts, treat those as non-portable and fall back to reading and interacting with the page itself.
## Important Rules
- Prefer ` + "`read-page`" + ` before interacting.
- Prefer element ` + "`index`" + ` over CSS selectors.
- If the tool says the snapshot is stale, call ` + "`read-page`" + ` again.
- After navigation, clicking, typing, pressing, or scrolling, use the returned page snapshot instead of assuming the page state.
- **Always check ` + "`suggestedSkills`" + ` after ` + "`navigate`" + `, ` + "`new-tab`" + `, or ` + "`read-page`" + `, and load the matching domain or interaction skill before acting.** Skipping this step is the single most common way to waste a dozen failed clicks on a site whose quirks are already documented. If the array is empty, proceed normally but don't skip the check.
- Do not try to use ` + "`http-fetch`" + `. If a browser-harness recipe mentions ` + "`http_get(...)`" + ` or a public API shortcut, adapt it to DOM-based browsing instead.
- Use Rowboat's browser for live interaction. Use web search tools for research where a live session is unnecessary.
- Do not wrap browser URLs or browser pages in ` + "```filepath" + ` blocks. Filepath cards are only for real files on disk, not web pages or browser tabs.
- If you mention a page the browser opened, use plain text for the URL/title instead of trying to create a clickable file card.

View file

@ -7,13 +7,13 @@ import draftEmailsSkill from "./draft-emails/skill.js";
import mcpIntegrationSkill from "./mcp-integration/skill.js";
import meetingPrepSkill from "./meeting-prep/skill.js";
import organizeFilesSkill from "./organize-files/skill.js";
import backgroundAgentsSkill from "./background-agents/skill.js";
import createPresentationsSkill from "./create-presentations/skill.js";
import appNavigationSkill from "./app-navigation/skill.js";
import browserControlSkill from "./browser-control/skill.js";
import composioIntegrationSkill from "./composio-integration/skill.js";
import tracksSkill from "./tracks/skill.js";
import notifyUserSkill from "./notify-user/skill.js";
const CURRENT_DIR = path.dirname(fileURLToPath(import.meta.url));
const CATALOG_PREFIX = "src/application/assistant/skills";
@ -64,12 +64,6 @@ const definitions: SkillDefinition[] = [
summary: "Find, organize, and tidy up files on the user's machine. Move files to folders, clean up Desktop/Downloads, locate specific files.",
content: organizeFilesSkill,
},
{
id: "background-agents",
title: "Background Agents",
summary: "Creating, editing, and scheduling background agents. Configure schedules in agent-schedule.json and build multi-agent workflows.",
content: backgroundAgentsSkill,
},
{
id: "builtin-tools",
title: "Builtin Tools Reference",
@ -112,6 +106,12 @@ const definitions: SkillDefinition[] = [
summary: "Control the embedded browser pane - open sites, inspect page state, and interact with indexed page elements.",
content: browserControlSkill,
},
{
id: "notify-user",
title: "Notify User",
summary: "Send native desktop notifications with optional clickable links — including rowboat:// deep links that open a specific note, chat, or view inside the app.",
content: notifyUserSkill,
},
];
const skillEntries = definitions.map((definition) => ({

View file

@ -0,0 +1,70 @@
export const skill = String.raw`
# Notify User
Load this skill when you need to send a desktop notification to the user e.g. after a long-running task completes, when a track block detects something noteworthy, or when an agent wants to ping the user with a clickable result.
## When to use
- **Use it for**: completion alerts, threshold breaches, status changes, new items the user asked you to watch for, anything time-sensitive.
- **Don't use it for**: routine progress updates, anything the user can already see in the chat, or repeated pings inside a loop (there is no built-in rate limit restraint is on you).
## The tool: \`notify-user\`
Triggers a native macOS notification. The call returns immediately; it does not block waiting for the user to click.
### Parameters
- **\`title\`** (optional, defaults to \`"Rowboat"\`) — bold headline at the top.
- **\`message\`** (required) — body text. Keep it short — macOS truncates after a couple of lines.
- **\`link\`** (optional) — URL to open when the user clicks the notification. Two kinds accepted:
- **\`https://...\` / \`http://...\`** — opens in the default browser
- **\`rowboat://...\`** — opens a view inside Rowboat (see deep links below)
- If omitted, clicking the notification focuses the Rowboat app.
### Examples
Plain alert (no link clicking focuses the app):
\`\`\`json
{
"title": "Backup complete",
"message": "All 142 files synced to iCloud."
}
\`\`\`
External link:
\`\`\`json
{
"title": "New email from Monica",
"message": "Re: Q4 planning — needs your input by Friday",
"link": "https://mail.google.com/mail/u/0/#inbox/abc123"
}
\`\`\`
Deep link into a Rowboat note:
\`\`\`json
{
"message": "Daily brief is ready",
"link": "rowboat://open?type=file&path=knowledge/Daily/2026-04-25.md"
}
\`\`\`
## Deep links: \`rowboat://\`
Use these as the \`link\` parameter to land the user on a specific view in Rowboat instead of an external site. URL-encode paths/names that contain spaces or special characters.
| Target | Format | Example |
|---|---|---|
| Open a file | \`rowboat://open?type=file&path=<workspace-relative path>\` | \`rowboat://open?type=file&path=knowledge/People/Acme.md\` |
| Open chat | \`rowboat://open?type=chat\` (optional \`&runId=<id>\`) | \`rowboat://open?type=chat&runId=abc123\` |
| Knowledge graph | \`rowboat://open?type=graph\` | — |
| Background task view | \`rowboat://open?type=task&name=<task-name>\` | \`rowboat://open?type=task&name=daily-brief\` |
| Suggested topics | \`rowboat://open?type=suggested-topics\` | — |
The \`type=file\` path is workspace-relative (the same path you'd pass to \`workspace-readFile\`).
## Anti-patterns
- **Don't notify per step** of a multi-step task. Notify on completion, not on progress.
- **Don't repeat what's already on screen.** If the result is already in the chat or in a track block the user is viewing, skip the notification.
- **Don't dump the result into \`message\`.** Surface the headline; put the detail behind a deep link or external link.
- **Don't notify silently-failing things either.** If something failed, say so in the message — don't swallow the failure into a generic "done".
`;
export default skill;

View file

@ -349,6 +349,20 @@ In that flow:
6. Keep the surrounding note scaffolding minimal but useful. The track block should be the core of the note.
7. If the target folder is one of the structured knowledge folders (` + "`" + `knowledge/People/` + "`" + `, ` + "`" + `knowledge/Organizations/` + "`" + `, ` + "`" + `knowledge/Projects/` + "`" + `, ` + "`" + `knowledge/Topics/` + "`" + `), mirror the local note style by quickly checking a nearby note or config before writing if needed.
### Background agent setup flow
Sometimes the user arrives from the Background agents panel and wants help creating a new background agent without naming a note yet.
In this flow, treat "background agent" and "track block" as the same feature. The user-facing term can stay "background agent", but the implementation is a track block inside a note. Do **not** claim these are different systems, and do **not** redirect the user toward standalone agent files or ` + "`" + `agent-schedule.json` + "`" + ` unless they explicitly ask for that architecture.
In that flow:
1. On the first turn, **do not create or modify anything yet**. Briefly explain what you can set up, say you will put it in ` + "`" + `knowledge/Tasks/` + "`" + ` by default, and ask what it should monitor plus how often it should run.
2. **Do not** ask the user where the results should live unless they explicitly said they want a different folder or there is a real ambiguity you cannot resolve.
3. If the user clearly confirms later, treat ` + "`" + `knowledge/Tasks/` + "`" + ` as the default target folder.
4. Before creating a new note there, search ` + "`" + `knowledge/Tasks/` + "`" + ` for an existing matching note and update it if one already exists.
5. If ` + "`" + `knowledge/Tasks/` + "`" + ` does not exist, create it as part of setup instead of bouncing back to ask.
6. Keep the surrounding note scaffolding minimal but useful. The track block should be the core of the note.
## The Exact Text to Insert
Write it verbatim like this (including the blank line between fence and target):

View file

@ -0,0 +1,3 @@
export { ensureLoaded, readSkillContent, refreshFromRemote } from './loader.js';
export type { SkillEntry, SkillsIndex, LoaderStatus } from './loader.js';
export { matchSkillsForUrl } from './matcher.js';

View file

@ -0,0 +1,215 @@
import * as path from 'node:path';
import * as fs from 'node:fs/promises';
import { WorkDir } from '../../config/config.js';
const REPO_OWNER = 'browser-use';
const REPO_NAME = 'browser-harness';
const REPO_BRANCH = 'main';
const DOMAIN_SKILLS_PREFIX = 'domain-skills/';
const MANIFEST_TTL_MS = 24 * 60 * 60 * 1000;
const FETCH_TIMEOUT_MS = 20_000;
export type SkillEntry = {
id: string; // e.g. "github/repo-actions"
site: string; // e.g. "github"
fileName: string; // e.g. "repo-actions.md"
title: string; // first H1 from the markdown, or a derived title
path: string; // relative repo path, e.g. "domain-skills/github/repo-actions.md"
localPath: string; // absolute path on disk
};
export type SkillsIndex = {
fetchedAt: number;
treeSha: string;
entries: SkillEntry[];
};
export type LoaderStatus =
| { status: 'ready'; index: SkillsIndex }
| { status: 'stale'; index: SkillsIndex; refreshing: boolean }
| { status: 'empty' }
| { status: 'error'; error: string };
const cacheRoot = () => path.join(WorkDir, 'cache', 'browser-skills');
const skillsDir = () => path.join(cacheRoot(), 'domain-skills');
const manifestPath = () => path.join(cacheRoot(), 'manifest.json');
async function ensureCacheDir(): Promise<void> {
await fs.mkdir(skillsDir(), { recursive: true });
}
async function readManifest(): Promise<SkillsIndex | null> {
try {
const raw = await fs.readFile(manifestPath(), 'utf8');
const parsed = JSON.parse(raw) as SkillsIndex;
if (!parsed.entries || !Array.isArray(parsed.entries)) return null;
return parsed;
} catch {
return null;
}
}
async function writeManifest(index: SkillsIndex): Promise<void> {
await ensureCacheDir();
await fs.writeFile(manifestPath(), JSON.stringify(index, null, 2), 'utf8');
}
function extractTitle(markdown: string, fallback: string): string {
const match = markdown.match(/^#\s+(.+?)\s*$/m);
if (match?.[1]) return match[1].trim();
return fallback;
}
async function fetchWithTimeout(url: string, init?: RequestInit): Promise<Response> {
const controller = new AbortController();
const timer = setTimeout(() => controller.abort(), FETCH_TIMEOUT_MS);
try {
return await fetch(url, {
...init,
signal: controller.signal,
headers: {
'User-Agent': 'rowboat-browser-skills',
Accept: 'application/vnd.github+json',
...(init?.headers ?? {}),
},
});
} finally {
clearTimeout(timer);
}
}
type GithubTreeNode = { path: string; type: string; sha: string };
async function fetchRepoTree(): Promise<{ treeSha: string; skillPaths: string[] }> {
const branchUrl = `https://api.github.com/repos/${REPO_OWNER}/${REPO_NAME}/branches/${REPO_BRANCH}`;
const branchRes = await fetchWithTimeout(branchUrl);
if (!branchRes.ok) {
throw new Error(`GitHub branch fetch failed: ${branchRes.status} ${branchRes.statusText}`);
}
const branch = (await branchRes.json()) as { commit: { commit: { tree: { sha: string } } } };
const treeSha = branch.commit?.commit?.tree?.sha;
if (!treeSha) throw new Error('Could not resolve tree SHA from branch response.');
const treeUrl = `https://api.github.com/repos/${REPO_OWNER}/${REPO_NAME}/git/trees/${treeSha}?recursive=1`;
const treeRes = await fetchWithTimeout(treeUrl);
if (!treeRes.ok) {
throw new Error(`GitHub tree fetch failed: ${treeRes.status} ${treeRes.statusText}`);
}
const tree = (await treeRes.json()) as { tree: GithubTreeNode[]; truncated: boolean };
const skillPaths = tree.tree
.filter((n) => n.type === 'blob' && n.path.startsWith(DOMAIN_SKILLS_PREFIX) && n.path.endsWith('.md'))
.map((n) => n.path);
return { treeSha, skillPaths };
}
async function fetchRawFile(repoPath: string): Promise<string> {
const url = `https://raw.githubusercontent.com/${REPO_OWNER}/${REPO_NAME}/${REPO_BRANCH}/${repoPath}`;
const res = await fetchWithTimeout(url, { headers: { Accept: 'text/plain' } });
if (!res.ok) {
throw new Error(`Raw file fetch failed for ${repoPath}: ${res.status} ${res.statusText}`);
}
return res.text();
}
function parseRepoPath(repoPath: string): { id: string; site: string; fileName: string } | null {
const rel = repoPath.slice(DOMAIN_SKILLS_PREFIX.length);
const parts = rel.split('/');
if (parts.length < 2) return null;
const site = parts[0];
const fileName = parts.slice(1).join('/');
const id = rel.replace(/\.md$/, '');
return { id, site, fileName };
}
export async function refreshFromRemote(): Promise<SkillsIndex> {
await ensureCacheDir();
const { treeSha, skillPaths } = await fetchRepoTree();
const entries: SkillEntry[] = [];
await Promise.all(skillPaths.map(async (repoPath) => {
const parsed = parseRepoPath(repoPath);
if (!parsed) return;
try {
const content = await fetchRawFile(repoPath);
const localRel = path.join(parsed.site, parsed.fileName);
const localPath = path.join(skillsDir(), localRel);
await fs.mkdir(path.dirname(localPath), { recursive: true });
await fs.writeFile(localPath, content, 'utf8');
entries.push({
id: parsed.id,
site: parsed.site,
fileName: parsed.fileName,
title: extractTitle(content, parsed.id),
path: repoPath,
localPath,
});
} catch (err) {
console.warn(`[browser-skills] Failed to fetch ${repoPath}:`, err);
}
}));
entries.sort((a, b) => a.id.localeCompare(b.id));
const index: SkillsIndex = {
fetchedAt: Date.now(),
treeSha,
entries,
};
await writeManifest(index);
return index;
}
let inFlightRefresh: Promise<SkillsIndex> | null = null;
export async function ensureLoaded(options?: { forceRefresh?: boolean }): Promise<LoaderStatus> {
try {
const existing = await readManifest();
const fresh = existing && Date.now() - existing.fetchedAt < MANIFEST_TTL_MS;
if (existing && fresh && !options?.forceRefresh) {
return { status: 'ready', index: existing };
}
if (existing && !options?.forceRefresh) {
if (!inFlightRefresh) {
inFlightRefresh = refreshFromRemote()
.catch((err) => {
console.warn('[browser-skills] Background refresh failed:', err);
return existing;
})
.finally(() => { inFlightRefresh = null; });
}
return { status: 'stale', index: existing, refreshing: true };
}
if (!inFlightRefresh) {
inFlightRefresh = refreshFromRemote().finally(() => { inFlightRefresh = null; });
}
try {
const index = await inFlightRefresh;
return { status: 'ready', index };
} catch (err) {
return { status: 'error', error: err instanceof Error ? err.message : 'Failed to load skills.' };
}
} catch (err) {
return { status: 'error', error: err instanceof Error ? err.message : 'Skill loader failed.' };
}
}
export async function readSkillContent(id: string): Promise<{ ok: true; content: string; entry: SkillEntry } | { ok: false; error: string }> {
const status = await ensureLoaded();
if (status.status === 'error' || status.status === 'empty') {
return { ok: false, error: status.status === 'error' ? status.error : 'No skills cached yet.' };
}
const entry = status.index.entries.find((e) => e.id === id);
if (!entry) return { ok: false, error: `Skill '${id}' not found.` };
try {
const content = await fs.readFile(entry.localPath, 'utf8');
return { ok: true, content, entry };
} catch (err) {
return { ok: false, error: err instanceof Error ? err.message : 'Failed to read skill file.' };
}
}

View file

@ -0,0 +1,56 @@
import type { SkillEntry, SkillsIndex } from './loader.js';
/**
* Map browser-harness `domain-skills/<site>/` folder names to hostname tokens we
* match against the current tab's URL.
*
* Heuristic: for each site folder we generate candidate hostnames like
* "booking-com" -> ["booking-com", "bookingcom", "booking.com"]
* "github" -> ["github", "github.com"]
* "dev-to" -> ["dev-to", "devto", "dev.to"]
* Then we check whether any candidate is a substring of the tab hostname.
*/
function siteCandidates(site: string): string[] {
const candidates = new Set<string>();
candidates.add(site);
candidates.add(site.replace(/-/g, ''));
candidates.add(site.replace(/-/g, '.'));
if (site.endsWith('-com')) {
candidates.add(`${site.slice(0, -4)}.com`);
}
if (site.endsWith('-org')) {
candidates.add(`${site.slice(0, -4)}.org`);
}
if (site.endsWith('-io')) {
candidates.add(`${site.slice(0, -3)}.io`);
}
return Array.from(candidates);
}
function extractHostname(url: string): string | null {
try {
return new URL(url).hostname.toLowerCase();
} catch {
return null;
}
}
export function matchSkillsForUrl(index: SkillsIndex, url: string, limit = 5): SkillEntry[] {
const hostname = extractHostname(url);
if (!hostname) return [];
const bySite = new Map<string, SkillEntry[]>();
for (const entry of index.entries) {
if (!bySite.has(entry.site)) bySite.set(entry.site, []);
bySite.get(entry.site)!.push(entry);
}
const matched: SkillEntry[] = [];
for (const [site, entries] of bySite) {
const candidates = siteCandidates(site);
const hit = candidates.some((c) => hostname === c || hostname.endsWith(`.${c}`) || hostname.includes(c));
if (hit) matched.push(...entries);
}
return matched.slice(0, limit);
}

View file

@ -18,15 +18,19 @@ import { composioAccountsRepo } from "../../composio/repo.js";
import { executeAction as executeComposioAction, isConfigured as isComposioConfigured, searchTools as searchComposioTools } from "../../composio/client.js";
import { CURATED_TOOLKITS, CURATED_TOOLKIT_SLUGS } from "@x/shared/dist/composio.js";
import { BrowserControlInputSchema, type BrowserControlInput } from "@x/shared/dist/browser-control.js";
import { ensureLoaded as ensureBrowserSkillsLoaded, readSkillContent as readBrowserSkillContent, refreshFromRemote as refreshBrowserSkills } from "../browser-skills/index.js";
import type { ToolContext } from "./exec-tool.js";
import { generateText } from "ai";
import { createProvider } from "../../models/models.js";
import { getDefaultModelAndProvider, resolveProviderConfig } from "../../models/defaults.js";
import { captureLlmUsage } from "../../analytics/usage.js";
import { getCurrentUseCase } from "../../analytics/use_case.js";
import { isSignedIn } from "../../account/account.js";
import { getAccessToken } from "../../auth/tokens.js";
import { API_URL } from "../../config/env.js";
import { updateContent, updateTrackBlock } from "../../knowledge/track/fileops.js";
import type { IBrowserControlService } from "../browser-control/service.js";
import type { INotificationService } from "../notification/service.js";
// Parser libraries are loaded dynamically inside parseFile.execute()
// to avoid pulling pdfjs-dist's DOM polyfills into the main bundle.
// Import paths are computed so esbuild cannot statically resolve them.
@ -764,6 +768,16 @@ export const BuiltinTools: z.infer<typeof BuiltinToolsSchema> = {
],
});
const ctx = getCurrentUseCase();
captureLlmUsage({
useCase: ctx?.useCase ?? 'copilot_chat',
subUseCase: 'file_parse',
...(ctx?.agentName ? { agentName: ctx.agentName } : {}),
model: modelId,
provider: providerName,
usage: response.usage,
});
return {
success: true,
fileName,
@ -994,6 +1008,71 @@ export const BuiltinTools: z.infer<typeof BuiltinToolsSchema> = {
},
},
// ============================================================================
// Browser Skills (browser-use/browser-harness domain-skills cache)
// ============================================================================
'load-browser-skill': {
description: 'Load a site-specific browser skill (from the browser-use/browser-harness domain-skills library) by id. Returns the full markdown content with selectors, gotchas, and recipes for the target site. Call this after browser-control responses surface a matching skill in suggestedSkills. Pass action="list" to see all available skills. Skills are fetched on first use and cached locally; pass action="refresh" to force an update from upstream.',
inputSchema: z.object({
action: z.enum(['load', 'list', 'refresh']).optional().describe('load: fetch a skill by id (default). list: list all cached skills. refresh: re-fetch the library from upstream.'),
id: z.string().optional().describe('Skill id (e.g., "github/repo-actions") — required for load.'),
site: z.string().optional().describe('Filter list results to a single site (e.g., "github").'),
}),
execute: async (input: { action?: 'load' | 'list' | 'refresh'; id?: string; site?: string }) => {
const action = input.action ?? 'load';
try {
if (action === 'refresh') {
const index = await refreshBrowserSkills();
return {
success: true,
message: `Refreshed ${index.entries.length} skill${index.entries.length === 1 ? '' : 's'} from upstream.`,
count: index.entries.length,
treeSha: index.treeSha,
};
}
if (action === 'list') {
const status = await ensureBrowserSkillsLoaded();
if (status.status === 'error') {
return { success: false, error: status.error };
}
if (status.status === 'empty') {
return { success: false, error: 'No browser skills cached yet.' };
}
const entries = status.index.entries
.filter((e) => !input.site || e.site === input.site)
.map((e) => ({ id: e.id, title: e.title, site: e.site }));
return {
success: true,
count: entries.length,
skills: entries,
cacheAgeMs: Date.now() - status.index.fetchedAt,
refreshing: status.status === 'stale' ? status.refreshing : false,
};
}
if (!input.id) {
return { success: false, error: 'id is required for load.' };
}
const result = await readBrowserSkillContent(input.id);
if (!result.ok) {
return { success: false, error: result.error };
}
return {
success: true,
id: result.entry.id,
title: result.entry.title,
site: result.entry.site,
path: result.entry.path,
content: result.content,
};
} catch (err) {
return { success: false, error: err instanceof Error ? err.message : 'Failed to load browser skill.' };
}
},
},
// ============================================================================
// Browser Control
// ============================================================================
@ -1514,4 +1593,44 @@ export const BuiltinTools: z.infer<typeof BuiltinToolsSchema> = {
}
},
},
'notify-user': {
description: "Show a native OS notification to the user. Clicking the notification opens the provided link in the default browser, or focuses the Rowboat app if no link is given.",
inputSchema: z.object({
title: z.string().min(1).max(120).optional().describe("Bold headline shown at the top of the notification. Defaults to 'Rowboat'."),
message: z.string().min(1).describe("Body text of the notification."),
link: z.string().url().refine((v) => /^(https?|rowboat):\/\//i.test(v), {
message: "link must be an http(s):// or rowboat:// URL",
}).optional().describe("Optional URL opened when the user clicks the notification. Accepts http(s):// (opens in browser) or rowboat:// (opens a view inside Rowboat — see the notify-user skill for deep-link shapes)."),
actionLabel: z.string().min(1).max(20).optional().describe("Optional label for an inline action button on the notification (e.g. 'Open', 'View', 'Take Notes'). Only shown when `link` is set. Click on the button triggers the same action as clicking the notification body."),
secondaryActions: z.array(z.object({
label: z.string().min(1).max(30),
link: z.string().url().refine((v) => /^(https?|rowboat):\/\//i.test(v), {
message: "secondary action link must be an http(s):// or rowboat:// URL",
}),
})).max(4).optional().describe("Additional action buttons. macOS shows them in the chevron menu next to the primary button (or all inline in Alert style). Each has its own label and link — clicking the button triggers that link, independent of the primary `link`."),
}),
isAvailable: async () => {
try {
return container.resolve<INotificationService>('notificationService').isSupported();
} catch {
return false;
}
},
execute: async ({ title, message, link, actionLabel, secondaryActions }: { title?: string; message: string; link?: string; actionLabel?: string; secondaryActions?: Array<{ label: string; link: string }> }) => {
try {
const service = container.resolve<INotificationService>('notificationService');
if (!service.isSupported()) {
return { success: false, error: 'Notifications are not supported on this system' };
}
service.notify({ title, message, link, actionLabel, secondaryActions });
return { success: true };
} catch (error) {
return {
success: false,
error: error instanceof Error ? error.message : 'Unknown error',
};
}
},
},
};

View file

@ -8,7 +8,6 @@ const execPromise = promisify(exec);
const COMMAND_SPLIT_REGEX = /(?:\|\||&&|;|\||\n|`|\$\(|\(|\))/;
const ENV_ASSIGNMENT_REGEX = /^[A-Za-z_][A-Za-z0-9_]*=.*/;
const WRAPPER_COMMANDS = new Set(['sudo', 'env', 'time', 'command']);
const EXECUTION_SHELL = getExecutionShell();
function sanitizeToken(token: string): string {
return token.trim().replace(/^['"()]+|['"()]+$/g, '');
@ -84,11 +83,12 @@ export async function executeCommand(
}
): Promise<CommandResult> {
try {
const shell = getExecutionShell();
const { stdout, stderr } = await execPromise(command, {
cwd: options?.cwd,
timeout: options?.timeout,
maxBuffer: options?.maxBuffer || 1024 * 1024, // default 1MB
shell: EXECUTION_SHELL,
shell,
});
return {
@ -161,8 +161,9 @@ export function executeCommandAbortable(
};
}
const shell = getExecutionShell();
const proc = spawn(command, [], {
shell: EXECUTION_SHELL,
shell,
cwd: options?.cwd,
detached: process.platform !== 'win32', // Create process group on Unix
stdio: ['ignore', 'pipe', 'pipe'],
@ -272,11 +273,12 @@ export function executeCommandSync(
}
): CommandResult {
try {
const shell = getExecutionShell();
const stdout = execSync(command, {
cwd: options?.cwd,
timeout: options?.timeout,
encoding: 'utf-8',
shell: EXECUTION_SHELL,
shell,
});
return {

View file

@ -0,0 +1,12 @@
export interface NotifyInput {
title?: string;
message: string;
link?: string;
actionLabel?: string;
secondaryActions?: Array<{ label: string; link: string }>;
}
export interface INotificationService {
isSupported(): boolean;
notify(input: NotifyInput): void;
}

View file

@ -0,0 +1,113 @@
import { API_URL } from "../config/env.js";
import { getAccessToken } from "./tokens.js";
import { OAuthTokens } from "./types.js";
/**
* Client for the rowboat-mode Google OAuth endpoints on the api:
* POST /v1/google-oauth/claim one-shot retrieval of tokens parked by
* the webapp callback under a `state` ticket
* POST /v1/google-oauth/refresh exchange a refresh_token for fresh tokens
* (the secret-requiring step that can't
* happen on the desktop)
*
* Both are called with the user's Rowboat Supabase bearer (via getAccessToken).
*
* The api response shape uses `scope: string` (space-delimited); we convert
* to the desktop's `scopes: string[]`. On refresh, api may omit `scope` and
* `refresh_token` caller-provided existingScopes / refreshToken are
* preserved in those cases (Google rarely rotates refresh tokens).
*/
/** Thrown when the api signals the user must reconnect (Google `invalid_grant`). */
export class ReconnectRequiredError extends Error {
constructor(message: string) {
super(message);
this.name = "ReconnectRequiredError";
}
}
interface ApiTokenResponse {
access_token: string;
refresh_token?: string;
expires_at: number;
scope?: string;
token_type?: string;
}
function toOAuthTokens(
body: ApiTokenResponse,
fallbackRefreshToken: string | null = null,
fallbackScopes?: string[],
): OAuthTokens {
const refresh_token = body.refresh_token ?? fallbackRefreshToken;
const scopes = body.scope
? body.scope.split(" ").filter((s) => s.length > 0)
: fallbackScopes;
return {
access_token: body.access_token,
refresh_token,
expires_at: body.expires_at,
token_type: "Bearer",
scopes,
};
}
async function postWithBearer(path: string, body: unknown): Promise<Response> {
const bearer = await getAccessToken();
return fetch(`${API_URL}${path}`, {
method: "POST",
headers: {
"content-type": "application/json",
authorization: `Bearer ${bearer}`,
},
body: JSON.stringify(body),
});
}
interface ErrorBody {
error?: string;
reconnectRequired?: boolean;
}
async function readError(res: Response): Promise<ErrorBody> {
try {
return (await res.json()) as ErrorBody;
} catch {
return {};
}
}
/** Claim the tokens parked under `state` after the webapp finished its callback. */
export async function claimTokensViaBackend(state: string): Promise<OAuthTokens> {
const res = await postWithBearer("/v1/google-oauth/claim", { session: state });
if (!res.ok) {
const err = await readError(res);
throw new Error(`claim failed: ${res.status} ${err.error ?? ""}`.trim());
}
const body = (await res.json()) as ApiTokenResponse;
return toOAuthTokens(body);
}
/**
* Refresh an access token via the api. Preserves caller's `refreshToken` and
* `existingScopes` when Google omits them on the refresh response.
*/
export async function refreshTokensViaBackend(
refreshToken: string,
existingScopes?: string[],
): Promise<OAuthTokens> {
const res = await postWithBearer("/v1/google-oauth/refresh", { refreshToken });
if (res.status === 409) {
const err = await readError(res);
if (err.reconnectRequired) {
throw new ReconnectRequiredError(err.error ?? "Reconnect required");
}
throw new Error(`refresh failed: 409 ${err.error ?? ""}`.trim());
}
if (!res.ok) {
const err = await readError(res);
throw new Error(`refresh failed: ${res.status} ${err.error ?? ""}`.trim());
}
const body = (await res.json()) as ApiTokenResponse;
return toOAuthTokens(body, refreshToken, existingScopes);
}

View file

@ -8,6 +8,13 @@ const ProviderConnectionSchema = z.object({
tokens: OAuthTokens.nullable().optional(),
clientId: z.string().nullable().optional(),
clientSecret: z.string().nullable().optional(),
/**
* `byok` (default for absent) user provides their own client_id+secret;
* tokens stored locally; refresh handled locally via openid-client.
* `rowboat` signed-in user; client_id+secret never on the desktop;
* tokens stored locally but refresh goes through the api.
*/
mode: z.enum(['byok', 'rowboat']).optional(),
error: z.string().nullable().optional(),
});

View file

@ -49,8 +49,6 @@ async function getAuthHeaders(): Promise<Record<string, string>> {
*/
const ZComposioConfig = z.object({
apiKey: z.string().optional(),
use_composio_for_google: z.boolean().optional(),
use_composio_for_google_calendar: z.boolean().optional(),
});
type ComposioConfig = z.infer<typeof ZComposioConfig>;
@ -106,24 +104,6 @@ export async function isConfigured(): Promise<boolean> {
return !!getApiKey();
}
/**
* Check if Composio should be used for Google services (Gmail, etc.)
*/
export async function useComposioForGoogle(): Promise<boolean> {
if (await isSignedIn()) return true;
const config = loadConfig();
return config.use_composio_for_google === true;
}
/**
* Check if Composio should be used for Google Calendar
*/
export async function useComposioForGoogleCalendar(): Promise<boolean> {
if (await isSignedIn()) return true;
const config = loadConfig();
return config.use_composio_for_google_calendar === true;
}
/**
* Make an API call to Composio
*/

View file

@ -1,2 +1,2 @@
export const API_URL =
process.env.API_URL || 'https://api.x.rowboatlabs.com';
process.env.API_URL || 'https://api.x.rowboatlabs.com';

View file

@ -0,0 +1,51 @@
import { API_URL } from "./env.js";
/**
* Per-process cache of the unauthenticated `GET /v1/config` response from
* the api. The api returns `{ appUrl, supabaseUrl, websocketApiUrl }`
* we use this to discover the webapp host (where the rowboat-mode OAuth
* flow runs) without hardcoding it on the desktop side.
*
* Cached as a Promise so concurrent first-callers all await the same fetch
* (no thundering herd). On failure the cache is cleared so the next call
* can retry.
*/
interface RemoteConfig {
appUrl: string;
supabaseUrl: string;
websocketApiUrl: string;
}
let _cached: Promise<RemoteConfig> | null = null;
async function fetchRemoteConfig(): Promise<RemoteConfig> {
const res = await fetch(`${API_URL}/v1/config`);
if (!res.ok) {
throw new Error(`/v1/config returned ${res.status}`);
}
const body = (await res.json()) as Partial<RemoteConfig>;
if (!body.appUrl) {
throw new Error("/v1/config response missing appUrl");
}
return {
appUrl: body.appUrl,
supabaseUrl: body.supabaseUrl ?? "",
websocketApiUrl: body.websocketApiUrl ?? "",
};
}
export async function getRemoteConfig(): Promise<RemoteConfig> {
if (!_cached) {
_cached = fetchRemoteConfig().catch((err) => {
_cached = null; // allow retry
throw err;
});
}
return _cached;
}
export async function getWebappUrl(): Promise<string> {
const config = await getRemoteConfig();
return config.appUrl;
}

View file

@ -16,6 +16,7 @@ import { FSAgentScheduleRepo, IAgentScheduleRepo } from "../agent-schedule/repo.
import { FSAgentScheduleStateRepo, IAgentScheduleStateRepo } from "../agent-schedule/state-repo.js";
import { FSSlackConfigRepo, ISlackConfigRepo } from "../slack/repo.js";
import type { IBrowserControlService } from "../application/browser-control/service.js";
import type { INotificationService } from "../application/notification/service.js";
const container = createContainer({
injectionMode: InjectionMode.PROXY,
@ -49,3 +50,9 @@ export function registerBrowserControlService(service: IBrowserControlService):
browserControlService: asValue(service),
});
}
export function registerNotificationService(service: INotificationService): void {
container.register({
notificationService: asValue(service),
});
}

View file

@ -4,12 +4,10 @@ import { google } from 'googleapis';
import { WorkDir } from '../config/config.js';
import { createRun, createMessage } from '../runs/runs.js';
import { getKgModel } from '../models/defaults.js';
import { waitForRunCompletion } from '../agents/utils.js';
import { getErrorDetails, waitForRunCompletion } from '../agents/utils.js';
import { serviceLogger } from '../services/service_logger.js';
import { loadUserConfig, updateUserEmail } from '../config/user_config.js';
import { GoogleClientFactory } from './google-client-factory.js';
import { useComposioForGoogle, executeAction } from '../composio/client.js';
import { composioAccountsRepo } from '../composio/repo.js';
import {
loadAgentNotesState,
saveAgentNotesState,
@ -199,30 +197,7 @@ async function ensureUserEmail(): Promise<string | null> {
return existing.email;
}
// Try Composio (used when signed in or composio configured)
try {
if (await useComposioForGoogle()) {
const account = composioAccountsRepo.getAccount('gmail');
if (account && account.status === 'ACTIVE') {
const result = await executeAction('GMAIL_GET_PROFILE', {
connected_account_id: account.id,
user_id: 'rowboat-user',
version: 'latest',
arguments: { user_id: 'me' },
});
const email = (result.data as Record<string, unknown>)?.emailAddress as string | undefined;
if (email) {
updateUserEmail(email);
console.log(`[AgentNotes] Auto-populated user email via Composio: ${email}`);
return email;
}
}
}
} catch (error) {
console.log('[AgentNotes] Could not fetch email via Composio:', error instanceof Error ? error.message : error);
}
// Try direct Google OAuth
// Try direct Google OAuth (covers both BYOK and rowboat modes)
try {
const auth = await GoogleClientFactory.getClient();
if (auth) {
@ -306,9 +281,14 @@ async function processAgentNotes(): Promise<void> {
const timestamp = new Date().toISOString();
const message = `Current timestamp: ${timestamp}\n\nProcess the following source material and update the Agent Notes folder accordingly.\n\n${messageParts.join('\n\n')}`;
const agentRun = await createRun({ agentId: AGENT_ID, model: await getKgModel() });
const agentRun = await createRun({
agentId: AGENT_ID,
model: await getKgModel(),
useCase: 'knowledge_sync',
subUseCase: 'agent_notes',
});
await createMessage(agentRun.id, message);
await waitForRunCompletion(agentRun.id);
await waitForRunCompletion(agentRun.id, { throwOnError: true });
// Mark everything as processed
for (const p of emailPaths) {
@ -346,7 +326,16 @@ async function processAgentNotes(): Promise<void> {
runId: serviceRun.runId,
level: 'error',
message: 'Error processing agent notes',
error: error instanceof Error ? error.message : String(error),
error: getErrorDetails(error),
});
await serviceLogger.log({
type: 'run_complete',
service: serviceRun.service,
runId: serviceRun.runId,
level: 'error',
message: 'Agent notes processing failed',
durationMs: Date.now() - serviceRun.startedAt,
outcome: 'error',
});
}
}

View file

@ -3,7 +3,7 @@ import path from 'path';
import { WorkDir } from '../config/config.js';
import { createRun, createMessage } from '../runs/runs.js';
import { bus } from '../runs/bus.js';
import { waitForRunCompletion } from '../agents/utils.js';
import { getErrorDetails, waitForRunCompletion } from '../agents/utils.js';
import { serviceLogger, type ServiceRunContext } from '../services/service_logger.js';
import {
loadState,
@ -252,6 +252,8 @@ async function createNotesFromBatch(
// Create a run for the note creation agent
const run = await createRun({
agentId: NOTE_CREATION_AGENT,
useCase: 'knowledge_sync',
subUseCase: 'build_graph',
});
const suggestedTopicsContent = readSuggestedTopicsFile();
@ -310,8 +312,11 @@ async function createNotesFromBatch(
await createMessage(run.id, message);
// Wait for the run to complete
await waitForRunCompletion(run.id);
unsubscribe();
try {
await waitForRunCompletion(run.id, { throwOnError: true });
} finally {
unsubscribe();
}
return { runId: run.id, notesCreated, notesModified };
}
@ -426,7 +431,7 @@ async function buildGraphWithFiles(
runId: run.runId,
level: 'error',
message: `Error processing batch ${batchNumber}`,
error: error instanceof Error ? error.message : String(error),
error: getErrorDetails(error),
context: { batchNumber },
});
}
@ -598,7 +603,7 @@ async function processVoiceMemosForKnowledge(): Promise<boolean> {
runId: run.runId,
level: 'error',
message: `Error processing voice memo batch ${batchNumber}`,
error: error instanceof Error ? error.message : String(error),
error: getErrorDetails(error),
context: { batchNumber },
});
}

View file

@ -6,20 +6,44 @@ import { getProviderConfig } from '../auth/providers.js';
import * as oauthClient from '../auth/oauth-client.js';
import type { Configuration } from '../auth/oauth-client.js';
import { OAuthTokens } from '../auth/types.js';
import {
ReconnectRequiredError,
refreshTokensViaBackend,
} from '../auth/google-backend-oauth.js';
type Mode = 'byok' | 'rowboat';
/**
* Factory for creating and managing Google OAuth2Client instances.
* Handles caching, token refresh, and client reuse for Google API SDKs.
*
* Two connection modes share the same `oauth.json` provider entry:
* - `byok` user supplied client_id+secret; refresh runs locally via
* openid-client; OAuth2Client built with creds.
* - `rowboat` signed-in user; client_id+secret never on the desktop;
* refresh goes through the api at /v1/google-oauth/refresh;
* OAuth2Client built without creds and without refresh_token
* (we own all refreshes see note below).
*
* **Auto-refresh disabled in rowboat mode:** google-auth-library's
* OAuth2Client will, on a 401 from a Google API call, try to refresh using
* the refresh_token + client secret it has on hand. In rowboat mode we have
* no secret, so that would 401-loop. We block this by passing only
* access_token + expiry_date in setCredentials (no refresh_token), which
* leaves the library nothing to refresh with. Our proactive expiry check
* in getClient() is the only refresh path.
*/
export class GoogleClientFactory {
private static readonly PROVIDER_NAME = 'google';
private static cache: {
mode: Mode | null;
config: Configuration | null;
client: OAuth2Client | null;
tokens: OAuthTokens | null;
clientId: string | null;
clientSecret: string | null;
} = {
mode: null,
config: null,
client: null,
tokens: null,
@ -27,7 +51,14 @@ export class GoogleClientFactory {
clientSecret: null,
};
private static async resolveCredentials(): Promise<{ clientId: string; clientSecret?: string }> {
/**
* Promise singleton so a burst of getClient() calls during the brief
* expiry window all wait on a single refresh round-trip rather than
* fanning out parallel refreshes.
*/
private static refreshInFlight: Promise<OAuth2Client | null> | null = null;
private static async resolveByokCredentials(): Promise<{ clientId: string; clientSecret?: string }> {
const oauthRepo = container.resolve<IOAuthRepo>('oauthRepo');
const connection = await oauthRepo.read(this.PROVIDER_NAME);
if (!connection.clientId) {
@ -41,80 +72,116 @@ export class GoogleClientFactory {
* Get or create OAuth2Client, reusing cached instance when possible
*/
static async getClient(): Promise<OAuth2Client | null> {
if (this.refreshInFlight) {
return this.refreshInFlight;
}
const oauthRepo = container.resolve<IOAuthRepo>('oauthRepo');
const { tokens } = await oauthRepo.read(this.PROVIDER_NAME);
const connection = await oauthRepo.read(this.PROVIDER_NAME);
const tokens = connection.tokens ?? null;
const mode: Mode = connection.mode ?? 'byok';
if (!tokens) {
this.clearCache();
return null;
}
// Initialize config cache if needed
try {
await this.initializeConfigCache();
} catch (error) {
console.error("[OAuth] Failed to initialize Google OAuth configuration:", error);
// Mode flipped (e.g. user disconnected then reconnected differently) — invalidate.
if (this.cache.mode && this.cache.mode !== mode) {
this.clearCache();
return null;
}
if (!this.cache.config) {
return null;
}
// Check if token is expired
// BYOK needs an openid-client Configuration for local refresh; rowboat doesn't.
if (mode === 'byok') {
try {
await this.initializeConfigCache();
} catch (error) {
console.error('[OAuth] Failed to initialize Google OAuth configuration:', error);
this.clearCache();
return null;
}
if (!this.cache.config) {
return null;
}
}
// Check expiry against the cached tokens. Note: oauthClient.isTokenExpired
// applies a small clock-skew margin so we refresh slightly before real
// expiry — keeps long-running calls from racing the boundary.
if (oauthClient.isTokenExpired(tokens)) {
// Token expired, try to refresh
if (!tokens.refresh_token) {
console.log("[OAuth] Token expired and no refresh token available for Google.");
console.log('[OAuth] Token expired and no refresh token available for Google.');
await oauthRepo.upsert(this.PROVIDER_NAME, { error: 'Missing refresh token. Please reconnect.' });
this.clearCache();
return null;
}
try {
console.log(`[OAuth] Token expired, refreshing access token...`);
const existingScopes = tokens.scopes;
const refreshedTokens = await oauthClient.refreshTokens(
this.cache.config,
tokens.refresh_token,
existingScopes
);
await oauthRepo.upsert(this.PROVIDER_NAME, { tokens: refreshedTokens });
// Update cached tokens and recreate client
this.cache.tokens = refreshedTokens;
if (!this.cache.clientId) {
const creds = await this.resolveCredentials();
this.cache.clientId = creds.clientId;
this.cache.clientSecret = creds.clientSecret ?? null;
}
this.cache.client = this.createClientFromTokens(refreshedTokens, this.cache.clientId, this.cache.clientSecret ?? undefined);
console.log(`[OAuth] Token refreshed successfully`);
return this.cache.client;
} catch (error) {
const message = error instanceof Error ? error.message : 'Failed to refresh token for Google';
await oauthRepo.upsert(this.PROVIDER_NAME, { error: message });
console.error("[OAuth] Failed to refresh token for Google:", error);
this.clearCache();
return null;
}
this.refreshInFlight = this.refreshAndBuild(tokens, mode).finally(() => {
this.refreshInFlight = null;
});
return this.refreshInFlight;
}
// Reuse client if tokens haven't changed
if (this.cache.client && this.cache.tokens && this.cache.tokens.access_token === tokens.access_token) {
if (this.cache.client && this.cache.tokens && this.cache.tokens.access_token === tokens.access_token && this.cache.mode === mode) {
return this.cache.client;
}
// Create new client with current tokens
console.log(`[OAuth] Creating new OAuth2Client instance`);
this.cache.tokens = tokens;
if (!this.cache.clientId) {
const creds = await this.resolveCredentials();
// Build a fresh client for current tokens
return this.buildAndCacheClient(tokens, mode);
}
private static async refreshAndBuild(tokens: OAuthTokens, mode: Mode): Promise<OAuth2Client | null> {
const oauthRepo = container.resolve<IOAuthRepo>('oauthRepo');
try {
console.log(`[OAuth] Token expired, refreshing via ${mode}...`);
const existingScopes = tokens.scopes;
let refreshedTokens: OAuthTokens;
if (mode === 'rowboat') {
refreshedTokens = await refreshTokensViaBackend(tokens.refresh_token!, existingScopes);
} else {
if (!this.cache.config) {
// Should not happen — initializeConfigCache ran above for byok.
throw new Error('Google OAuth config not initialized');
}
refreshedTokens = await oauthClient.refreshTokens(this.cache.config, tokens.refresh_token!, existingScopes);
}
await oauthRepo.upsert(this.PROVIDER_NAME, { tokens: refreshedTokens, error: null });
console.log('[OAuth] Token refreshed successfully');
return this.buildAndCacheClient(refreshedTokens, mode);
} catch (error) {
if (error instanceof ReconnectRequiredError) {
console.log('[OAuth] Reconnect required for Google');
await oauthRepo.upsert(this.PROVIDER_NAME, { error: 'Reconnect Google' });
this.clearCache();
return null;
}
const message = error instanceof Error ? error.message : 'Failed to refresh token for Google';
await oauthRepo.upsert(this.PROVIDER_NAME, { error: message });
console.error('[OAuth] Failed to refresh token for Google:', error);
this.clearCache();
return null;
}
}
private static async buildAndCacheClient(tokens: OAuthTokens, mode: Mode): Promise<OAuth2Client> {
if (mode === 'byok' && !this.cache.clientId) {
const creds = await this.resolveByokCredentials();
this.cache.clientId = creds.clientId;
this.cache.clientSecret = creds.clientSecret ?? null;
}
this.cache.client = this.createClientFromTokens(tokens, this.cache.clientId, this.cache.clientSecret ?? undefined);
return this.cache.client;
const client = mode === 'rowboat'
? this.createRowboatClient(tokens)
: this.createByokClient(tokens, this.cache.clientId!, this.cache.clientSecret ?? undefined);
this.cache.mode = mode;
this.cache.tokens = tokens;
this.cache.client = client;
return client;
}
/**
@ -139,7 +206,8 @@ export class GoogleClientFactory {
* Clear cache (useful for testing or when credentials are revoked)
*/
static clearCache(): void {
console.log(`[OAuth] Clearing Google auth cache`);
console.log('[OAuth] Clearing Google auth cache');
this.cache.mode = null;
this.cache.config = null;
this.cache.client = null;
this.cache.tokens = null;
@ -148,10 +216,10 @@ export class GoogleClientFactory {
}
/**
* Initialize cached configuration (called once)
* Initialize cached configuration for BYOK mode (rowboat doesn't need it).
*/
private static async initializeConfigCache(): Promise<void> {
const { clientId, clientSecret } = await this.resolveCredentials();
const { clientId, clientSecret } = await this.resolveByokCredentials();
if (this.cache.config && this.cache.clientId === clientId && this.cache.clientSecret === (clientSecret ?? null)) {
return; // Already initialized for these credentials
@ -161,13 +229,13 @@ export class GoogleClientFactory {
this.clearCache();
}
console.log(`[OAuth] Initializing Google OAuth configuration...`);
console.log('[OAuth] Initializing Google OAuth configuration...');
const providerConfig = await getProviderConfig(this.PROVIDER_NAME);
if (providerConfig.discovery.mode === 'issuer') {
if (providerConfig.client.mode === 'static') {
// Discover endpoints, use static client ID
console.log(`[OAuth] Discovery mode: issuer with static client ID`);
console.log('[OAuth] Discovery mode: issuer with static client ID');
this.cache.config = await oauthClient.discoverConfiguration(
providerConfig.discovery.issuer,
clientId,
@ -175,7 +243,7 @@ export class GoogleClientFactory {
);
} else {
// DCR mode - need existing registration
console.log(`[OAuth] Discovery mode: issuer with DCR`);
console.log('[OAuth] Discovery mode: issuer with DCR');
const clientRepo = container.resolve<IClientRegistrationRepo>('clientRegistrationRepo');
const existingRegistration = await clientRepo.getClientRegistration(this.PROVIDER_NAME);
@ -194,7 +262,7 @@ export class GoogleClientFactory {
throw new Error('DCR requires discovery mode "issuer", not "static"');
}
console.log(`[OAuth] Using static endpoints (no discovery)`);
console.log('[OAuth] Using static endpoints (no discovery)');
this.cache.config = oauthClient.createStaticConfiguration(
providerConfig.discovery.authorizationEndpoint,
providerConfig.discovery.tokenEndpoint,
@ -206,27 +274,33 @@ export class GoogleClientFactory {
this.cache.clientId = clientId;
this.cache.clientSecret = clientSecret ?? null;
console.log(`[OAuth] Google OAuth configuration initialized`);
console.log('[OAuth] Google OAuth configuration initialized');
}
/**
* Create OAuth2Client from OAuthTokens
*/
private static createClientFromTokens(tokens: OAuthTokens, clientId: string, clientSecret?: string): OAuth2Client {
const client = new OAuth2Client(
clientId,
clientSecret ?? undefined,
undefined // redirect_uri not needed for token usage
);
// Set credentials
/** BYOK OAuth2Client — has client_id + secret + refresh_token. */
private static createByokClient(tokens: OAuthTokens, clientId: string, clientSecret?: string): OAuth2Client {
const client = new OAuth2Client(clientId, clientSecret ?? undefined, undefined);
client.setCredentials({
access_token: tokens.access_token,
refresh_token: tokens.refresh_token || undefined,
expiry_date: tokens.expires_at * 1000, // Convert from seconds to milliseconds
expiry_date: tokens.expires_at * 1000,
scope: tokens.scopes?.join(' ') || undefined,
});
return client;
}
/**
* Rowboat OAuth2Client no client_id/secret, no refresh_token.
* Library auto-refresh is disabled by absence of refresh_token; our
* proactive refresh in getClient() is the only refresh path.
*/
private static createRowboatClient(tokens: OAuthTokens): OAuth2Client {
const client = new OAuth2Client();
client.setCredentials({
access_token: tokens.access_token,
expiry_date: tokens.expires_at * 1000,
scope: tokens.scopes?.join(' ') || undefined,
});
return client;
}
}

View file

@ -10,6 +10,7 @@ import type { IModelConfigRepo } from '../models/repo.js';
import { createProvider } from '../models/models.js';
import { inlineTask } from '@x/shared';
import { extractAgentResponse, waitForRunCompletion } from '../agents/utils.js';
import { captureLlmUsage } from '../analytics/usage.js';
const SYNC_INTERVAL_MS = 15 * 1000; // 15 seconds
const INLINE_TASK_AGENT = 'inline_task_agent';
@ -468,7 +469,12 @@ async function processInlineTasks(): Promise<void> {
console.log(`[InlineTasks] Running task: "${task.instruction.slice(0, 80)}..."`);
try {
const run = await createRun({ agentId: INLINE_TASK_AGENT, model: await getKgModel() });
const run = await createRun({
agentId: INLINE_TASK_AGENT,
model: await getKgModel(),
useCase: 'knowledge_sync',
subUseCase: 'inline_task_run',
});
const message = [
`Execute the following instruction from the note "${relativePath}":`,
@ -548,7 +554,12 @@ export async function processRowboatInstruction(
scheduleLabel: string | null;
response: string | null;
}> {
const run = await createRun({ agentId: INLINE_TASK_AGENT, model: await getKgModel() });
const run = await createRun({
agentId: INLINE_TASK_AGENT,
model: await getKgModel(),
useCase: 'knowledge_sync',
subUseCase: 'inline_task_run',
});
const message = [
`Process the following @rowboat instruction from the note "${notePath}":`,
@ -659,6 +670,14 @@ Respond with ONLY valid JSON: either a schedule object or null. No other text.`;
prompt: instruction,
});
captureLlmUsage({
useCase: 'knowledge_sync',
subUseCase: 'inline_task_classify',
model: config.model,
provider: config.provider.flavor,
usage: result.usage,
});
let text = result.text.trim();
console.log('[classifySchedule] LLM response:', text);
// Strip markdown code fences if the LLM wraps the JSON

View file

@ -4,7 +4,7 @@ import { WorkDir } from '../config/config.js';
import { createRun, createMessage } from '../runs/runs.js';
import { getKgModel } from '../models/defaults.js';
import { bus } from '../runs/bus.js';
import { waitForRunCompletion } from '../agents/utils.js';
import { getErrorDetails, waitForRunCompletion } from '../agents/utils.js';
import { serviceLogger } from '../services/service_logger.js';
import { limitEventItems } from './limit_event_items.js';
import {
@ -73,6 +73,8 @@ async function labelEmailBatch(
const run = await createRun({
agentId: LABELING_AGENT,
model: await getKgModel(),
useCase: 'knowledge_sync',
subUseCase: 'label_emails',
});
let message = `Label the following ${files.length} email files by prepending YAML frontmatter.\n\n`;
@ -110,8 +112,11 @@ async function labelEmailBatch(
});
await createMessage(run.id, message);
await waitForRunCompletion(run.id);
unsubscribe();
try {
await waitForRunCompletion(run.id, { throwOnError: true });
} finally {
unsubscribe();
}
return { runId: run.id, filesEdited };
}
@ -173,6 +178,7 @@ export async function processUnlabeledEmails(concurrency: number = DEFAULT_CONCU
const totalBatches = batches.length;
let totalEdited = 0;
let hadError = false;
let failedBatches = 0;
// Process batches with concurrency limit
for (let i = 0; i < batches.length; i += concurrency) {
@ -207,14 +213,16 @@ export async function processUnlabeledEmails(concurrency: number = DEFAULT_CONCU
return result.filesEdited.size;
} catch (error) {
hadError = true;
failedBatches++;
const errorDetails = getErrorDetails(error);
console.error(`[EmailLabeling] Error processing batch ${batchNumber}:`, error);
await serviceLogger.log({
type: 'error',
service: run.service,
runId: run.runId,
level: 'error',
message: `Error processing batch ${batchNumber}`,
error: error instanceof Error ? error.message : String(error),
message: `Email labeling batch ${batchNumber}/${totalBatches} failed`,
error: errorDetails,
context: { batchNumber },
});
return 0;
@ -236,12 +244,15 @@ export async function processUnlabeledEmails(concurrency: number = DEFAULT_CONCU
service: run.service,
runId: run.runId,
level: hadError ? 'error' : 'info',
message: `Email labeling complete: ${totalEdited} files labeled`,
message: hadError
? `Email labeling finished with errors: ${totalEdited} files labeled`
: `Email labeling complete: ${totalEdited} files labeled`,
durationMs: Date.now() - run.startedAt,
outcome: hadError ? 'error' : 'ok',
summary: {
totalEmails: unlabeled.length,
filesLabeled: totalEdited,
failedBatches,
},
});

View file

@ -807,6 +807,43 @@ The summary should answer: **"Who is this person and why do I know them?"**
**Focus on the relationship, not the communication method.**
## Knowing Vs Meeting
Distinguish between **knowing someone** and **having met or heard from them once**.
- Use **"I know X through Y"** only when there is an actual ongoing relationship
- In that construction, **Y** should be a person, organization, or recurring context such as YC, an investor relationship, a customer relationship, or an ongoing project
- For one-off encounters, use **"I met X at/on/during..."** or lead with what they did, such as **"X reached out about..."**, **"X joined..."**, or **"X was part of..."**
- Do **not** use **"I know X through [an event]"** when the thing is a specific meeting, dinner, conference, demo day, call, or other one-off event
- Events are **when or where I met someone**, not **how I know them**
- If the source only shows a single meeting, a single inbound email, or a one-time introduction, do not imply an ongoing relationship unless the broader context clearly supports it
Examples:
- Incorrect: \`I know him through a YC dinner.\`
- Correct: \`I met him at a YC dinner.\`
- Incorrect: \`I know her through a call about pricing.\`
- Correct: \`She reached out about pricing.\`
- Correct: \`I know her through YC and ongoing investor conversations.\`
## Perspective And Self-Reference
These knowledge notes are written from the **user's first-person perspective**.
- When the user's identity is known, **"I / me / my" refer to the user**
- When the company or team is the actor, use **"we / us / our"** when natural
- Name other participants normally
- **Do not refer to the user by name, email, or in third person inside first-person narration**
- Do not write broken combinations like **"I know him ... that met with Arjun"** when Arjun is the user
- Apply this consistently across **all note types and sections**: summaries, activity entries, timelines, decisions, open items, and any narrative prose
Examples:
- Incorrect: \`I know him as part of the Standard Capital team that met with Arjun and Ramnique.\`
- Correct: \`I know him as part of the Standard Capital team that met with me and Ramnique.\`
- Incorrect: \`Arjun discussed pricing with [[People/Sarah Chen]].\`
- Correct: \`I discussed pricing with [[People/Sarah Chen]].\`
## Activity Summary
One line summarizing this source's relevance to the entity:

View file

@ -26,7 +26,7 @@ const DEFAULT_NOTE_TYPE_DEFINITIONS: NoteTypeDefinition[] = [
**Last update:** {YYYY-MM-DD}
## Summary
{2-3 sentences: Who they are, why you know them, what you're working on together.}
{2-3 sentences: Who they are, whether I know them through an ongoing relationship or met them in a specific encounter, and what we're discussing or working on together if applicable.}
## Connected to
- [[Organizations/{Organization}]] works at
@ -59,7 +59,7 @@ const DEFAULT_NOTE_TYPE_DEFINITIONS: NoteTypeDefinition[] = [
**Last update:** {YYYY-MM-DD}
## Summary
{2-3 sentences: What this org is, what your relationship is.}
{2-3 sentences: What this org is, how I know or work with them.}
## People
- [[People/{Person}]] {role}
@ -93,7 +93,7 @@ const DEFAULT_NOTE_TYPE_DEFINITIONS: NoteTypeDefinition[] = [
**Last update:** {YYYY-MM-DD}
## Summary
{2-3 sentences: What this project is, goal, current state.}
{2-3 sentences: What this project is, the goal, current state, and my/our involvement where relevant.}
## People
- [[People/{Person}]] {role}

View file

@ -0,0 +1,180 @@
import path from "node:path";
import fs from "node:fs/promises";
import type { Dirent } from "node:fs";
import { WorkDir } from "../config/config.js";
import container from "../di/container.js";
import type { INotificationService } from "../application/notification/service.js";
const TICK_INTERVAL_MS = 30_000;
// Notify when an event is between 30s in the past (started just now) and
// 90s in the future (about to start). The window is wider than 60s so we
// don't miss an event if the tick lands slightly off the start time.
const NOTIFY_LEAD_MS = 90_000;
const NOTIFY_GRACE_MS = 30_000;
// Drop state entries older than 24h so the file doesn't grow forever.
const STATE_TTL_MS = 24 * 60 * 60 * 1000;
const CALENDAR_SYNC_DIR = path.join(WorkDir, "calendar_sync");
const STATE_FILE = path.join(WorkDir, "calendar_notifications_state.json");
interface NotificationState {
notifiedEventIds: Record<string, { notifiedAt: string; startTime: string }>;
}
interface CalendarEvent {
id?: string;
summary?: string;
status?: string;
start?: { dateTime?: string; date?: string; timeZone?: string };
end?: { dateTime?: string; date?: string };
attendees?: Array<{ email?: string; self?: boolean; responseStatus?: string }>;
hangoutLink?: string;
conferenceData?: {
entryPoints?: Array<{ entryPointType?: string; uri?: string }>;
};
}
async function loadState(): Promise<NotificationState> {
try {
const raw = await fs.readFile(STATE_FILE, "utf-8");
const parsed = JSON.parse(raw);
if (parsed && typeof parsed === "object" && parsed.notifiedEventIds) {
return parsed as NotificationState;
}
} catch {
// No state file yet, or corrupt — start fresh.
}
return { notifiedEventIds: {} };
}
async function saveState(state: NotificationState): Promise<void> {
// Write to a sibling tmp file then rename so a mid-write crash can't leave
// the JSON corrupt — a corrupt state file would make every event in the
// 90s notify window re-fire on next start.
const tmp = `${STATE_FILE}.tmp`;
await fs.writeFile(tmp, JSON.stringify(state, null, 2), "utf-8");
await fs.rename(tmp, STATE_FILE);
}
function gcState(state: NotificationState): NotificationState {
const cutoff = Date.now() - STATE_TTL_MS;
const fresh: NotificationState["notifiedEventIds"] = {};
for (const [id, entry] of Object.entries(state.notifiedEventIds)) {
const ts = Date.parse(entry.notifiedAt);
if (Number.isFinite(ts) && ts >= cutoff) fresh[id] = entry;
}
return { notifiedEventIds: fresh };
}
function isAllDay(event: CalendarEvent): boolean {
// Google Calendar all-day events have `date` (YYYY-MM-DD) on start, not `dateTime`.
return !event.start?.dateTime;
}
function isDeclinedBySelf(event: CalendarEvent): boolean {
if (!event.attendees) return false;
const self = event.attendees.find((a) => a.self);
return self?.responseStatus === "declined";
}
async function tick(state: NotificationState): Promise<{ state: NotificationState; dirty: boolean }> {
let entries: Dirent[];
try {
entries = await fs.readdir(CALENDAR_SYNC_DIR, { withFileTypes: true });
} catch {
return { state, dirty: false };
}
let service: INotificationService;
try {
service = container.resolve<INotificationService>("notificationService");
} catch {
// Notification service not registered yet (very early startup) — skip this tick.
return { state, dirty: false };
}
if (!service.isSupported()) return { state, dirty: false };
const now = Date.now();
let dirty = false;
for (const entry of entries) {
if (!entry.isFile() || !entry.name.endsWith(".json")) continue;
if (entry.name === "sync_state.json" || entry.name.startsWith("sync_state")) continue;
const eventId = entry.name.replace(/\.json$/, "");
if (state.notifiedEventIds[eventId]) continue;
const filePath = path.join(CALENDAR_SYNC_DIR, entry.name);
let event: CalendarEvent;
try {
event = JSON.parse(await fs.readFile(filePath, "utf-8"));
} catch {
continue;
}
if (event.status === "cancelled") continue;
if (isAllDay(event)) continue;
if (isDeclinedBySelf(event)) continue;
const startStr = event.start?.dateTime;
if (!startStr) continue;
const startMs = Date.parse(startStr);
if (!Number.isFinite(startMs)) continue;
const msUntilStart = startMs - now;
if (msUntilStart > NOTIFY_LEAD_MS) continue;
if (msUntilStart < -NOTIFY_GRACE_MS) continue;
const summary = event.summary?.trim() || "Untitled meeting";
const eid = encodeURIComponent(eventId);
try {
service.notify({
title: "Upcoming meeting",
message: `${summary} starts in 1 minute. Click to join and take notes.`,
// Single labeled button — adding a secondary action would force
// macOS to bundle them into an "Options" dropdown, hiding the
// primary label.
link: `rowboat://action?type=join-and-take-meeting-notes&eventId=${eid}`,
actionLabel: "Join & Notes",
});
console.log(`[CalendarNotify] notified for "${summary}" (${eventId})`);
} catch (err) {
console.error(`[CalendarNotify] notify failed for ${eventId}:`, err);
continue;
}
state.notifiedEventIds[eventId] = {
notifiedAt: new Date().toISOString(),
startTime: startStr,
};
dirty = true;
}
return { state, dirty };
}
export async function init(): Promise<void> {
console.log("[CalendarNotify] starting calendar notification service");
console.log(`[CalendarNotify] tick every ${TICK_INTERVAL_MS / 1000}s`);
let state = gcState(await loadState());
while (true) {
try {
const result = await tick(state);
state = result.state;
if (result.dirty) {
state = gcState(state);
try {
await saveState(state);
} catch (err) {
console.error("[CalendarNotify] failed to save state:", err);
}
}
} catch (err) {
console.error("[CalendarNotify] tick failed:", err);
}
await new Promise((resolve) => setTimeout(resolve, TICK_INTERVAL_MS));
}
}

View file

@ -4,6 +4,7 @@ import { generateText } from 'ai';
import { createProvider } from '../models/models.js';
import { getDefaultModelAndProvider, getMeetingNotesModel, resolveProviderConfig } from '../models/defaults.js';
import { WorkDir } from '../config/config.js';
import { captureLlmUsage } from '../analytics/usage.js';
const CALENDAR_SYNC_DIR = path.join(WorkDir, 'calendar_sync');
@ -157,5 +158,12 @@ export async function summarizeMeeting(transcript: string, meetingStartTime?: st
prompt,
});
captureLlmUsage({
useCase: 'meeting_note',
model: modelId,
provider: providerName,
usage: result.usage,
});
return result.text.trim();
}

View file

@ -5,10 +5,8 @@ import { OAuth2Client } from 'google-auth-library';
import { NodeHtmlMarkdown } from 'node-html-markdown'
import { WorkDir } from '../config/config.js';
import { GoogleClientFactory } from './google-client-factory.js';
import { serviceLogger, type ServiceRunContext } from '../services/service_logger.js';
import { serviceLogger } from '../services/service_logger.js';
import { limitEventItems } from './limit_event_items.js';
import { executeAction, useComposioForGoogleCalendar } from '../composio/client.js';
import { composioAccountsRepo } from '../composio/repo.js';
import { createEvent } from './track/events.js';
const MAX_EVENTS_IN_DIGEST = 50;
@ -138,7 +136,6 @@ async function publishCalendarSyncEvent(
const SYNC_DIR = path.join(WorkDir, 'calendar_sync');
const SYNC_INTERVAL_MS = 5 * 60 * 1000; // Check every 5 minutes
const LOOKBACK_DAYS = 7;
const COMPOSIO_LOOKBACK_DAYS = 7;
const REQUIRED_SCOPES = [
'https://www.googleapis.com/auth/calendar.events.readonly',
'https://www.googleapis.com/auth/drive.readonly'
@ -477,286 +474,17 @@ async function performSync(syncDir: string, lookbackDays: number) {
}
}
// --- Composio-based Sync ---
interface ComposioCalendarState {
last_sync: string; // ISO string
}
function loadComposioState(stateFile: string): ComposioCalendarState | null {
if (fs.existsSync(stateFile)) {
try {
const data = JSON.parse(fs.readFileSync(stateFile, 'utf-8'));
if (data.last_sync) {
return { last_sync: data.last_sync };
}
} catch (e) {
console.error('[Calendar] Failed to load composio state:', e);
}
}
return null;
}
function saveComposioState(stateFile: string, lastSync: string): void {
fs.writeFileSync(stateFile, JSON.stringify({ last_sync: lastSync }, null, 2));
}
/**
* Save a Composio calendar event as JSON (same format used by Google OAuth path).
* The event data from Composio is already structured similarly to Google Calendar API.
*/
function saveComposioEvent(eventData: Record<string, unknown>, syncDir: string): { changed: boolean; isNew: boolean; title: string } {
const eventId = eventData.id as string | undefined;
if (!eventId) return { changed: false, isNew: false, title: 'Unknown' };
const filePath = path.join(syncDir, `${eventId}.json`);
const content = JSON.stringify(eventData, null, 2);
const exists = fs.existsSync(filePath);
try {
if (exists) {
const existing = fs.readFileSync(filePath, 'utf-8');
if (existing === content) {
return { changed: false, isNew: false, title: (eventData.summary as string) || eventId };
}
}
fs.writeFileSync(filePath, content);
return { changed: true, isNew: !exists, title: (eventData.summary as string) || eventId };
} catch (e) {
console.error(`[Calendar] Error saving event ${eventId}:`, e);
return { changed: false, isNew: false, title: (eventData.summary as string) || eventId };
}
}
async function performSyncComposio() {
const STATE_FILE = path.join(SYNC_DIR, 'composio_state.json');
if (!fs.existsSync(SYNC_DIR)) fs.mkdirSync(SYNC_DIR, { recursive: true });
const account = composioAccountsRepo.getAccount('googlecalendar');
if (!account || account.status !== 'ACTIVE') {
console.log('[Calendar] Google Calendar not connected via Composio. Skipping sync.');
return;
}
const connectedAccountId = account.id;
// Calculate time window: lookback + 14 days forward
const now = new Date();
const lookbackMs = COMPOSIO_LOOKBACK_DAYS * 24 * 60 * 60 * 1000;
const twoWeeksForwardMs = 14 * 24 * 60 * 60 * 1000;
const timeMin = new Date(now.getTime() - lookbackMs).toISOString();
const timeMax = new Date(now.getTime() + twoWeeksForwardMs).toISOString();
console.log(`[Calendar] Syncing via Composio from ${timeMin} to ${timeMax} (lookback: ${COMPOSIO_LOOKBACK_DAYS} days)...`);
let run: ServiceRunContext | null = null;
const ensureRun = async (): Promise<ServiceRunContext> => {
if (!run) {
run = await serviceLogger.startRun({
service: 'calendar',
message: 'Syncing calendar (Composio)',
trigger: 'timer',
});
}
return run;
};
try {
const currentEventIds = new Set<string>();
let newCount = 0;
let updatedCount = 0;
const changedTitles: string[] = [];
const newEvents: AnyEvent[] = [];
const updatedEvents: AnyEvent[] = [];
let pageToken: string | null = null;
const MAX_PAGES = 20;
for (let page = 0; page < MAX_PAGES; page++) {
// Re-check connection in case user disconnected mid-sync
if (!composioAccountsRepo.isConnected('googlecalendar')) {
console.log('[Calendar] Account disconnected during sync. Stopping.');
return;
}
const args: Record<string, unknown> = {
calendar_id: 'primary',
time_min: timeMin,
time_max: timeMax,
single_events: true,
order_by: 'startTime',
};
if (pageToken) {
args.page_token = pageToken;
}
const result = await executeAction(
'GOOGLECALENDAR_FIND_EVENT',
{
connected_account_id: connectedAccountId,
user_id: 'rowboat-user',
version: 'latest',
arguments: args,
}
);
if (!result.successful || !result.data) {
console.error('[Calendar] Failed to list events via Composio:', result.error);
return;
}
const data = result.data as Record<string, unknown>;
// Composio may return events in different structures
let events: Array<Record<string, unknown>> = [];
if (Array.isArray(data.items)) {
events = data.items as Array<Record<string, unknown>>;
} else if (Array.isArray(data.events)) {
events = data.events as Array<Record<string, unknown>>;
} else if (data.event_data && typeof data.event_data === 'object') {
const nested = data.event_data as Record<string, unknown>;
if (Array.isArray(nested.event_data)) {
events = nested.event_data as Array<Record<string, unknown>>;
} else if (Array.isArray(data.event_data)) {
events = data.event_data as Array<Record<string, unknown>>;
}
} else if (Array.isArray(data)) {
events = data as unknown as Array<Record<string, unknown>>;
}
if (events.length === 0 && page === 0) {
console.log('[Calendar] No events found in this window.');
} else if (events.length > 0) {
console.log(`[Calendar] Page ${page + 1}: found ${events.length} events.`);
for (const event of events) {
const eventId = event.id as string | undefined;
if (eventId) {
const saveResult = saveComposioEvent(event, SYNC_DIR);
currentEventIds.add(eventId);
if (saveResult.changed) {
await ensureRun();
changedTitles.push(saveResult.title);
if (saveResult.isNew) {
newCount++;
newEvents.push(event);
} else {
updatedCount++;
updatedEvents.push(event);
}
}
}
}
}
// Check for next page
const nextToken = data.nextPageToken as string | undefined;
if (nextToken) {
pageToken = nextToken;
console.log(`[Calendar] Fetching next page...`);
} else {
break;
}
}
// Clean up events no longer in the window
const deletedFiles = cleanUpOldFiles(currentEventIds, SYNC_DIR);
let deletedCount = 0;
if (deletedFiles.length > 0) {
await ensureRun();
deletedCount = deletedFiles.length;
}
// Publish a single bundled event capturing all changes from this sync.
await publishCalendarSyncEvent(newEvents, updatedEvents, deletedFiles);
// Log results if any changes were detected (run was started by ensureRun)
if (run) {
const r = run as ServiceRunContext;
const totalChanges = newCount + updatedCount + deletedCount;
const limitedTitles = limitEventItems(changedTitles);
await serviceLogger.log({
type: 'changes_identified',
service: r.service,
runId: r.runId,
level: 'info',
message: `Calendar updates: ${totalChanges} change${totalChanges === 1 ? '' : 's'}`,
counts: {
newEvents: newCount,
updatedEvents: updatedCount,
deletedFiles: deletedCount,
},
items: limitedTitles.items,
truncated: limitedTitles.truncated,
});
await serviceLogger.log({
type: 'run_complete',
service: r.service,
runId: r.runId,
level: 'info',
message: `Calendar sync complete: ${totalChanges} change${totalChanges === 1 ? '' : 's'}`,
durationMs: Date.now() - r.startedAt,
outcome: 'ok',
summary: {
newEvents: newCount,
updatedEvents: updatedCount,
deletedFiles: deletedCount,
},
});
}
// Save state
saveComposioState(STATE_FILE, new Date().toISOString());
console.log(`[Calendar] Composio sync completed. ${newCount} new, ${updatedCount} updated, ${deletedCount} deleted.`);
} catch (error) {
console.error('[Calendar] Error during Composio sync:', error);
const errRun = await ensureRun();
await serviceLogger.log({
type: 'error',
service: errRun.service,
runId: errRun.runId,
level: 'error',
message: 'Calendar sync error',
error: error instanceof Error ? error.message : String(error),
});
await serviceLogger.log({
type: 'run_complete',
service: errRun.service,
runId: errRun.runId,
level: 'error',
message: 'Calendar sync failed',
durationMs: Date.now() - errRun.startedAt,
outcome: 'error',
});
}
}
export async function init() {
console.log("Starting Google Calendar & Notes Sync (TS)...");
console.log(`Will sync every ${SYNC_INTERVAL_MS / 1000} seconds.`);
while (true) {
try {
const composioMode = await useComposioForGoogleCalendar();
if (composioMode) {
const isConnected = composioAccountsRepo.isConnected('googlecalendar');
if (!isConnected) {
console.log('[Calendar] Google Calendar not connected via Composio. Sleeping...');
} else {
await performSyncComposio();
}
const hasCredentials = await GoogleClientFactory.hasValidCredentials(REQUIRED_SCOPES);
if (!hasCredentials) {
console.log("Google OAuth credentials not available or missing required Calendar/Drive scopes. Sleeping...");
} else {
// Check if credentials are available with required scopes
const hasCredentials = await GoogleClientFactory.hasValidCredentials(REQUIRED_SCOPES);
if (!hasCredentials) {
console.log("Google OAuth credentials not available or missing required Calendar/Drive scopes. Sleeping...");
} else {
// Perform one sync
await performSync(SYNC_DIR, LOOKBACK_DAYS);
}
await performSync(SYNC_DIR, LOOKBACK_DAYS);
}
} catch (error) {
console.error("Error in main loop:", error);

View file

@ -7,8 +7,6 @@ import { WorkDir } from '../config/config.js';
import { GoogleClientFactory } from './google-client-factory.js';
import { serviceLogger, type ServiceRunContext } from '../services/service_logger.js';
import { limitEventItems } from './limit_event_items.js';
import { executeAction, useComposioForGoogle } from '../composio/client.js';
import { composioAccountsRepo } from '../composio/repo.js';
import { createEvent } from './track/events.js';
// Configuration
@ -225,7 +223,7 @@ async function processThread(auth: OAuth2Client, threadId: string, syncDir: stri
}
}
function loadState(stateFile: string): { historyId?: string } {
function loadState(stateFile: string): { historyId?: string; last_sync?: string } {
if (fs.existsSync(stateFile)) {
return JSON.parse(fs.readFileSync(stateFile, 'utf-8'));
}
@ -240,9 +238,24 @@ function saveState(historyId: string, stateFile: string) {
}
async function fullSync(auth: OAuth2Client, syncDir: string, attachmentsDir: string, stateFile: string, lookbackDays: number) {
console.log(`Performing full sync of last ${lookbackDays} days...`);
const gmail = google.gmail({ version: 'v1', auth });
// If the state file holds a last_sync timestamp (e.g. left over from a
// prior Composio sync, or from a previous successful native sync that
// we're falling back to after a history.list 404), use that as the
// floor instead of the default lookback. Carries forward Composio's
// last_sync on first migration so we don't refetch the last 7 days.
const state = loadState(stateFile);
let pastDate: Date;
if (state.last_sync) {
pastDate = new Date(state.last_sync);
console.log(`Performing full sync from last_sync=${state.last_sync}...`);
} else {
pastDate = new Date();
pastDate.setDate(pastDate.getDate() - lookbackDays);
console.log(`Performing full sync of last ${lookbackDays} days...`);
}
let run: ServiceRunContext | null = null;
const ensureRun = async () => {
if (!run) {
@ -255,8 +268,6 @@ async function fullSync(auth: OAuth2Client, syncDir: string, attachmentsDir: str
};
try {
const pastDate = new Date();
pastDate.setDate(pastDate.getDate() - lookbackDays);
const dateQuery = pastDate.toISOString().split('T')[0].replace(/-/g, '/');
// Get History ID
@ -498,386 +509,17 @@ async function performSync() {
}
}
// --- Composio-based Sync ---
const COMPOSIO_LOOKBACK_DAYS = 7;
interface ComposioSyncState {
last_sync: string; // ISO string
}
function loadComposioState(stateFile: string): ComposioSyncState | null {
if (fs.existsSync(stateFile)) {
try {
const data = JSON.parse(fs.readFileSync(stateFile, 'utf-8'));
if (data.last_sync) {
return { last_sync: data.last_sync };
}
} catch (e) {
console.error('[Gmail] Failed to load composio state:', e);
}
}
return null;
}
function saveComposioState(stateFile: string, lastSync: string): void {
fs.writeFileSync(stateFile, JSON.stringify({ last_sync: lastSync }, null, 2));
}
function tryParseDate(dateStr: string): Date | null {
const d = new Date(dateStr);
return isNaN(d.getTime()) ? null : d;
}
interface ParsedMessage {
from: string;
date: string;
subject: string;
body: string;
}
function parseMessageData(messageData: Record<string, unknown>): ParsedMessage {
const headers = messageData.payload && typeof messageData.payload === 'object'
? (messageData.payload as Record<string, unknown>).headers as Array<{ name: string; value: string }> | undefined
: undefined;
const from = headers?.find(h => h.name === 'From')?.value || String(messageData.from || messageData.sender || 'Unknown');
const date = headers?.find(h => h.name === 'Date')?.value || String(messageData.date || messageData.internalDate || 'Unknown');
const subject = headers?.find(h => h.name === 'Subject')?.value || String(messageData.subject || '(No Subject)');
let body = '';
if (messageData.payload && typeof messageData.payload === 'object') {
body = extractBodyFromPayload(messageData.payload as Record<string, unknown>);
}
if (!body) {
if (typeof messageData.body === 'string') {
body = messageData.body;
} else if (typeof messageData.snippet === 'string') {
body = messageData.snippet;
} else if (typeof messageData.text === 'string') {
body = messageData.text;
}
}
if (body && (body.includes('<html') || body.includes('<div') || body.includes('<p'))) {
body = nhm.translate(body);
}
if (body) {
body = body.split('\n').filter((line: string) => !line.trim().startsWith('>')).join('\n');
}
return { from, date, subject, body };
}
function extractBodyFromPayload(payload: Record<string, unknown>): string {
const parts = payload.parts as Array<Record<string, unknown>> | undefined;
if (parts) {
for (const part of parts) {
const mimeType = part.mimeType as string | undefined;
const bodyData = part.body && typeof part.body === 'object'
? (part.body as Record<string, unknown>).data as string | undefined
: undefined;
if ((mimeType === 'text/plain' || mimeType === 'text/html') && bodyData) {
const decoded = Buffer.from(bodyData, 'base64').toString('utf-8');
if (mimeType === 'text/html') {
return nhm.translate(decoded);
}
return decoded;
}
if (part.parts) {
const result = extractBodyFromPayload(part as Record<string, unknown>);
if (result) return result;
}
}
}
const bodyData = payload.body && typeof payload.body === 'object'
? (payload.body as Record<string, unknown>).data as string | undefined
: undefined;
if (bodyData) {
const decoded = Buffer.from(bodyData, 'base64').toString('utf-8');
const mimeType = payload.mimeType as string | undefined;
if (mimeType === 'text/html') {
return nhm.translate(decoded);
}
return decoded;
}
return '';
}
interface ComposioThreadResult {
synced: SyncedThread | null;
newestIsoPlusOne: string | null;
}
async function processThreadComposio(connectedAccountId: string, threadId: string, syncDir: string): Promise<ComposioThreadResult> {
let threadResult;
try {
threadResult = await executeAction(
'GMAIL_FETCH_MESSAGE_BY_THREAD_ID',
{
connected_account_id: connectedAccountId,
user_id: 'rowboat-user',
version: 'latest',
arguments: { thread_id: threadId, user_id: 'me' },
}
);
} catch (error) {
console.warn(`[Gmail] Skipping thread ${threadId} (fetch failed):`, error instanceof Error ? error.message : error);
return { synced: null, newestIsoPlusOne: null };
}
if (!threadResult.successful || !threadResult.data) {
console.error(`[Gmail] Failed to fetch thread ${threadId}:`, threadResult.error);
return { synced: null, newestIsoPlusOne: null };
}
const data = threadResult.data as Record<string, unknown>;
const messages = data.messages as Array<Record<string, unknown>> | undefined;
let newestDate: Date | null = null;
let mdContent: string;
let subjectForLog: string;
if (!messages || messages.length === 0) {
const parsed = parseMessageData(data);
mdContent = `# ${parsed.subject}\n\n` +
`**Thread ID:** ${threadId}\n` +
`**Message Count:** 1\n\n---\n\n` +
`### From: ${parsed.from}\n` +
`**Date:** ${parsed.date}\n\n` +
`${parsed.body}\n\n---\n\n`;
subjectForLog = parsed.subject;
newestDate = tryParseDate(parsed.date);
} else {
const firstParsed = parseMessageData(messages[0]);
mdContent = `# ${firstParsed.subject}\n\n`;
mdContent += `**Thread ID:** ${threadId}\n`;
mdContent += `**Message Count:** ${messages.length}\n\n---\n\n`;
for (const msg of messages) {
const parsed = parseMessageData(msg);
mdContent += `### From: ${parsed.from}\n`;
mdContent += `**Date:** ${parsed.date}\n\n`;
mdContent += `${parsed.body}\n\n`;
mdContent += `---\n\n`;
const msgDate = tryParseDate(parsed.date);
if (msgDate && (!newestDate || msgDate > newestDate)) {
newestDate = msgDate;
}
}
subjectForLog = firstParsed.subject;
}
fs.writeFileSync(path.join(syncDir, `${cleanFilename(threadId)}.md`), mdContent);
console.log(`[Gmail] Synced Thread: ${subjectForLog} (${threadId})`);
const newestIsoPlusOne = newestDate ? new Date(newestDate.getTime() + 1000).toISOString() : null;
return { synced: { threadId, markdown: mdContent }, newestIsoPlusOne };
}
async function performSyncComposio() {
const ATTACHMENTS_DIR = path.join(SYNC_DIR, 'attachments');
const STATE_FILE = path.join(SYNC_DIR, 'sync_state.json');
if (!fs.existsSync(SYNC_DIR)) fs.mkdirSync(SYNC_DIR, { recursive: true });
if (!fs.existsSync(ATTACHMENTS_DIR)) fs.mkdirSync(ATTACHMENTS_DIR, { recursive: true });
const account = composioAccountsRepo.getAccount('gmail');
if (!account || account.status !== 'ACTIVE') {
console.log('[Gmail] Gmail not connected via Composio. Skipping sync.');
return;
}
const connectedAccountId = account.id;
const state = loadComposioState(STATE_FILE);
let afterEpochSeconds: number;
if (state) {
afterEpochSeconds = Math.floor(new Date(state.last_sync).getTime() / 1000);
console.log(`[Gmail] Syncing messages since ${state.last_sync}...`);
} else {
const pastDate = new Date();
pastDate.setDate(pastDate.getDate() - COMPOSIO_LOOKBACK_DAYS);
afterEpochSeconds = Math.floor(pastDate.getTime() / 1000);
console.log(`[Gmail] First sync - fetching last ${COMPOSIO_LOOKBACK_DAYS} days...`);
}
let run: ServiceRunContext | null = null;
const ensureRun = async () => {
if (!run) {
run = await serviceLogger.startRun({
service: 'gmail',
message: 'Syncing Gmail (Composio)',
trigger: 'timer',
});
}
};
try {
const allThreadIds: string[] = [];
let pageToken: string | undefined;
do {
const params: Record<string, unknown> = {
query: `after:${afterEpochSeconds}`,
max_results: 20,
user_id: 'me',
};
if (pageToken) {
params.page_token = pageToken;
}
const result = await executeAction(
'GMAIL_LIST_THREADS',
{
connected_account_id: connectedAccountId,
user_id: 'rowboat-user',
version: 'latest',
arguments: params,
}
);
if (!result.successful || !result.data) {
console.error('[Gmail] Failed to list threads:', result.error);
return;
}
const data = result.data as Record<string, unknown>;
const threads = data.threads as Array<Record<string, unknown>> | undefined;
if (threads && threads.length > 0) {
for (const thread of threads) {
const threadId = thread.id as string | undefined;
if (threadId) {
allThreadIds.push(threadId);
}
}
}
pageToken = data.nextPageToken as string | undefined;
} while (pageToken);
if (allThreadIds.length === 0) {
console.log('[Gmail] No new threads.');
return;
}
console.log(`[Gmail] Found ${allThreadIds.length} threads to sync.`);
await ensureRun();
const limitedThreads = limitEventItems(allThreadIds);
await serviceLogger.log({
type: 'changes_identified',
service: run!.service,
runId: run!.runId,
level: 'info',
message: `Found ${allThreadIds.length} thread${allThreadIds.length === 1 ? '' : 's'} to sync`,
counts: { threads: allThreadIds.length },
items: limitedThreads.items,
truncated: limitedThreads.truncated,
});
// Process oldest first so high-water mark advances chronologically
allThreadIds.reverse();
let highWaterMark: string | null = state?.last_sync ?? null;
let processedCount = 0;
const synced: SyncedThread[] = [];
for (const threadId of allThreadIds) {
// Re-check connection in case user disconnected mid-sync
if (!composioAccountsRepo.isConnected('gmail')) {
console.log('[Gmail] Account disconnected during sync. Stopping.');
break;
}
try {
const result = await processThreadComposio(connectedAccountId, threadId, SYNC_DIR);
processedCount++;
if (result.synced) synced.push(result.synced);
if (result.newestIsoPlusOne) {
if (!highWaterMark || new Date(result.newestIsoPlusOne) > new Date(highWaterMark)) {
highWaterMark = result.newestIsoPlusOne;
}
saveComposioState(STATE_FILE, highWaterMark);
}
} catch (error) {
console.error(`[Gmail] Error processing thread ${threadId}, skipping:`, error);
}
}
await publishGmailSyncEvent(synced);
await serviceLogger.log({
type: 'run_complete',
service: run!.service,
runId: run!.runId,
level: 'info',
message: `Gmail sync complete: ${processedCount}/${allThreadIds.length} thread${allThreadIds.length === 1 ? '' : 's'}`,
durationMs: Date.now() - run!.startedAt,
outcome: 'ok',
summary: { threads: processedCount },
});
console.log(`[Gmail] Sync completed. Processed ${processedCount}/${allThreadIds.length} threads.`);
} catch (error) {
console.error('[Gmail] Error during sync:', error);
await ensureRun();
await serviceLogger.log({
type: 'error',
service: run!.service,
runId: run!.runId,
level: 'error',
message: 'Gmail sync error',
error: error instanceof Error ? error.message : String(error),
});
await serviceLogger.log({
type: 'run_complete',
service: run!.service,
runId: run!.runId,
level: 'error',
message: 'Gmail sync failed',
durationMs: Date.now() - run!.startedAt,
outcome: 'error',
});
}
}
export async function init() {
console.log("Starting Gmail Sync (TS)...");
console.log(`Will sync every ${SYNC_INTERVAL_MS / 1000} seconds.`);
while (true) {
try {
const composioMode = await useComposioForGoogle();
if (composioMode) {
const isConnected = composioAccountsRepo.isConnected('gmail');
if (!isConnected) {
console.log('[Gmail] Gmail not connected via Composio. Sleeping...');
} else {
await performSyncComposio();
}
const hasCredentials = await GoogleClientFactory.hasValidCredentials(REQUIRED_SCOPE);
if (!hasCredentials) {
console.log("Google OAuth credentials not available or missing required Gmail scope. Sleeping...");
} else {
// Check if credentials are available with required scopes
const hasCredentials = await GoogleClientFactory.hasValidCredentials(REQUIRED_SCOPE);
if (!hasCredentials) {
console.log("Google OAuth credentials not available or missing required Gmail scope. Sleeping...");
} else {
// Perform one sync
await performSync();
}
await performSync();
}
} catch (error) {
console.error("Error in main loop:", error);

View file

@ -4,7 +4,7 @@ import { WorkDir } from '../config/config.js';
import { createRun, createMessage } from '../runs/runs.js';
import { getKgModel } from '../models/defaults.js';
import { bus } from '../runs/bus.js';
import { waitForRunCompletion } from '../agents/utils.js';
import { getErrorDetails, waitForRunCompletion } from '../agents/utils.js';
import { serviceLogger } from '../services/service_logger.js';
import { limitEventItems } from './limit_event_items.js';
import {
@ -86,6 +86,8 @@ async function tagNoteBatch(
const run = await createRun({
agentId: NOTE_TAGGING_AGENT,
model: await getKgModel(),
useCase: 'knowledge_sync',
subUseCase: 'tag_notes',
});
let message = `Tag the following ${files.length} knowledge notes by prepending YAML frontmatter with appropriate tags.\n\n`;
@ -123,8 +125,11 @@ async function tagNoteBatch(
});
await createMessage(run.id, message);
await waitForRunCompletion(run.id);
unsubscribe();
try {
await waitForRunCompletion(run.id, { throwOnError: true });
} finally {
unsubscribe();
}
return { runId: run.id, filesEdited };
}
@ -167,6 +172,7 @@ export async function processUntaggedNotes(): Promise<void> {
const totalBatches = Math.ceil(untagged.length / BATCH_SIZE);
let totalEdited = 0;
let hadError = false;
let failedBatches = 0;
for (let i = 0; i < untagged.length; i += BATCH_SIZE) {
const batchPaths = untagged.slice(i, i + BATCH_SIZE);
@ -215,14 +221,16 @@ export async function processUntaggedNotes(): Promise<void> {
console.log(`[NoteTagging] Batch ${batchNumber}/${totalBatches} complete, ${result.filesEdited.size} files tagged`);
} catch (error) {
hadError = true;
failedBatches++;
const errorDetails = getErrorDetails(error);
console.error(`[NoteTagging] Error processing batch ${batchNumber}:`, error);
await serviceLogger.log({
type: 'error',
service: run.service,
runId: run.runId,
level: 'error',
message: `Error processing batch ${batchNumber}`,
error: error instanceof Error ? error.message : String(error),
message: `Note tagging batch ${batchNumber}/${totalBatches} failed`,
error: errorDetails,
context: { batchNumber },
});
}
@ -236,12 +244,15 @@ export async function processUntaggedNotes(): Promise<void> {
service: run.service,
runId: run.runId,
level: hadError ? 'error' : 'info',
message: `Note tagging complete: ${totalEdited} notes tagged`,
message: hadError
? `Note tagging finished with errors: ${totalEdited} notes tagged`
: `Note tagging complete: ${totalEdited} notes tagged`,
durationMs: Date.now() - run.startedAt,
outcome: hadError ? 'error' : 'ok',
summary: {
totalNotes: untagged.length,
notesTagged: totalEdited,
failedBatches,
},
});

View file

@ -70,6 +70,122 @@ export async function fetch(filePath: string, trackId: string): Promise<z.infer<
return blocks.find(b => b.track.trackId === trackId) ?? null;
}
type TrackNoteSummary = {
path: string;
trackCount: number;
createdAt: string | null;
lastRunAt: string | null;
isActive: boolean;
};
async function summarizeTrackNote(
filePath: string,
tracks: z.infer<typeof TrackStateSchema>[],
): Promise<TrackNoteSummary | null> {
if (tracks.length === 0) return null;
const stats = await fs.stat(absPath(filePath));
const createdMs = stats.birthtimeMs > 0 ? stats.birthtimeMs : stats.ctimeMs;
let latestRunAt: string | null = null;
let latestRunMs = -1;
for (const { track } of tracks) {
if (!track.lastRunAt) continue;
const candidateMs = Date.parse(track.lastRunAt);
if (Number.isNaN(candidateMs) || candidateMs <= latestRunMs) continue;
latestRunMs = candidateMs;
latestRunAt = track.lastRunAt;
}
return {
path: `knowledge/${filePath}`,
trackCount: tracks.length,
createdAt: createdMs > 0 ? new Date(createdMs).toISOString() : null,
lastRunAt: latestRunAt,
isActive: tracks.every(({ track }) => track.active !== false),
};
}
export async function listNotesWithTracks(): Promise<TrackNoteSummary[]> {
async function walk(relativeDir = ''): Promise<string[]> {
const dirPath = absPath(relativeDir);
try {
const entries = await fs.readdir(dirPath, { withFileTypes: true });
const files: string[] = [];
for (const entry of entries) {
if (entry.name.startsWith('.')) continue;
const childRelPath = relativeDir
? path.posix.join(relativeDir, entry.name)
: entry.name;
if (entry.isDirectory()) {
files.push(...await walk(childRelPath));
continue;
}
if (entry.isFile() && entry.name.toLowerCase().endsWith('.md')) {
files.push(childRelPath);
}
}
return files;
} catch {
return [];
}
}
const markdownFiles = await walk();
const notes = await Promise.all(markdownFiles.map(async (relativePath) => {
try {
const tracks = await fetchAll(relativePath);
return await summarizeTrackNote(relativePath, tracks);
} catch {
return null;
}
}));
return notes
.filter((note): note is TrackNoteSummary => note !== null)
.sort((a, b) => {
const aName = path.basename(a.path, '.md').toLowerCase();
const bName = path.basename(b.path, '.md').toLowerCase();
if (aName !== bName) return aName.localeCompare(bName);
return a.path.localeCompare(b.path);
});
}
export async function setNoteTracksActive(filePath: string, active: boolean): Promise<TrackNoteSummary | null> {
return withFileLock(absPath(filePath), async () => {
const blocks = await fetchAll(filePath);
if (blocks.length === 0) return null;
const alreadyMatches = blocks.every(({ track }) => (track.active !== false) === active);
if (alreadyMatches) {
return summarizeTrackNote(filePath, blocks);
}
const content = await fs.readFile(absPath(filePath), 'utf-8');
const lines = content.split('\n');
const updatedBlocks = blocks
.map((block) => ({
...block,
track: { ...block.track, active },
}))
.sort((a, b) => b.fenceStart - a.fenceStart);
for (const block of updatedBlocks) {
const yaml = stringifyYaml(block.track).trimEnd();
const yamlLines = yaml ? yaml.split('\n') : [];
lines.splice(block.fenceStart, block.fenceEnd - block.fenceStart + 1, '```track', ...yamlLines, '```');
}
await fs.writeFile(absPath(filePath), lines.join('\n'), 'utf-8');
return summarizeTrackNote(filePath, updatedBlocks);
});
}
/**
* Fetch a track block and return its canonical YAML string (or null if not found).
* Useful for IPC handlers that need to return the fresh YAML without taking a
@ -196,4 +312,4 @@ export async function deleteTrackBlock(filePath: string, trackId: string): Promi
await fs.writeFile(absPath(filePath), lines.join('\n'), 'utf-8');
});
}
}

View file

@ -3,6 +3,7 @@ import { trackBlock, PrefixLogger } from '@x/shared';
import type { KnowledgeEvent } from '@x/shared/dist/track-block.js';
import { createProvider } from '../../models/models.js';
import { getDefaultModelAndProvider, getTrackBlockModel, resolveProviderConfig } from '../../models/defaults.js';
import { captureLlmUsage } from '../../analytics/usage.js';
const log = new PrefixLogger('TrackRouting');
@ -34,10 +35,14 @@ Rules:
- For each candidate, return BOTH trackId and filePath exactly as given. trackIds are not globally unique.`;
async function resolveModel() {
const model = await getTrackBlockModel();
const modelId = await getTrackBlockModel();
const { provider } = await getDefaultModelAndProvider();
const config = await resolveProviderConfig(provider);
return createProvider(config).languageModel(model);
return {
model: createProvider(config).languageModel(modelId),
modelId,
providerName: provider,
};
}
function buildRoutingPrompt(event: KnowledgeEvent, batch: ParsedTrack[]): string {
@ -84,19 +89,26 @@ export async function findCandidates(
log.log(`Routing event ${event.id} against ${filtered.length} track(s)`);
const model = await resolveModel();
const { model, modelId, providerName } = await resolveModel();
const candidateKeys = new Set<string>();
for (let i = 0; i < filtered.length; i += BATCH_SIZE) {
const batch = filtered.slice(i, i + BATCH_SIZE);
try {
const { object } = await generateObject({
const result = await generateObject({
model,
system: ROUTING_SYSTEM_PROMPT,
prompt: buildRoutingPrompt(event, batch),
schema: trackBlock.Pass1OutputSchema,
});
for (const c of object.candidates) {
captureLlmUsage({
useCase: 'track_block',
subUseCase: 'routing',
model: modelId,
provider: providerName,
usage: result.usage,
});
for (const c of result.object.candidates) {
candidateKeys.add(trackKey(c.trackId, c.filePath));
}
} catch (err) {

View file

@ -263,6 +263,7 @@ You have the full workspace toolkit. Quick reference for common cases:
- **\`parseFile\`, \`LLMParse\`** — parse PDFs, spreadsheets, Word docs if a track aggregates from attached files.
- **\`composio-*\`, \`listMcpTools\`, \`executeMcpTool\`** — user-connected integrations (Gmail, Calendar, etc.). Prefer these when a track needs structured data from a connected service the user has authorized.
- **\`browser-control\`** — only when a required source has no API / search alternative and requires JS rendering.
- **\`notify-user\`** — send a native desktop notification when this run produces something time-sensitive (threshold breach, urgent change, "the thing the user asked you to watch for just happened"). Skip it for routine refreshes — the note itself is the artifact. Load the \`notify-user\` skill via \`loadSkill\` for parameters and \`rowboat://\` deep-link shapes (so the click lands on the right note/view).
# The Knowledge Graph

View file

@ -110,6 +110,8 @@ export async function triggerTrackUpdate(
agentId: 'track-run',
model,
...(track.track.provider ? { provider: track.track.provider } : {}),
useCase: 'track_block',
subUseCase: 'run',
});
// Set lastRunAt and lastRunId immediately (before agent executes) so

View file

@ -0,0 +1,132 @@
import fs from 'fs';
import path from 'path';
import { z } from 'zod';
import { WorkDir } from '../config/config.js';
import { isSignedIn } from '../account/account.js';
import { composioAccountsRepo } from '../composio/repo.js';
import { deleteConnectedAccount } from '../composio/client.js';
import container from '../di/container.js';
import { IOAuthRepo } from '../auth/repo.js';
/**
* One-time migration that moves Composio-connected Gmail/Calendar users
* to the native rowboat-mode Google OAuth flow.
*
* Triggered by the renderer on app launch and after Rowboat sign-in. The
* single guard is `dismissed_at` in the migration state file once set,
* none of the migration's side effects run again. This protects users who
* later re-add Composio Google for non-sync purposes (e.g. a tool that
* happens to use the Gmail toolkit) from having that connection blown
* away on a future launch.
*/
const STATE_FILE = path.join(WorkDir, 'config', 'composio-google-migration.json');
const ZState = z.object({
dismissed_at: z.string().min(1).optional(),
});
type State = z.infer<typeof ZState>;
function loadState(): State {
try {
if (fs.existsSync(STATE_FILE)) {
const raw = fs.readFileSync(STATE_FILE, 'utf-8');
return ZState.parse(JSON.parse(raw));
}
} catch (error) {
console.error('[composio-google-migration] failed to load state:', error);
}
return {};
}
function saveState(state: State): void {
const dir = path.dirname(STATE_FILE);
if (!fs.existsSync(dir)) fs.mkdirSync(dir, { recursive: true });
fs.writeFileSync(STATE_FILE, JSON.stringify(state, null, 2));
}
function markDismissed(): void {
saveState({ dismissed_at: new Date().toISOString() });
}
async function disconnectComposioGoogle(): Promise<void> {
for (const slug of ['gmail', 'googlecalendar'] as const) {
const account = composioAccountsRepo.getAccount(slug);
if (!account?.id) continue;
try {
await deleteConnectedAccount(account.id);
console.log(`[composio-google-migration] composio: deleted ${slug} (${account.id})`);
} catch (error) {
// Best-effort — logged but doesn't block the local cleanup.
console.warn(`[composio-google-migration] composio delete failed for ${slug}:`, error);
}
try {
composioAccountsRepo.deleteAccount(slug);
} catch (error) {
console.warn(`[composio-google-migration] local delete failed for ${slug}:`, error);
}
}
}
function cleanupCalendarComposioState(): void {
const file = path.join(WorkDir, 'calendar_sync', 'composio_state.json');
try {
if (fs.existsSync(file)) {
fs.unlinkSync(file);
console.log('[composio-google-migration] removed stale calendar composio_state.json');
}
} catch (error) {
console.warn('[composio-google-migration] failed to remove composio_state.json:', error);
}
}
/**
* Check whether the user qualifies for the migration. If they do, atomically
* mark `dismissed_at`, fire-and-forget the Composio disconnect, and return
* `{shouldShow: true}` so the renderer can show the modal.
*
* Idempotent: subsequent calls return `{shouldShow: false}` once `dismissed_at`
* is set, regardless of whether the modal was actually shown or the user
* completed the OAuth flow.
*/
export async function qualifyAndDisconnectComposioGoogle(): Promise<{ shouldShow: boolean }> {
// Rule 4 — already processed
const state = loadState();
if (state.dismissed_at) {
return { shouldShow: false };
}
// Rule 1 — must be signed in to Rowboat
if (!(await isSignedIn())) {
return { shouldShow: false };
}
// Rule 3 — already on native rowboat-mode Google → silently mark dismissed
// (so we stop re-checking) and bail before touching Composio state.
const oauthRepo = container.resolve<IOAuthRepo>('oauthRepo');
const googleConnection = await oauthRepo.read('google');
if (googleConnection.tokens && googleConnection.mode === 'rowboat') {
markDismissed();
return { shouldShow: false };
}
// Rule 2 — must have at least one Composio Google toolkit connected
const hasGmail = composioAccountsRepo.isConnected('gmail');
const hasCalendar = composioAccountsRepo.isConnected('googlecalendar');
if (!hasGmail && !hasCalendar) {
return { shouldShow: false };
}
// All rules pass. Mark dismissed atomically before any side effects so
// a crash mid-migration leaves us in a deterministic post-migration state.
markDismissed();
// Fire-and-forget: disconnect Composio Google + clean up the stale
// calendar state file. Both are best-effort.
void disconnectComposioGoogle();
cleanupCalendarComposioState();
return { shouldShow: true };
}

View file

@ -6,7 +6,7 @@ import container from "../di/container.js";
const SIGNED_IN_DEFAULT_MODEL = "gpt-5.4";
const SIGNED_IN_DEFAULT_PROVIDER = "rowboat";
const SIGNED_IN_KG_MODEL = "anthropic/claude-haiku-4.5";
const SIGNED_IN_KG_MODEL = "google/gemini-3.1-flash-lite-preview";
const SIGNED_IN_TRACK_BLOCK_MODEL = "anthropic/claude-haiku-4.5";
/**

View file

@ -43,6 +43,8 @@ async function runAgent(agentName: string): Promise<void> {
const run = await createRun({
agentId: agentName,
model: await getKgModel(),
useCase: 'knowledge_sync',
subUseCase: 'pre_built',
});
// Build trigger message with user context

View file

@ -5,7 +5,7 @@ import path from "path";
import fsp from "fs/promises";
import fs from "fs";
import readline from "readline";
import { Run, RunEvent, StartEvent, CreateRunOptions, ListRunsResponse, MessageEvent } from "@x/shared/dist/runs.js";
import { Run, RunEvent, StartEvent, ListRunsResponse, MessageEvent, UseCase } from "@x/shared/dist/runs.js";
import { getDefaultModelAndProvider } from "../models/defaults.js";
/**
@ -24,7 +24,13 @@ const LegacyStartEvent = StartEvent.extend({
});
const ReadRunEvent = RunEvent.or(LegacyStartEvent);
export type CreateRunRepoOptions = Required<z.infer<typeof CreateRunOptions>>;
export type CreateRunRepoOptions = {
agentId: string;
model: string;
provider: string;
useCase: z.infer<typeof UseCase>;
subUseCase?: string;
};
export interface IRunsRepo {
create(options: CreateRunRepoOptions): Promise<z.infer<typeof Run>>;
@ -187,6 +193,8 @@ export class FSRunsRepo implements IRunsRepo {
agentName: options.agentId,
model: options.model,
provider: options.provider,
useCase: options.useCase,
...(options.subUseCase ? { subUseCase: options.subUseCase } : {}),
subflow: [],
ts,
};
@ -197,6 +205,8 @@ export class FSRunsRepo implements IRunsRepo {
agentId: options.agentId,
model: options.model,
provider: options.provider,
useCase: options.useCase,
...(options.subUseCase ? { subUseCase: options.subUseCase } : {}),
log: [start],
};
}
@ -230,6 +240,8 @@ export class FSRunsRepo implements IRunsRepo {
agentId: start.agentName,
model: start.model,
provider: start.provider,
...(start.useCase ? { useCase: start.useCase } : {}),
...(start.subUseCase ? { subUseCase: start.subUseCase } : {}),
log: events,
};
}

View file

@ -23,8 +23,15 @@ export async function createRun(opts: z.infer<typeof CreateRunOptions>): Promise
const defaults = await getDefaultModelAndProvider();
const model = opts.model ?? agent.model ?? defaults.model;
const provider = opts.provider ?? agent.provider ?? defaults.provider;
const useCase = opts.useCase ?? "copilot_chat";
const run = await repo.create({ agentId: opts.agentId, model, provider });
const run = await repo.create({
agentId: opts.agentId,
model,
provider,
useCase,
...(opts.subUseCase ? { subUseCase: opts.subUseCase } : {}),
});
await bus.publish(run.log[0]);
return run;
}

View file

@ -116,6 +116,12 @@ export const BrowserControlInputSchema = z.object({
}
});
export const SuggestedBrowserSkillSchema = z.object({
id: z.string(),
title: z.string(),
path: z.string(),
});
export const BrowserControlResultSchema = z.object({
success: z.boolean(),
action: BrowserControlActionSchema,
@ -123,6 +129,7 @@ export const BrowserControlResultSchema = z.object({
error: z.string().optional(),
browser: BrowserStateSchema,
page: BrowserPageSnapshotSchema.optional(),
suggestedSkills: z.array(SuggestedBrowserSkillSchema).optional(),
});
export type BrowserTabState = z.infer<typeof BrowserTabStateSchema>;
@ -132,3 +139,4 @@ export type BrowserPageSnapshot = z.infer<typeof BrowserPageSnapshotSchema>;
export type BrowserControlAction = z.infer<typeof BrowserControlActionSchema>;
export type BrowserControlInput = z.infer<typeof BrowserControlInputSchema>;
export type BrowserControlResult = z.infer<typeof BrowserControlResultSchema>;
export type SuggestedBrowserSkill = z.infer<typeof SuggestedBrowserSkillSchema>;

View file

@ -25,6 +25,13 @@ const ipcSchemas = {
electron: z.string(),
}),
},
'analytics:bootstrap': {
req: z.null(),
res: z.object({
installationId: z.string(),
apiUrl: z.string(),
}),
},
'workspace:getRoot': {
req: z.null(),
res: z.object({
@ -292,6 +299,28 @@ const ipcSchemas = {
}),
res: z.null(),
},
'app:openUrl': {
req: z.object({
url: z.string(),
}),
res: z.null(),
},
'app:takeMeetingNotes': {
req: z.object({
// Pass the raw calendar event JSON through; renderer adapts to its existing flow.
event: z.unknown(),
// When true, the renderer should also open the meeting URL (Zoom/Meet/etc.)
// in addition to triggering the take-notes flow.
openMeeting: z.boolean().optional(),
}),
res: z.null(),
},
'app:consumePendingDeepLink': {
req: z.null(),
res: z.object({
url: z.string().nullable(),
}),
},
'granola:getConfig': {
req: z.null(),
res: z.object({
@ -400,16 +429,10 @@ const ipcSchemas = {
toolkits: z.array(z.string()),
}),
},
'composio:use-composio-for-google': {
'migration:check-composio-google': {
req: z.null(),
res: z.object({
enabled: z.boolean(),
}),
},
'composio:use-composio-for-google-calendar': {
req: z.null(),
res: z.object({
enabled: z.boolean(),
shouldShow: z.boolean(),
}),
},
'composio:didConnect': {
@ -639,6 +662,35 @@ const ipcSchemas = {
error: z.string().optional(),
}),
},
'track:setNoteActive': {
req: z.object({
path: RelPath,
active: z.boolean(),
}),
res: z.object({
success: z.boolean(),
note: z.object({
path: RelPath,
trackCount: z.number().int().positive(),
createdAt: z.string().nullable(),
lastRunAt: z.string().nullable(),
isActive: z.boolean(),
}).optional(),
error: z.string().optional(),
}),
},
'track:listNotes': {
req: z.null(),
res: z.object({
notes: z.array(z.object({
path: RelPath,
trackCount: z.number().int().positive(),
createdAt: z.string().nullable(),
lastRunAt: z.string().nullable(),
isActive: z.boolean(),
})),
}),
},
// Embedded browser (WebContentsView) channels
'browser:setBounds': {
req: z.object({

View file

@ -21,6 +21,15 @@ export const StartEvent = BaseRunEvent.extend({
agentName: z.string(),
model: z.string(),
provider: z.string(),
// useCase/subUseCase tag the run for analytics. Optional on read so legacy
// run files written before these fields existed still parse cleanly.
useCase: z.enum([
"copilot_chat",
"track_block",
"meeting_note",
"knowledge_sync",
]).optional(),
subUseCase: z.string().optional(),
});
export const SpawnSubFlowEvent = BaseRunEvent.extend({
@ -118,6 +127,13 @@ export const AskHumanResponsePayload = AskHumanResponseEvent.pick({
response: true,
});
export const UseCase = z.enum([
"copilot_chat",
"track_block",
"meeting_note",
"knowledge_sync",
]);
export const Run = z.object({
id: z.string(),
title: z.string().optional(),
@ -125,6 +141,8 @@ export const Run = z.object({
agentId: z.string(),
model: z.string(),
provider: z.string(),
useCase: UseCase.optional(),
subUseCase: z.string().optional(),
log: z.array(RunEvent),
});
@ -142,4 +160,6 @@ export const CreateRunOptions = z.object({
agentId: z.string(),
model: z.string().optional(),
provider: z.string().optional(),
useCase: UseCase.optional(),
subUseCase: z.string().optional(),
});