Enable internal and user-facing agents to build pipelines

This commit is contained in:
akhisud3195 2025-05-06 14:49:52 +05:30
parent 1246ea47b9
commit e59a8b75cf
24 changed files with 2100 additions and 1376 deletions

View file

@ -10,11 +10,11 @@ import asyncio
from src.graph.core import run_turn_streamed
from src.graph.tools import RAG_TOOL, CLOSE_CHAT_TOOL
from src.utils.common import common_logger, read_json_from_file
from src.utils.common import read_json_from_file
logger = common_logger
app = Quart(__name__)
config = read_json_from_file("./configs/default_config.json")
master_config = read_json_from_file("./configs/default_config.json")
print("Master config:", master_config)
# filter out agent transfer messages using a function
def is_agent_transfer_message(msg):
@ -57,12 +57,15 @@ def require_api_key(f):
@app.route("/chat", methods=["POST"])
@require_api_key
async def chat():
logger.info('='*100)
logger.info(f"{'*'*100}Running server mode{'*'*100}")
print('='*100)
print(f"{'*'*100}Running server mode{'*'*100}")
try:
request_data = await request.get_json()
print("Request:", json.dumps(request_data))
# Add enable_tracing from master_config to request_data
request_data["enable_tracing"] = master_config.get("enable_tracing", False)
# filter out agent transfer messages
input_messages = [msg for msg in request_data["messages"] if not is_agent_transfer_message(msg)]
@ -82,7 +85,6 @@ async def chat():
data = request_data
messages = []
final_state = {}
# tokens_used = 0
async for event_type, event_data in run_turn_streamed(
messages=input_messages,
@ -90,7 +92,8 @@ async def chat():
agent_configs=data.get("agents", []),
tool_configs=data.get("tools", []),
prompt_configs=data.get("prompts", []),
start_turn_with_start_agent=config.get("start_turn_with_start_agent", False),
start_turn_with_start_agent=master_config.get("start_turn_with_start_agent", False),
max_calls_per_child_agent=master_config.get("max_calls_per_child_agent", 1),
state=data.get("state", {}),
additional_tool_configs=[RAG_TOOL, CLOSE_CHAT_TOOL],
complete_request=data
@ -99,23 +102,22 @@ async def chat():
messages.append(event_data)
elif event_type == 'done':
final_state = event_data['state']
# tokens_used = event_data["tokens_used"]
out = {
"messages": messages,
"state": final_state,
}
logger.info("Output:")
print("Output:")
for k, v in out.items():
logger.info(f"{k}: {v}")
logger.info('*'*100)
print(f"{k}: {v}")
print('*'*100)
return jsonify(out)
except Exception as e:
print(traceback.format_exc())
logger.error(f"Error: {str(e)}")
print(f"Error: {str(e)}")
return jsonify({"error": str(e)}), 500
def format_sse(data: dict, event: str = None) -> str:
@ -133,6 +135,9 @@ async def chat_stream():
print("Request:", request_data.decode('utf-8'))
request_data = json.loads(request_data)
# Add enable_tracing from master_config to request_data
request_data["enable_tracing"] = master_config.get("enable_tracing", False)
# filter out agent transfer messages
input_messages = [msg for msg in request_data["messages"] if not is_agent_transfer_message(msg)]
@ -150,6 +155,7 @@ async def chat_stream():
msg["role"] = "user"
async def generate():
print("Running generate() in server")
try:
async for event_type, event_data in run_turn_streamed(
messages=input_messages,
@ -157,23 +163,21 @@ async def chat_stream():
agent_configs=request_data.get("agents", []),
tool_configs=request_data.get("tools", []),
prompt_configs=request_data.get("prompts", []),
start_turn_with_start_agent=config.get("start_turn_with_start_agent", False),
start_turn_with_start_agent=master_config.get("start_turn_with_start_agent", False),
max_calls_per_child_agent=master_config.get("max_calls_per_child_agent", 1),
state=request_data.get("state", {}),
additional_tool_configs=[RAG_TOOL, CLOSE_CHAT_TOOL],
complete_request=request_data
):
if event_type == 'message':
print("Yielding message:")
yield format_sse(event_data, "message")
elif event_type == 'done':
print("Yielding done:")
yield format_sse(event_data, "done")
elif event_type == 'error':
print("Yielding error:")
yield format_sse(event_data, "stream_error")
yield format_sse(event_data, " error")
except Exception as e:
logger.error(f"Streaming error: {str(e)}")
print(f"Streaming error: {str(e)}")
yield format_sse({"error": str(e)}, "error")
return Response(generate(), mimetype='text/event-stream')

View file

@ -7,18 +7,16 @@ import logging
from .helpers.access import (
get_agent_by_name,
get_external_tools,
get_prompt_by_type
get_prompt_by_type,
get_agent_config_by_name
)
from .helpers.library_tools import handle_web_search_event
from .helpers.control import get_last_agent_name
from .swarm_wrapper import run_streamed as swarm_run_streamed, get_agents
from src.utils.common import common_logger as logger
from .execute_turn import run_streamed as swarm_run_streamed, get_agents
from .helpers.instructions import add_child_transfer_related_instructions
from .types import PromptType, outputVisibility, ResponseType
from agents.extensions.handoff_prompt import RECOMMENDED_PROMPT_PREFIX
from .types import PromptType, VisibilityType, ControlType
# Create a dedicated logger for swarm wrapper
logger.setLevel(logging.INFO)
print("Logger level set to INFO")
def order_messages(messages):
"""
@ -50,271 +48,43 @@ def set_sys_message(messages):
if messages[0].get("role") == "system" and messages[0].get("content") == "":
messages[0]["content"] = "You are a helpful assistant."
print("Updated system message: ", messages[0])
logger.info("Updated system message: ", messages[0])
print("Messages: ", messages)
# logger.info("Messages: ", messages)
return messages
def handle_web_search_event(event, current_agent):
"""
Helper function to handle all web search related events.
Returns a list of messages to yield.
"""
messages = []
# Handle raw response web search
if event.type == "raw_response_event":
if hasattr(event, 'data') and hasattr(event.data, 'raw_item'):
raw_item = event.data.raw_item
if (hasattr(raw_item, 'type') and raw_item.type == 'web_search_call') or (
isinstance(raw_item, dict) and raw_item.get('type') == 'web_search_call'
):
call_id = None
if hasattr(raw_item, 'id'):
call_id = raw_item.id
elif isinstance(raw_item, dict) and 'id' in raw_item:
call_id = raw_item['id']
else:
call_id = str(uuid.uuid4())
status = 'unknown'
if hasattr(raw_item, 'status'):
status = raw_item.status
elif isinstance(raw_item, dict) and 'status' in raw_item:
status = raw_item['status']
messages.append({
'content': None,
'role': 'assistant',
'sender': current_agent.name if current_agent else None,
'tool_calls': [{
'function': {
'name': 'web_search',
'arguments': json.dumps({
'search_id': call_id,
'status': status
})
},
'id': call_id,
'type': 'function'
}],
'tool_call_id': None,
'tool_name': None,
'response_type': 'internal'
})
# Handle run item web search events
elif event.type == "run_item_stream_event":
if event.item.type == "tool_call_item":
if hasattr(event.item.raw_item, 'type') and event.item.raw_item.type == 'web_search_call':
call_id = event.item.raw_item.id if hasattr(event.item.raw_item, 'id') else str(uuid.uuid4())
messages.append({
'content': None,
'role': 'assistant',
'sender': current_agent.name if current_agent else None,
'tool_calls': [{
'function': {
'name': 'web_search',
'arguments': json.dumps({
'search_id': call_id
})
},
'id': call_id,
'type': 'function'
}],
'tool_call_id': None,
'tool_name': None,
'response_type': 'internal'
})
messages.append({
'content': "Web search done",
'role': 'tool',
'sender': None,
'tool_calls': None,
'tool_call_id': call_id,
'tool_name': 'web_search',
'response_type': 'internal'
})
elif event.item.type == "tool_call_output_item":
if isinstance(event.item.raw_item, dict) and event.item.raw_item.get('type') == 'web_search_results':
call_id = event.item.raw_item.get('search_id', event.item.raw_item.get('id', str(uuid.uuid4())))
messages.append({
'content': str(event.item.output),
'role': 'tool',
'sender': None,
'tool_calls': None,
'tool_call_id': call_id,
'tool_name': 'web_search',
'response_type': 'internal'
})
elif event.item.type == "web_search_call_item" or (
hasattr(event.item, 'raw_item') and
hasattr(event.item.raw_item, 'type') and
event.item.raw_item.type == 'web_search_call'
):
call_id = None
if hasattr(event.item.raw_item, 'id'):
call_id = event.item.raw_item.id
messages.append({
'content': None,
'role': 'assistant',
'sender': current_agent.name if current_agent else None,
'tool_calls': [{
'function': {
'name': 'web_search',
'arguments': json.dumps({
'search_id': call_id
})
},
'id': call_id or str(uuid.uuid4()),
'type': 'function'
}],
'tool_call_id': None,
'tool_name': None,
'response_type': 'internal'
})
elif event.item.type == "web_search_results_item" or (
hasattr(event.item, 'raw_item') and (
(hasattr(event.item.raw_item, 'type') and event.item.raw_item.type == 'web_search_results') or
(isinstance(event.item.raw_item, dict) and event.item.raw_item.get('type') == 'web_search_results')
)
):
raw_item = event.item.raw_item
call_id = None
if hasattr(raw_item, 'search_id'):
call_id = raw_item.search_id
elif isinstance(raw_item, dict) and 'search_id' in raw_item:
call_id = raw_item['search_id']
elif hasattr(raw_item, 'id'):
call_id = raw_item.id
elif isinstance(raw_item, dict) and 'id' in raw_item:
call_id = raw_item['id']
else:
call_id = str(uuid.uuid4())
results = {}
if hasattr(event.item, 'output'):
results = event.item.output
elif hasattr(raw_item, 'results'):
results = raw_item.results
elif isinstance(raw_item, dict) and 'results' in raw_item:
results = raw_item['results']
results_str = ""
try:
results_str = json.dumps(results) if results else ""
except Exception as e:
print(f"Error serializing results: {str(e)}")
results_str = str(results)
messages.append({
'content': results_str,
'role': 'tool',
'sender': None,
'tool_calls': None,
'tool_call_id': call_id,
'tool_name': 'web_search',
'response_type': 'internal'
})
return messages
""" Example workflow config
{
"agents": [
{
"name": "Credit Card Hub",
"type": "conversation",
"description": "Hub agent to route credit card related queries to the appropriate specialized agent.",
"instructions": "## 🧑‍💼 Role:\nYou are the hub for all credit card related queries. Your job is to understand the user's intent and route their query to the correct specialized agent.\n\n---\n## ⚙️ Steps to Follow:\n1. Greet the user and ask how you can help with their credit card needs.\n2. If the user asks about card recommendations, call [@agent:Card Recommendation](#mention).\n3. If the user asks about card benefits or rewards, call [@agent:Card Benefits and Rewards](#mention).\n4. If the user asks about the application process, call [@agent:Card Application Process](#mention).\n5. If the user asks for general credit card advice, call [@agent:General Credit Card Advice](#mention).\n6. If the query is out of scope, politely inform the user.\n\n---\n## 🎯 Scope:\n✅ In Scope:\n- Routing credit card related queries to the correct agent.\n\n❌ Out of Scope:\n- Answering credit card questions directly.\n- Handling non-credit card queries.\n\n---\n## 📋 Guidelines:\n✔️ Dos:\n- Be professional and friendly.\n- Route queries efficiently.\n\n🚫 Don'ts:\n- Do not answer questions directly.\n- Do not provide user-facing text such as 'I will connect you now...' when calling another agent.",
"model": "claude-3-7-sonnet-latest",
"toggleAble": true,
"ragReturnType": "chunks",
"ragK": 3,
"controlType": "retain",
"examples": "- **User** : Can you recommend a credit card for travel?\n - **Agent actions**: Call [@agent:Card Recommendation](#mention)\n\n- **User** : What are the benefits of the Platinum card?\n - **Agent actions**: Call [@agent:Card Benefits and Rewards](#mention)\n\n- **User** : How do I apply for a credit card?\n - **Agent actions**: Call [@agent:Card Application Process](#mention)\n\n- **User** : Should I get a credit card or a debit card?\n - **Agent actions**: Call [@agent:General Credit Card Advice](#mention)\n\n- **User** : Hi!\n - **Agent response**: Hello! How can I help you with your credit card needs today?"
},
{
"name": "Card Recommendation",
"type": "conversation",
"description": "Provides personalized credit card recommendations based on user needs.",
"disabled": false,
"instructions": "## 🧑‍💼 Role:\nYou help users find the best credit card for their needs.\n\n---\n## ⚙️ Steps to Follow:\n1. Ask the user about their preferences (e.g., travel, cashback, low interest, rewards).\n2. Use [@tool:web_search](#mention) to find suitable credit card options.\n3. Present 2-3 card recommendations with a brief explanation for each.\n4. If the user asks about benefits or application process, call the relevant agent.\n5. If the query is out of scope, call [@agent:Credit Card Hub](#mention).\n\n---\n## 🎯 Scope:\n✅ In Scope:\n- Recommending credit cards based on user needs.\n\n❌ Out of Scope:\n- Detailed card benefits (refer to Card Benefits and Rewards agent).\n- Application process (refer to Card Application Process agent).\n\n---\n## 📋 Guidelines:\n✔️ Dos:\n- Be professional and friendly.\n- Tailor recommendations to user preferences.\n\n🚫 Don'ts:\n- Recommend cards without understanding user needs.",
"model": "claude-3-7-sonnet-latest",
"locked": false,
"toggleAble": true,
"ragReturnType": "chunks",
"ragK": 3,
"controlType": "retain",
"examples": "- **User** : I want a card with good travel rewards.\n - **Agent response**: Great! Are you looking for international travel benefits or domestic? Any airline preferences?\n\n- **User** : I prefer cashback cards.\n - **Agent response**: Understood. Do you spend more on groceries, fuel, or online shopping?\n\n- **User** : I want a card with no annual fee.\n - **Agent response**: Thanks for sharing. I'll look up the best no-annual-fee cards for you.\n - **Agent actions**: Call [@tool:web_search](#mention)\n\n- **User** : What are the benefits of the Platinum card?\n - **Agent actions**: Call [@agent:Card Benefits and Rewards](#mention)\n\n- **User** : How do I apply for a card?\n - **Agent actions**: Call [@agent:Card Application Process](#mention)"
},
{
"name": "Card Benefits and Rewards",
"type": "conversation",
"description": "Provides detailed information about credit card benefits and rewards.",
"disabled": false,
"instructions": "## 🧑‍💼 Role:\nYou answer questions about credit card benefits and rewards.\n\n---\n## ⚙️ Steps to Follow:\n1. Ask the user which card or type of benefit they are interested in.\n2. Use [@tool:web_search](#mention) to find up-to-date information.\n3. Present the benefits and rewards in a clear, concise manner.\n4. If the user asks about recommendations or application process, call the relevant agent.\n5. If the query is out of scope, call [@agent:Credit Card Hub](#mention).\n\n---\n## 🎯 Scope:\n✅ In Scope:\n- Explaining card benefits and rewards.\n\n❌ Out of Scope:\n- Recommending cards (refer to Card Recommendation agent).\n- Application process (refer to Card Application Process agent).\n\n---\n## 📋 Guidelines:\n✔️ Dos:\n- Be accurate and clear.\n- Use up-to-date information.\n\n🚫 Don'ts:\n- Speculate about benefits without verification.",
"model": "claude-3-7-sonnet-latest",
"locked": false,
"toggleAble": true,
"ragReturnType": "chunks",
"ragK": 3,
"controlType": "retain",
"examples": "- **User** : What are the benefits of the Gold card?\n - **Agent response**: Let me check the latest benefits for the Gold card.\n - **Agent actions**: Call [@tool:web_search](#mention)\n\n- **User** : Does this card offer airport lounge access?\n - **Agent response**: I'll find out if this card includes airport lounge access.\n - **Agent actions**: Call [@tool:web_search](#mention)\n\n- **User** : Which card has the best rewards for shopping?\n - **Agent actions**: Call [@agent:Card Recommendation](#mention)\n\n- **User** : How do I apply for the Platinum card?\n - **Agent actions**: Call [@agent:Card Application Process](#mention)\n\n- **User** : Can you recommend a card for fuel rewards?\n - **Agent actions**: Call [@agent:Card Recommendation](#mention)"
},
{
"name": "Card Application Process",
"type": "conversation",
"description": "Explains the steps and requirements for applying for a credit card.",
"disabled": false,
"instructions": "## 🧑‍💼 Role:\nYou guide users through the credit card application process.\n\n---\n## ⚙️ Steps to Follow:\n1. Ask the user which card they want to apply for.\n2. Use [@tool:web_search](#mention) to find the latest application steps and requirements.\n3. Explain the process clearly, including eligibility, documents, and timelines.\n4. If the user asks about recommendations or benefits, call the relevant agent.\n5. If the query is out of scope, call [@agent:Credit Card Hub](#mention).\n\n---\n## 🎯 Scope:\n✅ In Scope:\n- Explaining how to apply for a credit card.\n\n❌ Out of Scope:\n- Recommending cards (refer to Card Recommendation agent).\n- Explaining card benefits (refer to Card Benefits and Rewards agent).\n\n---\n## 📋 Guidelines:\n✔️ Dos:\n- Be clear and step-by-step.\n- Mention required documents and eligibility.\n\n🚫 Don'ts:\n- Give outdated or unverified information.",
"model": "claude-3-7-sonnet-latest",
"locked": false,
"toggleAble": true,
"ragReturnType": "chunks",
"ragK": 3,
"controlType": "retain",
"examples": "- **User** : How do I apply for a credit card?\n - **Agent response**: Which card are you interested in applying for?\n\n- **User** : I want to apply for the Gold card.\n - **Agent response**: Let me check the application process for the Gold card.\n - **Agent actions**: Call [@tool:web_search](#mention)\n\n- **User** : What documents do I need to apply?\n - **Agent response**: I'll find the list of required documents for you.\n - **Agent actions**: Call [@tool:web_search](#mention)\n\n- **User** : Can you recommend a card for students?\n - **Agent actions**: Call [@agent:Card Recommendation](#mention)\n\n- **User** : What are the benefits of the Platinum card?\n - **Agent actions**: Call [@agent:Card Benefits and Rewards](#mention)"
},
{
"name": "General Credit Card Advice",
"type": "conversation",
"description": "Provides general advice and best practices for using credit cards responsibly.",
"disabled": false,
"instructions": "## 🧑‍💼 Role:\nYou offer general advice and best practices for credit card usage.\n\n---\n## ⚙️ Steps to Follow:\n1. Ask the user about their specific concern or question.\n2. Use [@tool:web_search](#mention) if needed for up-to-date advice.\n3. Provide clear, practical advice on credit card usage, safety, and financial health.\n4. If the user asks about recommendations, benefits, or application process, call the relevant agent.\n5. If the query is out of scope, call [@agent:Credit Card Hub](#mention).\n\n---\n## 🎯 Scope:\n✅ In Scope:\n- General credit card advice and best practices.\n\n❌ Out of Scope:\n- Recommending specific cards (refer to Card Recommendation agent).\n- Explaining card benefits (refer to Card Benefits and Rewards agent).\n- Application process (refer to Card Application Process agent).\n\n---\n## 📋 Guidelines:\n✔️ Dos:\n- Be professional and friendly.\n- Give practical, actionable advice.\n\n🚫 Don'ts:\n- Give financial advice outside of credit cards.",
"model": "claude-3-7-sonnet-latest",
"locked": false,
"toggleAble": true,
"ragReturnType": "chunks",
"ragK": 3,
"controlType": "retain",
"examples": "- **User** : Should I get a credit card or a debit card?\n - **Agent response**: I can explain the pros and cons of both options. Are you looking for rewards, building credit, or something else?\n\n- **User** : How can I avoid credit card debt?\n - **Agent response**: Great question! Some best practices include paying your balance in full each month, tracking your spending, and setting up payment reminders.\n\n- **User** : Is it safe to use my credit card online?\n - **Agent response**: Yes, but always use secure websites and avoid sharing your card details on public Wi-Fi. Would you like more tips on online safety?\n\n- **User** : Can you recommend a card for students?\n - **Agent actions**: Call [@agent:Card Recommendation](#mention)\n\n- **User** : What are the benefits of the Platinum card?\n - **Agent actions**: Call [@agent:Card Benefits and Rewards](#mention)"
}
],
"prompts": [],
"tools": [
{
"name": "web_search",
"description": "Fetch information from the web based on chat context",
"parameters": {
"type": "object",
"properties": {}
},
"isLibrary": true
}
],
"startAgent": "Credit Card Hub",
"createdAt": "2025-05-02T12:02:06.172Z",
"lastUpdatedAt": "2025-05-02T12:02:06.172Z",
"name": "Version 1"
}
"""
def add_child_transfer_related_instructions_to_agents(agents):
for agent in agents:
add_child_transfer_related_instructions(agent)
return agents
def add_openai_recommended_instructions_to_agents(agents):
for agent in agents:
agent.instructions = RECOMMENDED_PROMPT_PREFIX + '\n\n' + agent.instructions
return agents
def check_internal_visibility(current_agent, agent_configs):
"""Check if an agent is internal based on its outputVisibility"""
agent_config = get_agent_config_by_name(current_agent.name, agent_configs)
visibility = agent_config.get('outputVisibility', outputVisibility.EXTERNAL.value)
return visibility == outputVisibility.INTERNAL.value
def add_sender_details_to_messages(messages):
for msg in messages:
msg['sender'] = msg.get('sender', None)
if msg.get('sender'):
msg['content'] = f"Sender agent: {msg.get('sender')}\nContent: {msg.get('content')}"
return messages
def append_messages(messages, accumulated_messages):
# Create a set of existing message contents for O(1) lookup
existing_contents = {msg.get('content') for msg in messages}
# Append messages that aren't already present, preserving order
for msg in accumulated_messages:
if msg.get('content') not in existing_contents:
messages.append(msg)
existing_contents.add(msg.get('content'))
return messages
async def run_turn_streamed(
messages,
@ -323,19 +93,42 @@ async def run_turn_streamed(
tool_configs,
prompt_configs,
start_turn_with_start_agent,
max_calls_per_child_agent,
state={},
additional_tool_configs=[],
complete_request={}
complete_request={},
enable_tracing=None
):
"""
Run a turn of the conversation with streaming responses.
A turn consists of all messages between user inputs and must follow these rules:
1. Each turn must have exactly one external message from an agent with external visibility
2. A turn can have multiple internal messages from internal agents
3. Each agent can output at most one regular message per parent
4. Control flows from parent to child, and child must return to parent after responding
5. Turn ends when an external agent outputs a message
"""
print("\n=== Starting new turn ===")
print(f"Starting agent: {start_agent_name}")
# Use enable_tracing from complete_request if available, otherwise default to False
enable_tracing = complete_request.get("enable_tracing", False) if enable_tracing is None else enable_tracing
messages = set_sys_message(messages)
messages = add_sender_details_to_messages(messages)
is_greeting_turn = not any(msg.get("role") != "system" for msg in messages)
final_state = None # Initialize outside try block
final_state = None
accumulated_messages = []
agent_message_counts = {} # Track messages per agent
child_call_counts = {} # Track parent->child calls
current_agent = None
parent_stack = []
try:
greeting_prompt = get_prompt_by_type(prompt_configs, PromptType.GREETING)
# Handle greeting turn
if is_greeting_turn:
if not greeting_prompt:
greeting_prompt = "How can I help you today?"
print("Greeting prompt not found. Using default: ", greeting_prompt)
greeting_prompt = get_prompt_by_type(prompt_configs, PromptType.GREETING) or "How can I help you today?"
message = {
'content': greeting_prompt,
'role': 'assistant',
@ -343,21 +136,29 @@ async def run_turn_streamed(
'tool_calls': None,
'tool_call_id': None,
'tool_name': None,
'response_type': 'external'
'response_type': ResponseType.EXTERNAL.value
}
print("Yielding greeting message: ", message)
accumulated_messages.append(message)
print('-'*100)
print(f"Yielding message: {message}")
print('-'*100)
yield ('message', message)
final_state = {
"last_agent_name": start_agent_name if start_agent_name else None,
"tokens": {"total": 0, "prompt": 0, "completion": 0}
"last_agent_name": start_agent_name,
"tokens": {"total": 0, "prompt": 0, "completion": 0},
"turn_messages": accumulated_messages
}
print("Yielding done message")
print('-'*100)
print(f"Yielding done: {final_state}")
print('-'*100)
yield ('done', {'state': final_state})
return
# Initialize agents and get external tools
new_agents = get_agents(agent_configs=agent_configs, tool_configs=tool_configs, complete_request=complete_request)
new_agents = add_child_transfer_related_instructions_to_agents(new_agents)
new_agents = add_openai_recommended_instructions_to_agents(new_agents)
last_agent_name = get_last_agent_name(
state=state,
agent_configs=agent_configs,
@ -366,200 +167,289 @@ async def run_turn_streamed(
latest_assistant_msg=None,
start_turn_with_start_agent=start_turn_with_start_agent
)
last_new_agent = get_agent_by_name(last_agent_name, new_agents)
current_agent = get_agent_by_name(last_agent_name, new_agents)
external_tools = get_external_tools(tool_configs)
current_agent = last_new_agent
tokens_used = {"total": 0, "prompt": 0, "completion": 0}
iter = 0
while True:
iter += 1
is_internal_agent = check_internal_visibility(current_agent, agent_configs)
print('-'*100)
print(f"Iteration {iter} of turn loop")
print(f"Current agent: {current_agent.name} (internal: {is_internal_agent})")
print(f"Parent stack: {[agent.name for agent in parent_stack]}")
print('-'*100)
stream_result = await swarm_run_streamed(
agent=last_new_agent,
messages=messages,
external_tools=external_tools,
tokens_used=tokens_used
)
messages = append_messages(messages, accumulated_messages)
# Run the current agent
stream_result = await swarm_run_streamed(
agent=current_agent,
messages=messages,
external_tools=external_tools,
tokens_used=tokens_used,
enable_tracing=enable_tracing
)
# Process streaming events
async for event in stream_result.stream_events():
print('='*50)
print("Received event: ", event)
print('-'*50)
async for event in stream_result.stream_events():
try:
print('-'*100)
print(f"Event: {event}")
print('-'*100)
# Handle web search events
if event.type == "raw_response_event":
web_search_messages = handle_web_search_event(event, current_agent)
for message in web_search_messages:
message['response_type'] = ResponseType.INTERNAL.value
print('-'*100)
print(f"Yielding message: {message}")
print('-'*100)
yield ('message', message)
if message.get('role') != 'tool':
message['content'] = f"Sender agent: {current_agent.name}\nContent: {message['content']}"
accumulated_messages.append(message)
continue
# Handle raw response events and accumulate tokens
if event.type == "raw_response_event":
if hasattr(event.data, 'type') and event.data.type == "response.completed" and event.data.response.usage:
if hasattr(event.data.response, 'usage'):
tokens_used["total"] += event.data.response.usage.total_tokens
tokens_used["prompt"] += event.data.response.usage.input_tokens
tokens_used["completion"] += event.data.response.usage.output_tokens
print('-'*50)
print(f"Found usage information. Updated cumulative tokens: {tokens_used}")
print('-'*50)
# Handle agent transfer
elif event.type == "agent_updated_stream_event":
# print(f"\nAgent transfer attempt: {current_agent.name} -> {event.new_agent.name}")
# Skip self-transfers
if current_agent.name == event.new_agent.name:
print(f"\nSkipping agent transfer attempt: {current_agent.name} -> {event.new_agent.name} (self-transfer)")
continue
# Check for web search events
web_search_messages = handle_web_search_event(event, current_agent)
for message in web_search_messages:
print("Yielding web search message: ", message)
yield ('message', message)
# Check if we've already called this child agent too many times
parent_child_key = f"{current_agent.name}:{event.new_agent.name}"
current_count = child_call_counts.get(parent_child_key, 0)
if current_count >= max_calls_per_child_agent:
print(f"Skipping transfer from {current_agent.name} to {event.new_agent.name} (max calls reached from parent to child)")
continue
continue
# Check if the child agent has already responded in this turn
if event.new_agent.name in agent_message_counts:
print(f"Skipping transfer from {current_agent.name} to {event.new_agent.name} (already responded this turn)")
continue
# Update current agent when it changes
elif event.type == "agent_updated_stream_event":
if current_agent.name == event.new_agent.name:
continue
tool_call_id = str(uuid.uuid4())
# yield the transfer invocation
message = {
'content': None,
'role': 'assistant',
'sender': current_agent.name,
'tool_calls': [{
'function': {
'name': 'transfer_to_agent',
'arguments': json.dumps({
'assistant': event.new_agent.name
})
},
'id': tool_call_id,
'type': 'function'
}],
'tool_call_id': None,
'tool_name': None,
'response_type': 'internal'
}
print("Yielding message: ", message)
yield ('message', message)
# yield the transfer result
message = {
'content': json.dumps({
'assistant': event.new_agent.name
}),
'role': 'tool',
'sender': None,
'tool_calls': None,
'tool_call_id': tool_call_id,
'tool_name': 'transfer_to_agent',
}
print("Yielding message: ", message)
yield ('message', message)
current_agent = event.new_agent
continue
# Handle run items (tools, messages, etc)
elif event.type == "run_item_stream_event":
current_agent = event.item.agent
# Check for web search events first
web_search_messages = handle_web_search_event(event, current_agent)
if web_search_messages:
for message in web_search_messages:
print("Yielding web search message: ", message)
# Transfer to new agent
tool_call_id = str(uuid.uuid4())
message = {
'content': None,
'role': 'assistant',
'sender': current_agent.name,
'tool_calls': [{
'function': {
'name': 'transfer_to_agent',
'arguments': json.dumps({
'assistant': event.new_agent.name
})
},
'id': tool_call_id,
'type': 'function'
}],
'tool_call_id': None,
'tool_name': None,
'response_type': ResponseType.INTERNAL.value
}
print('-'*100)
print(f"Yielding message: {message}")
print('-'*100)
yield ('message', message)
continue
if event.item.type == "tool_call_item":
# Handle normal tool calls
message = {
'content': None,
'role': 'assistant',
'sender': current_agent.name if current_agent else None,
'tool_calls': [{
'function': {
'name': event.item.raw_item.name,
'arguments': event.item.raw_item.arguments
},
'id': event.item.raw_item.call_id,
'type': 'function'
}],
'tool_call_id': None,
'tool_name': None,
'response_type': 'internal'
}
print("Yielding message: ", message)
yield ('message', message)
# Record transfer result
message = {
'content': json.dumps({
'assistant': event.new_agent.name
}),
'role': 'tool',
'sender': None,
'tool_calls': None,
'tool_call_id': tool_call_id,
'tool_name': 'transfer_to_agent'
}
print('-'*100)
print(f"Yielding message: {message}")
print('-'*100)
yield ('message', message)
elif event.item.type == "tool_call_output_item":
# Handle normal tool outputs
call_id = None
tool_name = None
# Update tracking and switch to child
if check_internal_visibility(event.new_agent, agent_configs):
child_call_counts[parent_child_key] = current_count + 1
parent_stack.append(current_agent)
current_agent = event.new_agent
if isinstance(event.item.raw_item, dict):
call_id = event.item.raw_item.get('call_id')
tool_name = event.item.raw_item.get('name')
elif hasattr(event.item.raw_item, 'call_id'):
call_id = event.item.raw_item.call_id
if hasattr(event.item.raw_item, 'name'):
tool_name = event.item.raw_item.name
# Handle regular messages and tool calls
elif event.type == "run_item_stream_event":
if event.item.type == "tool_call_item":
# Check if it's a web search call
if hasattr(event.item.raw_item, 'type') and event.item.raw_item.type == 'web_search_call':
web_search_messages = handle_web_search_event(event, current_agent)
for message in web_search_messages:
message['response_type'] = ResponseType.INTERNAL.value
print('-'*100)
print(f"Yielding message: {message}")
print('-'*100)
yield ('message', message)
if message.get('role') != 'tool':
message['content'] = f"Sender agent: {current_agent.name}\nContent: {message['content']}"
accumulated_messages.append(message)
continue
# Handle regular tool calls
message = {
'content': None,
'role': 'assistant',
'sender': current_agent.name,
'tool_calls': [{
'function': {
'name': event.item.raw_item.name,
'arguments': event.item.raw_item.arguments
},
'id': event.item.raw_item.call_id,
'type': 'function'
}],
'tool_call_id': None,
'tool_name': None,
'response_type': ResponseType.INTERNAL.value
}
print('-'*100)
print(f"Yielding message: {message}")
print('-'*100)
yield ('message', message)
message['content'] = f"Sender agent: {current_agent.name}\nContent: {message['content']}"
accumulated_messages.append(message)
message = {
'content': str(event.item.output),
'role': 'tool',
'sender': None,
'tool_calls': None,
'tool_call_id': call_id,
'tool_name': tool_name,
'response_type': 'internal'
}
print("Yielding message: ", message)
yield ('message', message)
elif event.item.type == "tool_call_output_item":
# Get the tool name and call id from raw_item
tool_call_id = None
tool_name = None
# Try to get call_id from various possible locations
if hasattr(event.item.raw_item, 'call_id'):
tool_call_id = event.item.raw_item.call_id
elif isinstance(event.item.raw_item, dict) and 'call_id' in event.item.raw_item:
tool_call_id = event.item.raw_item['call_id']
# Try to get tool name from various possible locations
if hasattr(event.item.raw_item, 'name'):
tool_name = event.item.raw_item.name
elif isinstance(event.item.raw_item, dict):
if 'name' in event.item.raw_item:
tool_name = event.item.raw_item['name']
elif 'type' in event.item.raw_item and event.item.raw_item['type'] == 'function_call_output':
# For function call outputs, try to infer from context
tool_name = 'recommendation' # Default for function calls
# Fallback to event item if available
if not tool_name and hasattr(event.item, 'tool_name'):
tool_name = event.item.tool_name
if not tool_call_id and hasattr(event.item, 'tool_call_id'):
tool_call_id = event.item.tool_call_id
message = {
'content': str(event.item.output),
'role': 'tool',
'sender': None,
'tool_calls': None,
'tool_call_id': tool_call_id,
'tool_name': tool_name,
'response_type': ResponseType.INTERNAL.value
}
print('-'*100)
print(f"Yielding tool call output message: {message}")
print('-'*100)
yield ('message', message)
elif event.item.type == "message_output_item":
content = ""
url_citations = []
elif event.item.type == "message_output_item":
# Extract content and citations
content = ""
url_citations = []
if hasattr(event.item.raw_item, 'content'):
for content_item in event.item.raw_item.content:
if hasattr(content_item, 'text'):
content += content_item.text
if hasattr(content_item, 'annotations'):
for annotation in content_item.annotations:
if hasattr(annotation, 'type') and annotation.type == 'url_citation':
citation = {
'url': annotation.url if hasattr(annotation, 'url') else '',
'title': annotation.title if hasattr(annotation, 'title') else '',
'start_index': annotation.start_index if hasattr(annotation, 'start_index') else 0,
'end_index': annotation.end_index if hasattr(annotation, 'end_index') else 0
}
url_citations.append(citation)
# Extract text content and any URL citations
if hasattr(event.item.raw_item, 'content'):
for content_item in event.item.raw_item.content:
# Handle text content
if hasattr(content_item, 'text'):
content += content_item.text
# Check if this agent has already responded
if current_agent.name in agent_message_counts:
print(f"\nSkipping agent {current_agent.name} because it has already responded")
if parent_stack:
print(f"-- Returning to parent agent {parent_stack[-1].name}")
current_agent = parent_stack.pop()
continue
else:
print(f"-- No parent agent to return to, ending turn")
break
# Extract URL citations if present
if hasattr(content_item, 'annotations'):
for annotation in content_item.annotations:
if hasattr(annotation, 'type') and annotation.type == 'url_citation':
citation = {
'url': annotation.url if hasattr(annotation, 'url') else '',
'title': annotation.title if hasattr(annotation, 'title') else '',
'start_index': annotation.start_index if hasattr(annotation, 'start_index') else 0,
'end_index': annotation.end_index if hasattr(annotation, 'end_index') else 0
}
url_citations.append(citation)
# Determine message type and create message
is_internal = check_internal_visibility(current_agent, agent_configs)
response_type = ResponseType.INTERNAL.value if is_internal else ResponseType.EXTERNAL.value
message = {
'content': content,
'role': 'assistant',
'sender': current_agent.name,
'tool_calls': None,
'tool_call_id': None,
'tool_name': None,
'response_type': response_type
}
# Create message with URL citations if they exist
message = {
'content': content,
'role': 'assistant',
'sender': current_agent.name,
'tool_calls': None,
'tool_call_id': None,
'tool_name': None,
'response_type': 'external'
}
if url_citations:
message['citations'] = url_citations
# Add citations if any were found
if url_citations:
message['citations'] = url_citations
# Track that this agent has responded
if not message.get('tool_calls'): # If there are no tool calls, it's a content response
agent_message_counts[current_agent.name] = 1
print('-'*100)
print(f"Yielding message: {message}")
print('-'*100)
yield ('message', message)
message['content'] = f"Sender agent: {current_agent.name}\nContent: {message['content']}"
accumulated_messages.append(message)
# Return to parent or end turn
if is_internal and parent_stack:
current_agent = parent_stack.pop()
continue
elif not is_internal:
break
print("Yielding message: ", message)
yield ('message', message)
except Exception as e:
print("\n=== Error in stream event processing ===")
print(f"Error: {str(e)}")
print("Event details:")
print(f"Event type: {event.type if hasattr(event, 'type') else 'unknown'}")
if hasattr(event, '__dict__'):
print(f"Event attributes: {event.__dict__}")
print(f"Full event object: {event}")
print(f"Traceback: {traceback.format_exc()}")
print("=" * 50)
raise
print(f"\n{'='*50}\n")
# Break main loop if we've output an external message
if not is_internal_agent and current_agent.name in agent_message_counts:
break
# After all events are processed, set final state
# Set final state
final_state = {
"last_agent_name": current_agent.name if current_agent else None,
"tokens": tokens_used
"tokens": tokens_used,
"turn_messages": accumulated_messages
}
print('-'*100)
print(f"Yielding done: {final_state}")
print('-'*100)
yield ('done', {'state': final_state})
except Exception as e:
print(traceback.format_exc())
print(f"Error in stream processing: {str(e)}")
print("Yielding error event:", {'error': str(e), 'state': final_state})
yield ('error', {'error': str(e), 'state': final_state}) # Include final_state in error response
yield ('error', {'error': str(e), 'state': final_state})

View file

@ -3,7 +3,7 @@ import json
import aiohttp
import jwt
import hashlib
from agents import OpenAIChatCompletionsModel
from agents import OpenAIChatCompletionsModel, trace, add_trace_processor
# Import helper functions needed for get_agents
from .helpers.access import (
@ -15,10 +15,10 @@ from .helpers.instructions import (
)
from agents import Agent as NewAgent, Runner, FunctionTool, RunContextWrapper, ModelSettings, WebSearchTool
from .tracing import AgentTurnTraceProcessor
# Add import for OpenAI functionality
from src.utils.common import common_logger as logger, generate_openai_output
from src.utils.common import generate_openai_output
from typing import Any
from dataclasses import asdict
import asyncio
from mcp import ClientSession
from mcp.client.sse import sse_client
@ -54,7 +54,7 @@ async def mock_tool(tool_name: str, args: str, description: str, mock_instructio
response_content = generate_openai_output(messages, output_type='text', model=PROVIDER_DEFAULT_MODEL)
return response_content
except Exception as e:
logger.error(f"Error in mock_tool: {str(e)}")
print(f"Error in mock_tool: {str(e)}")
return f"Error: {str(e)}"
async def call_webhook(tool_name: str, args: str, webhook_url: str, signing_secret: str) -> str:
@ -91,7 +91,7 @@ async def call_webhook(tool_name: str, args: str, webhook_url: str, signing_secr
print(f"Webhook error: {error_msg}")
return f"Error: {error_msg}"
except Exception as e:
logger.error(f"Exception in call_webhook: {str(e)}")
print(f"Exception in call_webhook: {str(e)}")
return f"Error: Failed to call webhook - {str(e)}"
async def call_mcp(tool_name: str, args: str, mcp_server_url: str) -> str:
@ -106,7 +106,7 @@ async def call_mcp(tool_name: str, args: str, mcp_server_url: str) -> str:
return json_output
except Exception as e:
logger.error(f"Error in call_mcp: {str(e)}")
print(f"Error in call_mcp: {str(e)}")
return f"Error: {str(e)}"
async def catch_all(ctx: RunContextWrapper[Any], args: str, tool_name: str, tool_config: dict, complete_request: dict) -> str:
@ -143,7 +143,7 @@ async def catch_all(ctx: RunContextWrapper[Any], args: str, tool_name: str, tool
response_content = await call_webhook(tool_name, args, webhook_url, signing_secret)
return response_content
except Exception as e:
logger.error(f"Error in catch_all: {str(e)}")
print(f"Error in catch_all: {str(e)}")
return f"Error: {str(e)}"
@ -191,7 +191,6 @@ def get_agents(agent_configs, tool_configs, complete_request):
new_agent_name_to_index = {}
# Create Agent objects from config
for agent_config in agent_configs:
logger.debug(f"Processing config for agent: {agent_config['name']}")
print("="*100)
print(f"Processing config for agent: {agent_config['name']}")
@ -204,14 +203,12 @@ def get_agents(agent_configs, tool_configs, complete_request):
# Prepare tool lists for this agent
external_tools = []
logger.debug(f"Agent {agent_config['name']} has {len(agent_config['tools'])} configured tools")
print(f"Agent {agent_config['name']} has {len(agent_config['tools'])} configured tools")
new_tools = []
rag_tool = get_rag_tool(agent_config, complete_request)
if rag_tool:
new_tools.append(rag_tool)
logger.debug(f"Added rag tool to agent {agent_config['name']}")
print(f"Added rag tool to agent {agent_config['name']}")
for tool_name in agent_config["tools"]:
@ -235,14 +232,11 @@ def get_agents(agent_configs, tool_configs, complete_request):
catch_all(ctx, args, _tool_name, _tool_config, _complete_request)
)
new_tools.append(tool)
logger.debug(f"Added tool {tool_name} to agent {agent_config['name']}")
print(f"Added tool {tool_name} to agent {agent_config['name']}")
else:
logger.warning(f"Tool {tool_name} not found in tool_configs")
print(f"WARNING: Tool {tool_name} not found in tool_configs")
# Create the agent object
logger.debug(f"Creating Agent object for {agent_config['name']}")
print(f"Creating Agent object for {agent_config['name']}")
# add the name and description to the agent instructions
@ -263,10 +257,8 @@ def get_agents(agent_configs, tool_configs, complete_request):
new_agent_to_children[agent_config["name"]] = agent_config.get("connectedAgents", [])
new_agent_name_to_index[agent_config["name"]] = len(new_agents)
new_agents.append(new_agent)
logger.debug(f"Successfully created agent: {agent_config['name']}")
print(f"Successfully created agent: {agent_config['name']}")
except Exception as e:
logger.error(f"Failed to create agent {agent_config['name']}: {str(e)}")
print(f"ERROR: Failed to create agent {agent_config['name']}: {str(e)}")
raise
@ -281,17 +273,20 @@ def get_agents(agent_configs, tool_configs, complete_request):
print("="*100)
return new_agents
# Initialize a flag to track if the trace processor is added
trace_processor_added = False
async def run_streamed(
agent,
messages,
external_tools=None,
tokens_used=None
tokens_used=None,
enable_tracing=False # Changed default to False
):
"""
Wrapper function for initializing and running the Swarm client in streaming mode.
"""
logger.info(f"Initializing Swarm streaming client for agent: {agent.name}")
print(f"Initializing Swarm streaming client for agent: {agent.name}")
print(f"Initializing streaming client for agent: {agent.name}")
# Initialize default parameters
if external_tools is None:
@ -314,14 +309,38 @@ async def run_streamed(
"content": str(msg)
})
logger.info("Beginning Swarm streaming run")
print("Beginning Swarm streaming run")
print("Beginning streaming run")
try:
# Use the Runner.run_streamed method
# Add our custom trace processor only if tracing is enabled
global trace_processor_added
if enable_tracing and not trace_processor_added:
trace_processor = AgentTurnTraceProcessor()
add_trace_processor(trace_processor)
trace_processor_added = True
# Create a trace context only if tracing is enabled
trace_ctx = None
if enable_tracing:
trace_ctx = trace(f"Agent turn: {agent.name}")
trace_ctx.__enter__()
# Get the stream result
stream_result = Runner.run_streamed(agent, formatted_messages)
# Patch the stream_events method to ensure trace context is maintained if tracing is enabled
if enable_tracing:
original_stream_events = stream_result.stream_events
async def wrapped_stream_events():
try:
async for event in original_stream_events():
yield event
finally:
if trace_ctx:
trace_ctx.__exit__(None, None, None)
stream_result.stream_events = wrapped_stream_events
return stream_result
except Exception as e:
logger.error(f"Error during streaming run: {str(e)}")
print(f"Error during streaming run: {str(e)}")
raise

View file

@ -3,7 +3,7 @@ from src.utils.common import generate_llm_output
import os
import copy
from .swarm_wrapper import Agent, Response, create_response
from .execute_turn import Agent, Response, create_response
from src.utils.common import common_logger, generate_openai_output, update_tokens_used
logger = common_logger

View file

@ -1,7 +1,5 @@
from .access import get_agent_config_by_name, get_agent_data_by_name
from src.graph.types import ControlType
from src.utils.common import common_logger
logger = common_logger
def get_last_agent_name(state, agent_configs, start_agent_name, msg_type, latest_assistant_msg, start_turn_with_start_agent):
default_last_agent_name = state.get("last_agent_name", '')
@ -9,7 +7,7 @@ def get_last_agent_name(state, agent_configs, start_agent_name, msg_type, latest
specific_agent_data = get_agent_data_by_name(default_last_agent_name, state.get("agent_data", []))
# Overrides for special cases
logger.info("Setting agent control based on last agent and control type")
print("Setting agent control based on last agent and control type")
if msg_type == "tool":
last_agent_name = default_last_agent_name
assert last_agent_name == latest_assistant_msg.get("sender", ''), "Last agent name does not match sender of latest assistant message during tool call handling"
@ -22,7 +20,7 @@ def get_last_agent_name(state, agent_configs, start_agent_name, msg_type, latest
if control_type == ControlType.PARENT_AGENT.value:
last_agent_name = specific_agent_data.get("most_recent_parent_name", None) if specific_agent_data else None
if not last_agent_name:
logger.error("Most recent parent is empty, defaulting to same agent instead")
print("Most recent parent is empty, defaulting to same agent instead")
last_agent_name = default_last_agent_name
elif control_type == ControlType.START_AGENT.value:
last_agent_name = start_agent_name
@ -30,7 +28,7 @@ def get_last_agent_name(state, agent_configs, start_agent_name, msg_type, latest
last_agent_name = default_last_agent_name
if default_last_agent_name != last_agent_name:
logger.info(f"Last agent name changed from {default_last_agent_name} to {last_agent_name} due to control settings")
print(f"Last agent name changed from {default_last_agent_name} to {last_agent_name} due to control settings")
return last_agent_name

View file

@ -1,4 +1,4 @@
from src.graph.instructions import TRANSFER_CHILDREN_INSTRUCTIONS, TRANSFER_PARENT_AWARE_INSTRUCTIONS, RAG_INSTRUCTIONS, ERROR_ESCALATION_AGENT_INSTRUCTIONS, TRANSFER_GIVE_UP_CONTROL_INSTRUCTIONS, SYSTEM_MESSAGE
from src.graph.instructions import TRANSFER_CHILDREN_INSTRUCTIONS, TRANSFER_PARENT_AWARE_INSTRUCTIONS, RAG_INSTRUCTIONS, ERROR_ESCALATION_AGENT_INSTRUCTIONS, TRANSFER_GIVE_UP_CONTROL_INSTRUCTIONS, SYSTEM_MESSAGE, CHILD_TRANSFER_RELATED_INSTRUCTIONS
def add_transfer_instructions_to_parent_agents(agent, children, transfer_functions):
other_agent_name_descriptions_tools = f'\n{'-'*100}\n'.join([f"Name: {agent.name}\nDescription: {agent.description if agent.description else ''}\nTool for transfer: {transfer_functions[agent.name].__name__}" for agent in children.values()])
@ -36,4 +36,9 @@ def get_universal_system_message(messages):
def add_universal_system_message_to_agent(agent, universal_sys_msg):
agent.instructions = agent.instructions + f'\n\n{'-'*100}\n\n' + universal_sys_msg
return agent
return agent
def add_child_transfer_related_instructions(agent):
prompt = CHILD_TRANSFER_RELATED_INSTRUCTIONS
agent.instructions = agent.instructions + f'\n\n{'-'*100}\n\n' + prompt
return agent

View file

@ -1,5 +1,6 @@
import json
import uuid
import traceback
def handle_web_search_event(event, current_agent):
"""
@ -8,53 +9,166 @@ def handle_web_search_event(event, current_agent):
"""
messages = []
# Handle raw response web search
if event.type == "raw_response_event":
if hasattr(event, 'data') and hasattr(event.data, 'raw_item'):
raw_item = event.data.raw_item
if (hasattr(raw_item, 'type') and raw_item.type == 'web_search_call') or (
isinstance(raw_item, dict) and raw_item.get('type') == 'web_search_call'
try:
# Handle raw response web search
if event.type == "raw_response_event":
if hasattr(event, 'data') and hasattr(event.data, 'raw_item'):
raw_item = event.data.raw_item
if (hasattr(raw_item, 'type') and raw_item.type == 'web_search_call') or (
isinstance(raw_item, dict) and raw_item.get('type') == 'web_search_call'
):
call_id = None
if hasattr(raw_item, 'id'):
call_id = raw_item.id
elif isinstance(raw_item, dict) and 'id' in raw_item:
call_id = raw_item['id']
else:
call_id = str(uuid.uuid4())
status = 'unknown'
if hasattr(raw_item, 'status'):
status = raw_item.status
elif isinstance(raw_item, dict) and 'status' in raw_item:
status = raw_item['status']
tool_call_msg = {
'content': None,
'role': 'assistant',
'sender': current_agent.name if current_agent else None,
'tool_calls': [{
'function': {
'name': 'web_search',
'arguments': json.dumps({
'search_id': call_id,
'status': status
})
},
'id': call_id,
'type': 'function'
}],
'tool_call_id': None,
'tool_name': None,
'response_type': 'internal'
}
print(f"Condition for tool call matched in raw_response_event. Appending tool call message: {tool_call_msg}")
messages.append(tool_call_msg)
tool_call_output_dummy_msg = {
'content': 'Web search completed.',
'role': 'tool',
'sender': None,
'tool_calls': None,
'tool_call_id': call_id,
'tool_name': 'web_search',
'response_type': 'internal'
}
messages.append(tool_call_output_dummy_msg)
# Handle run item web search events
elif event.type == "run_item_stream_event":
if event.item.type == "tool_call_item":
try:
# Check if it's a web search call
if hasattr(event.item.raw_item, 'type') and event.item.raw_item.type == 'web_search_call':
call_id = event.item.raw_item.id if hasattr(event.item.raw_item, 'id') else str(uuid.uuid4())
tool_call_msg = {
'content': None,
'role': 'assistant',
'sender': current_agent.name if current_agent else None,
'tool_calls': [{
'function': {
'name': 'web_search',
'arguments': json.dumps({
'search_id': call_id
})
},
'id': call_id,
'type': 'function'
}],
'tool_call_id': None,
'tool_name': None,
'response_type': 'internal'
}
print(f"Condition for tool call matched in run_item_stream_event. Appending tool call message: {tool_call_msg}")
messages.append(tool_call_msg)
tool_call_output_dummy_msg = {
'content': 'Web search completed.',
'role': 'tool',
'sender': None,
'tool_calls': None,
'tool_call_id': call_id,
'tool_name': 'web_search',
'response_type': 'internal'
}
messages.append(tool_call_output_dummy_msg)
else:
# Handle regular tool calls
tool_call_msg = {
'content': None,
'role': 'assistant',
'sender': current_agent.name if current_agent else None,
'tool_calls': [{
'function': {
'name': event.item.raw_item.name,
'arguments': event.item.raw_item.arguments
},
'id': event.item.raw_item.call_id,
'type': 'function'
}],
'tool_call_id': None,
'tool_name': None,
'response_type': 'internal'
}
print(f"Condition for tool call matched in run_item_stream_event. Appending tool call message: {tool_call_msg}")
messages.append(tool_call_msg)
tool_call_output_dummy_msg = {
'content': 'Web search completed.',
'role': 'tool',
'sender': None,
'tool_calls': None,
'tool_call_id': call_id,
'tool_name': 'web_search',
'response_type': 'internal'
}
messages.append(tool_call_output_dummy_msg)
except Exception as e:
print("\n=== Error in tool_call_item handling ===")
print(f"Error: {str(e)}")
print(f"Event type: {event.type}")
print(f"Event item type: {event.item.type}")
print("Event details:")
print(f"Raw item: {event.item.raw_item}")
if hasattr(event.item.raw_item, '__dict__'):
print(f"Raw item attributes: {event.item.raw_item.__dict__}")
print(f"Traceback: {traceback.format_exc()}")
print("=" * 50)
raise
elif event.item.type == "tool_call_output_item":
if isinstance(event.item.raw_item, dict) and event.item.raw_item.get('type') == 'web_search_results':
call_id = event.item.raw_item.get('search_id', event.item.raw_item.get('id', str(uuid.uuid4())))
tool_call_output_msg = {
'content': str(event.item.output),
'role': 'tool',
'sender': None,
'tool_calls': None,
'tool_call_id': call_id,
'tool_name': 'web_search',
'response_type': 'internal'
}
print(f"Condition for tool call output matched in run_item_stream_event. Appending tool call output message: {tool_call_output_msg}")
messages.append(tool_call_output_msg)
elif event.item.type == "web_search_call_item" or (
hasattr(event.item, 'raw_item') and
hasattr(event.item.raw_item, 'type') and
event.item.raw_item.type == 'web_search_call'
):
call_id = None
if hasattr(raw_item, 'id'):
call_id = raw_item.id
elif isinstance(raw_item, dict) and 'id' in raw_item:
call_id = raw_item['id']
else:
call_id = str(uuid.uuid4())
status = 'unknown'
if hasattr(raw_item, 'status'):
status = raw_item.status
elif isinstance(raw_item, dict) and 'status' in raw_item:
status = raw_item['status']
messages.append({
'content': None,
'role': 'assistant',
'sender': current_agent.name if current_agent else None,
'tool_calls': [{
'function': {
'name': 'web_search',
'arguments': json.dumps({
'search_id': call_id,
'status': status
})
},
'id': call_id,
'type': 'function'
}],
'tool_call_id': None,
'tool_name': None,
'response_type': 'internal'
})
# Handle run item web search events
elif event.type == "run_item_stream_event":
if event.item.type == "tool_call_item":
if hasattr(event.item.raw_item, 'type') and event.item.raw_item.type == 'web_search_call':
call_id = event.item.raw_item.id if hasattr(event.item.raw_item, 'id') else str(uuid.uuid4())
messages.append({
if hasattr(event.item.raw_item, 'id'):
call_id = event.item.raw_item.id
tool_call_msg = {
'content': None,
'role': 'assistant',
'sender': current_agent.name if current_agent else None,
@ -65,107 +179,90 @@ def handle_web_search_event(event, current_agent):
'search_id': call_id
})
},
'id': call_id,
'id': call_id or str(uuid.uuid4()),
'type': 'function'
}],
'tool_call_id': None,
'tool_name': None,
'response_type': 'internal'
})
messages.append({
'content': "Web search done",
}
print(f"Condition for tool call matched in run_item_stream_event. Appending tool call message: {tool_call_msg}")
messages.append(tool_call_msg)
tool_call_output_dummy_msg = {
'content': 'Web search completed.',
'role': 'tool',
'sender': None,
'tool_calls': None,
'tool_call_id': call_id,
'tool_name': 'web_search',
'response_type': 'internal'
})
}
messages.append(tool_call_output_dummy_msg)
elif event.item.type == "tool_call_output_item":
if isinstance(event.item.raw_item, dict) and event.item.raw_item.get('type') == 'web_search_results':
call_id = event.item.raw_item.get('search_id', event.item.raw_item.get('id', str(uuid.uuid4())))
messages.append({
'content': str(event.item.output),
elif event.item.type == "web_search_results_item" or (
hasattr(event.item, 'raw_item') and (
(hasattr(event.item.raw_item, 'type') and event.item.raw_item.type == 'web_search_results') or
(isinstance(event.item.raw_item, dict) and event.item.raw_item.get('type') == 'web_search_results')
)
):
raw_item = event.item.raw_item
call_id = None
if hasattr(raw_item, 'search_id'):
call_id = raw_item.search_id
elif isinstance(raw_item, dict) and 'search_id' in raw_item:
call_id = raw_item['search_id']
elif hasattr(raw_item, 'id'):
call_id = raw_item.id
elif isinstance(raw_item, dict) and 'id' in raw_item:
call_id = raw_item['id']
else:
call_id = str(uuid.uuid4())
results = {}
if hasattr(event.item, 'output'):
results = event.item.output
elif hasattr(raw_item, 'results'):
results = raw_item.results
elif isinstance(raw_item, dict) and 'results' in raw_item:
results = raw_item['results']
results_str = ""
try:
results_str = json.dumps(results) if results else ""
except Exception as e:
print(f"Error serializing results: {str(e)}")
results_str = str(results)
tool_call_output_msg = {
'content': results_str,
'role': 'tool',
'sender': None,
'tool_calls': None,
'tool_call_id': call_id,
'tool_name': 'web_search',
'response_type': 'internal'
})
}
print(f"Condition for tool call output matched in run_item_stream_event. Appending tool call output message: {tool_call_output_msg}")
messages.append(tool_call_output_msg)
elif event.item.type == "web_search_call_item" or (
hasattr(event.item, 'raw_item') and
hasattr(event.item.raw_item, 'type') and
event.item.raw_item.type == 'web_search_call'
):
call_id = None
if hasattr(event.item.raw_item, 'id'):
call_id = event.item.raw_item.id
except Exception as e:
print("\n=== Error in handle_web_search_event ===")
print(f"Error: {str(e)}")
print(f"Event type: {event.type}")
if hasattr(event, 'item'):
print(f"Event item type: {event.item.type}")
print("Event item details:")
print(f"Raw item: {event.item.raw_item}")
if hasattr(event.item.raw_item, '__dict__'):
print(f"Raw item attributes: {event.item.raw_item.__dict__}")
print(f"Traceback: {traceback.format_exc()}")
print("=" * 50)
raise
messages.append({
'content': None,
'role': 'assistant',
'sender': current_agent.name if current_agent else None,
'tool_calls': [{
'function': {
'name': 'web_search',
'arguments': json.dumps({
'search_id': call_id
})
},
'id': call_id or str(uuid.uuid4()),
'type': 'function'
}],
'tool_call_id': None,
'tool_name': None,
'response_type': 'internal'
})
elif event.item.type == "web_search_results_item" or (
hasattr(event.item, 'raw_item') and (
(hasattr(event.item.raw_item, 'type') and event.item.raw_item.type == 'web_search_results') or
(isinstance(event.item.raw_item, dict) and event.item.raw_item.get('type') == 'web_search_results')
)
):
raw_item = event.item.raw_item
call_id = None
if hasattr(raw_item, 'search_id'):
call_id = raw_item.search_id
elif isinstance(raw_item, dict) and 'search_id' in raw_item:
call_id = raw_item['search_id']
elif hasattr(raw_item, 'id'):
call_id = raw_item.id
elif isinstance(raw_item, dict) and 'id' in raw_item:
call_id = raw_item['id']
else:
call_id = str(uuid.uuid4())
results = {}
if hasattr(event.item, 'output'):
results = event.item.output
elif hasattr(raw_item, 'results'):
results = raw_item.results
elif isinstance(raw_item, dict) and 'results' in raw_item:
results = raw_item['results']
results_str = ""
try:
results_str = json.dumps(results) if results else ""
except Exception as e:
print(f"Error serializing results: {str(e)}")
results_str = str(results)
messages.append({
'content': results_str,
'role': 'tool',
'sender': None,
'tool_calls': None,
'tool_call_id': call_id,
'tool_name': 'web_search',
'response_type': 'internal'
})
if messages:
print("-"*100)
print(f"Web search related messages: {messages}")
print("-"*100)
return messages

View file

@ -67,4 +67,39 @@ The rest of the parts of the chatbot were unable to handle the chat. Hence, the
SYSTEM_MESSAGE = f"""
# Additional System-Wide Context or Instructions:
{{system_message}}
"""
########################
# Instructions for non-repeat child transfer
########################
CHILD_TRANSFER_RELATED_INSTRUCTIONS = f"""
# Critical Rules for Agent Transfers and Handoffs
- SEQUENTIAL TRANSFERS AND RESPONSES:
1. BEFORE transferring to any agent:
- Plan your complete sequence of needed transfers
- Document which responses you need to collect
2. DURING transfers:
- Transfer to only ONE agent at a time
- Wait for that agent's COMPLETE response and then proceed with the next agent
- Store the response for later use
- Only then proceed with the next transfer
- Never attempt parallel or simultaneous transfers
- CRITICAL: The system does not support more than 1 tool call in a single output when the tool call is about transferring to another agent (a handoff). You must only put out 1 transfer related tool call in one output.
3. AFTER receiving a response:
- Do not transfer to another agent until you've processed the current response
- If you need to transfer to another agent, wait for your current processing to complete
- Never transfer back to an agent that has already responded
- COMPLETION REQUIREMENTS:
- Never provide final response until ALL required agents have been consulted
- Never attempt to get multiple responses in parallel
- If a transfer is rejected due to multiple handoffs:
1. Complete current response processing
2. Then retry the transfer as next in sequence
3. Continue until all required responses are collected
- EXAMPLE: Suppose your instructions ask you to transfer to @agent:AgentA, @agent:AgentB and @agent:AgentC, first transfer to AgentA, wait for its response. Then transfer to AgentB, wait for its response. Then transfer to AgentC, wait for its response. Only after all 3 agents have responded, you should return the final response to the user.
"""

View file

@ -0,0 +1,212 @@
from agents import TracingProcessor
import logging
from datetime import datetime, timedelta
import json
logger = logging.getLogger(__name__)
class AgentTurnTraceProcessor(TracingProcessor):
"""Custom trace processor to print detailed information about agent turns."""
def __init__(self):
self.span_depth = {} # Track depth of each span
self.handoff_chain = [] # Track sequence of agent handoffs
self.message_flow = [] # Track message flow between agents
def _get_indent_level(self, span):
"""Calculate indent level based on parent_id chain."""
depth = 0
current_id = span.parent_id
while current_id:
depth += 1
current_id = self.span_depth.get(current_id)
return depth
def _format_time(self, timestamp_str):
"""Convert ISO timestamp string to formatted time string in IST timezone."""
try:
dt = datetime.fromisoformat(timestamp_str.replace('Z', '+00:00'))
# Add 5 hours and 30 minutes for IST timezone
dt = dt + timedelta(hours=5, minutes=30)
return dt.strftime("%H:%M:%S.%f")[:-3]
except (ValueError, AttributeError):
return "00:00:00.000"
def _calculate_duration(self, start_str, end_str):
"""Calculate duration between two ISO timestamp strings in seconds."""
try:
start = datetime.fromisoformat(start_str.replace('Z', '+00:00'))
end = datetime.fromisoformat(end_str.replace('Z', '+00:00'))
return (end - start).total_seconds()
except (ValueError, AttributeError):
return 0.0
def _get_span_id(self, span):
"""Safely get span identifier."""
for attr in ['span_id', 'id', 'trace_id']:
if hasattr(span, attr):
return getattr(span, attr)
return None
def _print_handoff_chain(self, indent=""):
"""Print the current handoff chain."""
if self.handoff_chain:
print(f"{indent}Current Handoff Chain:")
print(f"{indent} {' -> '.join(self.handoff_chain)}")
def _print_message_flow(self, indent=""):
"""Print the message flow history."""
if self.message_flow:
print(f"{indent}Message Flow History:")
for msg in self.message_flow:
print(f"{indent} {msg}")
def on_trace_start(self, trace):
"""Called when a trace starts."""
separator = "="*100
print("\n" + separator)
print("🚀 TRACE START")
print(f"Name: {trace.name}")
print(f"ID: {trace.trace_id}")
if trace.metadata:
print("\nMetadata:")
for key, value in trace.metadata.items():
print(f" {key}: {value}")
print(separator + "\n")
# Reset tracking for new trace
self.handoff_chain = []
self.message_flow = []
def on_trace_end(self, trace):
"""Called when a trace ends."""
separator = "="*100
print("\n" + separator)
print("✅ TRACE END")
print(f"Name: {trace.name}")
print(f"ID: {trace.trace_id}")
# Print final chain state
print("\nFinal State:")
self._print_handoff_chain(" ")
self._print_message_flow(" ")
print(separator + "\n")
# Clear tracking
self.span_depth.clear()
self.handoff_chain = []
self.message_flow = []
def on_span_start(self, span):
"""Called when a span starts."""
try:
indent = " " * self._get_indent_level(span)
start_time = self._format_time(span.started_at)
span_id = self._get_span_id(span)
# Track span depth
if span.parent_id and span_id:
self.span_depth[span_id] = span.parent_id
# Print span header with clear section separator
print(f"\n{indent}{'>'*40}")
print(f"{indent}▶ [{start_time}] {span.span_data.type.upper()} SPAN START")
print(f"{indent} ID: {span_id}")
print(f"{indent} Parent ID: {span.parent_id}")
data = span.span_data.export()
# Print span-specific information
if span.span_data.type == "agent":
agent_name = data.get('name', 'Unknown')
print(f"{indent} Agent: {agent_name}")
print(f"{indent} Handoffs: {', '.join(data.get('handoffs', []))}")
# Track agent in handoff chain
if agent_name not in self.handoff_chain:
self.handoff_chain.append(agent_name)
self._print_handoff_chain(indent + " ")
elif span.span_data.type == "generation":
print(f"{indent} Model: {data.get('model', 'Unknown')}")
messages = data.get('messages', [])
if messages:
print(f"{indent} Messages: {len(messages)} message(s)")
elif span.span_data.type == "function":
print(f"{indent} Function: {data.get('name', 'Unknown')}")
args = data.get('arguments')
if args:
print(f"{indent} Arguments: {args}")
elif span.span_data.type == "handoff":
from_agent = data.get('from_agent', 'Unknown')
to_agent = data.get('to_agent', 'Unknown')
print(f"{indent} From: {from_agent}")
print(f"{indent} To: {to_agent}")
# Track handoff in message flow
flow_msg = f"{from_agent} -> {to_agent}"
self.message_flow.append(flow_msg)
print(f"{indent} Message Flow:")
for msg in self.message_flow[-3:]: # Show last 3 flows
print(f"{indent} {msg}")
print(f"{indent}{'>'*40}")
except Exception as e:
print(f"\n❌ Error in on_span_start: {str(e)}")
import traceback
print(traceback.format_exc())
def on_span_end(self, span):
"""Called when a span ends."""
try:
indent = " " * self._get_indent_level(span)
end_time = self._format_time(span.ended_at)
duration = self._calculate_duration(span.started_at, span.ended_at)
# Print span end information with clear section separator
print(f"\n{indent}{'<'*40}")
print(f"{indent}◀ [{end_time}] {span.span_data.type.upper()} SPAN END")
print(f"{indent} Duration: {duration:.3f}s")
data = span.span_data.export()
# Print span-specific output
if span.span_data.type == "generation":
output = data.get('output')
if output:
print(f"{indent} Output: {str(output)[:200]}...")
elif span.span_data.type == "function":
output = data.get('output')
if output:
print(f"{indent} Output: {str(output)[:200]}...")
elif span.span_data.type == "handoff":
self._print_handoff_chain(indent + " ")
self._print_message_flow(indent + " ")
print(f"{indent}{'<'*40}")
# Clean up span depth tracking
span_id = self._get_span_id(span)
if span_id and span_id in self.span_depth:
del self.span_depth[span_id]
except Exception as e:
print(f"\n❌ Error in on_span_end: {str(e)}")
import traceback
print(traceback.format_exc())
def shutdown(self):
"""Called when the processor is shutting down."""
self.span_depth.clear()
self.handoff_chain = []
self.message_flow = []
def force_flush(self):
"""Called to force flush any buffered traces/spans."""
pass

View file

@ -4,13 +4,18 @@ class AgentRole(Enum):
POST_PROCESSING = "post_process"
GUARDRAILS = "guardrails"
class VisibilityType(Enum):
EXTERNAL = "external"
class outputVisibility(Enum):
EXTERNAL = "user_facing"
INTERNAL = "internal"
class ResponseType(Enum):
INTERNAL = "internal"
EXTERNAL = "external"
class ControlType(Enum):
RETAIN = "retain"
PARENT_AGENT = "relinquish_to_parent"
START_AGENT = "start_agent"
class PromptType(Enum):
STYLE = "style_prompt"