mirror of
https://github.com/rowboatlabs/rowboat.git
synced 2026-04-29 02:24:02 +02:00
renderer: track processing runs and stream buffers across chats
The renderer previously filtered run events by the active runId, which meant run-processing-start for a newly created or non-active run could be dropped. That caused the thinking indicator to miss on subsequent chats and lost live streaming context when switching between runs. This change keeps only minimal background state: - a Set of runIds currently processing (driven by run-processing-start/end) - a per-run streaming buffer for assistant/reasoning deltas All run events are now observed; non-active runs only update the minimal structures. When a run is selected, its conversation is loaded as before and any in-progress buffer is used to seed the streaming UI. Buffers are cleared on final assistant message, run end, stop, or error. The chat history list now shows a processing indicator per run. Tests: - cd apps/x && npm run deps - cd apps/x/apps/renderer && npm run build
This commit is contained in:
parent
906ffccb7b
commit
ef8584c05f
2 changed files with 111 additions and 5 deletions
|
|
@ -578,6 +578,9 @@ function App() {
|
|||
const runIdRef = useRef<string | null>(null)
|
||||
const loadRunRequestIdRef = useRef(0)
|
||||
const [isProcessing, setIsProcessing] = useState(false)
|
||||
const [processingRunIds, setProcessingRunIds] = useState<Set<string>>(new Set())
|
||||
const processingRunIdsRef = useRef<Set<string>>(new Set())
|
||||
const streamingBuffersRef = useRef<Map<string, { assistant: string; reasoning: string }>>(new Map())
|
||||
const [isStopping, setIsStopping] = useState(false)
|
||||
const [stopClickedAt, setStopClickedAt] = useState<number | null>(null)
|
||||
const [agentId] = useState<string>('copilot')
|
||||
|
|
@ -639,6 +642,31 @@ function App() {
|
|||
editorContentRef.current = markdown
|
||||
setEditorContent(markdown)
|
||||
}, [])
|
||||
// Keep processingRunIdsRef in sync for use in async callbacks
|
||||
useEffect(() => {
|
||||
processingRunIdsRef.current = processingRunIds
|
||||
}, [processingRunIds])
|
||||
|
||||
// Sync active run streaming UI with background tracking
|
||||
useEffect(() => {
|
||||
if (!runId) {
|
||||
setIsProcessing(false)
|
||||
setCurrentAssistantMessage('')
|
||||
setCurrentReasoning('')
|
||||
return
|
||||
}
|
||||
const isRunProcessing = processingRunIdsRef.current.has(runId)
|
||||
setIsProcessing(isRunProcessing)
|
||||
if (isRunProcessing) {
|
||||
const buffer = streamingBuffersRef.current.get(runId)
|
||||
setCurrentAssistantMessage(buffer?.assistant ?? '')
|
||||
setCurrentReasoning(buffer?.reasoning ?? '')
|
||||
} else {
|
||||
setCurrentAssistantMessage('')
|
||||
setCurrentReasoning('')
|
||||
streamingBuffersRef.current.delete(runId)
|
||||
}
|
||||
}, [runId])
|
||||
|
||||
// Load directory tree
|
||||
const loadDirectory = useCallback(async () => {
|
||||
|
|
@ -1062,8 +1090,6 @@ function App() {
|
|||
// Set the conversation and runId
|
||||
setConversation(items)
|
||||
setRunId(id)
|
||||
setCurrentAssistantMessage('')
|
||||
setCurrentReasoning('')
|
||||
setMessage('')
|
||||
setPendingPermissionRequests(pendingPerms)
|
||||
setPendingAskHumanRequests(pendingAsks)
|
||||
|
|
@ -1083,25 +1109,57 @@ function App() {
|
|||
return cleanup
|
||||
}, [])
|
||||
|
||||
const getStreamingBuffer = (id: string) => {
|
||||
const existing = streamingBuffersRef.current.get(id)
|
||||
if (existing) return existing
|
||||
const next = { assistant: '', reasoning: '' }
|
||||
streamingBuffersRef.current.set(id, next)
|
||||
return next
|
||||
}
|
||||
|
||||
const appendStreamingBuffer = (id: string, field: 'assistant' | 'reasoning', delta: string) => {
|
||||
if (!delta) return
|
||||
const buffer = getStreamingBuffer(id)
|
||||
buffer[field] += delta
|
||||
}
|
||||
|
||||
const clearStreamingBuffer = (id: string) => {
|
||||
streamingBuffersRef.current.delete(id)
|
||||
}
|
||||
|
||||
const handleRunEvent = (event: RunEventType) => {
|
||||
// Use ref to get current runId to avoid stale closure issues
|
||||
if (event.runId !== runIdRef.current) return
|
||||
const activeRunId = runIdRef.current
|
||||
const isActiveRun = event.runId === activeRunId
|
||||
|
||||
console.log('Run event:', event.type, event)
|
||||
|
||||
switch (event.type) {
|
||||
case 'run-processing-start':
|
||||
setProcessingRunIds(prev => {
|
||||
const next = new Set(prev)
|
||||
next.add(event.runId)
|
||||
return next
|
||||
})
|
||||
if (!isActiveRun) return
|
||||
setIsProcessing(true)
|
||||
setModelUsage(null)
|
||||
break
|
||||
|
||||
case 'run-processing-end':
|
||||
setProcessingRunIds(prev => {
|
||||
const next = new Set(prev)
|
||||
next.delete(event.runId)
|
||||
return next
|
||||
})
|
||||
clearStreamingBuffer(event.runId)
|
||||
if (!isActiveRun) return
|
||||
setIsProcessing(false)
|
||||
setIsStopping(false)
|
||||
setStopClickedAt(null)
|
||||
break
|
||||
|
||||
case 'start':
|
||||
if (!isActiveRun) return
|
||||
setCurrentAssistantMessage('')
|
||||
setCurrentReasoning('')
|
||||
setModelUsage(null)
|
||||
|
|
@ -1110,7 +1168,16 @@ function App() {
|
|||
case 'llm-stream-event':
|
||||
{
|
||||
const llmEvent = event.event
|
||||
if (!isActiveRun) {
|
||||
if (llmEvent.type === 'reasoning-delta' && llmEvent.delta) {
|
||||
appendStreamingBuffer(event.runId, 'reasoning', llmEvent.delta)
|
||||
} else if (llmEvent.type === 'text-delta' && llmEvent.delta) {
|
||||
appendStreamingBuffer(event.runId, 'assistant', llmEvent.delta)
|
||||
}
|
||||
return
|
||||
}
|
||||
if (llmEvent.type === 'reasoning-delta' && llmEvent.delta) {
|
||||
appendStreamingBuffer(event.runId, 'reasoning', llmEvent.delta)
|
||||
setCurrentReasoning(prev => prev + llmEvent.delta)
|
||||
} else if (llmEvent.type === 'reasoning-end') {
|
||||
setCurrentReasoning(reasoning => {
|
||||
|
|
@ -1124,6 +1191,7 @@ function App() {
|
|||
return ''
|
||||
})
|
||||
} else if (llmEvent.type === 'text-delta' && llmEvent.delta) {
|
||||
appendStreamingBuffer(event.runId, 'assistant', llmEvent.delta)
|
||||
setCurrentAssistantMessage(prev => prev + llmEvent.delta)
|
||||
} else if (llmEvent.type === 'tool-call') {
|
||||
setConversation(prev => [...prev, {
|
||||
|
|
@ -1145,6 +1213,12 @@ function App() {
|
|||
case 'message':
|
||||
{
|
||||
const msg = event.message
|
||||
if (!isActiveRun) {
|
||||
if (msg.role === 'assistant') {
|
||||
clearStreamingBuffer(event.runId)
|
||||
}
|
||||
return
|
||||
}
|
||||
if (msg.role === 'assistant') {
|
||||
setCurrentAssistantMessage(currentMsg => {
|
||||
if (currentMsg) {
|
||||
|
|
@ -1163,12 +1237,14 @@ function App() {
|
|||
}
|
||||
return ''
|
||||
})
|
||||
clearStreamingBuffer(event.runId)
|
||||
}
|
||||
}
|
||||
break
|
||||
|
||||
case 'tool-invocation':
|
||||
{
|
||||
if (!isActiveRun) return
|
||||
const parsedInput = normalizeToolInput(event.input)
|
||||
setConversation(prev => {
|
||||
let matched = false
|
||||
|
|
@ -1198,6 +1274,7 @@ function App() {
|
|||
|
||||
case 'tool-result':
|
||||
{
|
||||
if (!isActiveRun) return
|
||||
setConversation(prev => {
|
||||
let matched = false
|
||||
const next = prev.map(item => {
|
||||
|
|
@ -1230,6 +1307,7 @@ function App() {
|
|||
}
|
||||
|
||||
case 'tool-permission-request': {
|
||||
if (!isActiveRun) return
|
||||
const key = event.toolCall.toolCallId
|
||||
setPendingPermissionRequests(prev => {
|
||||
const next = new Map(prev)
|
||||
|
|
@ -1245,6 +1323,7 @@ function App() {
|
|||
}
|
||||
|
||||
case 'tool-permission-response': {
|
||||
if (!isActiveRun) return
|
||||
setPendingPermissionRequests(prev => {
|
||||
const next = new Map(prev)
|
||||
next.delete(event.toolCallId)
|
||||
|
|
@ -1259,6 +1338,7 @@ function App() {
|
|||
}
|
||||
|
||||
case 'ask-human-request': {
|
||||
if (!isActiveRun) return
|
||||
const key = event.toolCallId
|
||||
setPendingAskHumanRequests(prev => {
|
||||
const next = new Map(prev)
|
||||
|
|
@ -1269,6 +1349,7 @@ function App() {
|
|||
}
|
||||
|
||||
case 'ask-human-response': {
|
||||
if (!isActiveRun) return
|
||||
setPendingAskHumanRequests(prev => {
|
||||
const next = new Map(prev)
|
||||
next.delete(event.toolCallId)
|
||||
|
|
@ -1278,6 +1359,13 @@ function App() {
|
|||
}
|
||||
|
||||
case 'run-stopped':
|
||||
setProcessingRunIds(prev => {
|
||||
const next = new Set(prev)
|
||||
next.delete(event.runId)
|
||||
return next
|
||||
})
|
||||
clearStreamingBuffer(event.runId)
|
||||
if (!isActiveRun) return
|
||||
setIsProcessing(false)
|
||||
setIsStopping(false)
|
||||
setStopClickedAt(null)
|
||||
|
|
@ -1300,6 +1388,13 @@ function App() {
|
|||
break
|
||||
|
||||
case 'error':
|
||||
setProcessingRunIds(prev => {
|
||||
const next = new Set(prev)
|
||||
next.delete(event.runId)
|
||||
return next
|
||||
})
|
||||
clearStreamingBuffer(event.runId)
|
||||
if (!isActiveRun) return
|
||||
setIsProcessing(false)
|
||||
setIsStopping(false)
|
||||
setStopClickedAt(null)
|
||||
|
|
@ -2104,6 +2199,7 @@ function App() {
|
|||
onVoiceNoteCreated={handleVoiceNoteCreated}
|
||||
runs={runs}
|
||||
currentRunId={runId}
|
||||
processingRunIds={processingRunIds}
|
||||
tasksActions={{
|
||||
onNewChat: () => {
|
||||
void navigateToView({ type: 'chat', runId: null })
|
||||
|
|
|
|||
|
|
@ -142,6 +142,7 @@ type SidebarContentPanelProps = {
|
|||
onVoiceNoteCreated?: (path: string) => void
|
||||
runs?: RunListItem[]
|
||||
currentRunId?: string | null
|
||||
processingRunIds?: Set<string>
|
||||
tasksActions?: TasksActions
|
||||
backgroundTasks?: BackgroundTaskItem[]
|
||||
selectedBackgroundTask?: string | null
|
||||
|
|
@ -345,6 +346,7 @@ export function SidebarContentPanel({
|
|||
onVoiceNoteCreated,
|
||||
runs = [],
|
||||
currentRunId,
|
||||
processingRunIds,
|
||||
tasksActions,
|
||||
backgroundTasks = [],
|
||||
selectedBackgroundTask,
|
||||
|
|
@ -392,6 +394,7 @@ export function SidebarContentPanel({
|
|||
<TasksSection
|
||||
runs={runs}
|
||||
currentRunId={currentRunId}
|
||||
processingRunIds={processingRunIds}
|
||||
actions={tasksActions}
|
||||
backgroundTasks={backgroundTasks}
|
||||
selectedBackgroundTask={selectedBackgroundTask}
|
||||
|
|
@ -975,12 +978,14 @@ function getStatusColor(status?: string, enabled?: boolean): string {
|
|||
function TasksSection({
|
||||
runs,
|
||||
currentRunId,
|
||||
processingRunIds,
|
||||
actions,
|
||||
backgroundTasks = [],
|
||||
selectedBackgroundTask,
|
||||
}: {
|
||||
runs: RunListItem[]
|
||||
currentRunId?: string | null
|
||||
processingRunIds?: Set<string>
|
||||
actions?: TasksActions
|
||||
backgroundTasks?: BackgroundTaskItem[]
|
||||
selectedBackgroundTask?: string | null
|
||||
|
|
@ -1040,7 +1045,12 @@ function TasksSection({
|
|||
isActive={currentRunId === run.id}
|
||||
onClick={() => actions?.onSelectRun(run.id)}
|
||||
>
|
||||
<span className="truncate text-sm">{run.title || '(Untitled chat)'}</span>
|
||||
<div className="flex items-center gap-2">
|
||||
{processingRunIds?.has(run.id) ? (
|
||||
<span className="size-2 shrink-0 rounded-full bg-emerald-500 animate-pulse" />
|
||||
) : null}
|
||||
<span className="truncate text-sm">{run.title || '(Untitled chat)'}</span>
|
||||
</div>
|
||||
</SidebarMenuButton>
|
||||
</SidebarMenuItem>
|
||||
))}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue