From b1f6e642440242148c8fe13f5969e4e411401ef7 Mon Sep 17 00:00:00 2001 From: tusharmagar Date: Thu, 11 Dec 2025 13:43:47 +0530 Subject: [PATCH] connecting the copilot to the UI --- apps/cli/src/agents/runtime.ts | 4 +- apps/cli/src/entities/run-events.ts | 4 +- apps/rowboatx/app/api/chat/route.ts | 72 ++++ apps/rowboatx/app/api/stream/route.ts | 113 ++++++ apps/rowboatx/app/page.tsx | 493 +++++++++++++++++++++++--- apps/rowboatx/lib/cli-client.ts | 159 +++++++++ 6 files changed, 785 insertions(+), 60 deletions(-) create mode 100644 apps/rowboatx/app/api/chat/route.ts create mode 100644 apps/rowboatx/app/api/stream/route.ts create mode 100644 apps/rowboatx/lib/cli-client.ts diff --git a/apps/cli/src/agents/runtime.ts b/apps/cli/src/agents/runtime.ts index 530aa6e7..0a0dfe61 100644 --- a/apps/cli/src/agents/runtime.ts +++ b/apps/cli/src/agents/runtime.ts @@ -251,7 +251,7 @@ function normaliseAskHumanToolCall(message: z.infer) { } export async function loadAgent(id: string): Promise> { - if (id === "copilot") { + if (id === "copilot" || id === "rowboatx") { return CopilotAgent; } const repo = container.resolve('agentsRepo'); @@ -589,6 +589,7 @@ export async function* streamAgent({ yield *processEvent({ runId, type: "tool-invocation", + toolCallId, toolName: toolCall.toolName, input: JSON.stringify(toolCall.arguments), subflow: [], @@ -624,6 +625,7 @@ export async function* streamAgent({ yield* processEvent({ runId, type: "tool-result", + toolCallId: toolCall.toolCallId, toolName: toolCall.toolName, result: result, subflow: [], diff --git a/apps/cli/src/entities/run-events.ts b/apps/cli/src/entities/run-events.ts index edd6d7e9..cfbd0e45 100644 --- a/apps/cli/src/entities/run-events.ts +++ b/apps/cli/src/entities/run-events.ts @@ -32,12 +32,14 @@ export const MessageEvent = BaseRunEvent.extend({ export const ToolInvocationEvent = BaseRunEvent.extend({ type: z.literal("tool-invocation"), + toolCallId: z.string().optional(), toolName: z.string(), input: z.string(), }); export const ToolResultEvent = BaseRunEvent.extend({ type: z.literal("tool-result"), + toolCallId: z.string().optional(), toolName: z.string(), result: z.any(), }); @@ -82,4 +84,4 @@ export const RunEvent = z.union([ ToolPermissionRequestEvent, ToolPermissionResponseEvent, RunErrorEvent, -]); \ No newline at end of file +]); diff --git a/apps/rowboatx/app/api/chat/route.ts b/apps/rowboatx/app/api/chat/route.ts new file mode 100644 index 00000000..2b1e7704 --- /dev/null +++ b/apps/rowboatx/app/api/chat/route.ts @@ -0,0 +1,72 @@ +import { cliClient, RunEvent } from '@/lib/cli-client'; +import { NextRequest } from 'next/server'; + +export const runtime = 'nodejs'; +export const dynamic = 'force-dynamic'; + +/** + * POST /api/chat + * Creates a new conversation or sends a message to existing one + */ +export async function POST(request: NextRequest) { + try { + const body = await request.json(); + const { message, runId } = body; + + if (!message || typeof message !== 'string') { + return Response.json( + { error: 'Message is required' }, + { status: 400 } + ); + } + + let currentRunId = runId; + + // Create new run if no runId provided + if (!currentRunId) { + const run = await cliClient.createRun({ + agentId: 'copilot', + }); + currentRunId = run.id; + } + + // Always send the message (this triggers the agent runtime) + await cliClient.sendMessage(currentRunId, message); + + // Return the run ID + return Response.json({ runId: currentRunId }); + } catch (error) { + console.error('Chat API error:', error); + return Response.json( + { error: 'Failed to process message' }, + { status: 500 } + ); + } +} + +/** + * GET /api/chat?runId=xxx + * Get a specific run's details + */ +export async function GET(request: NextRequest) { + try { + const searchParams = request.nextUrl.searchParams; + const runId = searchParams.get('runId'); + + if (!runId) { + // List all runs + const result = await cliClient.listRuns(); + return Response.json(result); + } + + // Get specific run + const run = await cliClient.getRun(runId); + return Response.json(run); + } catch (error) { + console.error('Chat API error:', error); + return Response.json( + { error: 'Failed to fetch run' }, + { status: 500 } + ); + } +} diff --git a/apps/rowboatx/app/api/stream/route.ts b/apps/rowboatx/app/api/stream/route.ts new file mode 100644 index 00000000..58cae2b9 --- /dev/null +++ b/apps/rowboatx/app/api/stream/route.ts @@ -0,0 +1,113 @@ +import { NextRequest } from 'next/server'; + +export const runtime = 'nodejs'; +export const dynamic = 'force-dynamic'; + +const CLI_BASE_URL = process.env.CLI_BACKEND_URL || 'http://localhost:3000'; + +/** + * GET /api/stream + * Proxy SSE stream from CLI backend to frontend + */ +export async function GET(request: NextRequest) { + const encoder = new TextEncoder(); + + const customReadable = new ReadableStream({ + async start(controller) { + let reader: ReadableStreamDefaultReader | null = null; + let isClosed = false; + + // Handle client disconnect + request.signal.addEventListener('abort', () => { + isClosed = true; + reader?.cancel(); + try { + controller.close(); + } catch (e) { + // Already closed, ignore + } + }); + + try { + // Connect to CLI backend SSE stream + const response = await fetch(`${CLI_BASE_URL}/stream`, { + headers: { + 'Accept': 'text/event-stream', + 'Cache-Control': 'no-cache', + 'Connection': 'keep-alive', + }, + signal: request.signal, // Forward abort signal + }); + + if (!response.ok) { + throw new Error(`Failed to connect to backend: ${response.statusText}`); + } + + reader = response.body?.getReader(); + if (!reader) { + throw new Error('No response body'); + } + + // Read and forward stream + while (!isClosed) { + const { done, value } = await reader.read(); + + if (done) { + break; + } + + // Only enqueue if controller is still open + if (!isClosed) { + try { + controller.enqueue(value); + } catch (e) { + // Controller closed, stop reading + break; + } + } + } + } catch (error: any) { + // Only log non-abort errors + if (error.name !== 'AbortError') { + console.error('Stream error:', error); + } + + // Try to send error message if controller is still open + if (!isClosed) { + try { + const errorMessage = `data: ${JSON.stringify({ type: 'error', error: String(error) })}\n\n`; + controller.enqueue(encoder.encode(errorMessage)); + } catch (e) { + // Controller already closed, ignore + } + } + } finally { + // Clean up + if (reader) { + try { + await reader.cancel(); + } catch (e) { + // Ignore cancel errors + } + } + + if (!isClosed) { + try { + controller.close(); + } catch (e) { + // Already closed, ignore + } + } + } + }, + }); + + return new Response(customReadable, { + headers: { + 'Content-Type': 'text/event-stream', + 'Cache-Control': 'no-cache', + 'Connection': 'keep-alive', + }, + }); +} + diff --git a/apps/rowboatx/app/page.tsx b/apps/rowboatx/app/page.tsx index 44a784c1..93d6974c 100644 --- a/apps/rowboatx/app/page.tsx +++ b/apps/rowboatx/app/page.tsx @@ -32,8 +32,41 @@ import { PromptInputHeader, type PromptInputMessage, } from "@/components/ai-elements/prompt-input"; -import { useState } from "react"; +import { Message, MessageContent, MessageResponse } from "@/components/ai-elements/message"; +import { Conversation, ConversationContent } from "@/components/ai-elements/conversation"; +import { Tool, ToolContent, ToolHeader, ToolInput, ToolOutput } from "@/components/ai-elements/tool"; +import { Reasoning, ReasoningTrigger, ReasoningContent } from "@/components/ai-elements/reasoning"; +import { useState, useEffect, useRef } from "react"; import { GlobeIcon, MicIcon } from "lucide-react"; +import { RunEvent } from "@/lib/cli-client"; + +interface ChatMessage { + id: string; + type: 'message'; + role: 'user' | 'assistant'; + content: string; + timestamp: number; +} + +interface ToolCall { + id: string; + type: 'tool'; + name: string; + input: any; + result?: any; + status: 'pending' | 'running' | 'completed' | 'error'; + timestamp: number; +} + +interface ReasoningBlock { + id: string; + type: 'reasoning'; + content: string; + isStreaming: boolean; + timestamp: number; +} + +type ConversationItem = ChatMessage | ToolCall | ReasoningBlock; export default function HomePage() { const [text, setText] = useState(""); @@ -42,8 +75,264 @@ export default function HomePage() { const [status, setStatus] = useState< "submitted" | "streaming" | "ready" | "error" >("ready"); + + // Chat state + const [runId, setRunId] = useState(null); + const [conversation, setConversation] = useState([]); + const [currentAssistantMessage, setCurrentAssistantMessage] = useState(""); + const [currentReasoning, setCurrentReasoning] = useState(""); + const eventSourceRef = useRef(null); + const committedMessageIds = useRef>(new Set()); + const isEmptyConversation = + conversation.length === 0 && !currentAssistantMessage && !currentReasoning; - const handleSubmit = (message: PromptInputMessage) => { + const renderPromptInput = () => ( + + + + {(attachment) => } + + + + setText(event.target.value)} + value={text} + placeholder="Ask me anything..." + className="min-h-[46px] max-h-[200px]" + /> + + + + + + + + + + setUseMicrophone(!useMicrophone)} + variant={useMicrophone ? "default" : "ghost"} + > + + Microphone + + setUseWebSearch(!useWebSearch)} + variant={useWebSearch ? "default" : "ghost"} + > + + Search + + + + + + ); + + // Connect to SSE stream + useEffect(() => { + // Prevent multiple connections + if (eventSourceRef.current) { + console.log('⚠️ EventSource already exists, not creating new one'); + return; + } + + console.log('🔌 Creating new EventSource connection'); + const eventSource = new EventSource('/api/stream'); + eventSourceRef.current = eventSource; + + const handleMessage = (e: MessageEvent) => { + try { + const event: RunEvent = JSON.parse(e.data); + handleEvent(event); + } catch (error) { + console.error('Failed to parse event:', error); + } + }; + + const handleError = (e: Event) => { + const target = e.target as EventSource; + + // Only log if it's not a normal close + if (target.readyState === EventSource.CLOSED) { + console.log('SSE connection closed, will reconnect on next message'); + } else if (target.readyState === EventSource.CONNECTING) { + console.log('SSE reconnecting...'); + } else { + console.error('SSE error:', e); + } + }; + + eventSource.addEventListener('message', handleMessage); + eventSource.addEventListener('error', handleError); + + return () => { + console.log('🔌 Closing EventSource connection'); + eventSource.removeEventListener('message', handleMessage); + eventSource.removeEventListener('error', handleError); + eventSource.close(); + eventSourceRef.current = null; + }; + }, []); // Empty deps - only run once + + // Handle different event types from the copilot + const handleEvent = (event: RunEvent) => { + console.log('Event received:', event.type, event); + + switch (event.type) { + case 'start': + setStatus('streaming'); + setCurrentAssistantMessage(''); + setCurrentReasoning(''); + break; + + case 'llm-stream-event': + console.log('LLM stream event type:', event.event?.type); + + if (event.event?.type === 'reasoning-delta') { + setCurrentReasoning(prev => prev + event.event.delta); + } else if (event.event?.type === 'reasoning-end') { + // Commit reasoning block if we have content + setCurrentReasoning(reasoning => { + if (reasoning) { + setConversation(prev => [...prev, { + id: `reasoning-${Date.now()}`, + type: 'reasoning', + content: reasoning, + isStreaming: false, + timestamp: Date.now(), + }]); + } + return ''; + }); + } else if (event.event?.type === 'text-delta') { + setCurrentAssistantMessage(prev => prev + event.event.delta); + setStatus('streaming'); + } else if (event.event?.type === 'text-end') { + console.log('TEXT END received - waiting for message event'); + } else if (event.event?.type === 'tool-call') { + // Add tool call to conversation immediately + setConversation(prev => [...prev, { + id: event.event.toolCallId, + type: 'tool', + name: event.event.toolName, + input: event.event.input, + status: 'running', + timestamp: Date.now(), + }]); + } else if (event.event?.type === 'finish-step') { + console.log('FINISH STEP received - waiting for message event'); + } + break; + + case 'message': + console.log('MESSAGE event received:', event); + if (event.message?.role === 'assistant') { + // If the final assistant message contains tool calls, sync them to conversation + if (Array.isArray(event.message.content)) { + const toolCalls = event.message.content.filter( + (part: any) => part?.type === 'tool-call' + ); + if (toolCalls.length) { + setConversation((prev) => { + const updated = [...prev]; + for (const part of toolCalls) { + const idx = updated.findIndex( + (item) => item.type === 'tool' && item.id === part.toolCallId + ); + if (idx >= 0) { + updated[idx] = { + ...updated[idx], + name: part.toolName, + input: part.arguments, + status: 'pending', + }; + } else { + updated.push({ + id: part.toolCallId, + type: 'tool', + name: part.toolName, + input: part.arguments, + status: 'pending', + timestamp: Date.now(), + }); + } + } + return updated; + }); + } + } + + const messageId = event.messageId || `assistant-${Date.now()}`; + + if (committedMessageIds.current.has(messageId)) { + console.log('⚠️ Message already committed, skipping:', messageId); + return; + } + + committedMessageIds.current.add(messageId); + + setCurrentAssistantMessage(currentMsg => { + console.log('✅ Committing message:', messageId, currentMsg); + if (currentMsg) { + setConversation(prev => { + const exists = prev.some(m => m.id === messageId); + if (exists) { + console.log('⚠️ Message ID already in array, skipping:', messageId); + return prev; + } + return [...prev, { + id: messageId, + type: 'message', + role: 'assistant', + content: currentMsg, + timestamp: Date.now(), + }]; + }); + } + return ''; + }); + setStatus('ready'); + console.log('Status set to ready'); + } + break; + + case 'tool-invocation': + setConversation(prev => prev.map(item => + item.type === 'tool' && (item.id === event.toolCallId || item.name === event.toolName) + ? { ...item, status: 'running' as const } + : item + )); + break; + + case 'tool-result': + setConversation(prev => prev.map(item => + item.type === 'tool' && (item.id === event.toolCallId || item.name === event.toolName) + ? { ...item, result: event.result, status: 'completed' as const } + : item + )); + break; + + case 'error': + // Only set error status for actual errors, not connection issues + if (event.error && !event.error.includes('terminated')) { + setStatus('error'); + console.error('Agent error:', event.error); + } else { + console.log('Connection error (will auto-reconnect):', event.error); + setStatus('ready'); + } + break; + + default: + console.log('Unhandled event type:', event.type); + } + }; + + const handleSubmit = async (message: PromptInputMessage) => { const hasText = Boolean(message.text); const hasAttachments = Boolean(message.files?.length); @@ -51,18 +340,55 @@ export default function HomePage() { return; } + const userMessage = message.text || ''; + + // Add user message immediately with unique ID + const userMessageId = `user-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`; + setConversation(prev => [...prev, { + id: userMessageId, + type: 'message', + role: 'user', + content: userMessage, + timestamp: Date.now(), + }]); + setStatus("submitted"); - console.log("Message submitted:", message); - - // Reset after submission setText(""); - setTimeout(() => setStatus("ready"), 500); + + try { + // Send message to backend + const response = await fetch('/api/chat', { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ + message: userMessage, + runId: runId, + }), + }); + + if (!response.ok) { + throw new Error('Failed to send message'); + } + + const data = await response.json(); + + // Store runId for subsequent messages + if (data.runId && !runId) { + setRunId(data.runId); + } + + setStatus('streaming'); + } catch (error) { + console.error('Failed to send message:', error); + setStatus('error'); + setTimeout(() => setStatus('ready'), 2000); + } }; return ( - +
@@ -84,62 +410,113 @@ export default function HomePage() {
-
- {/* Blank canvas - main content area */} -
- {/* Empty space for messages */} -
+
+ {/* Messages area */} + + +
- {/* Input area - centered and narrower */} -
-
- - - - {(attachment) => } - - - - setText(event.target.value)} - value={text} - placeholder="Ask me anything..." - /> - - - - - - - - - - setUseMicrophone(!useMicrophone)} - variant={useMicrophone ? "default" : "ghost"} + {/* Render conversation items in order */} + {conversation.map((item) => { + if (item.type === 'message') { + return ( + - - Microphone - - setUseWebSearch(!useWebSearch)} - variant={useWebSearch ? "default" : "ghost"} - > - - Search - - - - - + + + {item.content} + + + + ); + } else if (item.type === 'tool') { + const stateMap: Record = { + 'pending': 'input-streaming', + 'running': 'input-available', + 'completed': 'output-available', + 'error': 'output-error', + }; + + return ( +
+ + + + + {item.result && ( + + )} + + +
+ ); + } else if (item.type === 'reasoning') { + return ( +
+ + + + {item.content} + + +
+ ); + } + return null; + })} + + {/* Streaming reasoning */} + {currentReasoning && ( +
+ + + + {currentReasoning} + + +
+ )} + + {/* Streaming message */} + {currentAssistantMessage && ( + + + + {currentAssistantMessage} + + + + + )} +
+ + + + {/* Input area */} + {isEmptyConversation ? ( +
+
+

+ RowboatX +

+ {renderPromptInput()} +
-
+ ) : ( +
+
+ {renderPromptInput()} +
+
+ )}
); } - diff --git a/apps/rowboatx/lib/cli-client.ts b/apps/rowboatx/lib/cli-client.ts new file mode 100644 index 00000000..eb325157 --- /dev/null +++ b/apps/rowboatx/lib/cli-client.ts @@ -0,0 +1,159 @@ +/** + * Type-safe client for the Rowboat CLI backend + */ + +const CLI_BASE_URL = process.env.CLI_BACKEND_URL || 'http://localhost:3000'; + +export interface Run { + id: string; + createdAt: string; + agentId: string; + log: RunEvent[]; +} + +export interface RunEvent { + type: string; + [key: string]: any; +} + +export interface CreateRunOptions { + agentId: string; +} + +export interface Agent { + name: string; + description: string; + instructions: string; + tools: Record; +} + +/** + * CLI Backend Client + */ +export class CliClient { + private baseUrl: string; + + constructor(baseUrl: string = CLI_BASE_URL) { + this.baseUrl = baseUrl; + } + + /** + * Create a new run (conversation) + */ + async createRun(options: CreateRunOptions): Promise { + const response = await fetch(`${this.baseUrl}/runs/new`, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify(options), + }); + + if (!response.ok) { + throw new Error(`Failed to create run: ${response.statusText}`); + } + + return response.json(); + } + + /** + * Send a message to an existing run + */ + async sendMessage(runId: string, message: string): Promise<{ messageId: string }> { + const response = await fetch(`${this.baseUrl}/runs/${runId}/messages/new`, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ message }), + }); + + if (!response.ok) { + throw new Error(`Failed to send message: ${response.statusText}`); + } + + return response.json(); + } + + /** + * Get a run by ID + */ + async getRun(runId: string): Promise { + const response = await fetch(`${this.baseUrl}/runs/${runId}`); + + if (!response.ok) { + throw new Error(`Failed to get run: ${response.statusText}`); + } + + return response.json(); + } + + /** + * List all runs + */ + async listRuns(cursor?: string): Promise<{ runs: Run[]; nextCursor?: string }> { + const url = new URL(`${this.baseUrl}/runs`); + if (cursor) url.searchParams.set('cursor', cursor); + + const response = await fetch(url.toString()); + + if (!response.ok) { + throw new Error(`Failed to list runs: ${response.statusText}`); + } + + return response.json(); + } + + /** + * Get an agent by ID + */ + async getAgent(agentId: string): Promise { + const response = await fetch(`${this.baseUrl}/agents/${agentId}`); + + if (!response.ok) { + throw new Error(`Failed to get agent: ${response.statusText}`); + } + + return response.json(); + } + + /** + * List all agents + */ + async listAgents(): Promise { + const response = await fetch(`${this.baseUrl}/agents`); + + if (!response.ok) { + throw new Error(`Failed to list agents: ${response.statusText}`); + } + + return response.json(); + } + + /** + * Create an SSE connection to receive real-time events + */ + createEventStream(onEvent: (event: RunEvent) => void, onError?: (error: Error) => void): () => void { + const eventSource = new EventSource(`${this.baseUrl}/stream`); + + eventSource.addEventListener('message', (e) => { + try { + const event = JSON.parse(e.data) as RunEvent; + onEvent(event); + } catch (error) { + console.error('Failed to parse event:', error); + onError?.(error as Error); + } + }); + + eventSource.addEventListener('error', (e) => { + console.error('SSE error:', e); + onError?.(new Error('SSE connection error')); + }); + + // Return cleanup function + return () => { + eventSource.close(); + }; + } +} + +// Singleton instance +export const cliClient = new CliClient(); +