structured ask human and permissions refactor

This commit is contained in:
Ramnique Singh 2025-11-18 19:27:11 +05:30
parent 28488d5fd1
commit 7d4484e7c0
5 changed files with 447 additions and 307 deletions

View file

@ -1,60 +1,68 @@
import { z } from "zod";
import { LlmStepStreamEvent } from "./llm-step-events.js";
import { Message } from "./message.js";
import { Message, ToolCallPart } from "./message.js";
import { Agent } from "./agent.js";
import z from "zod";
const BaseRunEvent = z.object({
ts: z.iso.datetime().optional(),
subflow: z.array(z.string()),
});
export const RunStartEvent = BaseRunEvent.extend({
export const StartEvent = BaseRunEvent.extend({
type: z.literal("start"),
runId: z.string(),
agent: z.string(),
agentName: z.string(),
});
export const RunStepStartEvent = BaseRunEvent.extend({
type: z.literal("step-start"),
export const SpawnSubFlowEvent = BaseRunEvent.extend({
type: z.literal("spawn-subflow"),
agentName: z.string(),
toolCallId: z.string(),
});
export const RunStreamEvent = BaseRunEvent.extend({
type: z.literal("stream-event"),
export const LlmStreamEvent = BaseRunEvent.extend({
type: z.literal("llm-stream-event"),
event: LlmStepStreamEvent,
});
export const RunMessageEvent = BaseRunEvent.extend({
export const MessageEvent = BaseRunEvent.extend({
type: z.literal("message"),
message: Message,
});
export const RunToolInvocationEvent = BaseRunEvent.extend({
export const ToolInvocationEvent = BaseRunEvent.extend({
type: z.literal("tool-invocation"),
toolName: z.string(),
input: z.string(),
});
export const RunToolResultEvent = BaseRunEvent.extend({
export const ToolResultEvent = BaseRunEvent.extend({
type: z.literal("tool-result"),
toolName: z.string(),
result: z.any(),
});
export const RunStepEndEvent = BaseRunEvent.extend({
type: z.literal("step-end"),
});
export const RunEndEvent = BaseRunEvent.extend({
type: z.literal("end"),
});
export const RunPauseEvent = BaseRunEvent.extend({
type: z.literal("pause-for-human-input"),
export const AskHumanRequestEvent = BaseRunEvent.extend({
type: z.literal("ask-human-request"),
toolCallId: z.string(),
question: z.string(),
query: z.string(),
});
export const RunResumeEvent = BaseRunEvent.extend({
type: z.literal("resume"),
export const AskHumanResponseEvent = BaseRunEvent.extend({
type: z.literal("ask-human-response"),
toolCallId: z.string(),
response: z.string(),
});
export const ToolPermissionRequestEvent = BaseRunEvent.extend({
type: z.literal("tool-permission-request"),
toolCall: ToolCallPart,
});
export const ToolPermissionResponseEvent = BaseRunEvent.extend({
type: z.literal("tool-permission-response"),
toolCallId: z.string(),
response: z.enum(["approve", "deny"]),
});
export const RunErrorEvent = BaseRunEvent.extend({
@ -63,15 +71,15 @@ export const RunErrorEvent = BaseRunEvent.extend({
});
export const RunEvent = z.union([
RunStartEvent,
RunStepStartEvent,
RunStreamEvent,
RunMessageEvent,
RunToolInvocationEvent,
RunToolResultEvent,
RunStepEndEvent,
RunEndEvent,
RunPauseEvent,
RunResumeEvent,
StartEvent,
SpawnSubFlowEvent,
LlmStreamEvent,
MessageEvent,
ToolInvocationEvent,
ToolResultEvent,
AskHumanRequestEvent,
AskHumanResponseEvent,
ToolPermissionRequestEvent,
ToolPermissionResponseEvent,
RunErrorEvent,
]);

View file

@ -3,7 +3,6 @@ import fs from "fs";
import path from "path";
import { ModelConfig, WorkDir } from "../config/config.js";
import { Agent, ToolAttachment } from "../entities/agent.js";
import { createInterface, Interface } from "node:readline/promises";
import { AssistantContentPart, AssistantMessage, Message, MessageList, ToolCallPart, ToolMessage, UserMessage } from "../entities/message.js";
import { runIdGenerator } from "./run-id-gen.js";
import { LanguageModel, stepCountIs, streamText, tool, Tool, ToolSet } from "ai";
@ -11,8 +10,9 @@ import { z } from "zod";
import { getProvider } from "./models.js";
import { LlmStepStreamEvent } from "../entities/llm-step-events.js";
import { execTool } from "./exec-tool.js";
import { RunEvent } from "../entities/run-events.js";
import { AskHumanRequestEvent, RunEvent, ToolPermissionRequestEvent, ToolPermissionResponseEvent } from "../entities/run-events.js";
import { BuiltinTools } from "./builtin-tools.js";
import { CopilotAgent } from "../assistant/agent.js";
export async function mapAgentTool(t: z.infer<typeof ToolAttachment>): Promise<Tool> {
switch (t.type) {
@ -75,7 +75,7 @@ export class RunLogger {
}
log(event: z.infer<typeof RunEvent>) {
if (event.type !== "stream-event") {
if (event.type !== "llm-stream-event") {
this.fileHandle.write(JSON.stringify(event) + "\n");
}
}
@ -161,6 +161,9 @@ function normaliseAskHumanToolCall(message: z.infer<typeof AssistantMessage>) {
}
export async function loadAgent(id: string): Promise<z.infer<typeof Agent>> {
if (id === "copilot") {
return CopilotAgent;
}
const agentPath = path.join(WorkDir, "agents", `${id}.json`);
const agent = fs.readFileSync(agentPath, "utf8");
return Agent.parse(JSON.parse(agent));
@ -230,14 +233,7 @@ export function convertFromMessages(messages: z.infer<typeof Message>[]): ModelM
return result;
}
export async function* streamAgentTurn(opts: {
agent: z.infer<typeof Agent>;
messages: z.infer<typeof MessageList>;
}): AsyncGenerator<z.infer<typeof RunEvent>, void, unknown> {
const { agent, messages } = opts;
// set up tools
async function buildTools(agent: z.infer<typeof Agent>): Promise<ToolSet> {
const tools: ToolSet = {};
for (const [name, tool] of Object.entries(agent.tools ?? {})) {
try {
@ -247,105 +243,340 @@ export async function* streamAgentTurn(opts: {
continue;
}
}
return tools;
}
// set up
export class AgentState {
logger: RunLogger | null = null;
runId: string | null = null;
agent: z.infer<typeof Agent> | null = null;
agentName: string;
messages: z.infer<typeof MessageList> = [];
lastAssistantMsg: z.infer<typeof AssistantMessage> | null = null;
subflowStates: Record<string, AgentState> = {};
toolCallIdMap: Record<string, z.infer<typeof ToolCallPart>> = {};
pendingToolCalls: Record<string, true> = {};
pendingToolPermissionRequests: Record<string, z.infer<typeof ToolPermissionRequestEvent>> = {};
pendingAskHumanRequests: Record<string, z.infer<typeof AskHumanRequestEvent>> = {};
allowedToolCallIds: Record<string, true> = {};
deniedToolCallIds: Record<string, true> = {};
constructor(agentName: string, runId?: string) {
this.agentName = agentName;
this.runId = runId || runIdGenerator.next();
this.logger = new RunLogger(this.runId);
if (!runId) {
this.logger.log({
type: "start",
runId: this.runId,
agentName: this.agentName,
subflow: [],
});
}
}
getPendingPermissions(): z.infer<typeof ToolPermissionRequestEvent>[] {
const response: z.infer<typeof ToolPermissionRequestEvent>[] = [];
for (const [id, subflowState] of Object.entries(this.subflowStates)) {
for (const perm of subflowState.getPendingPermissions()) {
response.push({
...perm,
subflow: [id, ...perm.subflow],
});
}
}
for (const perm of Object.values(this.pendingToolPermissionRequests)) {
response.push({
...perm,
subflow: [],
});
}
return response;
}
getPendingAskHumans(): z.infer<typeof AskHumanRequestEvent>[] {
const response: z.infer<typeof AskHumanRequestEvent>[] = [];
for (const [id, subflowState] of Object.entries(this.subflowStates)) {
for (const ask of subflowState.getPendingAskHumans()) {
response.push({
...ask,
subflow: [id, ...ask.subflow],
});
}
}
for (const ask of Object.values(this.pendingAskHumanRequests)) {
response.push({
...ask,
subflow: [],
});
}
return response;
}
finalResponse(): string {
if (!this.lastAssistantMsg) {
return '';
}
if (typeof this.lastAssistantMsg.content === "string") {
return this.lastAssistantMsg.content;
}
return this.lastAssistantMsg.content.reduce((acc, part) => {
if (part.type === "text") {
return acc + part.text;
}
return acc;
}, "");
}
ingest(event: z.infer<typeof RunEvent>) {
if (event.subflow.length > 0) {
const { subflow, ...rest } = event;
this.subflowStates[subflow[0]].ingest({
...rest,
subflow: subflow.slice(1),
});
return;
}
switch (event.type) {
case "message":
this.messages.push(event.message);
if (event.message.content instanceof Array) {
for (const part of event.message.content) {
if (part.type === "tool-call") {
this.toolCallIdMap[part.toolCallId] = part;
this.pendingToolCalls[part.toolCallId] = true;
}
}
}
if (event.message.role === "tool") {
const message = event.message as z.infer<typeof ToolMessage>;
delete this.pendingToolCalls[message.toolCallId];
}
if (event.message.role === "assistant") {
this.lastAssistantMsg = event.message;
}
break;
case "spawn-subflow":
this.subflowStates[event.toolCallId] = new AgentState(event.agentName);
break;
case "tool-permission-request":
this.pendingToolPermissionRequests[event.toolCall.toolCallId] = event;
break;
case "tool-permission-response":
switch (event.response) {
case "approve":
this.allowedToolCallIds[event.toolCallId] = true;
break;
case "deny":
this.deniedToolCallIds[event.toolCallId] = true;
break;
}
delete this.pendingToolPermissionRequests[event.toolCallId];
break;
case "ask-human-request":
this.pendingAskHumanRequests[event.toolCallId] = event;
break;
case "ask-human-response":
// console.error('im here', this.agentName, this.runId, event.subflow);
const ogEvent = this.pendingAskHumanRequests[event.toolCallId];
this.messages.push({
role: "tool",
content: JSON.stringify({
userResponse: event.response,
}),
toolCallId: ogEvent.toolCallId,
toolName: this.toolCallIdMap[ogEvent.toolCallId]!.toolName,
});
delete this.pendingAskHumanRequests[ogEvent.toolCallId];
break;
}
}
ingestAndLog(event: z.infer<typeof RunEvent>) {
this.ingest(event);
this.logger!.log(event);
}
*ingestAndLogAndYield(event: z.infer<typeof RunEvent>): Generator<z.infer<typeof RunEvent>, void, unknown> {
this.ingestAndLog(event);
yield event;
}
}
export async function* streamAgent(state: AgentState): AsyncGenerator<z.infer<typeof RunEvent>, void, unknown> {
// set up agent
const agent = await loadAgent(state.agentName);
// set up tools
const tools = await buildTools(agent);
// set up provider + model
const provider = getProvider(agent.provider);
const model = provider(agent.model || ModelConfig.defaults.model);
let loopCounter = 0;
// run one turn
while (true) {
// console.error(`loop counter: ${loopCounter++}`)
// if last response is from assistant and text, so exit
const lastMessage = state.messages[state.messages.length - 1];
if (lastMessage
&& lastMessage.role === "assistant"
&& (typeof lastMessage.content === "string"
|| !lastMessage.content.some(part => part.type === "tool-call")
)
) {
// console.error("Nothing to do, exiting (a.)")
return;
}
// execute any pending tool calls
for (const toolCallId of Object.keys(state.pendingToolCalls)) {
const toolCall = state.toolCallIdMap[toolCallId];
// if ask-human, skip
if (toolCall.toolName === "ask-human") {
continue;
}
// if tool has been denied, deny
if (state.deniedToolCallIds[toolCallId]) {
yield* state.ingestAndLogAndYield({
type: "message",
message: {
role: "tool",
content: "Unable to execute this tool: Permission was denied.",
toolCallId: toolCallId,
toolName: toolCall.toolName,
},
subflow: [],
});
continue;
}
// if permission is pending on this tool call, allow execution
if (state.pendingToolPermissionRequests[toolCallId]) {
continue;
}
// execute approved tool
yield* state.ingestAndLogAndYield({
type: "tool-invocation",
toolName: toolCall.toolName,
input: JSON.stringify(toolCall.arguments),
subflow: [],
});
let result: any = null;
if (agent.tools![toolCall.toolName].type === "agent") {
let subflowState = state.subflowStates[toolCallId];
for await (const event of streamAgent(subflowState)) {
yield* state.ingestAndLogAndYield({
...event,
subflow: [toolCallId, ...event.subflow],
});
}
if (!subflowState.getPendingAskHumans().length && !subflowState.getPendingPermissions().length) {
result = subflowState.finalResponse();
}
} else {
result = await execTool(agent.tools![toolCall.toolName], toolCall.arguments);
}
if (result) {
const resultMsg: z.infer<typeof ToolMessage> = {
role: "tool",
content: JSON.stringify(result),
toolCallId: toolCall.toolCallId,
toolName: toolCall.toolName,
};
yield* state.ingestAndLogAndYield({
type: "tool-result",
toolName: toolCall.toolName,
result: result,
subflow: [],
});
yield* state.ingestAndLogAndYield({
type: "message",
message: resultMsg,
subflow: [],
});
}
}
// if pending state, exit
if (state.getPendingAskHumans().length || state.getPendingPermissions().length) {
// console.error("pending asks or permissions, exiting (b.)")
return;
}
// if current message state isn't runnable, exit
if (state.messages.length === 0 || state.messages[state.messages.length - 1].role === "assistant") {
// console.error("current message state isn't runnable, exiting (c.)")
return;
}
// run one LLM turn.
// stream agent response and build message
const messageBuilder = new StreamStepMessageBuilder();
for await (const event of streamLlm(
model,
messages,
state.messages,
agent.instructions,
tools,
)) {
messageBuilder.ingest(event);
yield {
type: "stream-event",
yield* state.ingestAndLogAndYield({
type: "llm-stream-event",
event: event,
};
}
// build and emit final message from agent response
const msg = messageBuilder.get();
normaliseAskHumanToolCall(msg);
messages.push(msg);
yield {
type: "message",
message: msg,
};
// handle tool calls
const mappedToolCalls: z.infer<typeof MappedToolCall>[] = [];
let msgToolCallParts: z.infer<typeof ToolCallPart>[] = [];
if (msg.content instanceof Array) {
msgToolCallParts = msg.content.filter(part => part.type === "tool-call");
}
const hasToolCalls = msgToolCallParts.length > 0;
// validate and map tool calls
for (const part of msgToolCallParts) {
const agentTool = tools[part.toolName];
if (!agentTool) {
throw new Error(`Tool ${part.toolName} not found`);
}
mappedToolCalls.push({
toolCall: part,
agentTool: agent.tools![part.toolName],
subflow: [],
});
}
// first, handle tool calls other than ask-human
for (const call of mappedToolCalls) {
if (call.toolCall.toolName === "ask-human") {
continue;
// build and emit final message from agent response
const message = messageBuilder.get();
yield* state.ingestAndLogAndYield({
type: "message",
message,
subflow: [],
});
// if there were any ask-human calls, emit those events
if (message.content instanceof Array) {
for (const part of message.content) {
if (part.type === "tool-call") {
const underlyingTool = agent.tools![part.toolName];
if (underlyingTool.type === "builtin" && underlyingTool.name === "ask-human") {
yield* state.ingestAndLogAndYield({
type: "ask-human-request",
toolCallId: part.toolCallId,
query: part.arguments.question,
subflow: [],
});
}
if (underlyingTool.type === "builtin" && underlyingTool.name === "executeCommand") {
yield *state.ingestAndLogAndYield({
type: "tool-permission-request",
toolCall: part,
subflow: [],
});
}
if (underlyingTool.type === "agent" && underlyingTool.name) {
yield* state.ingestAndLogAndYield({
type: "spawn-subflow",
agentName: underlyingTool.name,
toolCallId: part.toolCallId,
subflow: [],
});
yield* state.ingestAndLogAndYield({
type: "message",
message: {
role: "user",
content: part.arguments.message,
},
subflow: [part.toolCallId],
});
}
}
}
const { agentTool, toolCall } = call;
yield {
type: "tool-invocation",
toolName: toolCall.toolName,
input: JSON.stringify(toolCall.arguments),
};
const result = await execTool(agentTool, toolCall.arguments);
const resultMsg: z.infer<typeof ToolMessage> = {
role: "tool",
content: JSON.stringify(result),
toolCallId: toolCall.toolCallId,
toolName: toolCall.toolName,
};
messages.push(resultMsg);
yield {
type: "tool-result",
toolName: toolCall.toolName,
result: result,
};
yield {
type: "message",
message: resultMsg,
};
}
// then, handle ask-human (only first one)
const askHumanCall = mappedToolCalls.filter(call => call.toolCall.toolName === "ask-human")[0];
if (askHumanCall) {
yield {
type: "pause-for-human-input",
toolCallId: askHumanCall.toolCall.toolCallId,
question: askHumanCall.toolCall.arguments.question as string,
};
return;
}
// if the agent response had tool calls, replay this agent
if (hasToolCalls) {
continue;
}
// otherwise, break
return;
}
}

View file

@ -6,10 +6,7 @@ import { StreamableHTTPClientTransport } from "@modelcontextprotocol/sdk/client/
import { SSEClientTransport } from "@modelcontextprotocol/sdk/client/sse.js";
import { Transport } from "@modelcontextprotocol/sdk/shared/transport.js";
import { Client } from "@modelcontextprotocol/sdk/client";
import { AssistantMessage } from "../entities/message.js";
import { BuiltinTools } from "./builtin-tools.js";
import { loadAgent, streamAgentTurn } from "./agent.js";
import { app } from "@/app.js";
async function execMcpTool(agentTool: z.infer<typeof ToolAttachment> & { type: "mcp" }, input: any): Promise<any> {
// load mcp configuration from the tool
@ -54,49 +51,10 @@ async function execMcpTool(agentTool: z.infer<typeof ToolAttachment> & { type: "
return result;
}
async function execAgentTool(agentTool: z.infer<typeof ToolAttachment> & { type: "agent" }, input: any): Promise<any> {
let lastMsg: z.infer<typeof AssistantMessage> | null = null;
const agent = await loadAgent(agentTool.name);
for await (const event of streamAgentTurn({
agent,
messages: [{
role: "user",
content: JSON.stringify(input),
}],
})) {
if (event.type === "message" && event.message.role === "assistant") {
lastMsg = event.message;
}
if (event.type === "pause-for-human-input") {
return `I need more information from a human in order to continue. I should use the ask-human tool to ask the user for a response on the question below. Once the user comes back with an answer, call this tool again with the answer embedded in the original input that you used to call this tool the first time.
Question: ${event.question}`;
}
if (event.type === "error") {
throw new Error(event.error);
}
}
if (!lastMsg) {
throw new Error("No message received from agent");
}
if (typeof lastMsg.content === "string") {
return lastMsg.content;
}
return lastMsg.content.reduce((acc, part) => {
if (part.type === "text") {
acc += part.text;
}
return acc;
}, "");
}
export async function execTool(agentTool: z.infer<typeof ToolAttachment>, input: any): Promise<any> {
switch (agentTool.type) {
case "mcp":
return execMcpTool(agentTool, input);
case "agent":
return execAgentTool(agentTool, input);
case "builtin":
const builtinTool = BuiltinTools[agentTool.name];
if (!builtinTool || !builtinTool.execute) {

View file

@ -28,14 +28,10 @@ export class StreamRenderer {
render(event: z.infer<typeof RunEvent>) {
switch (event.type) {
case "start": {
this.onStart(event.agent, event.runId);
this.onStart(event.agentName, event.runId);
break;
}
case "step-start": {
this.onStepStart();
break;
}
case "stream-event": {
case "llm-stream-event": {
this.renderLlmEvent(event.event);
break;
}
@ -51,22 +47,10 @@ export class StreamRenderer {
this.onStepToolResult(event.toolName, event.result);
break;
}
case "step-end": {
this.onStepEnd();
break;
}
case "end": {
this.onEnd();
break;
}
case "error": {
this.onError(event.error);
break;
}
case "pause-for-human-input": {
this.onPauseForHumanInput(event.toolCallId, event.question);
break;
}
}
}
@ -99,10 +83,9 @@ export class StreamRenderer {
}
}
private onStart(agent: string, runId: string) {
private onStart(agentName: string, runId: string) {
this.write("\n");
this.write(this.bold(this.cyan(`╭─ Agent: ${agent}`)));
this.write(this.dim(` │ run ${runId}`));
this.write(this.bold(`▶ Agent ${agentName} (run ${runId})`));
this.write("\n");
this.write(this.dim(`╰─────────────────────────────────────────────────\n`));
}