Remove old swarm code and unused imports

This commit is contained in:
akhisud3195 2025-05-02 17:10:37 +05:30
parent fef056073f
commit 6d47adafb3
2 changed files with 4 additions and 194 deletions

View file

@ -8,12 +8,10 @@ from hypercorn.config import Config
from hypercorn.asyncio import serve
import asyncio
from src.graph.core import run_turn, run_turn_streamed
from src.graph.core import run_turn_streamed
from src.graph.tools import RAG_TOOL, CLOSE_CHAT_TOOL
from src.utils.common import common_logger, read_json_from_file
from pprint import pprint
logger = common_logger
app = Quart(__name__)
config = read_json_from_file("./configs/default_config.json")

View file

@ -9,11 +9,9 @@ from .helpers.access import (
get_external_tools,
get_prompt_by_type
)
from .helpers.state import (
construct_state_from_response
)
from .helpers.control import get_latest_assistant_msg, get_latest_non_assistant_messages, get_last_agent_name
from .swarm_wrapper import run as swarm_run, run_streamed as swarm_run_streamed, create_response, get_agents
from .helpers.control import get_last_agent_name
from .swarm_wrapper import run_streamed as swarm_run_streamed, get_agents
from src.utils.common import common_logger as logger
from .types import PromptType
@ -58,192 +56,6 @@ def set_sys_message(messages):
return messages
def clean_up_history(agent_data):
"""
Ensures each agent's history is sorted using order_messages.
"""
for data in agent_data:
data["history"] = order_messages(data["history"])
return agent_data
def create_final_response(response, turn_messages, tokens_used, all_agents):
"""
Constructs the final response data (messages, tokens_used, updated state) that a caller would need.
"""
# Ensure response has a messages attribute
if not hasattr(response, 'messages'):
response.messages = []
# Assign the appropriate messages to the response
response.messages = turn_messages
# Ensure tokens_used is a valid dictionary
if not isinstance(tokens_used, dict):
tokens_used = {"total": 0, "prompt": 0, "completion": 0} # Default values if not a dictionary
# Ensure response has a tokens_used attribute that's a dictionary
if not hasattr(response, 'tokens_used') or not isinstance(response.tokens_used, dict):
response.tokens_used = {}
response.tokens_used = tokens_used
# Ensure response has an agent attribute for state construction
if not hasattr(response, 'agent'):
if all_agents and len(all_agents) > 0:
response.agent = all_agents[0] # Set default agent if missing
new_state = construct_state_from_response(response, all_agents)
return response.messages, response.tokens_used, new_state
async def run_turn(
messages, start_agent_name, agent_configs, tool_configs, start_turn_with_start_agent, state={}, additional_tool_configs=[], complete_request={}
):
"""
Coordinates a single 'turn' of conversation or processing among agents.
Includes validation, agent setup, optional greeting logic, error handling, and post-processing steps.
"""
logger.info("Running stateless turn")
print("Running stateless turn")
# Sort messages by the specified ordering
#messages = order_messages(messages)
# Merge any additional tool configs
tool_configs = tool_configs + additional_tool_configs
# Determine if this is a greeting turn
greeting_turn = not any(msg.get("role") != "system" for msg in messages)
turn_messages = []
# Initialize tokens_used as a dictionary
tokens_used = {"total": 0, "prompt": 0, "completion": 0}
agent_data = state.get("agent_data", [])
# If not a greeting turn, localize the last user or system messages
if not greeting_turn:
latest_assistant_msg = get_latest_assistant_msg(messages)
latest_non_assistant_msgs = get_latest_non_assistant_messages(messages)
msg_type = latest_non_assistant_msgs[-1]["role"]
# Determine the last agent from state/config
last_agent_name = get_last_agent_name(
state=state,
agent_configs=agent_configs,
start_agent_name=start_agent_name,
msg_type=msg_type,
latest_assistant_msg=latest_assistant_msg,
start_turn_with_start_agent=start_turn_with_start_agent
)
else:
# For a greeting turn, we assume the last agent is the start_agent_name
last_agent_name = start_agent_name
state["agent_data"] = agent_data
# Initialize all agents
logger.info("Initializing agents")
print("Initializing agents")
new_agents = get_agents(
agent_configs=agent_configs,
tool_configs=tool_configs,
complete_request=complete_request
)
# Prepare escalation agent
last_new_agent = get_agent_by_name(last_agent_name, new_agents)
# Gather external tools for Swarm
external_tools = get_external_tools(tool_configs)
logger.info(f"Found {len(external_tools)} external tools")
print(f"Found {len(external_tools)} external tools")
# If no validation error yet, proceed with the main run
logger.info("Running swarm run")
print("Running swarm run")
response = await swarm_run(
agent=last_new_agent,
messages=messages,
external_tools=external_tools,
tokens_used=tokens_used
)
logger.info("Swarm run completed")
print("Swarm run completed")
# Initialize response.messages if it doesn't exist
if not hasattr(response, 'messages'):
response.messages = []
# Convert the ResponseOutputMessage to a standard message format
if hasattr(response, 'new_items') and response.new_items and hasattr(response.new_items[-1], 'raw_item'):
raw_item = response.new_items[-1].raw_item
# Extract text content from ResponseOutputText objects
content = ""
if hasattr(raw_item, 'content') and raw_item.content:
for content_item in raw_item.content:
if hasattr(content_item, 'text'):
content += content_item.text
# Create a standard message dictionary
standard_message = {
"role": raw_item.role if hasattr(raw_item, 'role') else "assistant",
"content": content,
"sender": last_new_agent.name,
"created_at": None,
"response_type": "internal"
}
# Add the converted message to response messages
response.messages.append(standard_message)
logger.info("Converted message added to response messages")
print("Converted message added to response messages")
# Use a dictionary for tokens_used
tokens_used = {"total": 0, "prompt": 0, "completion": 0} # Default values as placeholders
# Ensure turn_messages can be extended with response.messages
if hasattr(response, 'messages') and isinstance(response.messages, list):
turn_messages.extend(response.messages)
logger.info(f"Completed run of agent: {last_new_agent.name}")
print(f"Completed run of agent: {last_new_agent.name}")
# Otherwise, duplicate the last response as external
logger.info("No post-processing agent found. Duplicating last response and setting to external.")
print("No post-processing agent found. Duplicating last response and setting to external.")
if turn_messages:
duplicate_msg = deepcopy(turn_messages[-1])
duplicate_msg["response_type"] = "external"
duplicate_msg["sender"] += " >> External"
# Ensure tokens_used remains a proper dictionary
if not isinstance(tokens_used, dict):
tokens_used = {"total": 0, "prompt": 0, "completion": 0} # Default values if not a dictionary
response = create_response(
messages=[duplicate_msg],
tokens_used=tokens_used,
agent=last_new_agent,
error_msg=''
)
# Ensure response has messages attribute
if hasattr(response, 'messages') and isinstance(response.messages, list):
turn_messages.extend(response.messages)
# Finalize the response
logger.info("Finalizing response")
print("Finalizing response")
return create_final_response(
response=response,
turn_messages=turn_messages,
tokens_used=tokens_used,
all_agents=new_agents
)
async def run_turn_streamed(
messages,
start_agent_name,