diff --git a/apps/rowboat_agents/src/graph/core.py b/apps/rowboat_agents/src/graph/core.py index 431089b8..7b76ea1f 100644 --- a/apps/rowboat_agents/src/graph/core.py +++ b/apps/rowboat_agents/src/graph/core.py @@ -77,13 +77,13 @@ def add_sender_details_to_messages(messages): def append_messages(messages, accumulated_messages): # Create a set of existing message contents for O(1) lookup existing_contents = {msg.get('content') for msg in messages} - + # Append messages that aren't already present, preserving order for msg in accumulated_messages: if msg.get('content') not in existing_contents: messages.append(msg) existing_contents.add(msg.get('content')) - + return messages async def run_turn_streamed( @@ -101,7 +101,7 @@ async def run_turn_streamed( ): """ Run a turn of the conversation with streaming responses. - + A turn consists of all messages between user inputs and must follow these rules: 1. Each turn must have exactly one external message from an agent with external visibility 2. A turn can have multiple internal messages from internal agents @@ -155,7 +155,7 @@ async def run_turn_streamed( return # Initialize agents and get external tools - + new_agents = get_agents(agent_configs=agent_configs, tool_configs=tool_configs, complete_request=complete_request) new_agents = add_child_transfer_related_instructions_to_agents(new_agents) new_agents = add_openai_recommended_instructions_to_agents(new_agents) @@ -217,7 +217,7 @@ async def run_turn_streamed( # Handle agent transfer elif event.type == "agent_updated_stream_event": - + # Skip self-transfers if current_agent.name == event.new_agent.name: print(f"\nSkipping agent transfer attempt: {current_agent.name} -> {event.new_agent.name} (self-transfer)") @@ -230,11 +230,6 @@ async def run_turn_streamed( print(f"Skipping transfer from {current_agent.name} to {event.new_agent.name} (max calls reached from parent to child)") continue - # Check if the child agent has already responded in this turn - if event.new_agent.name in agent_message_counts: - print(f"Skipping transfer from {current_agent.name} to {event.new_agent.name} (already responded this turn)") - continue - # Transfer to new agent tool_call_id = str(uuid.uuid4()) message = { @@ -298,7 +293,7 @@ async def run_turn_streamed( message['content'] = f"Sender agent: {current_agent.name}\nContent: {message['content']}" accumulated_messages.append(message) continue - + # Handle regular tool calls message = { 'content': None, @@ -327,13 +322,13 @@ async def run_turn_streamed( # Get the tool name and call id from raw_item tool_call_id = None tool_name = None - + # Try to get call_id from various possible locations if hasattr(event.item.raw_item, 'call_id'): tool_call_id = event.item.raw_item.call_id elif isinstance(event.item.raw_item, dict) and 'call_id' in event.item.raw_item: tool_call_id = event.item.raw_item['call_id'] - + # Try to get tool name from various possible locations if hasattr(event.item.raw_item, 'name'): tool_name = event.item.raw_item.name @@ -343,13 +338,13 @@ async def run_turn_streamed( elif 'type' in event.item.raw_item and event.item.raw_item['type'] == 'function_call_output': # For function call outputs, try to infer from context tool_name = 'recommendation' # Default for function calls - + # Fallback to event item if available if not tool_name and hasattr(event.item, 'tool_name'): tool_name = event.item.tool_name if not tool_call_id and hasattr(event.item, 'tool_call_id'): tool_call_id = event.item.tool_call_id - + message = { 'content': str(event.item.output), 'role': 'tool', @@ -383,62 +378,10 @@ async def run_turn_streamed( } url_citations.append(citation) - # Check if this agent has already responded - if current_agent.name in agent_message_counts: - print(f"\nSkipping agent {current_agent.name} because it has already responded") - if parent_stack: - print(f"-- Returning to parent agent {parent_stack[-1].name}") - # Create tool call for control transition - tool_call_id = str(uuid.uuid4()) - transition_message = { - 'content': None, - 'role': 'assistant', - 'sender': current_agent.name, - 'tool_calls': [{ - 'function': { - 'name': 'transfer_to_agent', - 'arguments': json.dumps({ - 'assistant': parent_stack[-1].name - }) - }, - 'id': tool_call_id, - 'type': 'function' - }], - 'tool_call_id': None, - 'tool_name': None, - 'response_type': ResponseType.INTERNAL.value - } - print('-'*100) - print(f"Yielding control transition message: {transition_message}") - print('-'*100) - yield ('message', transition_message) - - # Create tool response for control transition - transition_response = { - 'content': json.dumps({ - 'assistant': parent_stack[-1].name - }), - 'role': 'tool', - 'sender': None, - 'tool_calls': None, - 'tool_call_id': tool_call_id, - 'tool_name': 'transfer_to_agent' - } - print('-'*100) - print(f"Yielding control transition response: {transition_response}") - print('-'*100) - yield ('message', transition_response) - - current_agent = parent_stack.pop() - continue - else: - print(f"-- No parent agent to return to, ending turn") - break - # Determine message type and create message is_internal = check_internal_visibility(current_agent, agent_configs) response_type = ResponseType.INTERNAL.value if is_internal else ResponseType.EXTERNAL.value - + message = { 'content': content, 'role': 'assistant',