diff --git a/tests/integration/test_agent_streaming_integration.py b/tests/integration/test_agent_streaming_integration.py index 0971d30c..d6004c21 100644 --- a/tests/integration/test_agent_streaming_integration.py +++ b/tests/integration/test_agent_streaming_integration.py @@ -47,8 +47,9 @@ Args: { "}" ] - for chunk in chunks: - await chunk_callback(chunk) + for i, chunk in enumerate(chunks): + is_final = (i == len(chunks) - 1) + await chunk_callback(chunk, is_final) return full_text else: @@ -312,8 +313,10 @@ Final Answer: AI is the simulation of human intelligence in machines.""" call_count += 1 if streaming and chunk_callback: - for chunk in response.split(): - await chunk_callback(chunk + " ") + chunks = response.split() + for i, chunk in enumerate(chunks): + is_final = (i == len(chunks) - 1) + await chunk_callback(chunk + " ", is_final) return response return response diff --git a/trustgraph-flow/trustgraph/agent/react/agent_manager.py b/trustgraph-flow/trustgraph/agent/react/agent_manager.py index 90bc445c..87cee33d 100644 --- a/trustgraph-flow/trustgraph/agent/react/agent_manager.py +++ b/trustgraph-flow/trustgraph/agent/react/agent_manager.py @@ -241,8 +241,8 @@ class AgentManager: logger.info("DEBUG: StreamingReActParser created") # Create async chunk callback that feeds parser and sends collected chunks - async def on_chunk(text): - logger.info(f"DEBUG: on_chunk called with {len(text)} chars") + async def on_chunk(text, end_of_stream): + logger.info(f"DEBUG: on_chunk called with {len(text)} chars, end_of_stream={end_of_stream}") # Track what we had before prev_thought_count = len(thought_chunks)