diff --git a/apps/rowboat/app/actions/actions.ts b/apps/rowboat/app/actions/actions.ts index e72efe79..2b5114e5 100644 --- a/apps/rowboat/app/actions/actions.ts +++ b/apps/rowboat/app/actions/actions.ts @@ -1,6 +1,4 @@ 'use server'; -import { AgenticAPIInitStreamResponse } from "../lib/types/agents_api_types"; -import { AgenticAPIChatRequest } from "../lib/types/agents_api_types"; import { WebpageCrawlResponse } from "../lib/types/tool_types"; import { webpagesCollection } from "../lib/mongodb"; import { z } from 'zod'; @@ -10,6 +8,8 @@ import { check_query_limit } from "../lib/rate_limiting"; import { QueryLimitError } from "../lib/client_utils"; import { projectAuthCheck } from "./project_actions"; import { authorizeUserAction } from "./billing_actions"; +import { Workflow, WorkflowTool } from "../lib/types/workflow_types"; +import { Message } from "@/app/lib/types/types"; const crawler = new FirecrawlApp({ apiKey: process.env.FIRECRAWL_API_KEY || '' }); @@ -57,14 +57,18 @@ export async function scrapeWebpage(url: string): Promise): Promise | { billingError: string }> { - await projectAuthCheck(request.projectId); - if (!await check_query_limit(request.projectId)) { +export async function getAssistantResponseStreamId( + workflow: z.infer, + projectTools: z.infer[], + messages: z.infer[], +): Promise<{ streamId: string } | { billingError: string }> { + await projectAuthCheck(workflow.projectId); + if (!await check_query_limit(workflow.projectId)) { throw new QueryLimitError(); } // Check billing authorization - const agentModels = request.agents.reduce((acc, agent) => { + const agentModels = workflow.agents.reduce((acc, agent) => { acc.push(agent.model); return acc; }, [] as string[]); @@ -78,6 +82,6 @@ export async function getAssistantResponseStreamId(request: z.infer { - let client: Client | undefined = undefined; - const baseUrl = new URL(serverUrl); - - // Try to connect using Streamable HTTP transport - try { - client = new Client({ - name: 'streamable-http-client', - version: '1.0.0' - }); - const transport = new StreamableHTTPClientTransport( - new URL(baseUrl) - ); - await client.connect(transport); - console.log(`[MCP] Connected using Streamable HTTP transport to ${serverName}`); - return client; - } catch (error) { - // If that fails with a 4xx error, try the older SSE transport - console.log(`[MCP] Streamable HTTP connection failed, falling back to SSE transport for ${serverName}`); - client = new Client({ - name: 'sse-client', - version: '1.0.0' - }); - const sseTransport = new SSEClientTransport(baseUrl); - await client.connect(sseTransport); - console.log(`[MCP] Connected using SSE transport to ${serverName}`); - return client; - } -} +import { getMcpClient } from "../lib/mcp"; export async function fetchMcpTools(projectId: string): Promise[]> { await projectAuthCheck(projectId); diff --git a/apps/rowboat/app/api/stream-response/[streamId]/route.ts b/apps/rowboat/app/api/stream-response/[streamId]/route.ts index 2f5e7f6d..3e765e66 100644 --- a/apps/rowboat/app/api/stream-response/[streamId]/route.ts +++ b/apps/rowboat/app/api/stream-response/[streamId]/route.ts @@ -1,8 +1,16 @@ import { getCustomerIdForProject, logUsage } from "@/app/lib/billing"; import { USE_BILLING } from "@/app/lib/feature_flags"; import { redisClient } from "@/app/lib/redis"; -import { AgenticAPIChatMessage, AgenticAPIChatRequest, convertFromAgenticAPIChatMessages } from "@/app/lib/types/agents_api_types"; -import { createParser, type EventSourceMessage } from 'eventsource-parser'; +import { Workflow, WorkflowTool } from "@/app/lib/types/workflow_types"; +import { streamResponse } from "@/app/lib/agents"; +import { Message } from "@/app/lib/types/types"; +import { z } from "zod"; + +const PayloadSchema = z.object({ + workflow: Workflow, + projectTools: z.array(WorkflowTool), + messages: z.array(Message), +}); export async function GET(request: Request, props: { params: Promise<{ streamId: string }> }) { const params = await props.params; @@ -13,85 +21,42 @@ export async function GET(request: Request, props: { params: Promise<{ streamId: } // parse the payload - const parsedPayload = AgenticAPIChatRequest.parse(JSON.parse(payload)); + const { workflow, projectTools, messages } = PayloadSchema.parse(JSON.parse(payload)); + console.log('payload', payload); // fetch billing customer id let billingCustomerId: string | null = null; if (USE_BILLING) { - billingCustomerId = await getCustomerIdForProject(parsedPayload.projectId); + billingCustomerId = await getCustomerIdForProject(workflow.projectId); } - // Fetch the upstream SSE stream. - const upstreamResponse = await fetch(`${process.env.AGENTS_API_URL}/chat_stream`, { - method: 'POST', - body: payload, - headers: { - 'Content-Type': 'application/json', - 'Authorization': `Bearer ${process.env.AGENTS_API_KEY || 'test'}`, - }, - cache: 'no-store', - }); - - // If the upstream request fails, return a 502 Bad Gateway. - if (!upstreamResponse.ok || !upstreamResponse.body) { - return new Response("Error connecting to upstream SSE stream", { status: 502 }); - } - - const reader = upstreamResponse.body.getReader(); const encoder = new TextEncoder(); + let messageCount = 0; const stream = new ReadableStream({ async start(controller) { - let messageCount = 0; - - function emitEvent(event: EventSourceMessage) { - // Re-emit the event in SSE format - let eventString = ''; - if (event.id) eventString += `id: ${event.id}\n`; - if (event.event) eventString += `event: ${event.event}\n`; - if (event.data) eventString += `data: ${event.data}\n`; - eventString += '\n'; - - controller.enqueue(encoder.encode(eventString)); - } - - const parser = createParser({ - onEvent(event: EventSourceMessage) { - if (event.event !== 'message') { - emitEvent(event); - return; - } - - // Parse message - const data = JSON.parse(event.data); - const msg = AgenticAPIChatMessage.parse(data); - const parsedMsg = convertFromAgenticAPIChatMessages([msg])[0]; - - // increment the message count if this is an assistant message - if (parsedMsg.role === 'assistant') { - messageCount++; - } - - // emit the event - emitEvent(event); - } - }); - try { - while (true) { - const { done, value } = await reader.read(); - if (done) break; - - // Feed the chunk to the parser - parser.feed(new TextDecoder().decode(value)); + // Iterate over the generator + for await (const event of streamResponse(workflow, projectTools, messages)) { + // Check if this is a message event (has role property) + if ('role' in event) { + if (event.role === 'assistant') { + messageCount++; + } + controller.enqueue(encoder.encode(`event: message\ndata: ${JSON.stringify(event)}\n\n`)); + } else { + controller.enqueue(encoder.encode(`event: done\ndata: ${JSON.stringify(event)}\n\n`)); + } } + controller.close(); + // Log billing usage if (USE_BILLING && billingCustomerId) { await logUsage(billingCustomerId, { type: "agent_messages", amount: messageCount, - }) + }); } } catch (error) { console.error('Error processing stream:', error); diff --git a/apps/rowboat/app/api/v1/[projectId]/chat/route.ts b/apps/rowboat/app/api/v1/[projectId]/chat/route.ts index 2b1c3029..d1b871bb 100644 --- a/apps/rowboat/app/api/v1/[projectId]/chat/route.ts +++ b/apps/rowboat/app/api/v1/[projectId]/chat/route.ts @@ -4,14 +4,13 @@ import { z } from "zod"; import { ObjectId } from "mongodb"; import { authCheck } from "../../utils"; import { ApiRequest, ApiResponse } from "../../../../lib/types/types"; -import { AgenticAPIChatRequest, convertFromAgenticApiToApiMessages, convertFromApiToAgenticApiMessages, convertWorkflowToAgenticAPI } from "../../../../lib/types/agents_api_types"; -import { getAgenticApiResponse } from "../../../../lib/utils"; import { check_query_limit } from "../../../../lib/rate_limiting"; import { PrefixLogger } from "../../../../lib/utils"; import { TestProfile } from "@/app/lib/types/testing_types"; import { fetchProjectMcpTools } from "@/app/lib/project_tools"; import { authorize, getCustomerIdForProject, logUsage } from "@/app/lib/billing"; import { USE_BILLING } from "@/app/lib/feature_flags"; +import { getResponse } from "@/app/lib/agents"; // get next turn / agent response export async function POST( @@ -52,7 +51,6 @@ export async function POST( return Response.json({ error: `Invalid request body: ${result.error.message}` }, { status: 400 }); } const reqMessages = result.data.messages; - const reqState = result.data.state; // fetch published workflow id const project = await projectsCollection.findOne({ @@ -112,34 +110,12 @@ export async function POST( } } - let currentState: unknown = reqState ?? { last_agent_name: workflow.agents[0].name }; - // get assistant response - const { agents, tools, prompts, startAgent } = convertWorkflowToAgenticAPI(workflow, projectTools); - const request: z.infer = { - projectId, - messages: convertFromApiToAgenticApiMessages(reqMessages), - state: currentState, - agents, - tools, - prompts, - startAgent, - testProfile: testProfile ?? undefined, - mcpServers: (project.mcpServers ?? []).map(server => ({ - name: server.name, - serverUrl: server.serverUrl ?? '', - isReady: server.isReady ?? false - })), - toolWebhookUrl: project.webhookUrl ?? '', - }; - - const { messages: agenticMessages, state } = await getAgenticApiResponse(request); - const newMessages = convertFromAgenticApiToApiMessages(agenticMessages); - const newState = state; + const { messages } = await getResponse(workflow, projectTools, reqMessages); // log billing usage if (USE_BILLING && billingCustomerId) { - const agentMessageCount = newMessages.filter(m => m.role === 'assistant').length; + const agentMessageCount = messages.filter(m => m.role === 'assistant').length; await logUsage(billingCustomerId, { type: 'agent_messages', amount: agentMessageCount, @@ -147,8 +123,7 @@ export async function POST( } const responseBody: z.infer = { - messages: newMessages, - state: newState, + messages, }; return Response.json(responseBody); }); diff --git a/apps/rowboat/app/api/widget/v1/chats/[chatId]/turn/route.ts b/apps/rowboat/app/api/widget/v1/chats/[chatId]/turn/route.ts index 620a9e07..47bb0acb 100644 --- a/apps/rowboat/app/api/widget/v1/chats/[chatId]/turn/route.ts +++ b/apps/rowboat/app/api/widget/v1/chats/[chatId]/turn/route.ts @@ -4,16 +4,108 @@ import { agentWorkflowsCollection, projectsCollection, chatsCollection, chatMess import { z } from "zod"; import { ObjectId, WithId } from "mongodb"; import { authCheck } from "../../../utils"; -import { convertFromAgenticAPIChatMessages } from "../../../../../../lib/types/agents_api_types"; -import { convertToAgenticAPIChatMessages } from "../../../../../../lib/types/agents_api_types"; -import { convertWorkflowToAgenticAPI } from "../../../../../../lib/types/agents_api_types"; -import { AgenticAPIChatRequest } from "../../../../../../lib/types/agents_api_types"; -import { getAgenticApiResponse } from "../../../../../../lib/utils"; import { check_query_limit } from "../../../../../../lib/rate_limiting"; import { PrefixLogger } from "../../../../../../lib/utils"; import { fetchProjectMcpTools } from "@/app/lib/project_tools"; import { authorize, getCustomerIdForProject, logUsage } from "@/app/lib/billing"; import { USE_BILLING } from "@/app/lib/feature_flags"; +import { getResponse } from "@/app/lib/agents"; +import { Message, AssistantMessage, AssistantMessageWithToolCalls, ToolMessage } from "@/app/lib/types/types"; + +function convert(messages: z.infer[]): z.infer[] { + const result: z.infer[] = []; + for (const m of messages) { + if (m.role === 'assistant') { + if ('tool_calls' in m) { + result.push({ + role: 'assistant', + content: null, + agentName: m.agenticSender ?? '', + toolCalls: m.tool_calls.map((t: any) => ({ + function: { + name: t.function.name, + arguments: t.function.arguments, + }, + type: 'function', + id: t.id, + })), + }); + } else { + result.push({ + role: 'assistant', + content: m.content, + agentName: m.agenticSender ?? '', + responseType: m.agenticResponseType, + }); + } + } else if (m.role === 'tool') { + result.push({ + role: 'tool', + content: m.content, + toolCallId: m.tool_call_id, + toolName: m.tool_name, + }); + } else if (m.role === 'system') { + result.push({ + role: 'system', + content: m.content, + }); + } else if (m.role === 'user') { + result.push({ + role: 'user', + content: m.content, + }); + } + } + return result; +} + +function convertBack(messages: z.infer[]): z.infer[] { + const result: z.infer[] = []; + for (const m of messages) { + if (m.role === 'assistant') { + if ('toolCalls' in m) { + result.push({ + version: 'v1', + chatId: '', + createdAt: new Date().toISOString(), + role: 'assistant', + agenticSender: m.agentName, + agenticResponseType: 'external', + tool_calls: m.toolCalls.map((t: any) => ({ + function: { + name: t.function.name, + arguments: t.function.arguments, + }, + type: 'function', + id: t.id, + })), + }); + } else { + result.push({ + version: 'v1', + chatId: '', + createdAt: new Date().toISOString(), + role: 'assistant', + content: m.content, + agenticSender: m.agentName, + agenticResponseType: m.responseType, + }); + } + } else if (m.role === 'tool') { + result.push({ + version: 'v1', + chatId: '', + createdAt: new Date().toISOString(), + role: 'tool', + content: m.content, + tool_call_id: m.toolCallId, + tool_name: m.toolName, + }); + } + } + return result; +} // get next turn / agent response export async function POST( @@ -119,47 +211,23 @@ export async function POST( } // get assistant response - const { agents, tools, prompts, startAgent } = convertWorkflowToAgenticAPI(workflow, projectTools); - const unsavedMessages: z.infer[] = [userMessage]; - let state: unknown = chat.agenticState ?? { last_agent_name: startAgent }; + const inMessages: z.infer[] = convert(messages); + inMessages.push(userMessage); - const request: z.infer = { - projectId: session.projectId, - messages: convertToAgenticAPIChatMessages([systemMessage, ...messages, ...unsavedMessages]), - state, - agents, - tools, - prompts, - startAgent, - mcpServers: (projectSettings.mcpServers ?? []).map(server => ({ - name: server.name, - serverUrl: server.serverUrl || '', - isReady: server.isReady - })), - toolWebhookUrl: projectSettings.webhookUrl ?? '', - testProfile: undefined, - }; - logger.log(`Sending agentic request`); - const response = await getAgenticApiResponse(request); - state = response.state; - if (response.messages.length === 0) { - throw new Error("No messages returned from assistant"); - } - const convertedMessages = convertFromAgenticAPIChatMessages(response.messages); - unsavedMessages.push(...convertedMessages.map(m => ({ - ...m, - version: 'v1' as const, - chatId, - createdAt: new Date().toISOString(), - }))); + const { messages: responseMessages } = await getResponse(workflow, projectTools, [systemMessage, ...inMessages]); + const convertedResponseMessages = convertBack(responseMessages); + const unsavedMessages = [ + userMessage, + ...convertedResponseMessages, + ]; logger.log(`Saving ${unsavedMessages.length} new messages and updating chat state`); await chatMessagesCollection.insertMany(unsavedMessages); - await chatsCollection.updateOne({ _id: new ObjectId(chatId) }, { $set: { agenticState: state } }); + await chatsCollection.updateOne({ _id: new ObjectId(chatId) }, { $set: { agenticState: chat.agenticState } }); // log billing usage if (USE_BILLING && billingCustomerId) { - const agentMessageCount = convertedMessages.filter(m => m.role === 'assistant').length; + const agentMessageCount = convertedResponseMessages.filter(m => m.role === 'assistant').length; await logUsage(billingCustomerId, { type: 'agent_messages', amount: agentMessageCount, diff --git a/apps/rowboat/app/lib/agent_instructions.ts b/apps/rowboat/app/lib/agent_instructions.ts new file mode 100644 index 00000000..e2d4b3f3 --- /dev/null +++ b/apps/rowboat/app/lib/agent_instructions.ts @@ -0,0 +1,109 @@ +/** + * Instructions for agents that use RAG (Retrieval Augmented Generation) + */ +export const RAG_INSTRUCTIONS = (ragToolName: string): string => ` +# Instructions about using the article retrieval tool +- Where relevant, use the articles tool: ${ragToolName} to fetch articles with knowledge relevant to the query and use its contents to respond to the user. +- Do not send a separate message first asking the user to wait while you look up information. Immediately fetch the articles and respond to the user with the answer to their query. +- Do not make up information. If the article's contents do not have the answer, give up control of the chat (or transfer to your parent agent, as per your transfer instructions). Do not say anything to the user. +`; + +/** + * Instructions for child agents that are aware of parent agents + * These instructions guide agents that can transfer control to parent agents + */ +export const TRANSFER_PARENT_AWARE_INSTRUCTIONS = (candidateParentsNameDescriptionTools: string): string => ` +# Instructions about using your parent agents +You have the following candidate parent agents that you can transfer the chat to, using the appropriate tool calls for the transfer: +${candidateParentsNameDescriptionTools}. + +## Notes: +- During runtime, you will be provided with a tool call for exactly one of these parent agents that you can use. Use that tool call to transfer the chat to the parent agent in case you are unable to handle the chat (e.g. if it is not in your scope of instructions). +- Transfer the chat to the appropriate agent, based on the chat history and / or the user's request. +- When you transfer the chat to another agent, you should not provide any response to the user. For example, do not say 'Transferring chat to X agent' or anything like that. Just invoke the tool call to transfer to the other agent. +- Do NOT ever mention the existence of other agents. For example, do not say 'Please check with X agent for details regarding processing times.' or anything like that. +- If any other agent transfers the chat to you without responding to the user, it means that they don't know how to help. Do not transfer the chat to back to the same agent in this case. In such cases, you should transfer to the escalation agent using the appropriate tool call. Never ask the user to contact support. +`; + +/** + * Instructions for child agents that give up control to parent agents + * These instructions guide agents that need to relinquish control to parent agents + */ +export const TRANSFER_GIVE_UP_CONTROL_INSTRUCTIONS = (candidateParentsNameDescriptionTools: string): string => ` +# Instructions about giving up chat control +If you are unable to handle the chat (e.g. if it is not in your scope of instructions), you should use the tool call provided to give up control of the chat. +${candidateParentsNameDescriptionTools} + +## Notes: +- When you give up control of the chat, you should not provide any response to the user. Just invoke the tool call to give up control. +`; + +/** + * Instructions for parent agents that need to transfer the chat to other specialized (children) agents + * These instructions guide parent agents in delegating tasks to specialized child agents + */ +export const TRANSFER_CHILDREN_INSTRUCTIONS = (otherAgentNameDescriptionsTools: string): string => ` +# Instructions about using other specialized agents +You have the following specialized agents that you can transfer the chat to, using the appropriate tool calls for the transfer: +${otherAgentNameDescriptionsTools} + +## Notes: +- Transfer the chat to the appropriate agent, based on the chat history and / or the user's request. +- When you transfer the chat to another agent, you should not provide any response to the user. For example, do not say 'Transferring chat to X agent' or anything like that. Just invoke the tool call to transfer to the other agent. +- Do NOT ever mention the existence of other agents. For example, do not say 'Please check with X agent for details regarding processing times.' or anything like that. +- If any other agent transfers the chat to you without responding to the user, it means that they don't know how to help. Do not transfer the chat to back to the same agent in this case. In such cases, you should transfer to the escalation agent using the appropriate tool call. Never ask the user to contact support. +`; + +/** + * Additional instruction for escalation agent when called due to an error + * These instructions are used when other agents are unable to handle the chat + */ +export const ERROR_ESCALATION_AGENT_INSTRUCTIONS = ` +# Context +The rest of the parts of the chatbot were unable to handle the chat. Hence, the chat has been escalated to you. In addition to your other instructions, tell the user that you are having trouble handling the chat - say "I'm having trouble helping with your request. Sorry about that.". Remember you are a part of the chatbot as well. +`; + +/** + * Universal system message formatting + * Template for system-wide context and instructions + */ +export const SYSTEM_MESSAGE = (systemMessage: string): string => ` +# Additional System-Wide Context or Instructions: +${systemMessage} +`; + +/** + * Instructions for non-repeat child transfer + * Critical rules for handling agent transfers and handoffs to prevent circular transfers + */ +export const CHILD_TRANSFER_RELATED_INSTRUCTIONS = ` +# Critical Rules for Agent Transfers and Handoffs + +- SEQUENTIAL TRANSFERS AND RESPONSES: + 1. BEFORE transferring to any agent: + - Plan your complete sequence of needed transfers + - Document which responses you need to collect + + 2. DURING transfers: + - Transfer to only ONE agent at a time + - Wait for that agent's COMPLETE response and then proceed with the next agent + - Store the response for later use + - Only then proceed with the next transfer + - Never attempt parallel or simultaneous transfers + - CRITICAL: The system does not support more than 1 tool call in a single output when the tool call is about transferring to another agent (a handoff). You must only put out 1 transfer related tool call in one output. + + 3. AFTER receiving a response: + - Do not transfer to another agent until you've processed the current response + - If you need to transfer to another agent, wait for your current processing to complete + - Never transfer back to an agent that has already responded + +- COMPLETION REQUIREMENTS: + - Never provide final response until ALL required agents have been consulted + - Never attempt to get multiple responses in parallel + - If a transfer is rejected due to multiple handoffs: + 1. Complete current response processing + 2. Then retry the transfer as next in sequence + 3. Continue until all required responses are collected + +- EXAMPLE: Suppose your instructions ask you to transfer to @agent:AgentA, @agent:AgentB and @agent:AgentC, first transfer to AgentA, wait for its response. Then transfer to AgentB, wait for its response. Then transfer to AgentC, wait for its response. Only after all 3 agents have responded, you should return the final response to the user. +`; \ No newline at end of file diff --git a/apps/rowboat/app/lib/agents.ts b/apps/rowboat/app/lib/agents.ts new file mode 100644 index 00000000..24b1621b --- /dev/null +++ b/apps/rowboat/app/lib/agents.ts @@ -0,0 +1,909 @@ +// External dependencies +import { Agent, AgentInputItem, run, tool, Tool } from "@openai/agents"; +import { RECOMMENDED_PROMPT_PREFIX } from "@openai/agents-core/extensions"; +import { aisdk } from "@openai/agents-extensions"; +import { createOpenAI } from "@ai-sdk/openai"; +import { CoreMessage, embed, generateText } from "ai"; +import { ObjectId } from "mongodb"; +import { z } from "zod"; + +// Internal dependencies +import { embeddingModel } from '../lib/embedding'; +import { getMcpClient } from "./mcp"; +import { dataSourceDocsCollection, dataSourcesCollection } from "./mongodb"; +import { qdrantClient } from '../lib/qdrant'; +import { EmbeddingRecord } from "./types/datasource_types"; +import { ConnectedEntity, sanitizeTextWithMentions, Workflow, WorkflowAgent, WorkflowPrompt, WorkflowTool } from "./types/workflow_types"; +import { CHILD_TRANSFER_RELATED_INSTRUCTIONS, RAG_INSTRUCTIONS } from "./agent_instructions"; +import { PrefixLogger } from "./utils"; +import { Message, AssistantMessage, AssistantMessageWithToolCalls, ToolMessage } from "./types/types"; + +const PROVIDER_API_KEY = process.env.PROVIDER_API_KEY || process.env.OPENAI_API_KEY || ''; +const PROVIDER_BASE_URL = process.env.PROVIDER_BASE_URL || undefined; +const MODEL = process.env.PROVIDER_DEFAULT_MODEL || 'gpt-4o'; + +const openai = createOpenAI({ + apiKey: PROVIDER_API_KEY, + baseURL: PROVIDER_BASE_URL, +}); + +export const ZUsage = z.object({ + tokens: z.object({ + total: z.number(), + prompt: z.number(), + completion: z.number(), + }), +}); + +const ZOutMessage = z.union([ + AssistantMessage, + AssistantMessageWithToolCalls, + ToolMessage, +]); + +// Helper to handle mock tool responses +async function invokeMockTool( + logger: PrefixLogger, + toolName: string, + args: string, + description: string, + mockInstructions: string +): Promise { + logger = logger.child(`invokeMockTool`); + logger.log(`toolName: ${toolName}`); + logger.log(`args: ${args}`); + logger.log(`description: ${description}`); + logger.log(`mockInstructions: ${mockInstructions}`); + + const messages: CoreMessage[] = [{ + role: "system" as const, + content: `You are simulating the execution of a tool called '${toolName}'. Here is the description of the tool: ${description}. Here are the instructions for the mock tool: ${mockInstructions}. Generate a realistic response as if the tool was actually executed with the given parameters.` + }, { + role: "user" as const, + content: `Generate a realistic response for the tool '${toolName}' with these parameters: ${args}. The response should be concise and focused on what the tool would actually return.` + }]; + + const { text } = await generateText({ + model: openai(MODEL), + messages, + }); + logger.log(`generated text: ${text}`); + + return text; +} + +// Helper to handle RAG tool calls +async function invokeRagTool( + logger: PrefixLogger, + projectId: string, + query: string, + sourceIds: string[], + returnType: 'chunks' | 'content', + k: number +): Promise<{ + title: string; + name: string; + content: string; + docId: string; + sourceId: string; +}[]> { + logger = logger.child(`invokeRagTool`); + logger.log(`projectId: ${projectId}`); + logger.log(`query: ${query}`); + logger.log(`sourceIds: ${sourceIds.join(', ')}`); + logger.log(`returnType: ${returnType}`); + logger.log(`k: ${k}`); + + // Create embedding for question + const { embedding } = await embed({ + model: embeddingModel, + value: query, + }); + + // Fetch all data sources for this project + const sources = await dataSourcesCollection.find({ + projectId: projectId, + active: true, + }).toArray(); + const validSourceIds = sources + .filter(s => sourceIds.includes(s._id.toString())) // id should be in sourceIds + .filter(s => s.active) // should be active + .map(s => s._id.toString()); + logger.log(`valid source ids: ${validSourceIds.join(', ')}`); + + // if no sources found, return empty response + if (validSourceIds.length === 0) { + logger.log(`no valid source ids found, returning empty response`); + return []; + } + + // Perform vector search + const qdrantResults = await qdrantClient.query("embeddings", { + query: embedding, + filter: { + must: [ + { key: "projectId", match: { value: projectId } }, + { key: "sourceId", match: { any: validSourceIds } }, + ], + }, + limit: k, + with_payload: true, + }); + logger.log(`found ${qdrantResults.points.length} results`); + + // if return type is chunks, return the chunks + let results = qdrantResults.points.map((point) => { + const { title, name, content, docId, sourceId } = point.payload as z.infer['payload']; + return { + title, + name, + content, + docId, + sourceId, + }; + }); + + if (returnType === 'chunks') { + logger.log(`returning chunks`); + return results; + } + + // otherwise, fetch the doc contents from mongodb + const docs = await dataSourceDocsCollection.find({ + _id: { $in: results.map(r => new ObjectId(r.docId)) }, + }).toArray(); + logger.log(`fetched docs: ${docs.length}`); + + // map the results to the docs + results = results.map(r => { + const doc = docs.find(d => d._id.toString() === r.docId); + return { + ...r, + content: doc?.content || '', + }; + }); + + return results; +} + +// Helper to handle MCP tool calls +async function invokeMcpTool( + logger: PrefixLogger, + projectId: string, + name: string, + input: any, + mcpServerURL: string, + mcpServerName: string +) { + logger = logger.child(`invokeMcpTool`); + logger.log(`projectId: ${projectId}`); + logger.log(`name: ${name}`); + logger.log(`input: ${JSON.stringify(input)}`); + logger.log(`mcpServerURL: ${mcpServerURL}`); + logger.log(`mcpServerName: ${mcpServerName}`); + + const client = await getMcpClient(mcpServerURL, mcpServerName || ''); + const result = await client.callTool({ + name, + arguments: input, + }); + logger.log(`mcp tool result: ${JSON.stringify(result)}`); + await client.close(); + return result; +} + +// Helper to create RAG tool +function createRagTool( + logger: PrefixLogger, + config: z.infer, + projectId: string +): Tool { + if (!config.ragDataSources?.length) { + throw new Error(`data sources not found for agent ${config.name}`); + } + + return tool({ + name: "rag_search", + description: config.description, + parameters: z.object({ + query: z.string().describe("The query to search for") + }), + async execute(input: { query: string }) { + const results = await invokeRagTool( + logger, + projectId, + input.query, + config.ragDataSources || [], + config.ragReturnType || 'chunks', + config.ragK || 3 + ); + return JSON.stringify({ + results, + }); + } + }); +} + +// Helper to create a mock tool +function createMockTool( + logger: PrefixLogger, + config: z.infer, +): Tool { + return tool({ + name: config.name, + description: config.description, + parameters: z.object({ + query: z.string().describe("The query to search for") + }), + async execute(input: { query: string }) { + try { + const result = await invokeMockTool( + logger, + config.name, + JSON.stringify(input), + config.description, + config.mockInstructions || '' + ); + return JSON.stringify({ + result, + }); + } catch (error) { + logger.log(`Error executing mock tool ${config.name}:`, error); + return JSON.stringify({ + error: `Mock tool execution failed: ${error}`, + }); + } + } + }); +} + +// Helper to create an mcp tool +function createMcpTool( + logger: PrefixLogger, + config: z.infer, + projectId: string +): Tool { + const { name, description, parameters, mcpServerName, mcpServerURL } = config; + + return tool({ + name, + description, + strict: false, + parameters: { + type: 'object', + properties: parameters.properties, + required: parameters.required || [], + additionalProperties: true, + }, + async execute(input: any) { + try { + const result = await invokeMcpTool(logger, projectId, name, input, mcpServerURL || '', mcpServerName || ''); + return JSON.stringify({ + result, + }); + } catch (error) { + logger.log(`Error executing mcp tool ${name}:`, error); + return JSON.stringify({ + error: `Tool execution failed: ${error}`, + }); + } + } + }); +} + +// Helper to create an agent +function createAgent( + logger: PrefixLogger, + config: z.infer, + tools: Record, + projectTools: z.infer[], + workflow: z.infer, + promptConfig: Record>, +): { agent: Agent, entities: z.infer[] } { + const agentLogger = logger.child(`createAgent: ${config.name}`); + + // Combine instructions and examples + let instructions = `${RECOMMENDED_PROMPT_PREFIX} + +## Your Name +${config.name} + +## Description +${config.description} + +## Instructions +${config.instructions} + +${config.examples ? ('# Examples\n' + config.examples) : ''} + +${'-'.repeat(100)} + +${CHILD_TRANSFER_RELATED_INSTRUCTIONS} +`; + + let { sanitized, entities } = sanitizeTextWithMentions(instructions, workflow, projectTools); + agentLogger.log(`instructions: ${JSON.stringify(sanitized)}`); + agentLogger.log(`mentions: ${JSON.stringify(entities)}`); + + // // add prompts to instructions + // for (const e of entities) { + // if (e.type === 'prompt') { + // const prompt = promptConfig[e.name]; + // if (prompt) { + // compiledInstructions = compiledInstructions + '\n\n# ' + prompt.name + '\n' + prompt.prompt; + // } + // } + // } + + const agentTools = entities.filter(e => e.type === 'tool').map(e => tools[e.name]).filter(Boolean) as Tool[]; + + // Add RAG tool if needed + if (config.ragDataSources?.length) { + const ragTool = createRagTool(logger, config, workflow.projectId); + agentTools.push(ragTool); + + // update instructions to include RAG instructions + sanitized = sanitized + '\n\n' + ('-'.repeat(100)) + '\n\n' + RAG_INSTRUCTIONS(ragTool.name); + agentLogger.log(`added rag instructions`); + } + + // Create the agent + const agent = new Agent({ + name: config.name, + instructions: sanitized, + tools: agentTools, + model: aisdk(openai(config.model)), + // model: config.model, + modelSettings: { + temperature: 0.0, + } + }); + agentLogger.log(`created agent`); + + return { + agent, + entities, + }; +} + +// Convert messages to agent input items +function convertMsgsInput(messages: z.infer[]): AgentInputItem[] { + const msgs: AgentInputItem[] = []; + + for (const msg of messages) { + if (msg.role === 'assistant' && msg.content) { + msgs.push({ + role: 'assistant', + content: [{ + type: 'output_text', + text: `Sender agent: ${msg.agentName}\nContent: ${msg.content}`, + }], + status: 'completed', + }); + } else if (msg.role === 'user') { + msgs.push({ + role: 'user', + content: msg.content, + }); + } else if (msg.role === 'system') { + msgs.push({ + role: 'system', + content: msg.content, + }); + } + } + + return msgs; +} + +// Helper to determine the next agent name based on control settings +function getNextAgentName( + logger: PrefixLogger, + stack: string[], + agentConfig: Record>, + workflow: z.infer, +): string { + logger = logger.child(`getNextAgentName`); + logger.log(`stack: ${stack.join(', ')}`); + + // get the last agent from the stack + // if stack is empty, use the start agent + const lastAgentName = stack.pop() || workflow.startAgent; + + return lastAgentName; + + // TODO: control-type logic is being ignored for now + // if control type is retain, return last agent + // const lastAgentName = stack.pop() || workflow.startAgent; + // const lastAgentConfig = agentConfig[lastAgentName]; + // if (!lastAgentConfig) { + // logger.log(`last agent ${lastAgentName} not found in agent config, returning start agent: ${workflow.startAgent}`); + // return workflow.startAgent; + // } + // switch (lastAgentConfig.controlType) { + // case 'retain': + // logger.log(`last agent ${lastAgentName} control type is retain, returning last agent: ${lastAgentName}`); + // return lastAgentName; + // case 'relinquish_to_parent': + // const parentAgentName = stack.pop() || workflow.startAgent; + // logger.log(`last agent ${lastAgentName} control type is relinquish_to_parent, returning most recent parent: ${parentAgentName}`); + // return parentAgentName; + // case 'relinquish_to_start': + // logger.log(`last agent ${lastAgentName} control type is relinquish_to_start, returning start agent: ${workflow.startAgent}`); + // return workflow.startAgent; + // } +} + +// Logs an event and then yields it +async function* emitEvent( + logger: PrefixLogger, + event: z.infer | z.infer, +): AsyncIterable | z.infer> { + logger.log(`-> emitting event: ${JSON.stringify(event)}`); + yield event; + return; +} + +// Emits an agent -> agent transfer event +function createTransferEvents( + fromAgent: string, + toAgent: string, +): [z.infer, z.infer] { + const toolCallId = crypto.randomUUID(); + const m1: z.infer = { + role: 'assistant', + content: null, + toolCalls: [{ + id: toolCallId, + type: 'function', + function: { + name: 'transfer_to_agent', + arguments: JSON.stringify({ assistant: toAgent }), + }, + }], + agentName: fromAgent, + }; + + const m2: z.infer = { + role: 'tool', + content: JSON.stringify({ assistant: toAgent }), + toolCallId: toolCallId, + toolName: 'transfer_to_agent', + }; + + return [m1, m2]; +} + +// Tracks agent to agent transfer counts +class AgentTransferCounter { + private calls: Record = {}; + + increment(fromAgent: string, toAgent: string): void { + const key = `${fromAgent}:${toAgent}`; + this.calls[key] = (this.calls[key] || 0) + 1; + } + + get(fromAgent: string, toAgent: string): number { + const key = `${fromAgent}:${toAgent}`; + return this.calls[key] || 0; + } +} + +class UsageTracker { + private usage: { + total: number; + prompt: number; + completion: number; + } = { total: 0, prompt: 0, completion: 0 }; + + increment(total: number, prompt: number, completion: number): void { + this.usage.total += total; + this.usage.prompt += prompt; + this.usage.completion += completion; + } + + get(): { total: number, prompt: number, completion: number } { + return this.usage; + } + + asEvent(): z.infer { + return { + tokens: this.usage, + }; + } +} + +function ensureSystemMessage(logger: PrefixLogger, messages: z.infer[]) { + logger = logger.child(`ensureSystemMessage`); + + // ensure that a system message is set + if (messages.length > 0 && messages[0]?.role !== 'system') { + messages.unshift({ + role: 'system', + content: 'You are a helpful assistant.', + }); + logger.log(`added system message: ${messages[0]?.content}`); + } + + // ensure that system message isn't blank + if (messages.length > 0 && messages[0]?.role === 'system' && !messages[0].content) { + messages[0].content = 'You are a helpful assistant.'; + logger.log(`updated system message: ${messages[0].content}`); + } +} + +function mapConfig(workflow: z.infer, projectTools: z.infer[]): { + agentConfig: Record>; + toolConfig: Record>; + promptConfig: Record>; +} { + const agentConfig: Record> = workflow.agents.reduce((acc, agent) => ({ + ...acc, + [agent.name]: agent + }), {}); + const toolConfig: Record> = [ + ...workflow.tools, + ...projectTools, + ].reduce((acc, tool) => ({ + ...acc, + [tool.name]: tool + }), {}); + const promptConfig: Record> = workflow.prompts.reduce((acc, prompt) => ({ + ...acc, + [prompt.name]: prompt + }), {}); + return { agentConfig, toolConfig, promptConfig }; +} + +async function* emitGreetingTurn(logger: PrefixLogger, workflow: z.infer): AsyncIterable | z.infer> { + // find the greeting prompt + const prompt = workflow.prompts.find(p => p.type === 'greeting')?.prompt || 'How can I help you today?'; + logger.log(`greeting turn: ${prompt}`); + + // emit greeting turn + yield* emitEvent(logger, { + role: 'assistant', + content: prompt, + agentName: workflow.startAgent, + responseType: 'external', + }); + + // emit final usage information + yield* emitEvent(logger, new UsageTracker().asEvent()); +} + +function createAgentCallStack(messages: z.infer[]): string[] { + const stack: string[] = []; + for (const msg of messages) { + if (msg.role === 'assistant' && msg.agentName) { + // skip duplicate entries + if (stack.length > 0 && stack[stack.length - 1] === msg.agentName) { + continue; + } + // add to stack + stack.push(msg.agentName); + } + } + return stack; +} + +function createTools(logger: PrefixLogger, workflow: z.infer, toolConfig: Record>): Record { + const tools: Record = {}; + for (const [toolName, config] of Object.entries(toolConfig)) { + if (config.isMcp) { + tools[toolName] = createMcpTool(logger, config, workflow.projectId); + logger.log(`created mcp tool: ${toolName}`); + } else if (config.mockTool) { + tools[toolName] = createMockTool(logger, config); + logger.log(`created mock tool: ${toolName}`); + } else { + logger.log(`unsupported tool type: ${toolName}`); + } + } + return tools; +} + +function createAgents( + logger: PrefixLogger, + workflow: z.infer, + agentConfig: Record>, + tools: Record, + projectTools: z.infer[], + promptConfig: Record>, +): { agents: Record, mentions: Record[]> } { + const agents: Record = {}; + const mentions: Record[]> = {}; + + // create agents + for (const [agentName, config] of Object.entries(agentConfig)) { + const { agent, entities } = createAgent( + logger, + config, + tools, + projectTools, + workflow, + promptConfig, + ); + agents[agentName] = agent; + mentions[agentName] = entities; + logger.log(`created agent: ${agentName}`); + } + + // set handoffs + for (const [agentName, agent] of Object.entries(agents)) { + const connectedAgentNames = (mentions[agentName] || []).filter(e => e.type === 'agent').map(e => e.name); + agent.handoffs = connectedAgentNames.map(e => agents[e]).filter(Boolean) as Agent[]; + logger.log(`set handoffs for ${agentName}: ${connectedAgentNames.join(',')}`); + } + + return { agents, mentions }; +} + +// Main function to stream an agentic response +// using OpenAI Agents SDK +export async function* streamResponse( + workflow: z.infer, + projectTools: z.infer[], + messages: z.infer[], +): AsyncIterable | z.infer> { + // set up logging + let logger = new PrefixLogger(`agent-loop`) + logger.log('projectId', workflow.projectId); + logger.log('workflow', workflow.name); + + // ensure valid system message + ensureSystemMessage(logger, messages); + + // if there is only a system message, emit greeting turn and return + if (messages.length === 1 && messages[0]?.role === 'system') { + yield* emitGreetingTurn(logger, workflow); + return; + } + + // create map of agent, tool and prompt configs + const { agentConfig, toolConfig, promptConfig } = mapConfig(workflow, projectTools); + + // create agent call stack from input messages + const stack = createAgentCallStack(messages); + + // create tools + const tools = createTools(logger, workflow, toolConfig); + + // create agents + const { agents } = createAgents(logger, workflow, agentConfig, tools, projectTools, promptConfig); + + // track agent to agent calls + const transferCounter = new AgentTransferCounter(); + + // track usage + const usageTracker = new UsageTracker(); + + // get next agent name + let agentName = getNextAgentName(logger, stack, agentConfig, workflow); + + // set up initial state for loop + logger.log('@@ starting agent turn @@'); + let iter = 0; + const turnMsgs: z.infer[] = [...messages]; + + // loop indefinitely + turnLoop: while (true) { + // increment loop counter + iter++; + + // set up logging + const loopLogger = logger.child(`iter-${iter}`); + + // log agent info + loopLogger.log(`agent name: ${agentName}`); + loopLogger.log(`stack: ${stack.join(', ')}`); + if (!agents[agentName]) { + throw new Error(`agent not found in agent config!`); + } + const agent: Agent = agents[agentName]!; + + // convert messages to agents sdk compatible input + const inputs = convertMsgsInput(turnMsgs); + + // run the agent + const result = await run(agent, inputs, { + stream: true, + }); + + // handle streaming events + for await (const event of result) { + const eventLogger = loopLogger.child(event.type); + // eventLogger.log(`----------> event: ${JSON.stringify(event)}`); + + switch (event.type) { + case 'raw_model_stream_event': + if (event.data.type === 'response_done') { + for (const output of event.data.response.output) { + // handle tool call invocation + // except for transfer_to_* tool calls + if (output.type === 'function_call' && !output.name.startsWith('transfer_to')) { + const m: z.infer = { + role: 'assistant', + content: null, + toolCalls: [{ + id: output.callId, + type: 'function', + function: { + name: output.name, + arguments: output.arguments, + }, + }], + agentName: agentName, + }; + + // add message to turn + turnMsgs.push(m); + + // emit event + yield* emitEvent(eventLogger, m); + } + } + + // update usage information + usageTracker.increment( + event.data.response.usage.totalTokens, + event.data.response.usage.inputTokens, + event.data.response.usage.outputTokens + ); + eventLogger.log(`updated usage information: ${JSON.stringify(usageTracker.get())}`); + } + break; + case 'run_item_stream_event': + // handle handoff event + if (event.name === 'handoff_occurred' && event.item.type === 'handoff_output_item') { + // skip if its the same agent + if (agentName === event.item.targetAgent.name) { + eventLogger.log(`skipping handoff to same agent: ${agentName}`); + break; + } + + // emit transfer tool call invocation + const [transferStart, transferComplete] = createTransferEvents(agentName, event.item.targetAgent.name); + + // add messages to turn + turnMsgs.push(transferStart); + turnMsgs.push(transferComplete); + + // emit events + yield* emitEvent(eventLogger, transferStart); + yield* emitEvent(eventLogger, transferComplete); + + // update transfer counter + transferCounter.increment(agentName, event.item.targetAgent.name); + + // add current agent to stack + stack.push(agentName); + + // set this as the new agent name + agentName = event.item.targetAgent.name; + loopLogger.log(`switched to agent: ${agentName}`); + } + + // handle tool call result + if (event.item.type === 'tool_call_output_item' && + event.item.rawItem.type === 'function_call_result' && + event.item.rawItem.status === 'completed' && + event.item.rawItem.output.type === 'text') { + const m: z.infer = { + role: 'tool', + content: event.item.rawItem.output.text, + toolCallId: event.item.rawItem.callId, + toolName: event.item.rawItem.name, + }; + + // add message to turn + turnMsgs.push(m); + + // emit event + yield* emitEvent(eventLogger, m); + } + + // handle model response message output + if (event.item.type === 'message_output_item' && + event.item.rawItem.type === 'message' && + event.item.rawItem.status === 'completed') { + // check response visibility + const isInternal = agentConfig[agentName]?.outputVisibility === 'internal'; + for (const content of event.item.rawItem.content) { + if (content.type === 'output_text') { + // create message + const msg: z.infer = { + role: 'assistant', + content: content.text, + agentName: agentName, + responseType: isInternal ? 'internal' : 'external', + }; + + // add message to turn + turnMsgs.push(msg); + + // emit event + yield* emitEvent(eventLogger, msg); + } + } + + // if this is an internal agent, switch to previous agent + if (isInternal) { + const current = agentName; + agentName = getNextAgentName(logger, stack, agentConfig, workflow); + + // emit transfer tool call invocation + const [transferStart, transferComplete] = createTransferEvents(current, agentName); + + // add messages to turn + turnMsgs.push(transferStart); + turnMsgs.push(transferComplete); + + // emit events + yield* emitEvent(eventLogger, transferStart); + yield* emitEvent(eventLogger, transferComplete); + + // update transfer counter + transferCounter.increment(current, agentName); + + // add current agent to stack + stack.push(current); + + // set this as the new agent name + loopLogger.log(`switched to agent (reason: internal agent put out a message): ${agentName}`); + + // run the turn from the previous agent + continue turnLoop; + } + break; + } + break; + default: + break; + } + } + + // if the last message was a text response by a user-facing agent, complete the turn + // loopLogger.log(`iter end, turnMsgs: ${JSON.stringify(turnMsgs)}, agentName: ${agentName}`); + const lastMessage = turnMsgs[turnMsgs.length - 1]; + if (agentConfig[agentName]?.outputVisibility === 'user_facing' && + lastMessage?.role === 'assistant' && + lastMessage?.content !== null && + lastMessage?.agentName === agentName + ) { + loopLogger.log(`last message was by a user_facing agent, breaking out of parent loop`); + break turnLoop; + } + } + + // emit usage information + yield* emitEvent(logger, usageTracker.asEvent()); +} + +// this is a sync version of streamResponse +export async function getResponse( + workflow: z.infer, + projectTools: z.infer[], + messages: z.infer[], +): Promise<{ + messages: z.infer[], + usage: z.infer, +}> { + const out: z.infer[] = []; + let usage: z.infer = { + tokens: { + total: 0, + prompt: 0, + completion: 0, + }, + }; + for await (const event of streamResponse(workflow, projectTools, messages)) { + if ('role' in event && event.role === 'assistant') { + out.push(event); + } + if ('tokens' in event) { + usage = event; + } + } + return { messages: out, usage }; +} \ No newline at end of file diff --git a/apps/rowboat/app/lib/mcp.ts b/apps/rowboat/app/lib/mcp.ts new file mode 100644 index 00000000..5e943a41 --- /dev/null +++ b/apps/rowboat/app/lib/mcp.ts @@ -0,0 +1,32 @@ +import { Client } from "@modelcontextprotocol/sdk/client/index.js"; +import { SSEClientTransport } from "@modelcontextprotocol/sdk/client/sse.js"; +import { StreamableHTTPClientTransport } from "@modelcontextprotocol/sdk/client/streamableHttp.js"; + +// Helper to get MCP client +export async function getMcpClient(serverUrl: string, serverName: string): Promise { + let client: Client | undefined = undefined; + const baseUrl = new URL(serverUrl); + + // Try to connect using Streamable HTTP transport + try { + client = new Client({ + name: 'streamable-http-client', + version: '1.0.0' + }); + const transport = new StreamableHTTPClientTransport(baseUrl); + await client.connect(transport); + console.log(`[MCP] Connected using Streamable HTTP transport to ${serverName}`); + return client; + } catch (error) { + // If that fails with a 4xx error, try the older SSE transport + console.log(`[MCP] Streamable HTTP connection failed, falling back to SSE transport for ${serverName}`); + client = new Client({ + name: 'sse-client', + version: '1.0.0' + }); + const sseTransport = new SSEClientTransport(baseUrl); + await client.connect(sseTransport); + console.log(`[MCP] Connected using SSE transport to ${serverName}`); + return client; + } +} diff --git a/apps/rowboat/app/lib/types/agents_api_types.ts b/apps/rowboat/app/lib/types/agents_api_types.ts deleted file mode 100644 index bf8f8263..00000000 --- a/apps/rowboat/app/lib/types/agents_api_types.ts +++ /dev/null @@ -1,334 +0,0 @@ -import { z } from "zod"; -import { sanitizeTextWithMentions, Workflow, WorkflowAgent, WorkflowPrompt, WorkflowTool } from "./workflow_types"; -import { apiV1 } from "rowboat-shared"; -import { ApiMessage } from "./types"; -import { TestProfile } from "./testing_types"; -import { MCPServer, MCPServerMinimal } from "./types"; -import { mergeProjectTools } from "./project_types"; - -export const AgenticAPIChatMessage = z.object({ - role: z.union([z.literal('user'), z.literal('assistant'), z.literal('tool'), z.literal('system')]), - content: z.string().nullable(), - tool_calls: z.array(z.object({ - id: z.string(), - function: z.object({ - name: z.string(), - arguments: z.string(), - }), - type: z.literal('function'), - })).nullable(), - tool_call_id: z.string().nullable(), - tool_name: z.string().nullable(), - sender: z.string().nullable(), - response_type: z.union([ - z.literal('internal'), - z.literal('external'), - ]).optional(), -}); - -export const AgenticAPIAgent = WorkflowAgent - .omit({ - disabled: true, - examples: true, - locked: true, - toggleAble: true, - global: true, - }) - .extend({ - tools: z.array(z.string()), - prompts: z.array(z.string()), - connectedAgents: z.array(z.string()), - }); - -export const AgenticAPIPrompt = WorkflowPrompt; - -export const AgenticAPITool = WorkflowTool - .omit({ - autoSubmitMockedResponse: true, - }) - -export const AgenticAPIChatRequest = z.object({ - projectId: z.string(), - messages: z.array(AgenticAPIChatMessage), - state: z.unknown(), - agents: z.array(AgenticAPIAgent), - tools: z.array(AgenticAPITool), - prompts: z.array(WorkflowPrompt), - startAgent: z.string(), - testProfile: TestProfile.optional(), - mcpServers: z.array(MCPServerMinimal), - toolWebhookUrl: z.string(), -}); - -export const AgenticAPIChatResponse = z.object({ - messages: z.array(AgenticAPIChatMessage), - state: z.unknown(), -}); - -export const AgenticAPIInitStreamResponse = z.object({ - streamId: z.string(), -}); - -export function convertWorkflowToAgenticAPI(workflow: z.infer, projectTools: z.infer[]): { - agents: z.infer[]; - tools: z.infer[]; - prompts: z.infer[]; - startAgent: string; -} { - const mergedTools = mergeProjectTools(workflow.tools, projectTools); - - return { - agents: workflow.agents - .filter(agent => !agent.disabled) - .map(agent => { - const compiledInstructions = agent.instructions + - (agent.examples ? '\n\n# Examples\n' + agent.examples : ''); - const { sanitized, entities } = sanitizeTextWithMentions(compiledInstructions, workflow, mergedTools); - - const agenticAgent: z.infer = { - name: agent.name, - type: agent.type, - description: agent.description, - instructions: sanitized, - model: agent.model, - controlType: agent.controlType, - ragDataSources: agent.ragDataSources, - ragK: agent.ragK, - ragReturnType: agent.ragReturnType, - outputVisibility: agent.outputVisibility, - tools: entities.filter(e => e.type == 'tool').map(e => e.name), - prompts: entities.filter(e => e.type == 'prompt').map(e => e.name), - connectedAgents: entities.filter(e => e.type === 'agent').map(e => e.name), - maxCallsPerParentAgent: agent.maxCallsPerParentAgent, - }; - return agenticAgent; - }), - tools: mergedTools, - prompts: workflow.prompts - .map(p => { - const { sanitized } = sanitizeTextWithMentions(p.prompt, workflow, mergedTools); - return { - ...p, - prompt: sanitized, - }; - }), - startAgent: workflow.startAgent, - }; -} - -export function convertToAgenticAPIChatMessages(messages: z.infer[]): z.infer[] { - const converted: z.infer[] = []; - - for (const m of messages) { - const baseMessage: z.infer = { - content: null, - role: m.role, - sender: null, - tool_calls: null, - tool_call_id: null, - tool_name: null, - }; - - switch (m.role) { - case 'system': - converted.push({ - ...baseMessage, - content: m.content, - }); - break; - case 'user': - converted.push({ - ...baseMessage, - content: m.content, - }); - break; - case 'assistant': - if ('tool_calls' in m) { - converted.push({ - ...baseMessage, - tool_calls: m.tool_calls, - sender: m.agenticSender ?? null, - response_type: m.agenticResponseType, - }); - } else { - converted.push({ - ...baseMessage, - content: m.content, - sender: m.agenticSender ?? null, - response_type: m.agenticResponseType, - }); - } - break; - case 'tool': - converted.push({ - ...baseMessage, - content: m.content, - tool_call_id: m.tool_call_id, - tool_name: m.tool_name, - }); - break; - default: - continue; - } - } - - return converted; -} - -export function convertFromAgenticAPIChatMessages(messages: z.infer[]): z.infer[] { - const converted: z.infer[] = []; - - for (const m of messages) { - const baseMessage = { - version: 'v1' as const, - chatId: '', - createdAt: new Date().toISOString(), - }; - switch (m.role) { - case 'user': - converted.push({ - ...baseMessage, - role: 'user', - content: m.content ?? '', - }); - break; - case 'assistant': - if (m.tool_calls) { - // TODO: handle tool calls - converted.push({ - ...baseMessage, - role: 'assistant', - tool_calls: m.tool_calls, - agenticSender: m.sender ?? undefined, - agenticResponseType: m.response_type ?? 'internal', - }); - } else { - converted.push({ - ...baseMessage, - role: 'assistant', - content: m.content ?? '', - agenticSender: m.sender ?? undefined, - agenticResponseType: m.response_type ?? 'internal', - }); - } - break; - case 'tool': - converted.push({ - ...baseMessage, - role: 'tool', - content: m.content ?? '', - tool_call_id: m.tool_call_id ?? '', - tool_name: m.tool_name ?? '', - }); - break; - } - } - return converted; -} - -export function convertFromApiToAgenticApiMessages(messages: z.infer[]): z.infer[] { - return messages.map(m => { - switch (m.role) { - case 'system': - return { - role: 'system', - content: m.content, - tool_calls: null, - tool_call_id: null, - tool_name: null, - sender: null, - }; - case 'user': - return { - role: 'user', - content: m.content, - tool_calls: null, - tool_call_id: null, - tool_name: null, - sender: null, - }; - - case 'assistant': - if ('tool_calls' in m) { - return { - role: 'assistant', - content: m.content ?? null, - tool_calls: m.tool_calls, - tool_call_id: null, - tool_name: null, - sender: m.agenticSender ?? null, - response_type: m.agenticResponseType ?? 'external', - }; - } else { - return { - role: 'assistant', - content: m.content ?? null, - sender: m.agenticSender ?? null, - response_type: m.agenticResponseType ?? 'external', - tool_call_id: null, - tool_calls: null, - tool_name: null, - }; - } - case 'tool': - return { - role: 'tool', - content: m.content ?? null, - tool_calls: null, - tool_call_id: m.tool_call_id ?? null, - tool_name: m.tool_name ?? null, - sender: null, - }; - default: - return { - role: "user", - content: "foo", - tool_calls: null, - tool_call_id: null, - tool_name: null, - sender: null, - }; - } - }); -} - -export function convertFromAgenticApiToApiMessages(messages: z.infer[]): z.infer[] { - const converted: z.infer[] = []; - - for (const m of messages) { - switch (m.role) { - case 'user': - converted.push({ - role: 'user', - content: m.content ?? '', - }); - break; - case 'assistant': - if (m.tool_calls) { - converted.push({ - role: 'assistant', - tool_calls: m.tool_calls, - agenticSender: m.sender ?? undefined, - agenticResponseType: m.response_type ?? 'internal', - }); - } else { - converted.push({ - role: 'assistant', - content: m.content ?? '', - agenticSender: m.sender ?? undefined, - agenticResponseType: m.response_type ?? 'internal', - }); - } - break; - case 'tool': - converted.push({ - role: 'tool', - content: m.content ?? '', - tool_call_id: m.tool_call_id ?? '', - tool_name: m.tool_name ?? '', - }); - break; - } - } - return converted; -} diff --git a/apps/rowboat/app/lib/types/copilot_types.ts b/apps/rowboat/app/lib/types/copilot_types.ts index 22e98f42..33cc09df 100644 --- a/apps/rowboat/app/lib/types/copilot_types.ts +++ b/apps/rowboat/app/lib/types/copilot_types.ts @@ -1,9 +1,7 @@ import { z } from "zod"; import { Workflow } from "./workflow_types"; -import { apiV1 } from "rowboat-shared" -import { AgenticAPIChatMessage } from "./agents_api_types"; -import { convertToAgenticAPIChatMessages } from "./agents_api_types"; import { DataSource } from "./datasource_types"; +import { Message } from "./types"; // Create a filtered version of DataSource for copilot export const CopilotDataSource = z.object({ @@ -70,7 +68,7 @@ export const CopilotApiMessage = z.object({ export const CopilotChatContext = z.union([ z.object({ type: z.literal('chat'), - messages: z.array(apiV1.ChatMessage), + messages: z.array(Message), }), z.object({ type: z.literal('agent'), @@ -88,7 +86,7 @@ export const CopilotChatContext = z.union([ export const CopilotApiChatContext = z.union([ z.object({ type: z.literal('chat'), - messages: z.array(AgenticAPIChatMessage), + messages: z.array(Message), }), z.object({ type: z.literal('agent'), @@ -124,7 +122,7 @@ export function convertToCopilotApiChatContext(context: z.infer = T & { _id: string }; -export function convertToCoreMessages(messages: z.infer[]): CoreMessage[] { - // convert to core messages - const coreMessages: CoreMessage[] = []; - for (const m of messages) { - switch (m.role) { - case 'system': - coreMessages.push({ - role: 'system', - content: m.content, - }); - break; - case 'user': - coreMessages.push({ - role: 'user', - content: m.content, - }); - break; - case 'assistant': - if ('tool_calls' in m) { - const toolCallParts: ToolCallPart[] = m.tool_calls.map((toolCall) => ({ - type: 'tool-call', - toolCallId: toolCall.id, - toolName: toolCall.function.name, - args: JSON.parse(toolCall.function.arguments), - })); - if (m.content) { - coreMessages.push({ - role: 'assistant', - content: [ - { - type: 'text', - text: m.content, - }, - ...toolCallParts, - ] - }); - } else { - coreMessages.push({ - role: 'assistant', - content: toolCallParts, - }); - } - } else { - coreMessages.push({ - role: 'assistant', - content: m.content, - }); - } - break; - case 'tool': - coreMessages.push({ - role: 'tool', - content: [ - { - type: 'tool-result', - toolCallId: m.tool_call_id, - toolName: m.tool_name, - result: JSON.parse(m.content), - } - ] - }); - break; - } - } - return coreMessages; -} - -export const ApiMessage = z.union([ - apiV1.SystemMessage, - apiV1.UserMessage, - apiV1.AssistantMessage, - apiV1.AssistantMessageWithToolCalls, - apiV1.ToolMessage, -]); - export const ApiRequest = z.object({ - messages: z.array(ApiMessage), + messages: z.array(Message), state: z.unknown(), workflowId: z.string().nullable().optional(), testProfileId: z.string().nullable().optional(), }); export const ApiResponse = z.object({ - messages: z.array(ApiMessage), + messages: z.array(Message), state: z.unknown(), }); diff --git a/apps/rowboat/app/lib/types/workflow_types.ts b/apps/rowboat/app/lib/types/workflow_types.ts index 7c596291..bfb0d4dd 100644 --- a/apps/rowboat/app/lib/types/workflow_types.ts +++ b/apps/rowboat/app/lib/types/workflow_types.ts @@ -2,10 +2,10 @@ import { z } from "zod"; export const WorkflowAgent = z.object({ name: z.string(), order: z.number().int().optional(), - type: z.union([ - z.literal('conversation'), - z.literal('post_process'), - z.literal('escalation'), + type: z.enum([ + 'conversation', + 'post_process', + 'escalation', ]), description: z.string(), disabled: z.boolean().default(false).optional(), @@ -16,18 +16,22 @@ export const WorkflowAgent = z.object({ toggleAble: z.boolean().default(true).describe('Whether this agent can be enabled or disabled').optional(), global: z.boolean().default(false).describe('Whether this agent is a global agent, in which case it cannot be connected to other agents').optional(), ragDataSources: z.array(z.string()).optional(), - ragReturnType: z.union([z.literal('chunks'), z.literal('content')]).default('chunks'), + ragReturnType: z.enum(['chunks', 'content']).default('chunks'), ragK: z.number().default(3), - outputVisibility: z.union([z.literal('user_facing'), z.literal('internal')]).default('user_facing').optional(), - controlType: z.union([z.literal('retain'), z.literal('relinquish_to_parent'), z.literal('relinquish_to_start')]).default('retain').describe('Whether this agent retains control after a turn, relinquishes to the parent agent, or relinquishes to the start agent'), + outputVisibility: z.enum(['user_facing', 'internal']).default('user_facing').optional(), + controlType: z.enum([ + 'retain', + 'relinquish_to_parent', + 'relinquish_to_start', + ]).default('retain').describe('Whether this agent retains control after a turn, relinquishes to the parent agent, or relinquishes to the start agent'), maxCallsPerParentAgent: z.number().default(3).describe('Maximum number of times this agent can be called by a parent agent in a single turn').optional(), }); export const WorkflowPrompt = z.object({ name: z.string(), - type: z.union([ - z.literal('base_prompt'), - z.literal('style_prompt'), - z.literal('greeting'), + type: z.enum([ + 'base_prompt', + 'style_prompt', + 'greeting', ]), prompt: z.string(), }); @@ -39,29 +43,9 @@ export const WorkflowTool = z.object({ mockInstructions: z.string().optional(), parameters: z.object({ type: z.literal('object'), - properties: z.record(z.object({ - type: z.string(), - description: z.string(), - enum: z.array(z.any()).optional(), - default: z.any().optional(), - minimum: z.number().optional(), - maximum: z.number().optional(), - items: z.any().optional(), // For array types - format: z.string().optional(), - pattern: z.string().optional(), - minLength: z.number().optional(), - maxLength: z.number().optional(), - minItems: z.number().optional(), - maxItems: z.number().optional(), - uniqueItems: z.boolean().optional(), - multipleOf: z.number().optional(), - examples: z.array(z.any()).optional(), - })), - required: z.array(z.string()).default([]), - }).default({ - type: 'object', - properties: {}, - required: [], + properties: z.record(z.string(), z.any()), + required: z.array(z.string()).optional(), + additionalProperties: z.boolean().optional(), }), isMcp: z.boolean().default(false).optional(), isLibrary: z.boolean().default(false).optional(), @@ -90,7 +74,7 @@ export const WorkflowTemplate = Workflow }); export const ConnectedEntity = z.object({ - type: z.union([z.literal('tool'), z.literal('prompt'), z.literal('agent')]), + type: z.enum(['tool', 'prompt', 'agent']), name: z.string(), }); diff --git a/apps/rowboat/app/lib/utils.ts b/apps/rowboat/app/lib/utils.ts index 0b834e11..13293a10 100644 --- a/apps/rowboat/app/lib/utils.ts +++ b/apps/rowboat/app/lib/utils.ts @@ -1,46 +1,23 @@ -import { AgenticAPIChatResponse, AgenticAPIChatRequest, AgenticAPIChatMessage, AgenticAPIInitStreamResponse } from "./types/agents_api_types"; import { z } from "zod"; import { generateObject } from "ai"; -import { ApiMessage } from "./types/types"; import { openai } from "@ai-sdk/openai"; import { redisClient } from "./redis"; - -export async function getAgenticApiResponse( - request: z.infer, -): Promise<{ - messages: z.infer[], - state: unknown, - rawAPIResponse: unknown, -}> { - // call agentic api - console.log(`sending agentic api request`, JSON.stringify(request)); - const response = await fetch(process.env.AGENTS_API_URL + '/chat', { - method: 'POST', - body: JSON.stringify(request), - headers: { - 'Content-Type': 'application/json', - 'Authorization': `Bearer ${process.env.AGENTS_API_KEY || 'test'}`, - }, - }); - if (!response.ok) { - console.error('Failed to call agentic api', response); - throw new Error(`Failed to call agentic api: ${response.statusText}`); - } - const responseJson = await response.json(); - console.log(`received agentic api response`, JSON.stringify(responseJson)); - const result: z.infer = responseJson; - return { - messages: result.messages, - state: result.state, - rawAPIResponse: result, - }; -} +import { Workflow, WorkflowTool } from "./types/workflow_types"; +import { Message } from "./types/types"; export async function getAgenticResponseStreamId( - request: z.infer, -): Promise> { + workflow: z.infer, + projectTools: z.infer[], + messages: z.infer[], +): Promise<{ + streamId: string, +}> { // serialize the request - const payload = JSON.stringify(request); + const payload = JSON.stringify({ + workflow, + projectTools, + messages, + }); // create a uuid for the stream const streamId = crypto.randomUUID(); @@ -80,7 +57,7 @@ export class PrefixLogger { } } -export async function mockToolResponse(toolId: string, messages: z.infer[], mockInstructions: string): Promise { +export async function mockToolResponse(toolId: string, messages: z.infer[], mockInstructions: string): Promise { const prompt = `Given below is a chat between a user and a customer support assistant. The assistant has requested a tool call with ID {{toolID}}. diff --git a/apps/rowboat/app/projects/[projectId]/entities/agent_config.tsx b/apps/rowboat/app/projects/[projectId]/entities/agent_config.tsx index 0439d14e..e5b2d3a4 100644 --- a/apps/rowboat/app/projects/[projectId]/entities/agent_config.tsx +++ b/apps/rowboat/app/projects/[projectId]/entities/agent_config.tsx @@ -1,6 +1,5 @@ "use client"; import { WithStringId } from "../../../lib/types/types"; -import { AgenticAPITool } from "../../../lib/types/agents_api_types"; import { WorkflowPrompt, WorkflowAgent, Workflow, WorkflowTool } from "../../../lib/types/workflow_types"; import { DataSource } from "../../../lib/types/datasource_types"; import { z } from "zod"; @@ -56,7 +55,7 @@ export function AgentConfig({ agent: z.infer, usedAgentNames: Set, agents: z.infer[], - tools: z.infer[], + tools: z.infer[], projectTools: z.infer[], prompts: z.infer[], dataSources: WithStringId>[], diff --git a/apps/rowboat/app/projects/[projectId]/playground/app.tsx b/apps/rowboat/app/projects/[projectId]/playground/app.tsx index 443e6c84..7ec7b39d 100644 --- a/apps/rowboat/app/projects/[projectId]/playground/app.tsx +++ b/apps/rowboat/app/projects/[projectId]/playground/app.tsx @@ -1,13 +1,12 @@ 'use client'; import { useState, useCallback, useRef } from "react"; import { z } from "zod"; -import { MCPServer, PlaygroundChat } from "@/app/lib/types/types"; +import { MCPServer, Message, PlaygroundChat } from "@/app/lib/types/types"; import { Workflow, WorkflowTool } from "@/app/lib/types/workflow_types"; import { Chat } from "./components/chat"; import { Panel } from "@/components/common/panel-common"; import { Button } from "@/components/ui/button"; import { Tooltip } from "@heroui/react"; -import { apiV1 } from "rowboat-shared"; import { TestProfile } from "@/app/lib/types/testing_types"; import { WithStringId } from "@/app/lib/types/types"; import { ProfileSelector } from "@/app/projects/[projectId]/test/[[...slug]]/components/selectors/profile-selector"; @@ -31,7 +30,7 @@ export function App({ hidden?: boolean; projectId: string; workflow: z.infer; - messageSubscriber?: (messages: z.infer[]) => void; + messageSubscriber?: (messages: z.infer[]) => void; mcpServerUrls: Array>; toolWebhookUrl: string; isInitialState?: boolean; diff --git a/apps/rowboat/app/projects/[projectId]/playground/components/chat.tsx b/apps/rowboat/app/projects/[projectId]/playground/components/chat.tsx index 44ec9d06..b933540c 100644 --- a/apps/rowboat/app/projects/[projectId]/playground/components/chat.tsx +++ b/apps/rowboat/app/projects/[projectId]/playground/components/chat.tsx @@ -3,14 +3,10 @@ import { useEffect, useRef, useState, useCallback } from "react"; import { getAssistantResponseStreamId } from "@/app/actions/actions"; import { Messages } from "./messages"; import z from "zod"; -import { MCPServer, PlaygroundChat } from "@/app/lib/types/types"; -import { AgenticAPIChatMessage, convertFromAgenticAPIChatMessages, convertToAgenticAPIChatMessages } from "@/app/lib/types/agents_api_types"; -import { convertWorkflowToAgenticAPI } from "@/app/lib/types/agents_api_types"; -import { AgenticAPIChatRequest } from "@/app/lib/types/agents_api_types"; +import { MCPServer, Message, PlaygroundChat, ToolMessage } from "@/app/lib/types/types"; import { Workflow, WorkflowTool } from "@/app/lib/types/workflow_types"; import { ComposeBoxPlayground } from "@/components/common/compose-box-playground"; import { Button } from "@heroui/react"; -import { apiV1 } from "rowboat-shared"; import { TestProfile } from "@/app/lib/types/testing_types"; import { WithStringId } from "@/app/lib/types/types"; import { ProfileContextBox } from "./profile-context-box"; @@ -35,7 +31,7 @@ export function Chat({ chat: z.infer; projectId: string; workflow: z.infer; - messageSubscriber?: (messages: z.infer[]) => void; + messageSubscriber?: (messages: z.infer[]) => void; testProfile?: z.infer | null; onTestProfileChange: (profile: WithStringId> | null) => void; systemMessage: string; @@ -46,16 +42,13 @@ export function Chat({ showDebugMessages?: boolean; projectTools: z.infer[]; }) { - const [messages, setMessages] = useState[]>(chat.messages); + const [messages, setMessages] = useState[]>(chat.messages); const [loadingAssistantResponse, setLoadingAssistantResponse] = useState(false); - const [agenticState, setAgenticState] = useState(chat.agenticState || { - last_agent_name: workflow.startAgent, - }); const [fetchResponseError, setFetchResponseError] = useState(null); const [billingError, setBillingError] = useState(null); const [lastAgenticRequest, setLastAgenticRequest] = useState(null); const [lastAgenticResponse, setLastAgenticResponse] = useState(null); - const [optimisticMessages, setOptimisticMessages] = useState[]>(chat.messages); + const [optimisticMessages, setOptimisticMessages] = useState[]>(chat.messages); const [isLastInteracted, setIsLastInteracted] = useState(false); const getCopyContent = useCallback(() => { @@ -80,20 +73,17 @@ export function Chat({ }, [messages]); // collect published tool call results - const toolCallResults: Record> = {}; + const toolCallResults: Record> = {}; optimisticMessages .filter((message) => message.role == 'tool') .forEach((message) => { - toolCallResults[message.tool_call_id] = message; + toolCallResults[message.toolCallId] = message; }); function handleUserMessage(prompt: string) { - const updatedMessages: z.infer[] = [...messages, { + const updatedMessages: z.infer[] = [...messages, { role: 'user', content: prompt, - version: 'v1', - chatId: '', - createdAt: new Date().toISOString(), }]; setMessages(updatedMessages); setFetchResponseError(null); @@ -103,9 +93,6 @@ export function Chat({ // reset state when workflow changes useEffect(() => { setMessages([]); - setAgenticState({ - last_agent_name: workflow.startAgent, - }); }, [workflow]); // publish messages to subscriber @@ -119,7 +106,7 @@ export function Chat({ useEffect(() => { let ignore = false; let eventSource: EventSource | null = null; - let msgs: z.infer[] = []; + let msgs: z.infer[] = []; async function process() { setLoadingAssistantResponse(true); @@ -129,36 +116,19 @@ export function Chat({ setLastAgenticRequest(null); setLastAgenticResponse(null); - const { agents, tools, prompts, startAgent } = convertWorkflowToAgenticAPI(workflow, projectTools); - const request: z.infer = { - projectId, - messages: convertToAgenticAPIChatMessages([{ - role: 'system', - content: systemMessage || '', - version: 'v1' as const, - chatId: '', - createdAt: new Date().toISOString(), - }, ...messages]), - state: agenticState, - agents, - tools, - prompts, - startAgent, - mcpServers: mcpServerUrls.map(server => ({ - name: server.name, - serverUrl: server.serverUrl || '', - isReady: server.isReady - })), - toolWebhookUrl: toolWebhookUrl, - testProfile: testProfile ?? undefined, - }; - - // Store the full request object - setLastAgenticRequest(request); - let streamId: string | null = null; try { - const response = await getAssistantResponseStreamId(request); + const response = await getAssistantResponseStreamId( + workflow, + projectTools, + [ + { + role: 'system', + content: systemMessage || '', + }, + ...messages, + ], + ); if (ignore) { return; } @@ -190,8 +160,7 @@ export function Chat({ try { const data = JSON.parse(event.data); - const msg = AgenticAPIChatMessage.parse(data); - const parsedMsg = convertFromAgenticAPIChatMessages([msg])[0]; + const parsedMsg = Message.parse(data); msgs.push(parsedMsg); setOptimisticMessages(prev => [...prev, parsedMsg]); } catch (err) { @@ -207,7 +176,6 @@ export function Chat({ } const parsed = JSON.parse(event.data); - setAgenticState(parsed.state); // Combine state and collected messages in the response setLastAgenticResponse({ @@ -267,7 +235,6 @@ export function Chat({ }, [ messages, projectId, - agenticState, workflow, systemMessage, mcpServerUrls, diff --git a/apps/rowboat/app/projects/[projectId]/playground/components/messages.tsx b/apps/rowboat/app/projects/[projectId]/playground/components/messages.tsx index f6a1fa98..37540ae5 100644 --- a/apps/rowboat/app/projects/[projectId]/playground/components/messages.tsx +++ b/apps/rowboat/app/projects/[projectId]/playground/components/messages.tsx @@ -5,10 +5,10 @@ import z from "zod"; import { Workflow } from "@/app/lib/types/workflow_types"; import { WorkflowTool } from "@/app/lib/types/workflow_types"; import MarkdownContent from "@/app/lib/components/markdown-content"; -import { apiV1 } from "rowboat-shared"; import { MessageSquareIcon, EllipsisIcon, CircleCheckIcon, ChevronRightIcon, ChevronDownIcon, ChevronUpIcon, XIcon, PlusIcon, CodeIcon, CheckCircleIcon, FileTextIcon } from "lucide-react"; import { TestProfile } from "@/app/lib/types/testing_types"; import { ProfileContextBox } from "./profile-context-box"; +import { Message, ToolMessage, AssistantMessageWithToolCalls } from "@/app/lib/types/types"; function UserMessage({ content }: { content: string }) { return ( @@ -140,10 +140,10 @@ function ToolCalls({ systemMessage, delta }: { - toolCalls: z.infer['tool_calls']; - results: Record>; + toolCalls: z.infer['toolCalls']; + results: Record>; projectId: string; - messages: z.infer[]; + messages: z.infer[]; sender: string | null | undefined; workflow: z.infer; testProfile: z.infer | null; @@ -171,8 +171,8 @@ function ToolCall({ workflow, delta }: { - toolCall: z.infer['tool_calls'][number]; - result: z.infer | undefined; + toolCall: z.infer['toolCalls'][number]; + result: z.infer | undefined; sender: string | null | undefined; workflow: z.infer; delta: number; @@ -206,7 +206,7 @@ function TransferToAgentToolCall({ sender, delta }: { - result: z.infer | undefined; + result: z.infer | undefined; sender: string | null | undefined; delta: number; }) { @@ -238,8 +238,8 @@ function ClientToolCall({ workflow, delta }: { - toolCall: z.infer['tool_calls'][number]; - result: z.infer | undefined; + toolCall: z.infer['toolCalls'][number]; + result: z.infer | undefined; sender: string | null | undefined; workflow: z.infer; delta: number; @@ -350,8 +350,8 @@ export function Messages({ showDebugMessages = true, }: { projectId: string; - messages: z.infer[]; - toolCallResults: Record>; + messages: z.infer[]; + toolCallResults: Record>; loadingAssistantResponse: boolean; workflow: z.infer; testProfile: z.infer | null; @@ -368,28 +368,28 @@ export function Messages({ messagesEndRef.current?.scrollIntoView({ behavior: "smooth" }); }, [messages, loadingAssistantResponse]); - const renderMessage = (message: z.infer, index: number) => { - const isConsecutive = index > 0 && messages[index - 1].role === message.role; - + const renderMessage = (message: z.infer, index: number) => { if (message.role === 'assistant') { - let latency = new Date(message.createdAt).getTime() - lastUserMessageTimestamp; - if (!userMessageSeen) { - latency = 0; - } + // TODO: add latency support + // let latency = new Date(message.createdAt).getTime() - lastUserMessageTimestamp; + // if (!userMessageSeen) { + // latency = 0; + // } + let latency = 0; // First check for tool calls - if ('tool_calls' in message && message.tool_calls) { + if ('toolCalls' in message) { // Skip tool calls if debug mode is off if (!showDebugMessages) { return null; } return ( @@ -418,14 +418,15 @@ export function Messages({ return ( ); } - if (message.role === 'user' && typeof message.content === 'string') { - lastUserMessageTimestamp = new Date(message.createdAt).getTime(); + if (message.role === 'user') { + // TODO: add latency support + // lastUserMessageTimestamp = new Date(message.createdAt).getTime(); userMessageSeen = true; return ; } @@ -433,12 +434,12 @@ export function Messages({ return null; }; - const isAgentTransition = (message: z.infer) => { - return message.role === 'assistant' && 'tool_calls' in message && Array.isArray(message.tool_calls) && message.tool_calls.some(tc => tc.function.name.startsWith('transfer_to_')); + const isAgentTransition = (message: z.infer) => { + return message.role === 'assistant' && 'toolCalls' in message && Array.isArray(message.toolCalls) && message.toolCalls.some(tc => tc.function.name.startsWith('transfer_to_')); }; - const isAssistantMessage = (message: z.infer) => { - return message.role === 'assistant' && (!('tool_calls' in message) || !Array.isArray(message.tool_calls) || !message.tool_calls.some(tc => tc.function.name.startsWith('transfer_to_'))); + const isAssistantMessage = (message: z.infer) => { + return message.role === 'assistant' && (!('toolCalls' in message) || !Array.isArray(message.toolCalls) || !message.toolCalls.some(tc => tc.function.name.startsWith('transfer_to_'))); }; if (showSystemMessage) { diff --git a/apps/rowboat/app/projects/[projectId]/tools/components/ToolsConfig.tsx b/apps/rowboat/app/projects/[projectId]/tools/components/ToolsConfig.tsx index 2fd34a47..ade90081 100644 --- a/apps/rowboat/app/projects/[projectId]/tools/components/ToolsConfig.tsx +++ b/apps/rowboat/app/projects/[projectId]/tools/components/ToolsConfig.tsx @@ -4,7 +4,6 @@ import { useState } from 'react'; import { Tabs, Tab } from '@/components/ui/tabs'; import { HostedServers } from './HostedServers'; import { CustomServers } from './CustomServers'; -import { WebhookConfig } from './WebhookConfig'; import type { Key } from 'react'; export function ToolsConfig() { @@ -40,11 +39,6 @@ export function ToolsConfig() { - -
- -
-
); diff --git a/apps/rowboat/app/projects/[projectId]/workflow/entity_list.tsx b/apps/rowboat/app/projects/[projectId]/workflow/entity_list.tsx index ff06d5d4..0dda552b 100644 --- a/apps/rowboat/app/projects/[projectId]/workflow/entity_list.tsx +++ b/apps/rowboat/app/projects/[projectId]/workflow/entity_list.tsx @@ -1,5 +1,4 @@ import { z } from "zod"; -import { AgenticAPITool } from "../../../lib/types/agents_api_types"; import { WorkflowPrompt, WorkflowAgent, WorkflowTool } from "../../../lib/types/workflow_types"; import { Dropdown, DropdownItem, DropdownTrigger, DropdownMenu } from "@heroui/react"; import { useRef, useEffect, useState } from "react"; diff --git a/apps/rowboat/app/projects/[projectId]/workflow/workflow_editor.tsx b/apps/rowboat/app/projects/[projectId]/workflow/workflow_editor.tsx index 2a76b0a6..992cade2 100644 --- a/apps/rowboat/app/projects/[projectId]/workflow/workflow_editor.tsx +++ b/apps/rowboat/app/projects/[projectId]/workflow/workflow_editor.tsx @@ -1,6 +1,6 @@ "use client"; import React, { useReducer, Reducer, useState, useCallback, useEffect, useRef, createContext, useContext } from "react"; -import { MCPServer, WithStringId } from "../../../lib/types/types"; +import { MCPServer, Message, WithStringId } from "../../../lib/types/types"; import { Workflow, WorkflowTool, WorkflowPrompt, WorkflowAgent } from "../../../lib/types/workflow_types"; import { DataSource } from "../../../lib/types/datasource_types"; import { produce, applyPatches, enablePatches, produceWithPatches, Patch } from 'immer'; @@ -20,7 +20,6 @@ import { ResizablePanelGroup, } from "@/components/ui/resizable" import { Copilot } from "../copilot/app"; -import { apiV1 } from "rowboat-shared"; import { publishWorkflow, renameWorkflow, saveWorkflow } from "../../../actions/workflow_actions"; import { PublishedBadge } from "./published_badge"; import { BackIcon, HamburgerIcon, WorkflowIcon } from "../../../lib/components/icons"; @@ -597,8 +596,8 @@ export function WorkflowEditor({ lastUpdatedAt: workflow.lastUpdatedAt, } }); - const [chatMessages, setChatMessages] = useState[]>([]); - const updateChatMessages = useCallback((messages: z.infer[]) => { + const [chatMessages, setChatMessages] = useState[]>([]); + const updateChatMessages = useCallback((messages: z.infer[]) => { setChatMessages(messages); }, []); const saveQueue = useRef[]>([]); diff --git a/apps/rowboat/package-lock.json b/apps/rowboat/package-lock.json index bd11e466..7d60f3b9 100644 --- a/apps/rowboat/package-lock.json +++ b/apps/rowboat/package-lock.json @@ -25,6 +25,8 @@ "@langchain/textsplitters": "^0.1.0", "@mendable/firecrawl-js": "^1.0.3", "@modelcontextprotocol/sdk": "^1.12.1", + "@openai/agents": "^0.0.9", + "@openai/agents-extensions": "^0.0.9", "@primer/react": "^37.27.0", "@qdrant/js-client-rest": "^1.13.0", "ai": "^4.3.13", @@ -4666,6 +4668,155 @@ "integrity": "sha512-iFrvar5SOMtKFOSjYvs4z9UlLqDdJbMx0mgISLcPedv+g0ac5sgeETLGtipHCVIae6HJPclNEH5aCyD1RZaEHw==", "license": "BSD-3-Clause" }, + "node_modules/@openai/agents": { + "version": "0.0.9", + "resolved": "https://registry.npmjs.org/@openai/agents/-/agents-0.0.9.tgz", + "integrity": "sha512-JAZLqovH4MLGflwm7BZKjqW7ejhfGAoS7eyXkgfXh4oh/DtWbMr5hmK/Ha0jeqb1+xKY5ULbmikKDTPmHflW7g==", + "license": "MIT", + "dependencies": { + "@openai/agents-core": "0.0.9", + "@openai/agents-openai": "0.0.9", + "@openai/agents-realtime": "0.0.9", + "debug": "^4.4.0", + "openai": "^5.0.1" + } + }, + "node_modules/@openai/agents-core": { + "version": "0.0.9", + "resolved": "https://registry.npmjs.org/@openai/agents-core/-/agents-core-0.0.9.tgz", + "integrity": "sha512-n7vftCMIBNNbhHSs6SAr0mU99YDD8CH2wRoGZ016nqgl1X9SZsfdQyZvpMMypWrGQ+bqke+jUtXCVnOhKXISFQ==", + "license": "MIT", + "dependencies": { + "@openai/zod": "npm:zod@^3.25.40", + "debug": "^4.4.0", + "openai": "^5.0.1" + }, + "optionalDependencies": { + "@modelcontextprotocol/sdk": "^1.12.0" + }, + "peerDependencies": { + "zod": "^3.25.40" + }, + "peerDependenciesMeta": { + "zod": { + "optional": true + } + } + }, + "node_modules/@openai/agents-core/node_modules/openai": { + "version": "5.7.0", + "resolved": "https://registry.npmjs.org/openai/-/openai-5.7.0.tgz", + "integrity": "sha512-zXWawZl6J/P5Wz57/nKzVT3kJQZvogfuyuNVCdEp4/XU2UNrjL7SsuNpWAyLZbo6HVymwmnfno9toVzBhelygA==", + "license": "Apache-2.0", + "bin": { + "openai": "bin/cli" + }, + "peerDependencies": { + "ws": "^8.18.0", + "zod": "^3.23.8" + }, + "peerDependenciesMeta": { + "ws": { + "optional": true + }, + "zod": { + "optional": true + } + } + }, + "node_modules/@openai/agents-extensions": { + "version": "0.0.9", + "resolved": "https://registry.npmjs.org/@openai/agents-extensions/-/agents-extensions-0.0.9.tgz", + "integrity": "sha512-r8gSAd9gZOSlY/c79dWToi5e1BnBrCCIY/gZ+j84rWl+hRumhjzMQcXenzbw6y3ng45UkmIubIUK3VBO2SSIPQ==", + "license": "MIT", + "dependencies": { + "@ai-sdk/provider": "^1.1.3", + "@openai/zod": "npm:zod@^3.25.40", + "@types/ws": "^8.18.1", + "debug": "^4.4.0" + }, + "peerDependencies": { + "@openai/agents": "0.0.9", + "ws": "^8.18.1" + } + }, + "node_modules/@openai/agents-openai": { + "version": "0.0.9", + "resolved": "https://registry.npmjs.org/@openai/agents-openai/-/agents-openai-0.0.9.tgz", + "integrity": "sha512-JfTuFaswJUmzVVEEseH+uQHLeHv3ED+X8E0pNE868FwKe1+vd9elzD9uCqRolMAtkBfk8AemHzkZlYl2nuG1sg==", + "license": "MIT", + "dependencies": { + "@openai/agents-core": "0.0.9", + "@openai/zod": "npm:zod@^3.25.40", + "debug": "^4.4.0", + "openai": "^5.0.1" + } + }, + "node_modules/@openai/agents-openai/node_modules/openai": { + "version": "5.7.0", + "resolved": "https://registry.npmjs.org/openai/-/openai-5.7.0.tgz", + "integrity": "sha512-zXWawZl6J/P5Wz57/nKzVT3kJQZvogfuyuNVCdEp4/XU2UNrjL7SsuNpWAyLZbo6HVymwmnfno9toVzBhelygA==", + "license": "Apache-2.0", + "bin": { + "openai": "bin/cli" + }, + "peerDependencies": { + "ws": "^8.18.0", + "zod": "^3.23.8" + }, + "peerDependenciesMeta": { + "ws": { + "optional": true + }, + "zod": { + "optional": true + } + } + }, + "node_modules/@openai/agents-realtime": { + "version": "0.0.9", + "resolved": "https://registry.npmjs.org/@openai/agents-realtime/-/agents-realtime-0.0.9.tgz", + "integrity": "sha512-WpAoYG3zOq1U7ljyERxChMXOgnzaRaHqbU4gPMQUmEBD48MHpA0uro6VZTk83Vs6J8JfuS+fGxF11WiR2UlTCg==", + "license": "MIT", + "dependencies": { + "@openai/agents-core": "0.0.9", + "@openai/zod": "npm:zod@^3.25.40", + "@types/ws": "^8.18.1", + "debug": "^4.4.0", + "ws": "^8.18.1" + } + }, + "node_modules/@openai/agents/node_modules/openai": { + "version": "5.7.0", + "resolved": "https://registry.npmjs.org/openai/-/openai-5.7.0.tgz", + "integrity": "sha512-zXWawZl6J/P5Wz57/nKzVT3kJQZvogfuyuNVCdEp4/XU2UNrjL7SsuNpWAyLZbo6HVymwmnfno9toVzBhelygA==", + "license": "Apache-2.0", + "bin": { + "openai": "bin/cli" + }, + "peerDependencies": { + "ws": "^8.18.0", + "zod": "^3.23.8" + }, + "peerDependenciesMeta": { + "ws": { + "optional": true + }, + "zod": { + "optional": true + } + } + }, + "node_modules/@openai/zod": { + "name": "zod", + "version": "3.25.67", + "resolved": "https://registry.npmjs.org/zod/-/zod-3.25.67.tgz", + "integrity": "sha512-idA2YXwpCdqUSKRCACDE6ItZD9TZzy3OZMtpfLoh6oPR47lipysRrJfjzMqFxQ3uJuUPyUeWe1r9vLH33xO/Qw==", + "license": "MIT", + "funding": { + "url": "https://github.com/sponsors/colinhacks" + } + }, "node_modules/@opentelemetry/api": { "version": "1.9.0", "resolved": "https://registry.npmjs.org/@opentelemetry/api/-/api-1.9.0.tgz", @@ -7560,6 +7711,15 @@ "@types/webidl-conversions": "*" } }, + "node_modules/@types/ws": { + "version": "8.18.1", + "resolved": "https://registry.npmjs.org/@types/ws/-/ws-8.18.1.tgz", + "integrity": "sha512-ThVF6DCVhA8kUGy+aazFQ4kXQ7E1Ty7A3ypFOe0IcJV8O/M511G99AW24irKrW56Wt44yG9+ij8FaqoBGkuBXg==", + "license": "MIT", + "dependencies": { + "@types/node": "*" + } + }, "node_modules/@typescript-eslint/eslint-plugin": { "version": "8.35.0", "resolved": "https://registry.npmjs.org/@typescript-eslint/eslint-plugin/-/eslint-plugin-8.35.0.tgz", @@ -16455,10 +16615,10 @@ "integrity": "sha512-l4Sp/DRseor9wL6EvV2+TuQn63dMkPjZ/sp9XkghTEbV9KlPS1xUsZ3u7/IQO4wxtcFB4bgpQPRcR3QCvezPcQ==" }, "node_modules/ws": { - "version": "8.18.0", - "resolved": "https://registry.npmjs.org/ws/-/ws-8.18.0.tgz", - "integrity": "sha512-8VbfWfHLbbwu3+N6OKsOMpBdT4kXPDDB9cJk2bJ6mh9ucxdlnNvH1e+roYkKmN9Nxw2yjz7VzeO9oOz2zJ04Pw==", - "peer": true, + "version": "8.18.2", + "resolved": "https://registry.npmjs.org/ws/-/ws-8.18.2.tgz", + "integrity": "sha512-DMricUmwGZUVr++AEAe2uiVM7UoO9MAVZMDu05UQOaUII0lp+zOzLLU4Xqh/JvTqklB1T4uELaaPBKyjE1r4fQ==", + "license": "MIT", "engines": { "node": ">=10.0.0" }, @@ -16503,9 +16663,10 @@ } }, "node_modules/zod": { - "version": "3.24.2", - "resolved": "https://registry.npmjs.org/zod/-/zod-3.24.2.tgz", - "integrity": "sha512-lY7CDW43ECgW9u1TcT3IoXHflywfVqDYze4waEz812jR/bZ8FHDsl7pFQoSZTz5N+2NqRXs8GBwnAwo3ZNxqhQ==", + "version": "3.25.67", + "resolved": "https://registry.npmjs.org/zod/-/zod-3.25.67.tgz", + "integrity": "sha512-idA2YXwpCdqUSKRCACDE6ItZD9TZzy3OZMtpfLoh6oPR47lipysRrJfjzMqFxQ3uJuUPyUeWe1r9vLH33xO/Qw==", + "license": "MIT", "funding": { "url": "https://github.com/sponsors/colinhacks" } diff --git a/apps/rowboat/package.json b/apps/rowboat/package.json index f146b096..292a8f72 100644 --- a/apps/rowboat/package.json +++ b/apps/rowboat/package.json @@ -32,6 +32,8 @@ "@langchain/textsplitters": "^0.1.0", "@mendable/firecrawl-js": "^1.0.3", "@modelcontextprotocol/sdk": "^1.12.1", + "@openai/agents": "^0.0.9", + "@openai/agents-extensions": "^0.0.9", "@primer/react": "^37.27.0", "@qdrant/js-client-rest": "^1.13.0", "ai": "^4.3.13",