diff --git a/apps/cli/src/application/assistant/README.md b/apps/cli/src/application/assistant/README.md index ebcbf861..4c776c26 100644 --- a/apps/cli/src/application/assistant/README.md +++ b/apps/cli/src/application/assistant/README.md @@ -3,4 +3,5 @@ Rowboat Copilot (demo) - Entry point: `npm run copilot` (runs `src/x.ts` after building) - Natural language interface to list/create/update/delete workflow JSON under `.rowboat/workflows` - Uses existing zod schemas for validation; errors bubble up plainly for easy debugging +- Maintains conversational memory within a session and replies in natural language (append `--debug` or set `COPILOT_DEBUG=1` to view raw JSON commands) - Data folders ensured automatically: `.rowboat/workflows`, `.rowboat/agents`, `.rowboat/mcp` diff --git a/apps/cli/src/application/assistant/USAGE.md b/apps/cli/src/application/assistant/USAGE.md index 21a25779..0181a696 100644 --- a/apps/cli/src/application/assistant/USAGE.md +++ b/apps/cli/src/application/assistant/USAGE.md @@ -10,3 +10,7 @@ Example prompts once running: - `create a workflow demo that calls function get_date` - `add an agent step default_assistant to demo` - `delete the demo workflow` + +While the session is open the copilot keeps conversational context, so you can ask follow-ups such as “what was the first thing I asked?” or “add that step again”. Responses are natural language summaries of the structured actions it performs. + +Need to inspect the underlying JSON command/results? Run in debug mode with `npm run copilot -- --debug` (or set `COPILOT_DEBUG=1`) to keep the raw interpreter output visible. diff --git a/apps/cli/src/application/assistant/chat.ts b/apps/cli/src/application/assistant/chat.ts index d6de810d..e96a23ee 100644 --- a/apps/cli/src/application/assistant/chat.ts +++ b/apps/cli/src/application/assistant/chat.ts @@ -1,80 +1,149 @@ import readline from "readline"; -import { z } from "zod"; import { openai } from "@ai-sdk/openai"; -import { generateObject } from "ai"; -import { Workflow } from "../../application/entities/workflow.js"; -import { listWorkflows, getWorkflow, upsertWorkflow, deleteWorkflow } from "./workflows/service.js"; +import { generateObject, streamText } from "ai"; +import type { CoreMessage } from "ai"; +import { + ChatCommand, + ChatCommandT, + CommandOutcome, + executeCommand, +} from "./commands.js"; -const ChatCommand = z.object({ - action: z.enum([ - "help", - "list_workflows", - "get_workflow", - "create_workflow", - "update_workflow", - "delete_workflow", - "unknown", - ]), - id: z.string().optional(), - updates: Workflow.partial().optional(), -}); - -type ChatCommandT = z.infer; +type ConversationMessage = { + role: "user" | "assistant"; + content: string; +}; const systemPrompt = ` -You are a CLI assistant that converts the user's natural language into a JSON command for managing workflows. +You are a general-purpose CLI copilot that converts the user's natural language into structured commands the Rowboat assistant runtime can execute, and you can also hold a regular conversation when no command fits. Rules: - Only output JSON matching the provided schema. No extra commentary. -- Choose the most appropriate action from: help, list_workflows, get_workflow, create_workflow, update_workflow, delete_workflow, unknown. -- For actions that need an id (get/update/delete/create), set "id" to the workflow identifier (e.g. "example_workflow"). -- For create/update, include only provided fields in "updates". If not provided, omit. +- Select the most appropriate action from: help, general_chat, list_workflows, get_workflow, describe_workflows, create_workflow, update_workflow, delete_workflow, list_agents, get_agent, create_agent, update_agent, delete_agent, list_mcp_servers, add_mcp_server, remove_mcp_server, run_workflow, unknown. +- Use describe_workflows with { scope: "all" } to show every workflow, or provide specific ids when the user names particular workflows (including pronouns like "them" or "those" referring to previously listed workflows). +- For actions that need an id (workflow/agent), set "id" to the identifier (e.g. "example_workflow"). +- For create/update actions, only include provided fields in "updates". - Workflow shape reminder: { name: string, description: string, steps: Step[] } where Step is either { type: "function", id: string } or { type: "agent", id: string }. +- Agent shape reminder: { name: string, model: string, description: string, instructions: string }. +- MCP server shape reminder: { name: string, url: string }. - If the request is ambiguous, set action to "unknown". +- If the user is just chatting or asking for general help or explanations, use action "general_chat" with their full prompt in "query". `; -async function interpret(input: string): Promise { - const { object } = await generateObject({ - model: openai("gpt-4.1"), - system: systemPrompt, - prompt: input, - schema: ChatCommand, - }); - return object; +const responseSystemPrompt = ` +You are Skipper, the Rowboat CLI copilot. You maintain an ongoing conversation, remember prior questions, run commands when requested, and give helpful free-form answers when a general reply is appropriate. + +Guidelines: +- Respond in natural language with short, helpful paragraphs or bullet lists when useful. +- Summarise command results plainly (lists, confirmations, errors) and mention next steps when appropriate. +- If a command could not be inferred (action "unknown"), clarify what additional detail is needed or answer the query directly using the conversation history when possible. +- Use the conversation history to answer memory questions (for example "what was the first question I asked?"). +- Avoid repeating the raw JSON command or result unless explicitly asked; focus on what the outcome means. +- Deliver everything requested in one response. Do not say you'll follow up later—include all available details right away. +- For general_chat actions, respond directly to the user's query with the best answer you can provide. +`; + +function buildMessageHistory(history: ConversationMessage[]): CoreMessage[] { + return history.map((message) => ({ + role: message.role, + content: message.content, + })); } -async function execute(cmd: ChatCommandT): Promise { - switch (cmd.action) { - case "help": - return { - usage: [ - "Examples:", - "- list workflows", - "- show workflow example_workflow", - "- create workflow demo with one step calling function get_date", - "- update workflow demo: add agent step default_assistant", - "- delete workflow demo", - ], - }; - case "list_workflows": - return { items: listWorkflows() }; - case "get_workflow": - if (!cmd.id) return { error: "id required" }; - return getWorkflow(cmd.id) ?? null; - case "create_workflow": - if (!cmd.id) return { error: "id required" }; - return upsertWorkflow(cmd.id, { ...(cmd.updates ?? {}) }); - case "update_workflow": - if (!cmd.id) return { error: "id required" }; - return upsertWorkflow(cmd.id, { ...(cmd.updates ?? {}) }); - case "delete_workflow": - if (!cmd.id) return { error: "id required" }; - return { deleted: deleteWorkflow(cmd.id) }; - case "unknown": - return { error: "Could not determine intent. Try again or ask for help." }; +async function interpret(input: string, history: ConversationMessage[]): Promise { + const stopSpinner = startSpinner("Analyzing…", { persist: false }); + const conversation: CoreMessage[] = [ + { role: "system", content: systemPrompt }, + ...buildMessageHistory(history), + { role: "user", content: input }, + ]; + + try { + const { object } = await generateObject({ + model: openai("gpt-4.1"), + messages: conversation, + schema: ChatCommand, + }); + return object; + } finally { + stopSpinner(); } } +function startSpinner( + label: string, + options: { persist?: boolean } = {} +): (finalMessage?: string) => void { + const { persist = true } = options; + const frames = ["⠋", "⠙", "⠹", "⠸", "⠼", "⠴"]; + let index = 0; + const render = () => { + const frame = frames[index]; + index = (index + 1) % frames.length; + process.stdout.write(`\r${frame} ${label}`); + }; + render(); + const timer = setInterval(render, 80); + return (finalMessage?: string) => { + clearInterval(timer); + const doneFrame = frames[(index + frames.length - 1) % frames.length]; + const message = finalMessage ?? "done"; + const clearWidth = doneFrame.length + label.length + (persist ? message.length + 3 : 2); + const clear = " ".repeat(clearWidth); + process.stdout.write(`\r${clear}`); + if (persist) { + process.stdout.write(`\r${doneFrame} ${label} ${message}\n`); + } else { + process.stdout.write("\r"); + } + }; +} + +async function renderAssistantResponse( + input: string, + cmd: ChatCommandT, + outcome: CommandOutcome, + history: ConversationMessage[] +): Promise { + const condensedCommand = JSON.stringify(cmd, null, 2); + const condensedResult = JSON.stringify(outcome, null, 2); + + const { textStream } = await streamText({ + model: openai("gpt-4.1"), + messages: [ + { role: "system", content: responseSystemPrompt }, + ...buildMessageHistory(history), + { + role: "user", + content: [ + `Most recent request: ${input}`, + `Interpreter output:\n${condensedCommand}`, + `Command result:\n${condensedResult}`, + ].join("\n\n"), + }, + ], + }); + + let final = ""; + for await (const textChunk of textStream as AsyncIterable) { + const chunk = + typeof textChunk === "string" + ? textChunk + : typeof (textChunk as { value?: string }).value === "string" + ? (textChunk as { value?: string }).value ?? "" + : ""; + if (!chunk) continue; + process.stdout.write(chunk); + final += chunk; + } + + if (!final.endsWith("\n")) { + process.stdout.write("\n"); + } + + return final.trim(); +} + export async function startCopilot(): Promise { if (!process.env.OPENAI_API_KEY) { console.error("OPENAI_API_KEY is not set. Please export it to use chat."); @@ -85,16 +154,38 @@ export async function startCopilot(): Promise { const rl = readline.createInterface({ input: process.stdin, output: process.stdout }); console.log("Rowboat Copilot (type 'exit' to quit)"); + const debugMode = process.argv.includes("--debug") || process.env.COPILOT_DEBUG === "1"; + const conversationHistory: ConversationMessage[] = []; + const ask = () => rl.question("> ", async (line) => { if (!line || line.trim().toLowerCase() === "exit") { rl.close(); return; } try { - const cmd = await interpret(line); - console.log("\n=== Parsed Command ===\n" + JSON.stringify(cmd, null, 2)); - const result = await execute(cmd); - console.log("\n=== Result ===\n" + JSON.stringify(result, null, 2) + "\n"); + const trimmed = line.trim(); + const cmd = await interpret(trimmed, conversationHistory); + let outcome: CommandOutcome; + try { + outcome = await executeCommand(cmd); + } finally { + // no-op + } + + const historyWithLatestUser: ConversationMessage[] = [ + ...conversationHistory, + { role: "user", content: trimmed }, + ]; + const assistantReply = await renderAssistantResponse(trimmed, cmd, outcome, historyWithLatestUser); + console.log(""); + + if (debugMode) { + console.log("=== Parsed Command ===\n" + JSON.stringify(cmd, null, 2)); + console.log("\n=== Outcome ===\n" + JSON.stringify(outcome, null, 2) + "\n"); + } + + conversationHistory.push({ role: "user", content: trimmed }); + conversationHistory.push({ role: "assistant", content: assistantReply }); } catch (err) { console.error("Error:", (err as Error).message); } diff --git a/apps/cli/src/application/assistant/commands.ts b/apps/cli/src/application/assistant/commands.ts new file mode 100644 index 00000000..0e8d97c8 --- /dev/null +++ b/apps/cli/src/application/assistant/commands.ts @@ -0,0 +1,501 @@ +import { z } from "zod"; +import { + listWorkflows, + getWorkflow, + upsertWorkflow, + deleteWorkflow, +} from "./workflows/service.js"; +import { + listAgents, + getAgent, + upsertAgent, + deleteAgent, +} from "./agents/service.js"; +import { + readMcpConfig, + writeMcpConfig, +} from "./mcp/service.js"; +import { Agent } from "../entities/agent.js"; +import { Workflow } from "../entities/workflow.js"; + +export const ChatCommand = z.object({ + action: z.enum([ + "help", + "general_chat", + "list_workflows", + "get_workflow", + "describe_workflows", + "create_workflow", + "update_workflow", + "delete_workflow", + "list_agents", + "get_agent", + "create_agent", + "update_agent", + "delete_agent", + "list_mcp_servers", + "add_mcp_server", + "remove_mcp_server", + "run_workflow", + "unknown", + ]), + id: z.string().optional(), + query: z.string().optional(), + updates: Workflow.partial().optional(), + server: z + .object({ + name: z.string(), + url: z.string(), + }) + .optional(), + name: z.string().optional(), + clarification: z.string().optional(), + ids: z.array(z.string()).optional(), + scope: z.enum(["all"]).optional(), +}); + +export type ChatCommandT = z.infer; + +export type CommandStatus = "ok" | "error"; + +export interface CommandOutcome { + status: CommandStatus; + headline: string; + details?: string; + list?: string[]; + data?: unknown; +} + +function asCommandOutcome( + outcome: Omit & { status?: CommandStatus } +): CommandOutcome { + return { + status: outcome.status ?? "ok", + headline: outcome.headline, + details: outcome.details, + list: outcome.list, + data: outcome.data, + }; +} + +function normalizeKey(value: string): string { + return value.toLowerCase().replace(/[^a-z0-9]/g, ""); +} + +function levenshtein(a: string, b: string): number { + if (a === b) return 0; + if (a.length === 0) return b.length; + if (b.length === 0) return a.length; + + const matrix: number[][] = Array.from({ length: a.length + 1 }, (_, i) => + Array.from({ length: b.length + 1 }, (_, j) => (i === 0 ? j : j === 0 ? i : 0)) + ); + + for (let i = 1; i <= a.length; i++) { + for (let j = 1; j <= b.length; j++) { + const cost = a[i - 1] === b[j - 1] ? 0 : 1; + matrix[i][j] = Math.min( + matrix[i - 1][j] + 1, // deletion + matrix[i][j - 1] + 1, // insertion + matrix[i - 1][j - 1] + cost // substitution + ); + } + } + + return matrix[a.length][b.length]; +} + +function resolveWorkflowId( + input: string, + existing: string[] +): { id?: string; suggestion?: string } { + const exact = existing.find((candidate) => candidate === input); + if (exact) return { id: exact }; + + const normalizedInput = normalizeKey(input); + const normalizedMap = new Map(); + for (const candidate of existing) { + const key = normalizeKey(candidate); + if (!normalizedMap.has(key)) normalizedMap.set(key, candidate); + } + const normalizedMatch = normalizedMap.get(normalizedInput); + if (normalizedMatch) return { id: normalizedMatch }; + + const ranked = existing + .map((candidate) => ({ + id: candidate, + distance: levenshtein(normalizeKey(candidate), normalizedInput), + })) + .sort((a, b) => a.distance - b.distance); + + const best = ranked[0]; + if (best && best.distance <= 2) { + return { id: best.id }; + } + + return { suggestion: best?.id }; +} + +export async function executeCommand(cmd: ChatCommandT): Promise { + switch (cmd.action) { + case "help": + return asCommandOutcome({ + headline: "Try asking for workflows, agents, or MCP servers.", + list: [ + "list workflows", + "show workflow example_workflow", + "show all workflows in detail", + "create workflow demo that calls function get_date", + "list agents", + "add mcp server staging at http://localhost:8800", + ], + }); + case "list_workflows": { + const items = listWorkflows(); + return asCommandOutcome({ + headline: + items.length === 0 + ? "No workflows saved yet." + : `Found ${items.length} workflow${items.length === 1 ? "" : "s"}.`, + list: items, + data: { items }, + }); + } + case "get_workflow": { + if (!cmd.id) { + return asCommandOutcome({ + status: "error", + headline: "Workflow id required.", + details: "Provide the workflow name you want to inspect.", + }); + } + const allWorkflows = listWorkflows(); + const { id: resolvedId, suggestion } = resolveWorkflowId(cmd.id, allWorkflows); + if (!resolvedId) { + return asCommandOutcome({ + status: "error", + headline: `Workflow "${cmd.id}" was not found.`, + details: suggestion ? `Did you mean "${suggestion}"?` : undefined, + }); + } + const workflow = getWorkflow(resolvedId); + if (!workflow) { + return asCommandOutcome({ + status: "error", + headline: `Workflow "${resolvedId}" could not be loaded.`, + }); + } + return asCommandOutcome({ + headline: `Loaded workflow "${resolvedId}".`, + details: workflow.description || "No description set.", + data: workflow, + list: workflow.steps.map((step, index) => `${index + 1}. ${step.type} → ${step.id}`), + }); + } + case "describe_workflows": { + const allWorkflows = listWorkflows(); + const explicitIds = cmd.ids?.map((value) => value.trim()).filter((value) => value.length > 0) ?? []; + const targetIds = + explicitIds.length > 0 ? Array.from(new Set(explicitIds)) : cmd.scope === "all" ? [...allWorkflows] : []; + + if (targetIds.length === 0) { + return asCommandOutcome({ + status: "error", + headline: "No workflows specified.", + details: + explicitIds.length === 0 && cmd.scope !== "all" + ? "Provide workflow ids or set scope to \"all\"." + : "No workflows found to describe.", + }); + } + + const described: Array<{ id: string; workflow: z.infer }> = []; + const missing: string[] = []; + const suggestions: string[] = []; + const seen = new Set(); + + if (explicitIds.length === 0 && cmd.scope === "all") { + for (const id of allWorkflows) { + const workflow = getWorkflow(id); + if (workflow && !seen.has(id)) { + seen.add(id); + described.push({ id, workflow }); + } + } + } else { + for (const requestedId of targetIds) { + const { id: resolvedId, suggestion } = resolveWorkflowId(requestedId, allWorkflows); + if (!resolvedId) { + missing.push(requestedId); + if (suggestion) suggestions.push(`${requestedId} → ${suggestion}`); + continue; + } + if (seen.has(resolvedId)) continue; + seen.add(resolvedId); + const workflow = getWorkflow(resolvedId); + if (workflow) { + described.push({ id: resolvedId, workflow }); + } else { + missing.push(requestedId); + } + } + } + + if (described.length === 0) { + return asCommandOutcome({ + status: "error", + headline: "No workflows found.", + details: `Checked: ${targetIds.join(", ")}`, + }); + } + + const list = described.map(({ workflow }) => { + const description = workflow.description ? workflow.description : "No description set."; + const steps = workflow.steps.map((step, index) => `${index + 1}. ${step.type} → ${step.id}`).join("; "); + return `${workflow.name}: ${description} Steps: ${steps || "None"}.`; + }); + + const details = + missing.length > 0 + ? `Missing workflows: ${missing.join(", ")}.${suggestions.length > 0 ? ` Closest matches: ${suggestions.join(", ")}.` : ""}` + : suggestions.length > 0 + ? `Closest matches: ${suggestions.join(", ")}.` + : undefined; + + return asCommandOutcome({ + headline: `Showing ${described.length} workflow${described.length === 1 ? "" : "s"}.`, + details, + list, + data: { + workflows: described.map(({ workflow }) => workflow), + missing, + }, + }); + } + case "general_chat": + if (!cmd.query) { + return asCommandOutcome({ + status: "error", + headline: "Need the question to answer.", + details: "Repeat your request so I can help.", + }); + } + return asCommandOutcome({ + headline: "General assistance requested.", + details: cmd.query, + data: { query: cmd.query }, + }); + case "create_workflow": { + if (!cmd.id) { + return asCommandOutcome({ + status: "error", + headline: "Workflow id required.", + details: "Name the workflow you want to create.", + }); + } + const created = upsertWorkflow(cmd.id, { ...(cmd.updates ?? {}) }); + return asCommandOutcome({ + headline: `Workflow "${cmd.id}" saved.`, + data: created, + }); + } + case "update_workflow": { + if (!cmd.id) { + return asCommandOutcome({ + status: "error", + headline: "Workflow id required.", + details: "Name the workflow you want to update.", + }); + } + const updated = upsertWorkflow(cmd.id, { ...(cmd.updates ?? {}) }); + return asCommandOutcome({ + headline: `Workflow "${cmd.id}" updated.`, + data: updated, + }); + } + case "delete_workflow": { + if (!cmd.id) { + return asCommandOutcome({ + status: "error", + headline: "Workflow id required.", + details: "Name the workflow you want to delete.", + }); + } + const deleted = deleteWorkflow(cmd.id); + return asCommandOutcome({ + headline: deleted + ? `Workflow "${cmd.id}" deleted.` + : `Workflow "${cmd.id}" did not exist.`, + data: { deleted }, + }); + } + case "list_agents": { + const items = listAgents(); + return asCommandOutcome({ + headline: + items.length === 0 + ? "No agents saved yet." + : `Found ${items.length} agent${items.length === 1 ? "" : "s"}.`, + list: items, + data: { items }, + }); + } + case "get_agent": { + if (!cmd.id) { + return asCommandOutcome({ + status: "error", + headline: "Agent id required.", + details: "Provide the agent name you want to inspect.", + }); + } + const agent = getAgent(cmd.id); + if (!agent) { + return asCommandOutcome({ + status: "error", + headline: `Agent "${cmd.id}" was not found.`, + }); + } + return asCommandOutcome({ + headline: `Loaded agent "${cmd.id}".`, + details: agent.description || "No description set.", + data: agent, + }); + } + case "create_agent": { + if (!cmd.id) { + return asCommandOutcome({ + status: "error", + headline: "Agent id required.", + details: "Name the agent you want to create.", + }); + } + const created = upsertAgent(cmd.id, { ...(cmd.updates ?? {}) }); + return asCommandOutcome({ + headline: `Agent "${cmd.id}" saved.`, + data: created, + }); + } + case "update_agent": { + if (!cmd.id) { + return asCommandOutcome({ + status: "error", + headline: "Agent id required.", + details: "Name the agent you want to update.", + }); + } + const updated = upsertAgent(cmd.id, { ...(cmd.updates ?? {}) }); + return asCommandOutcome({ + headline: `Agent "${cmd.id}" updated.`, + data: updated, + }); + } + case "delete_agent": { + if (!cmd.id) { + return asCommandOutcome({ + status: "error", + headline: "Agent id required.", + details: "Name the agent you want to delete.", + }); + } + const deleted = deleteAgent(cmd.id); + return asCommandOutcome({ + headline: deleted + ? `Agent "${cmd.id}" deleted.` + : `Agent "${cmd.id}" did not exist.`, + data: { deleted }, + }); + } + case "list_mcp_servers": { + const config = readMcpConfig(); + const servers = config.mcpServers; + return asCommandOutcome({ + headline: + servers.length === 0 + ? "No MCP servers configured." + : `Found ${servers.length} MCP server${servers.length === 1 ? "" : "s"}.`, + list: servers.map((server) => `${server.name} → ${server.url}`), + data: servers, + }); + } + case "add_mcp_server": { + const serverConfig = cmd.server; + if (!serverConfig) { + return asCommandOutcome({ + status: "error", + headline: "Server details required.", + details: "Provide a name and url for the MCP server.", + }); + } + const config = readMcpConfig(); + const withoutExisting = config.mcpServers.filter( + (server) => server.name !== serverConfig.name + ); + const updated = { + mcpServers: [...withoutExisting, { ...serverConfig }], + }; + writeMcpConfig(updated); + return asCommandOutcome({ + headline: `MCP server "${serverConfig.name}" saved.`, + data: updated.mcpServers, + }); + } + case "remove_mcp_server": { + const name = cmd.name; + if (!name) { + return asCommandOutcome({ + status: "error", + headline: "Server name required.", + details: "Tell me which MCP server to remove.", + }); + } + const config = readMcpConfig(); + const remaining = config.mcpServers.filter( + (server) => server.name !== name + ); + const removed = remaining.length !== config.mcpServers.length; + writeMcpConfig({ mcpServers: remaining }); + return asCommandOutcome({ + headline: removed + ? `MCP server "${name}" removed.` + : `MCP server "${name}" was not registered.`, + data: remaining, + }); + } + case "run_workflow": { + if (!cmd.id) { + return asCommandOutcome({ + status: "error", + headline: "Workflow id required.", + details: "Name the workflow you want to run.", + }); + } + const workflow = getWorkflow(cmd.id); + if (!workflow) { + return asCommandOutcome({ + status: "error", + headline: `Workflow "${cmd.id}" was not found.`, + }); + } + if (workflow.steps.length === 0) { + return asCommandOutcome({ + headline: `Workflow "${cmd.id}" is empty.`, + details: "Add function or agent steps before running.", + data: workflow, + }); + } + return asCommandOutcome({ + headline: `Workflow "${cmd.id}" is ready.`, + details: + "Running from the copilot will be available once the runtime bridge is connected.", + list: workflow.steps.map((step, index) => `${index + 1}. ${step.type} → ${step.id}`), + data: workflow, + }); + } + case "unknown": + return asCommandOutcome({ + status: "error", + headline: "I need more detail before taking action.", + details: cmd.clarification ?? "Try rephrasing or be more specific about the workflow, agent, or MCP server.", + }); + } +}