diff --git a/metagpt/prompts/di/swe_agent.py b/metagpt/prompts/di/swe_agent.py index a1c491210..78a8ce846 100644 --- a/metagpt/prompts/di/swe_agent.py +++ b/metagpt/prompts/di/swe_agent.py @@ -5,7 +5,7 @@ https://github.com/princeton-nlp/SWE-agent/tree/main/config/configs """ SWE_AGENT_SYSTEM_TEMPLATE = """ -SETTING: You are an autonomous programmer, and you're working directly in the environment line with a special interface. +SETTING: You are an autonomous programmer, and you're working directly in the environment line with a special interface.Let's work step by step. The special interface consists of a file editor that shows you 100 lines of a file at a time. @@ -33,7 +33,7 @@ You should only include a *SINGLE* command in the command section and then wait If you'd like to issue two commands at once, PLEASE DO NOT DO THAT! Please instead first submit just the first command, and then after receiving a response you'll be able to issue the second command. Remember, YOU CAN ONLY ENTER ONE COMMAND AT A TIME. You should always wait for feedback after every command. -You can use any bash commands you want (e.g., find, grep, cat, ls, cd) or any custom special tools (including `edit`) by calling Bash.run. Edit all the files you need. +You can use any bash commands you want (e.g., find, grep, cat, ls, cd) or any custom special tools (including `edit`) by calling Bash.run. Edit all the files you need, but make sure to PRIORITIZE the `Available Commands` provided to accomplish your tasks,such as `git_push`. You should carefully observe the behavior and results of the previous action, and avoid triggering repeated errors. However, the Bash.run does NOT support interactive session commands (e.g. python, vim), so please do not invoke them. @@ -51,53 +51,68 @@ MINIMAL_EXAMPLE = """ User Requirement and Issue: Fix the bug in the repo. Because the environment is not available, you DO NOT need to run and modify any existing test case files or add new test case files to ensure that the bug is fixed. ### Read and understand issue(Require): -{{ +[{{ "command_name": "Browser.goto", "args": {{ "url": "https://github.com/geekan/MetaGPT/issues/1275" }} -}} +}}] -> ### Locate issue(Require): Locate the issue in the code by searching for the relevant file, function, or class and open the file to view the code. -{{ +[{{ "command_name": "Bash.run", "args": {{ "cmd": "cd /workspace/django__django_3.0" }} -}} +}}] -> Bash.run(cmd='search_dir_and_preview ASCIIUsernameValidator') -{{ +[{{ "command_name": "Bash.run", "args": {{ "cmd": "open /workspace/django__django_3.0/django/contrib/auth/validators.py" }} -}} +}}] -> ### Fix the Bug(Require): Fix the bug in the code by editing the relevant function, class or code snippet. -{{ +[{{ "command_name": "Bash.run", "args": {{ "cmd": "edit 10:20 < -### Submit the Changes(Require): Submit the changes to the repository. -{{ - "command_name": "Bash.run", +### Push the Changes: Pushes changes from a local Git repository to its remote counterpart. +[{{ + "command_name": "git_push", "args": {{ - "cmd": "submit" + "local_path": "/workspace/django__django_3.0/django/contrib/auth", + "app_name": "github", + "comments": "Fix: produced TypeError: openai.types.completion_usage.CompletionUsage() argument after ** must be a mapping, not NoneType", + "new_branch": "test-fix" }} -}} -Bash.run(cmd='submit') +}}] + +### Create pull request +[{{ + "command_name": "git_create_pull", + "args": {{ + "base": "master", + "head": "test-fix", + "base_repo_name": "Justin-ZL/langchain", + "app_name": "github", + "title": "Fix Issue #1275: produced TypeError: openai.types.completion_usage.CompletionUsage() argument after ** must be a mapping, not NoneTyp", + "body": "This pull request addresses issue #1275 by producing TypeError: openai.types.completion_usage.CompletionUsage() argument after ** must be a mapping, not NoneTyp." + }} +}}] -> -{{ +[{{ "command_name": "end", -}} +}}] """ @@ -162,7 +177,7 @@ IMPORTANT_TIPS = """ - If a search command fails, modify the search criteria and check for typos or incorrect paths, then try again. - Based on feedback of observation or bash command in trajectory to guide adjustments in your search strategy. -13. If the task results in succeed, fail, or NO PROGRESS, output `submit`. +13. If the task results in succeed, use command `git_push` to push the results in local,then use command `git_create_pull` to create PR. 14. If provided an issue link, you MUST go to the issue page using Browser tool to understand the issue before starting your fix. diff --git a/metagpt/roles/di/swe_agent.py b/metagpt/roles/di/swe_agent.py index 7150c0a67..e3cca3330 100644 --- a/metagpt/roles/di/swe_agent.py +++ b/metagpt/roles/di/swe_agent.py @@ -32,7 +32,7 @@ class SWEAgent(RoleZero): run_eval: bool = False async def _think(self) -> bool: - self._format_instruction() + await self._format_instruction() res = await super()._think() if self.run_eval: await self._parse_commands_for_eval() @@ -47,19 +47,17 @@ class SWEAgent(RoleZero): } ) - def _format_instruction(self): + async def _format_instruction(self): """ Formats the instruction message for the SWE agent. Runs the "state" command in the terminal, parses its output as JSON, and uses it to format the `_instruction` template. """ - state_output = self.terminal.run("state") + state_output = await self.terminal.run("state") bash_state = json.loads(state_output) self.instruction = self._instruction.format(**bash_state).strip() - return self.instruction - async def _parse_commands_for_eval(self): """ Handles actions based on parsed commands. @@ -79,7 +77,7 @@ class SWEAgent(RoleZero): if "end" != cmd.get("command_name", ""): return try: - diff_output = self.terminal.run("git diff --cached") + diff_output = await self.terminal.run("git diff --cached") clear_diff = extract_patch(diff_output) logger.info(f"Diff output: \n{clear_diff}") if clear_diff: diff --git a/metagpt/tools/libs/terminal.py b/metagpt/tools/libs/terminal.py index bcf039a5e..8d44c13e6 100644 --- a/metagpt/tools/libs/terminal.py +++ b/metagpt/tools/libs/terminal.py @@ -1,8 +1,10 @@ -import subprocess -import threading -from queue import Queue +import asyncio +from asyncio import Queue +from asyncio.subprocess import PIPE +from typing import Optional from metagpt.const import DEFAULT_WORKSPACE_ROOT, SWE_SETUP_PATH +from metagpt.logs import logger from metagpt.tools.tool_registry import register_tool from metagpt.utils.report import END_MARKER_VALUE, TerminalReporter @@ -19,62 +21,54 @@ class Terminal: def __init__(self): self.shell_command = ["bash"] # FIXME: should consider windows support later self.command_terminator = "\n" - - # Start a persistent shell process - self.process = subprocess.Popen( - self.shell_command, - shell=True, - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT, - executable="/bin/bash", - ) - self.stdout_queue = Queue() + self.stdout_queue = Queue(maxsize=1000) self.observer = TerminalReporter() + self.process: Optional[asyncio.subprocess.Process] = None - self._check_state() + async def _start_process(self): + # Start a persistent shell process + self.process = await asyncio.create_subprocess_exec( + *self.shell_command, stdin=PIPE, stdout=PIPE, stderr=PIPE, executable="/bin/bash" + ) + await self._check_state() - def _check_state(self): - """Check the state of the terminal, e.g. the current directory of the terminal process. Useful for agent to understand.""" - print("The terminal is at:", self.run_command("pwd")) + async def _check_state(self): + """ + Check the state of the terminal, e.g. the current directory of the terminal process. Useful for agent to understand. + """ + output = await self.run_command("pwd") + logger.info("The terminal is at:", output) - def run_command(self, cmd: str, daemon=False) -> str: + async def run_command(self, cmd: str, daemon=False) -> str: """ Executes a specified command in the terminal and streams the output back in real time. This command maintains state across executions, such as the current directory, - allowing for sequential commands to be contextually aware. The output from the - command execution is placed into `stdout_queue`, which can be consumed as needed. + allowing for sequential commands to be contextually aware. Args: cmd (str): The command to execute in the terminal. - daemon (bool): If True, executes the command in a background thread, allowing - the main program to continue execution. The command's output is - collected asynchronously in daemon mode and placed into `stdout_queue`. - + daemon (bool): If True, executes the command in an asynchronous task, allowing + the main program to continue execution. Returns: str: The command's output or an empty string if `daemon` is True. Remember that - when `daemon` is True, the output is collected into `stdout_queue` and must - be consumed from there. - - Note: - If `stdout_queue` is not periodically consumed, it could potentially grow indefinitely, - consuming memory. Ensure that there's a mechanism in place to consume this queue, - especially during long-running or output-heavy command executions. + when `daemon` is True, use the `get_stdout_output` method to get the output. """ + if self.process is None: + await self._start_process() # Send the command self.process.stdin.write((cmd + self.command_terminator).encode()) self.process.stdin.write( - (f'echo "{END_MARKER_VALUE}"{self.command_terminator}').encode() # write EOF + f'echo "{END_MARKER_VALUE}"{self.command_terminator}'.encode() # write EOF ) # Unique marker to signal command end - self.process.stdin.flush() + await self.process.stdin.drain() if daemon: - threading.Thread(target=self._read_and_process_output, args=(cmd,), daemon=True).start() + asyncio.create_task(self._read_and_process_output(cmd)) return "" else: - return self._read_and_process_output(cmd) + return await self._read_and_process_output(cmd) - def execute_in_conda_env(self, cmd: str, env, daemon=False) -> str: + async def execute_in_conda_env(self, cmd: str, env, daemon=False) -> str: """ Executes a given command within a specified Conda environment automatically without the need for manual activation. Users just need to provide the name of the Conda @@ -84,7 +78,7 @@ class Terminal: cmd (str): The command to execute within the Conda environment. env (str, optional): The name of the Conda environment to activate before executing the command. If not specified, the command will run in the current active environment. - daemon (bool): If True, the command is run in a background thread, similar to `run_command`, + daemon (bool): If True, the command is run in an asynchronous task, similar to `run_command`, affecting error logging and handling in the same manner. Returns: @@ -96,19 +90,32 @@ class Terminal: to ensure the specified environment is active for the command's execution. """ cmd = f"conda run -n {env} {cmd}" - return self.run_command(cmd, daemon=daemon) + return await self.run_command(cmd, daemon=daemon) - def _read_and_process_output(self, cmd): - with self.observer as observer: + async def get_stdout_output(self) -> str: + """ + Retrieves all collected output from background running commands and returns it as a string. + + Returns: + str: The collected output from background running commands, returned as a string. + """ + output_lines = [] + while not self.stdout_queue.empty(): + line = await self.stdout_queue.get() + output_lines.append(line) + return "\n".join(output_lines) + + async def _read_and_process_output(self, cmd, daemon=False) -> str: + async with self.observer as observer: cmd_output = [] - observer.report(cmd + self.command_terminator, "cmd") - # report the comman + await observer.async_report(cmd + self.command_terminator, "cmd") + # report the command # Read the output until the unique marker is found. # We read bytes directly from stdout instead of text because when reading text, # '\r' is changed to '\n', resulting in excessive output. tmp = b"" while True: - output = tmp + self.process.stdout.read(1) + output = tmp + await self.process.stdout.read(1) *lines, tmp = output.splitlines(True) for line in lines: line = line.decode() @@ -123,13 +130,13 @@ class Terminal: # log stdout in real-time observer.report(line, "output") cmd_output.append(line) - self.stdout_queue.put(line) + if daemon: + await self.stdout_queue.put(line) - def close(self): + async def close(self): """Close the persistent shell process.""" self.process.stdin.close() - self.process.terminate() - self.process.wait() + await self.process.wait() @register_tool(include_functions=["run"]) @@ -142,10 +149,13 @@ class Bash(Terminal): def __init__(self): """init""" super().__init__() - self.run_command(f"cd {DEFAULT_WORKSPACE_ROOT}") - self.run_command(f"source {SWE_SETUP_PATH}") + self.start_flag = False - def run(self, cmd) -> str: + async def start(self): + await self.run_command(f"cd {DEFAULT_WORKSPACE_ROOT}") + await self.run_command(f"source {SWE_SETUP_PATH}") + + async def run(self, cmd) -> str: """ Executes a bash command. @@ -184,9 +194,6 @@ class Bash(Terminal): Arguments: filename (str): The name of the file to create. - - submit - Submits your current code. it can only be executed once, the last action before the `end`. - - search_dir_and_preview [] Searches for search_term in all files in dir and gives their code preview with line numbers. If dir is not provided, searches in the current directory. @@ -222,4 +229,8 @@ class Bash(Terminal): Note: Make sure to use these functions as per their defined arguments and behaviors. """ - return self.run_command(cmd) + if not self.start_flag: + await self.start() + self.start_flag = True + + return await self.run_command(cmd) diff --git a/tests/metagpt/environment/mgx_env/run_mgx_env.py b/tests/metagpt/environment/mgx_env/run_mgx_env.py index 2cd299f17..93c134bb4 100644 --- a/tests/metagpt/environment/mgx_env/run_mgx_env.py +++ b/tests/metagpt/environment/mgx_env/run_mgx_env.py @@ -1,6 +1,8 @@ import asyncio import os +import re import threading +import time from metagpt.environment.mgx.mgx_env import MGXEnv from metagpt.roles import Architect, Engineer, ProductManager, ProjectManager @@ -11,7 +13,7 @@ from metagpt.roles.di.team_leader import TeamLeader from metagpt.schema import Message -async def main(requirement="", enable_human_input=False, use_fixed_sop=False): +async def main(requirement="", enable_human_input=False, use_fixed_sop=False, allow_idle_time=30): if use_fixed_sop: engineer = Engineer(n_borg=5, use_code_review=False) else: @@ -33,30 +35,50 @@ async def main(requirement="", enable_human_input=False, use_fixed_sop=False): if enable_human_input: # simulate human sending messages in chatbox - send_human_input(env) + stop_event = threading.Event() + human_input_thread = send_human_input(env, stop_event) if requirement: env.publish_message(Message(content=requirement)) - # env.publish_message(Message(content=requirement, send_to={"David"}), user_defined_recipient="David") + # user_defined_recipient = "Alex" + # env.publish_message(Message(content=requirement, send_to={user_defined_recipient}), user_defined_recipient=user_defined_recipient) - while not env.is_idle: - await env.run() + allow_idle_time = allow_idle_time if enable_human_input else 1 + start_time = time.time() + while time.time() - start_time < allow_idle_time: + if not env.is_idle: + await env.run() + start_time = time.time() # reset start time + + if enable_human_input: + print("No more human input, terminating, press ENTER for a full termination.") + stop_event.set() + human_input_thread.join() -def send_human_input(env): +def send_human_input(env, stop_event): """ Simulate sending message in chatbox Note in local environment, the message is consumed only after current round of env.run is finished """ def send_messages(): - while True: + while not stop_event.is_set(): message = input("Enter a message any time: ") - env.publish_message(Message(content=message)) + user_defined_recipient = re.search(r"@(\w+)", message) + if user_defined_recipient: + recipient_name = user_defined_recipient.group(1) + print(f"{recipient_name} will receive the message") + env.publish_message( + Message(content=message, send_to={recipient_name}), user_defined_recipient=recipient_name + ) + else: + env.publish_message(Message(content=message)) # Start a thread for sending messages send_thread = threading.Thread(target=send_messages, args=()) send_thread.start() + return send_thread GAME_REQ = "create a 2048 game" @@ -102,6 +124,14 @@ clone https://github.com/garylin2099/simple_calculator, checkout a new branch na Commit your changes and push, finally, create a PR to the master branch of https://github.com/mannaandpoem/simple_calculator. """ +TL_CHAT1 = """Summarize the paper for me""" # expecting clarification +TL_CHAT2 = """Solve the issue at this link""" # expecting clarification +TL_CHAT3 = """Who is the first man landing on Moon""" # expecting answering directly +TL_CHAT4 = """Find all zeros in the indicated finite field of the given polynomial with coefficients in that field. x^5 + 3x^3 + x^2 + 2x in Z_5""" # expecting answering directly +TL_CHAT5 = """Find the degree for the given field extension Q(sqrt(2), sqrt(3), sqrt(18)) over Q.""" # expecting answering directly +TL_CHAT6 = """Statement 1 | A ring homomorphism is one to one if and only if the kernel is {{0}},. Statement 2 | Q is an ideal in R""" # expecting answering directly +TL_CHAT7 = """Jean has 30 lollipops. Jean eats 2 of the lollipops. With the remaining lollipops, Jean wants to package 2 lollipops in one bag. How many bags can Jean fill?""" # expecting answering directly + if __name__ == "__main__": # NOTE: Add access_token to test github issue fixing diff --git a/tests/metagpt/roles/di/run_swe_agent_for_benchmark.py b/tests/metagpt/roles/di/run_swe_agent_for_benchmark.py index 54b3623a4..e2aa3d17f 100644 --- a/tests/metagpt/roles/di/run_swe_agent_for_benchmark.py +++ b/tests/metagpt/roles/di/run_swe_agent_for_benchmark.py @@ -59,11 +59,11 @@ async def run(instance, swe_result_dir): # 前处理 terminal = Terminal() - terminal.run_command(f"cd {repo_path} && git reset --hard && git clean -n -d && git clean -f -d") - terminal.run_command("BRANCH=$(git remote show origin | awk '/HEAD branch/ {print $NF}')") - logger.info(terminal.run_command("echo $BRANCH")) - logger.info(terminal.run_command('git checkout "$BRANCH"')) - logger.info(terminal.run_command("git branch")) + await terminal.run_command(f"cd {repo_path} && git reset --hard && git clean -n -d && git clean -f -d") + await terminal.run_command("BRANCH=$(git remote show origin | awk '/HEAD branch/ {print $NF}')") + logger.info(await terminal.run_command("echo $BRANCH")) + logger.info(await terminal.run_command('git checkout "$BRANCH"')) + logger.info(await terminal.run_command("git branch")) user_requirement_and_issue = INSTANCE_TEMPLATE.format( issue=instance["problem_statement"], diff --git a/tests/metagpt/tools/libs/test_terminal.py b/tests/metagpt/tools/libs/test_terminal.py index 98ed63dd8..9c64009ae 100644 --- a/tests/metagpt/tools/libs/test_terminal.py +++ b/tests/metagpt/tools/libs/test_terminal.py @@ -4,16 +4,17 @@ from metagpt.const import DATA_PATH, METAGPT_ROOT from metagpt.tools.libs.terminal import Terminal -def test_terminal(): +@pytest.mark.asyncio +async def test_terminal(): terminal = Terminal() - terminal.run_command(f"cd {METAGPT_ROOT}") - output = terminal.run_command("pwd") + await terminal.run_command(f"cd {METAGPT_ROOT}") + output = await terminal.run_command("pwd") assert output.strip() == str(METAGPT_ROOT) # pwd now should be METAGPT_ROOT, cd data should land in DATA_PATH - terminal.run_command("cd data") - output = terminal.run_command("pwd") + await terminal.run_command("cd data") + output = await terminal.run_command("pwd") assert output.strip() == str(DATA_PATH)