diff --git a/metagpt/environment/base_env.py b/metagpt/environment/base_env.py index 5d6d3a286..0776ae9ff 100644 --- a/metagpt/environment/base_env.py +++ b/metagpt/environment/base_env.py @@ -159,7 +159,7 @@ class Environment(ExtEnv): """增加一个在当前环境的角色 Add a role in the current environment """ - self.roles[role.profile] = role + self.roles[role.name] = role role.set_env(self) role.context = self.context @@ -168,7 +168,7 @@ class Environment(ExtEnv): Add a batch of characters in the current environment """ for role in roles: - self.roles[role.profile] = role + self.roles[role.name] = role for role in roles: # setup system message with roles role.context = self.context diff --git a/metagpt/environment/mgx/mgx_env.py b/metagpt/environment/mgx/mgx_env.py index 3bd41fd1e..33160cfd8 100644 --- a/metagpt/environment/mgx/mgx_env.py +++ b/metagpt/environment/mgx/mgx_env.py @@ -17,20 +17,33 @@ from metagpt.utils.common import any_to_str, any_to_str_set class MGXEnv(Environment): """MGX Environment""" - # Before enabling TL to fully take over the routing, all software company roles need to be able to handle TL messages, which requires restructuring. - allow_bypass_team_leader: bool = True + # If True, fixed software sop bypassing TL is allowed, otherwise, TL will fully take over the routing + allow_bypass_team_leader: bool = False + + direct_chat_roles: set[str] = set() # record direct chat: @role_name def _publish_message(self, message: Message, peekable: bool = True) -> bool: return super().publish_message(message, peekable) def publish_message(self, message: Message, user_defined_recipient: str = "", publicer: str = "") -> bool: """let the team leader take over message publishing""" - tl = self.get_role("Team Leader") + tl = self.get_role("Tim") # TeamLeader's name is Tim if user_defined_recipient: + # human user's direct chat message to a certain role + for role_name in message.send_to: + if self.get_role(role_name).is_idle: + # User starts a new direct chat with a certain role, expecting a direct chat response from the role; Other roles including TL should not be involved. + # If the role is not idle, it means the user helps the role with its current work, in this case, we handle the role's response message as usual. + self.direct_chat_roles.add(role_name) + self._publish_message(message) - # bypass team leader, team leader only needs to know but not to react - tl.rc.memory.add(self.move_message_info_to_content(message)) + # # bypass team leader, team leader only needs to know but not to react (commented out because TL doesn't understand the message well in actual experiments) + # tl.rc.memory.add(self.move_message_info_to_content(message)) + + elif message.sent_from in self.direct_chat_roles: + # direct chat response from a certain role to human user, team leader and other roles in the env should not be involved, no need to publish + self.direct_chat_roles.remove(message.sent_from) elif ( self.allow_bypass_team_leader @@ -106,3 +119,6 @@ class MGXEnv(Environment): sent_from = converted_msg.metadata[AGENT] if AGENT in converted_msg.metadata else converted_msg.sent_from converted_msg.content = f"from {sent_from} to {converted_msg.send_to}: {converted_msg.content}" return converted_msg + + def __repr__(self): + return "MGXEnv()" diff --git a/metagpt/prompts/di/role_zero.py b/metagpt/prompts/di/role_zero.py index f098d2c4b..4d52476aa 100644 --- a/metagpt/prompts/di/role_zero.py +++ b/metagpt/prompts/di/role_zero.py @@ -20,7 +20,7 @@ class Task(BaseModel): # Available Commands {available_commands} -Special Command: Use {{"command_name": "pass"}} to do nothing and {{"command_name": "end"}} to indicate completion of all requirements and the end of actions. +Special Command: Use {{"command_name": "end"}} to do nothing or indicate completion of all requirements and the end of actions. # Current Plan {plan_status} diff --git a/metagpt/prompts/di/team_leader.py b/metagpt/prompts/di/team_leader.py index e5ab03ac1..5abb1abcc 100644 --- a/metagpt/prompts/di/team_leader.py +++ b/metagpt/prompts/di/team_leader.py @@ -31,5 +31,6 @@ FINISH_CURRENT_TASK_CMD = """ "command_name": "Plan.finish_current_task", "args": {{}} } +] ``` """ diff --git a/metagpt/roles/di/data_analyst.py b/metagpt/roles/di/data_analyst.py index e489c9d98..71e6bacd4 100644 --- a/metagpt/roles/di/data_analyst.py +++ b/metagpt/roles/di/data_analyst.py @@ -41,6 +41,7 @@ class DataAnalyst(DataInterpreter): # Command.PASS, ] commands: list[dict] = [] # issued commands to be executed + user_requirement: str = "" @model_validator(mode="after") def set_plan_and_tool(self) -> "DataInterpreter": @@ -68,9 +69,6 @@ class DataAnalyst(DataInterpreter): self.user_requirement = self.get_memories()[-1].content self.planner.plan.goal = self.user_requirement example = KeywordExpRetriever().retrieve(self.user_requirement) - else: - self.working_memory.add_batch(self.rc.news) - # TODO: implement experience retrieval in multi-round setting plan_status = self.planner.plan.model_dump(include=["goal", "tasks"]) # for task in plan_status["tasks"]: @@ -82,10 +80,11 @@ class DataAnalyst(DataInterpreter): available_commands=prepare_command_prompt(self.available_commands), ) context = self.llm.format_msg(self.working_memory.get() + [Message(content=prompt, role="user")]) - async with ThoughtReporter(): + # print(*context, sep="\n" + "*" * 5 + "\n") + async with ThoughtReporter(enable_llm_stream=True): rsp = await self.llm.aask(context) self.commands = json.loads(CodeParser.parse_code(block=None, text=rsp)) - self.rc.memory.add(Message(content=rsp, role="assistant")) + self.rc.working_memory.add(Message(content=rsp, role="assistant")) await run_commands(self, self.commands, self.rc.working_memory) @@ -118,7 +117,11 @@ class DataAnalyst(DataInterpreter): rsp = Message(content="No actions taken yet", cause_by=Action) # will be overwritten after Role _act while actions_taken < self.rc.max_react_loop: # NOTE: Diff 2: Keep observing within _react, news will go into memory, allowing adapting to new info + # add news from self._observe, the one called in self.run, consider removing when switching from working_memory to memory + self.working_memory.add_batch(self.rc.news) await self._observe() + # add news from this self._observe, we need twice because _observe rewrites rc.news + self.working_memory.add_batch(self.rc.news) # think has_todo = await self._think() diff --git a/metagpt/roles/di/data_interpreter.py b/metagpt/roles/di/data_interpreter.py index bdfc0e294..f90a928df 100644 --- a/metagpt/roles/di/data_interpreter.py +++ b/metagpt/roles/di/data_interpreter.py @@ -74,7 +74,7 @@ class DataInterpreter(Role): return True prompt = REACT_THINK_PROMPT.format(user_requirement=self.user_requirement, context=context) - async with ThoughtReporter(): + async with ThoughtReporter(enable_llm_stream=True): rsp = await self.llm.aask(prompt) rsp_dict = json.loads(CodeParser.parse_code(text=rsp)) self.working_memory.add(Message(content=rsp_dict["thoughts"], role="assistant")) diff --git a/metagpt/roles/di/role_zero.py b/metagpt/roles/di/role_zero.py index 44e658ed4..b5342409f 100644 --- a/metagpt/roles/di/role_zero.py +++ b/metagpt/roles/di/role_zero.py @@ -53,7 +53,6 @@ class RoleZero(Role): experience_retriever: ExpRetriever = DummyExpRetriever() # Others - user_requirement: str = "" command_rsp: str = "" # the raw string containing the commands commands: list[dict] = [] # commands to be executed memory_k: int = 20 # number of memories (messages) to use as historical context @@ -106,8 +105,7 @@ class RoleZero(Role): return False if not self.planner.plan.goal: - self.user_requirement = self.get_memories()[-1].content - self.planner.plan.goal = self.user_requirement + self.planner.plan.goal = self.get_memories()[-1].content ### 1. Experience ### example = self._retrieve_experience() @@ -129,7 +127,7 @@ class RoleZero(Role): ) context = self.llm.format_msg(self.rc.memory.get(self.memory_k) + [UserMessage(content=prompt)]) # print(*context, sep="\n" + "*" * 5 + "\n") - async with ThoughtReporter(): + async with ThoughtReporter(enable_llm_stream=True): self.command_rsp = await self.llm.aask(context, system_msgs=self.system_msg) self.rc.memory.add(AIMessage(content=self.command_rsp)) diff --git a/metagpt/strategy/experience_retriever.py b/metagpt/strategy/experience_retriever.py index 217a533d5..a43b94ce0 100644 --- a/metagpt/strategy/experience_retriever.py +++ b/metagpt/strategy/experience_retriever.py @@ -250,164 +250,165 @@ class TRDExpRetriever(ExpRetriever): """ +TL_EXAMPLE = """ +## example 1 +User Requirement: Create a cli snake game using Python. +Explanation: The requirement is about software development. Assign each tasks to a different team member based on their expertise. When publishing message to Product Manager, we copy original user requirement directly to ensure no information loss. +```json +[ + { + "command_name": "Plan.append_task", + "args": { + "task_id": "1", + "dependent_task_ids": [], + "instruction": "Create a product requirement document (PRD) outlining the features, user interface, and user experience of the CLI python snake game.", + "assignee": "Alice" + } + }, + { + "command_name": "Plan.append_task", + "args": { + "task_id": "2", + "dependent_task_ids": ["1"], + "instruction": "Design the software architecture for the CLI snake game, including the choice of programming language, libraries, and data flow.", + "assignee": "Bob" + } + }, + { + "command_name": "Plan.append_task", + "args": { + "task_id": "3", + "dependent_task_ids": ["2"], + "instruction": "Break down the architecture into manageable tasks, identify task dependencies, and prepare a detailed task list for implementation.", + "assignee": "Eve" + } + }, + { + "command_name": "Plan.append_task", + "args": { + "task_id": "4", + "dependent_task_ids": ["3"], + "instruction": "Implement the core game logic for the CLI snake game, including snake movement, food generation, and score tracking.", + "assignee": "Alex" + } + }, + { + "command_name": "Plan.append_task", + "args": { + "task_id": "5", + "dependent_task_ids": ["4"], + "instruction": "Write comprehensive tests for the game logic and user interface to ensure functionality and reliability.", + "assignee": "Edward" + } + }, + { + "command_name": "TeamLeader.publish_message", + "args": { + "content": "Create a cli snake game using Python", + "send_to": "Alice" + } + }, + { + "command_name": "RoleZero.reply_to_human", + "args": { + "content": "I have assigned the tasks to the team members. Alice will create the PRD, Bob will design the software architecture, Eve will break down the architecture into tasks, Alex will implement the core game logic, and Edward will write comprehensive tests. The team will work on the project accordingly", + } + }, + { + "command_name": "end" + } +] +``` + +## example 2 +User Requirement: Run data analysis on sklearn Wine recognition dataset, include a plot, and train a model to predict wine class (20% as validation), and show validation accuracy. +Explanation: DON'T decompose requirement if it is a DATA-RELATED task, assign a single task directly to Data Analyst David. He will manage the decomposition and implementation. +```json +[ + { + "command_name": "Plan.append_task", + "args": { + "task_id": "1", + "dependent_task_ids": [], + "instruction": "Run data analysis on sklearn Wine recognition dataset, include a plot, and train a model to predict wine class (20% as validation), and show validation accuracy.", + "assignee": "David" + } + }, + { + "command_name": "TeamLeader.publish_message", + "args": { + "content": "Run data analysis on sklearn Wine recognition dataset, include a plot, and train a model to predict wine class (20% as validation), and show validation accuracy.", + "send_to": "David" + } + }, + { + "command_name": "RoleZero.reply_to_human", + "args": { + "content": "I have assigned the task to David. He will break down the task further by himself and starts solving it.", + } + }, + { + "command_name": "end" + } +] +``` + +## example 3 +Conversation History: +[ + ..., + {'role': 'assistant', 'content': 'from Alice(Product Manager) to {'Bob'}: {'docs': {'20240424153821.json': {'root_path': 'docs/prd', 'filename': '20240424153821.json', 'content': '{"Language":"en_us","Programming Language":"Python","Original Requirements":"create a cli snake game","Project Name":"snake_game","Product Goals":["Develop an intuitive and addictive snake game",...], ...}}}}}, +] +Explanation: You received a message from Alice, the Product Manager, that she has completed the PRD, use Plan.finish_current_task to mark her task as finished and moves the plan to the next task. Based on plan status, next task is for Bob (Architect), publish a message asking him to start. The message content should contain important path info. +```json +[ + { + "command_name": "Plan.finish_current_task", + "args": {} + }, + { + "command_name": "TeamLeader.publish_message", + "args": { + "content": "Please design the software architecture for the snake game based on the PRD created by Alice. The PRD is at 'docs/prd/20240424153821.json'. Include the choice of programming language, libraries, and data flow, etc.", + "send_to": "Bob" + } + }, + { + "command_name": "RoleZero.reply_to_human", + "args": { + "content": "Alice has completed the PRD. I have marked her task as finished and sent the PRD to Bob. Bob will work on the software architecture.", + } + }, + { + "command_name": "end" + } +] +``` + +## example 4 +User Question: how does the project go? +Explanation: The user is asking for a general update on the project status. Give a straight answer about the current task the team is working on and provide a summary of the completed tasks. +```json +[ + { + "command_name": "RoleZero.reply_to_human", + "args": { + "content": "The team is currently working on ... We have completed ...", + } + }, + { + "command_name": "end" + } +] +``` +""" + + class SimpleExpRetriever(ExpRetriever): """A simple experience retriever that returns manually crafted examples.""" - EXAMPLE: str = """ - ## example 1 - User Requirement: Create a cli snake game using Python. - Explanation: The requirement is about software development. Assign each tasks to a different team member based on their expertise. When publishing message to Product Manager, we copy original user requirement directly to ensure no information loss. - ```json - [ - { - "command_name": "Plan.append_task", - "args": { - "task_id": "1", - "dependent_task_ids": [], - "instruction": "Create a product requirement document (PRD) outlining the features, user interface, and user experience of the CLI python snake game.", - "assignee": "Alice" - } - }, - { - "command_name": "Plan.append_task", - "args": { - "task_id": "2", - "dependent_task_ids": ["1"], - "instruction": "Design the software architecture for the CLI snake game, including the choice of programming language, libraries, and data flow.", - "assignee": "Bob" - } - }, - { - "command_name": "Plan.append_task", - "args": { - "task_id": "3", - "dependent_task_ids": ["2"], - "instruction": "Break down the architecture into manageable tasks, identify task dependencies, and prepare a detailed task list for implementation.", - "assignee": "Eve" - } - }, - { - "command_name": "Plan.append_task", - "args": { - "task_id": "4", - "dependent_task_ids": ["3"], - "instruction": "Implement the core game logic for the CLI snake game, including snake movement, food generation, and score tracking.", - "assignee": "Alex" - } - }, - { - "command_name": "Plan.append_task", - "args": { - "task_id": "5", - "dependent_task_ids": ["4"], - "instruction": "Write comprehensive tests for the game logic and user interface to ensure functionality and reliability.", - "assignee": "Edward" - } - }, - { - "command_name": "TeamLeader.publish_message", - "args": { - "content": "Create a cli snake game using Python", - "send_to": "Alice" - } - }, - { - "command_name": "RoleZero.reply_to_human", - "args": { - "content": "I have assigned the tasks to the team members. Alice will create the PRD, Bob will design the software architecture, Eve will break down the architecture into tasks, Alex will implement the core game logic, and Edward will write comprehensive tests. The team will work on the project accordingly", - } - }, - { - "command_name": "end" - } - ] - ``` - - ## example 2 - User Requirement: Run data analysis on sklearn Wine recognition dataset, include a plot, and train a model to predict wine class (20% as validation), and show validation accuracy. - Explanation: DON'T decompose requirement if it is a DATA-RELATED task, assign a single task directly to Data Analyst David. He will manage the decomposition and implementation. - ```json - [ - { - "command_name": "Plan.append_task", - "args": { - "task_id": "1", - "dependent_task_ids": [], - "instruction": "Run data analysis on sklearn Wine recognition dataset, include a plot, and train a model to predict wine class (20% as validation), and show validation accuracy.", - "assignee": "David" - } - }, - { - "command_name": "TeamLeader.publish_message", - "args": { - "content": "Run data analysis on sklearn Wine recognition dataset, include a plot, and train a model to predict wine class (20% as validation), and show validation accuracy.", - "send_to": "David" - } - }, - { - "command_name": "RoleZero.reply_to_human", - "args": { - "content": "I have assigned the task to David. He will break down the task further by himself and starts solving it.", - } - }, - { - "command_name": "end" - } - ] - ``` - - ## example 3 - Conversation History: - [ - ..., - {'role': 'assistant', 'content': 'from Alice(Product Manager) to {'Bob'}: {'docs': {'20240424153821.json': {'root_path': 'docs/prd', 'filename': '20240424153821.json', 'content': '{"Language":"en_us","Programming Language":"Python","Original Requirements":"create a cli snake game","Project Name":"snake_game","Product Goals":["Develop an intuitive and addictive snake game",...], ...}}}}}, - ] - Explanation: You received a message from Alice, the Product Manager, that she has completed the PRD, use Plan.finish_current_task to mark her task as finished and moves the plan to the next task. Based on plan status, next task is for Bob (Architect), publish a message asking him to start. The message content should contain important path info. - ```json - [ - { - "command_name": "Plan.finish_current_task", - "args": {} - }, - { - "command_name": "TeamLeader.publish_message", - "args": { - "content": "Please design the software architecture for the snake game based on the PRD created by Alice. The PRD is at 'docs/prd/20240424153821.json'. Include the choice of programming language, libraries, and data flow, etc.", - "send_to": "Bob" - } - }, - { - "command_name": "RoleZero.reply_to_human", - "args": { - "content": "Alice has completed the PRD. I have marked her task as finished and sent the PRD to Bob. Bob will work on the software architecture.", - } - }, - { - "command_name": "end" - } - ] - ``` - - ## example 4 - User Question: how does the project go? - Explanation: The user is asking for a general update on the project status. Give a straight answer about the current task the team is working on and provide a summary of the completed tasks. - ```json - [ - { - "command_name": "RoleZero.reply_to_human", - "args": { - "content": "The team is currently working on ... We have completed ...", - } - }, - { - "command_name": "end" - } - ] - ``` - """ - def retrieve(self, context: str = "") -> str: - return self.EXAMPLE + return TL_EXAMPLE class KeywordExpRetriever(ExpRetriever): diff --git a/metagpt/tools/libs/editor.py b/metagpt/tools/libs/editor.py index eba1a1eac..c9f02c4f1 100644 --- a/metagpt/tools/libs/editor.py +++ b/metagpt/tools/libs/editor.py @@ -1,5 +1,4 @@ import os -import re import shutil import subprocess @@ -27,15 +26,16 @@ class Editor: def write(self, path: str, content: str): """Write the whole content to a file. When used, make sure content arg contains the full content of the file.""" - if len(re.findall(r"\\n", content)) >= 5: - # A very raw rule to correct the content: Many \\n suggests all new line characters are mistaken as \\n whereas the correct one should be \n + if "\n" not in content and "\\n" in content: + # A very raw rule to correct the content: If 'content' lacks actual newlines ('\n') but includes '\\n', consider + # replacing them with '\n' to potentially correct mistaken representations of newline characters. content = content.replace("\\n", "\n") directory = os.path.dirname(path) if directory and not os.path.exists(directory): os.makedirs(directory) with open(path, "w", encoding="utf-8") as f: f.write(content) - self.resource.report(path, "path") + # self.resource.report(path, "path") def read(self, path: str) -> FileBlock: """Read the whole content of a file.""" diff --git a/metagpt/utils/common.py b/metagpt/utils/common.py index dfd3a8919..e8f150556 100644 --- a/metagpt/utils/common.py +++ b/metagpt/utils/common.py @@ -275,7 +275,7 @@ class CodeParser: def parse_code(cls, text: str, lang: str = "", block: Optional[str] = None) -> str: if block: text = cls.parse_block(block, text) - pattern = rf"```{lang}.*?\s+(.*?)```" + pattern = rf"```{lang}.*?\s+(.*?)\n```" match = re.search(pattern, text, re.DOTALL) if match: code = match.group(1) diff --git a/metagpt/utils/report.py b/metagpt/utils/report.py index 2d72af111..ed40e5dde 100644 --- a/metagpt/utils/report.py +++ b/metagpt/utils/report.py @@ -51,7 +51,6 @@ class ResourceReporter(BaseModel): block: BlockType = Field(description="The type of block that is reporting the resource") uuid: UUID = Field(default_factory=uuid4, description="The unique identifier for the resource") - is_chunk: bool = Field(False, description="Indicates whether the report is a chunk of a stream") enable_llm_stream: bool = Field(False, description="Indicates whether to connect to an LLM stream for reporting") callback_url: str = Field(METAGPT_REPORTER_DEFAULT_URL, description="The URL to which the report should be sent") _llm_task: Optional[asyncio.Task] = PrivateAttr(None) @@ -153,17 +152,14 @@ class ResourceReporter(BaseModel): def __enter__(self): """Enter the synchronous streaming callback context.""" - self.is_chunk = True return self def __exit__(self, *args, **kwargs): """Exit the synchronous streaming callback context.""" self.report(None, END_MARKER_NAME) - self.is_chunk = False async def __aenter__(self): """Enter the asynchronous streaming callback context.""" - self.is_chunk = True if self.enable_llm_stream: queue = create_llm_stream_queue() self._llm_task = asyncio.create_task(self._llm_stream_report(queue)) @@ -171,15 +167,18 @@ class ResourceReporter(BaseModel): async def __aexit__(self, *args, **kwargs): """Exit the asynchronous streaming callback context.""" - self.is_chunk = False if self.enable_llm_stream: - self._llm_task.cancel() + await get_llm_stream_queue().put(None) + await self._llm_task self._llm_task = None await self.async_report(None, END_MARKER_NAME) async def _llm_stream_report(self, queue: asyncio.Queue): - while self.is_chunk: - await self.async_report(await queue.get(), "content") + while True: + data = await queue.get() + if data is None: + return + await self.async_report(data, "content") async def wait_llm_stream_report(self): """Wait for the LLM stream report to complete."""