mirror of
https://github.com/FoundationAgents/MetaGPT.git
synced 2026-06-11 15:15:18 +02:00
Merge branch 'geekan:main' into main
This commit is contained in:
commit
c70ab73f6c
15 changed files with 598 additions and 78 deletions
|
|
@ -5,15 +5,47 @@
|
|||
@Author : alexanderwu
|
||||
@File : debug_error.py
|
||||
"""
|
||||
import re
|
||||
|
||||
from metagpt.logs import logger
|
||||
from metagpt.actions.action import Action
|
||||
from metagpt.utils.common import CodeParser
|
||||
|
||||
|
||||
PROMPT_TEMPLATE = """
|
||||
NOTICE
|
||||
1. Role: You are a Development Engineer or QA engineer;
|
||||
2. Task: You received this message from another Development Engineer or QA engineer who ran or tested your code.
|
||||
Based on the message, first, figure out your own role, i.e. Engineer or QaEngineer,
|
||||
then rewrite the development code or the test code based on your role, the error, and the summary, such that all bugs are fixed and the code performs well.
|
||||
Attention: Use '##' to split sections, not '#', and '## <SECTION_NAME>' SHOULD WRITE BEFORE the test case or script and triple quotes.
|
||||
The message is as follows:
|
||||
{context}
|
||||
---
|
||||
Now you should start rewriting the code:
|
||||
## file name of the code to rewrite: Write code with triple quoto. Do your best to implement THIS IN ONLY ONE FILE.
|
||||
"""
|
||||
class DebugError(Action):
|
||||
def __init__(self, name, context=None, llm=None):
|
||||
def __init__(self, name="DebugError", context=None, llm=None):
|
||||
super().__init__(name, context, llm)
|
||||
|
||||
async def run(self, code, error):
|
||||
prompt = f"Here is a piece of Python code:\n\n{code}\n\nThe following error occurred during execution:" \
|
||||
f"\n\n{error}\n\nPlease try to fix the error in this code."
|
||||
fixed_code = await self._aask(prompt)
|
||||
return fixed_code
|
||||
# async def run(self, code, error):
|
||||
# prompt = f"Here is a piece of Python code:\n\n{code}\n\nThe following error occurred during execution:" \
|
||||
# f"\n\n{error}\n\nPlease try to fix the error in this code."
|
||||
# fixed_code = await self._aask(prompt)
|
||||
# return fixed_code
|
||||
|
||||
async def run(self, context):
|
||||
if "PASS" in context:
|
||||
return "", "the original code works fine, no need to debug"
|
||||
|
||||
file_name = re.search("## File To Rewrite:\s*(.+\\.py)", context).group(1)
|
||||
|
||||
logger.info(f"Debug and rewrite {file_name}")
|
||||
|
||||
prompt = PROMPT_TEMPLATE.format(context=context)
|
||||
|
||||
rsp = await self._aask(prompt)
|
||||
|
||||
code = CodeParser.parse_code(block="", text=rsp)
|
||||
|
||||
return file_name, code
|
||||
|
|
|
|||
|
|
@ -6,20 +6,118 @@
|
|||
@File : run_code.py
|
||||
"""
|
||||
import traceback
|
||||
import os
|
||||
import subprocess
|
||||
from typing import List, Tuple
|
||||
|
||||
from metagpt.logs import logger
|
||||
from metagpt.actions.action import Action
|
||||
|
||||
PROMPT_TEMPLATE = """
|
||||
Role: You are a senior development and qa engineer, your role is summarize the code running result.
|
||||
If the running result does not include an error, you should explicitly approve the result.
|
||||
On the other hand, if the running result indicates some error, you should point out which part, the development code or the test code, produces the error,
|
||||
and give specific instructions on fixing the errors. Here is the code info:
|
||||
{context}
|
||||
Now you should begin your analysis
|
||||
---
|
||||
## instruction:
|
||||
Please summarize the cause of the errors and give correction instruction
|
||||
## File To Rewrite:
|
||||
Determine the ONE file to rewrite in order to fix the error, for example, xyz.py, or test_xyz.py
|
||||
## Status:
|
||||
Determine if all of the code works fine, if so write PASS, else FAIL,
|
||||
WRITE ONLY ONE WORD, PASS OR FAIL, IN THI SECTION
|
||||
## Send To:
|
||||
Please write Engineer if the errors are due to problematic development codes, and QaEngineer to problematic test codes, and NoOne if there are no errors,
|
||||
WRITE ONLY ONE WORD, Engineer OR QaEngineer OR NoOne, IN THIS SECTION.
|
||||
---
|
||||
You should fill in necessary instruction, status, send to, and finally return all content between the --- segment line.
|
||||
"""
|
||||
|
||||
CONTEXT = """
|
||||
## Development Code File Name
|
||||
{code_file_name}
|
||||
## Development Code
|
||||
```python
|
||||
{code}
|
||||
```
|
||||
## Test File Name
|
||||
{test_file_name}
|
||||
## Test Code
|
||||
```python
|
||||
{test_code}
|
||||
```
|
||||
## Running Command
|
||||
{command}
|
||||
## Running Output
|
||||
standard output: {outs};
|
||||
standard errors: {errs};
|
||||
"""
|
||||
|
||||
class RunCode(Action):
|
||||
def __init__(self, name, context=None, llm=None):
|
||||
def __init__(self, name="RunCode", context=None, llm=None):
|
||||
super().__init__(name, context, llm)
|
||||
|
||||
async def run(self, code):
|
||||
@classmethod
|
||||
async def run_text(cls, code) -> Tuple[str, str]:
|
||||
try:
|
||||
# We will document_store the result in this dictionary
|
||||
namespace = {}
|
||||
exec(code, namespace)
|
||||
return namespace.get('result', None)
|
||||
return namespace.get('result', ""), ""
|
||||
except Exception:
|
||||
# If there is an error in the code, return the error message
|
||||
return traceback.format_exc()
|
||||
return "", traceback.format_exc()
|
||||
|
||||
@classmethod
|
||||
async def run_script(cls, working_directory, additional_python_paths=[], command=[]) -> Tuple[str, str]:
|
||||
working_directory = str(working_directory)
|
||||
additional_python_paths = [str(path) for path in additional_python_paths]
|
||||
|
||||
# Copy the current environment variables
|
||||
env = os.environ.copy()
|
||||
|
||||
# Modify the PYTHONPATH environment variable
|
||||
additional_python_paths = [working_directory] + additional_python_paths
|
||||
additional_python_paths = ":".join(additional_python_paths)
|
||||
env['PYTHONPATH'] = additional_python_paths + ':' + env.get('PYTHONPATH', '')
|
||||
|
||||
# Start the subprocess
|
||||
process = subprocess.Popen(command, cwd=working_directory, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=env)
|
||||
|
||||
try:
|
||||
# Wait for the process to complete, with a timeout
|
||||
stdout, stderr = process.communicate(timeout=10)
|
||||
except subprocess.TimeoutExpired:
|
||||
logger.info("The command did not complete within the given timeout.")
|
||||
process.kill() # Kill the process if it times out
|
||||
stdout, stderr = process.communicate()
|
||||
return stdout.decode('utf-8'), stderr.decode('utf-8')
|
||||
|
||||
async def run(
|
||||
self, code, mode="script", code_file_name="", test_code="", test_file_name="", command=[], **kwargs
|
||||
) -> str:
|
||||
logger.info(f"Running {' '.join(command)}")
|
||||
if mode == "script":
|
||||
outs, errs = await self.run_script(command=command, **kwargs)
|
||||
elif mode == "text":
|
||||
outs, errs = await self.run_text(code=code)
|
||||
|
||||
logger.info(f"{outs=}")
|
||||
logger.info(f"{errs=}")
|
||||
|
||||
context = CONTEXT.format(
|
||||
code=code, code_file_name=code_file_name,
|
||||
test_code=test_code, test_file_name=test_file_name,
|
||||
command=" ".join(command),
|
||||
outs=outs[:500], # outs might be long but they are not important, truncate them to avoid token overflow
|
||||
errs=errs[:10000] # truncate errors to avoid token overflow
|
||||
)
|
||||
|
||||
prompt = PROMPT_TEMPLATE.format(context=context)
|
||||
rsp = await self._aask(prompt)
|
||||
|
||||
result = context + rsp
|
||||
|
||||
return result
|
||||
|
|
|
|||
|
|
@ -5,22 +5,45 @@
|
|||
@Author : alexanderwu
|
||||
@File : write_test.py
|
||||
"""
|
||||
from metagpt.logs import logger
|
||||
from metagpt.actions.action import Action
|
||||
from metagpt.utils.common import CodeParser
|
||||
|
||||
PROMPT_TEMPLATE = """
|
||||
NOTICE
|
||||
1. Role: You are a QA engineer; the main goal is to design, develop, and execute PEP8 compliant, well-structured, maintainable test cases and scripts for Python 3.9. Your focus should be on ensuring the product quality of the entire project through systematic testing.
|
||||
2. Requirement: Based on the context, develop a comprehensive test suite that adequately covers all relevant aspects of the code file under review. Your test suite will be part of the overall project QA, so please develop complete, robust, and reusable test cases.
|
||||
3. Attention1: Use '##' to split sections, not '#', and '## <SECTION_NAME>' SHOULD WRITE BEFORE the test case or script.
|
||||
4. Attention2: If there are any settings in your tests, ALWAYS SET A DEFAULT VALUE, ALWAYS USE STRONG TYPE AND EXPLICIT VARIABLE.
|
||||
5. Attention3: YOU MUST FOLLOW "Data structures and interface definitions". DO NOT CHANGE ANY DESIGN. Make sure your tests respect the existing design and ensure its validity.
|
||||
6. Think before writing: What should be tested and validated in this document? What edge cases could exist? What might fail?
|
||||
7. CAREFULLY CHECK THAT YOU DON'T MISS ANY NECESSARY TEST CASES/SCRIPTS IN THIS FILE.
|
||||
Attention: Use '##' to split sections, not '#', and '## <SECTION_NAME>' SHOULD WRITE BEFORE the test case or script and triple quotes.
|
||||
-----
|
||||
## Given the following code, please write appropriate test cases using Python's unittest framework to verify the correctness and robustness of this code:
|
||||
```python
|
||||
{code_to_test}
|
||||
```
|
||||
Note that the code to test is at {source_file_path}, we will put your test code at {workspace}/tests/{test_file_name}, and run your test code from {workspace},
|
||||
you should correctly import the necessary classes based on these file locations!
|
||||
## {test_file_name}: Write test code with triple quoto. Do your best to implement THIS ONLY ONE FILE.
|
||||
"""
|
||||
|
||||
class WriteTest(Action):
|
||||
def __init__(self, name="", context=None, llm=None):
|
||||
def __init__(self, name="WriteTest", context=None, llm=None):
|
||||
super().__init__(name, context, llm)
|
||||
self.code = None
|
||||
self.test_prompt_template = """
|
||||
Given the following code or function:
|
||||
{code}
|
||||
|
||||
As a test engineer, please write appropriate test cases using Python's unittest framework to verify the correctness and robustness of this code.
|
||||
"""
|
||||
async def write_code(self, prompt):
|
||||
code_rsp = await self._aask(prompt)
|
||||
code = CodeParser.parse_code(block="", text=code_rsp)
|
||||
return code
|
||||
|
||||
async def run(self, code):
|
||||
self.code = code
|
||||
prompt = self.test_prompt_template.format(code=self.code)
|
||||
test_cases = await self._aask(prompt)
|
||||
return test_cases
|
||||
async def run(self, code_to_test, test_file_name, source_file_path, workspace):
|
||||
prompt = PROMPT_TEMPLATE.format(
|
||||
code_to_test=code_to_test,
|
||||
test_file_name=test_file_name,
|
||||
source_file_path=source_file_path,
|
||||
workspace=workspace
|
||||
)
|
||||
code = await self.write_code(prompt)
|
||||
return code
|
||||
|
|
|
|||
|
|
@ -43,13 +43,13 @@ class LongTermMemory(Memory):
|
|||
# and ignore adding messages from recover repeatedly
|
||||
self.memory_storage.add(message)
|
||||
|
||||
def remember(self, observed: list[Message], k=10) -> list[Message]:
|
||||
def remember(self, observed: list[Message], k=0) -> list[Message]:
|
||||
"""
|
||||
remember the most similar k memories from observed Messages, return all when k=0
|
||||
1. remember the short-term memory(stm) news
|
||||
2. integrate the stm news with ltm(long-term memory) news
|
||||
"""
|
||||
stm_news = super(LongTermMemory, self).remember(observed) # shot-term memory news
|
||||
stm_news = super(LongTermMemory, self).remember(observed, k=k) # shot-term memory news
|
||||
if not self.memory_storage.is_initialized:
|
||||
# memory_storage hasn't initialized, use default `remember` to get stm_news
|
||||
return stm_news
|
||||
|
|
|
|||
|
|
@ -63,7 +63,7 @@ class Memory:
|
|||
"""Return the most recent k memories, return all when k=0"""
|
||||
return self.storage[-k:]
|
||||
|
||||
def remember(self, observed: list[Message], k=10) -> list[Message]:
|
||||
def remember(self, observed: list[Message], k=0) -> list[Message]:
|
||||
"""remember the most recent k memories from observed Messages, return all when k=0"""
|
||||
already_observed = self.get(k)
|
||||
news: list[Message] = []
|
||||
|
|
|
|||
|
|
@ -16,6 +16,7 @@ from metagpt.roles import Role
|
|||
from metagpt.actions import WriteCode, WriteCodeReview, WriteTasks, WriteDesign
|
||||
from metagpt.schema import Message
|
||||
from metagpt.utils.common import CodeParser
|
||||
from metagpt.utils.special_tokens import MSG_SEP, FILENAME_CODE_SEP
|
||||
|
||||
|
||||
async def gather_ordered_k(coros, k) -> list:
|
||||
|
|
@ -60,7 +61,7 @@ class Engineer(Role):
|
|||
|
||||
@classmethod
|
||||
def parse_tasks(self, task_msg: Message) -> list[str]:
|
||||
if not task_msg.instruct_content:
|
||||
if task_msg.instruct_content:
|
||||
return task_msg.instruct_content.dict().get("Task list")
|
||||
return CodeParser.parse_file_list(block="Task list", text=task_msg.content)
|
||||
|
||||
|
|
@ -70,7 +71,7 @@ class Engineer(Role):
|
|||
|
||||
@classmethod
|
||||
def parse_workspace(cls, system_design_msg: Message) -> str:
|
||||
if not system_design_msg.instruct_content:
|
||||
if system_design_msg.instruct_content:
|
||||
return system_design_msg.instruct_content.dict().get("Python package name")
|
||||
return CodeParser.parse_str(block="Python package name", text=system_design_msg.content)
|
||||
|
||||
|
|
@ -95,6 +96,7 @@ class Engineer(Role):
|
|||
file = workspace / filename
|
||||
file.parent.mkdir(parents=True, exist_ok=True)
|
||||
file.write_text(code)
|
||||
return file
|
||||
|
||||
def recv(self, message: Message) -> None:
|
||||
self._rc.memory.add(message)
|
||||
|
|
@ -126,23 +128,33 @@ class Engineer(Role):
|
|||
return msg
|
||||
|
||||
async def _act_sp(self) -> Message:
|
||||
code_msg_all = [] # gather all code info, will pass to qa_engineer for tests later
|
||||
for todo in self.todos:
|
||||
code_rsp = await WriteCode().run(
|
||||
code = await WriteCode().run(
|
||||
context=self._rc.history,
|
||||
filename=todo
|
||||
)
|
||||
# logger.info(todo)
|
||||
# logger.info(code_rsp)
|
||||
# code = self.parse_code(code_rsp)
|
||||
self.write_file(todo, code_rsp)
|
||||
msg = Message(content=code_rsp, role=self.profile, cause_by=type(self._rc.todo))
|
||||
file_path = self.write_file(todo, code)
|
||||
msg = Message(content=code, role=self.profile, cause_by=type(self._rc.todo))
|
||||
self._rc.memory.add(msg)
|
||||
|
||||
code_msg = todo + FILENAME_CODE_SEP + str(file_path)
|
||||
code_msg_all.append(code_msg)
|
||||
|
||||
logger.info(f'Done {self.get_workspace()} generating.')
|
||||
msg = Message(content="all done.", role=self.profile, cause_by=type(self._rc.todo))
|
||||
msg = Message(
|
||||
content=MSG_SEP.join(code_msg_all),
|
||||
role=self.profile,
|
||||
cause_by=type(self._rc.todo),
|
||||
send_to="QaEngineer"
|
||||
)
|
||||
return msg
|
||||
|
||||
async def _act_sp_precision(self) -> Message:
|
||||
code_msg_all = [] # gather all code info, will pass to qa_engineer for tests later
|
||||
for todo in self.todos:
|
||||
"""
|
||||
# 从历史信息中挑选必须的信息,以减少prompt长度(人工经验总结)
|
||||
|
|
@ -173,12 +185,20 @@ class Engineer(Role):
|
|||
except Exception as e:
|
||||
logger.error("code review failed!", e)
|
||||
pass
|
||||
self.write_file(todo, code)
|
||||
file_path = self.write_file(todo, code)
|
||||
msg = Message(content=code, role=self.profile, cause_by=WriteCode)
|
||||
self._rc.memory.add(msg)
|
||||
|
||||
code_msg = todo + FILENAME_CODE_SEP + str(file_path)
|
||||
code_msg_all.append(code_msg)
|
||||
|
||||
logger.info(f'Done {self.get_workspace()} generating.')
|
||||
msg = Message(content="all done.", role=self.profile, cause_by=WriteCode)
|
||||
msg = Message(
|
||||
content=MSG_SEP.join(code_msg_all),
|
||||
role=self.profile,
|
||||
cause_by=type(self._rc.todo),
|
||||
send_to="QaEngineer"
|
||||
)
|
||||
return msg
|
||||
|
||||
async def _act(self) -> Message:
|
||||
|
|
|
|||
|
|
@ -5,11 +5,162 @@
|
|||
@Author : alexanderwu
|
||||
@File : qa_engineer.py
|
||||
"""
|
||||
from metagpt.actions import WriteTest
|
||||
from metagpt.roles import Role
|
||||
import os
|
||||
import re
|
||||
from pathlib import Path
|
||||
from typing import Type
|
||||
|
||||
from metagpt.actions import WriteTest, WriteCode, WriteDesign, RunCode, DebugError
|
||||
from metagpt.const import WORKSPACE_ROOT
|
||||
from metagpt.logs import logger
|
||||
from metagpt.roles import Role
|
||||
from metagpt.schema import Message
|
||||
from metagpt.roles.engineer import Engineer
|
||||
from metagpt.utils.common import CodeParser, parse_recipient
|
||||
from metagpt.utils.special_tokens import MSG_SEP, FILENAME_CODE_SEP
|
||||
|
||||
class QaEngineer(Role):
|
||||
def __init__(self, name, profile, goal, constraints):
|
||||
def __init__(self, name="Edward", profile="QaEngineer",
|
||||
goal="Write comprehensive and robust tests to ensure codes will work as expected without bugs",
|
||||
constraints="The test code you write should conform to code standard like PEP8, be modular, easy to read and maintain",
|
||||
test_round_allowed=5):
|
||||
super().__init__(name, profile, goal, constraints)
|
||||
self._init_actions([WriteTest])
|
||||
self._init_actions([WriteTest]) # FIXME: a bit hack here, only init one action to circumvent _think() logic, will overwrite _think() in future updates
|
||||
self._watch([WriteCode, WriteTest, RunCode, DebugError])
|
||||
self.test_round = 0
|
||||
self.test_round_allowed = test_round_allowed
|
||||
|
||||
@classmethod
|
||||
def parse_workspace(cls, system_design_msg: Message) -> str:
|
||||
if not system_design_msg.instruct_content:
|
||||
return system_design_msg.instruct_content.dict().get("Python package name")
|
||||
return CodeParser.parse_str(block="Python package name", text=system_design_msg.content)
|
||||
|
||||
def get_workspace(self, return_proj_dir=True) -> Path:
|
||||
msg = self._rc.memory.get_by_action(WriteDesign)[-1]
|
||||
if not msg:
|
||||
return WORKSPACE_ROOT / 'src'
|
||||
workspace = self.parse_workspace(msg)
|
||||
# project directory: workspace/{package_name}, which contains package source code folder, tests folder, resources folder, etc.
|
||||
if return_proj_dir:
|
||||
return WORKSPACE_ROOT / workspace
|
||||
# development codes directory: workspace/{package_name}/{package_name}
|
||||
return WORKSPACE_ROOT / workspace / workspace
|
||||
|
||||
def write_file(self, filename: str, code: str):
|
||||
workspace = self.get_workspace() / 'tests'
|
||||
file = workspace / filename
|
||||
file.parent.mkdir(parents=True, exist_ok=True)
|
||||
file.write_text(code)
|
||||
|
||||
async def _write_test(self, message: Message) -> None:
|
||||
|
||||
code_msgs = message.content.split(MSG_SEP)
|
||||
result_msg_all = []
|
||||
for code_msg in code_msgs:
|
||||
|
||||
# write tests
|
||||
file_name, file_path = code_msg.split(FILENAME_CODE_SEP)
|
||||
code_to_test = open(file_path, "r").read()
|
||||
if "test" in file_name:
|
||||
continue # Engineer might write some test files, skip testing a test file
|
||||
test_file_name = "test_" + file_name
|
||||
test_file_path = self.get_workspace() / "tests" / test_file_name
|
||||
logger.info(f'Writing {test_file_name}..')
|
||||
test_code = await WriteTest().run(
|
||||
code_to_test=code_to_test,
|
||||
test_file_name=test_file_name,
|
||||
# source_file_name=file_name,
|
||||
source_file_path=file_path,
|
||||
workspace=self.get_workspace()
|
||||
)
|
||||
self.write_file(test_file_name, test_code)
|
||||
|
||||
# prepare context for run tests in next round
|
||||
command = ['python', f'tests/{test_file_name}']
|
||||
file_info = {
|
||||
"file_name": file_name, "file_path": str(file_path),
|
||||
"test_file_name": test_file_name, "test_file_path": str(test_file_path),
|
||||
"command": command
|
||||
}
|
||||
msg = Message(
|
||||
content=str(file_info), role=self.profile, cause_by=WriteTest,
|
||||
sent_from=self.profile, send_to=self.profile
|
||||
)
|
||||
self._publish_message(msg)
|
||||
|
||||
logger.info(f'Done {self.get_workspace()}/tests generating.')
|
||||
|
||||
async def _run_code(self, msg):
|
||||
file_info = eval(msg.content)
|
||||
development_file_path = file_info["file_path"]
|
||||
test_file_path = file_info["test_file_path"]
|
||||
if not os.path.exists(development_file_path) or not os.path.exists(test_file_path):
|
||||
return
|
||||
|
||||
development_code = open(development_file_path, "r").read()
|
||||
test_code = open(test_file_path, "r").read()
|
||||
proj_dir = self.get_workspace()
|
||||
development_code_dir = self.get_workspace(return_proj_dir=False)
|
||||
|
||||
result_msg = await RunCode().run(
|
||||
mode="script",
|
||||
code=development_code,
|
||||
code_file_name=file_info["file_name"],
|
||||
test_code=test_code,
|
||||
test_file_name=file_info["test_file_name"],
|
||||
command=file_info["command"],
|
||||
working_directory=proj_dir, # workspace/package_name, will run tests/test_xxx.py here
|
||||
additional_python_paths=[development_code_dir], # workspace/package_name/package_name,
|
||||
# import statement inside package code needs this
|
||||
)
|
||||
|
||||
recipient = parse_recipient(result_msg) # the recipient might be Engineer or myself
|
||||
content = str(file_info) + FILENAME_CODE_SEP + result_msg
|
||||
msg = Message(
|
||||
content=content, role=self.profile, cause_by=RunCode,
|
||||
sent_from=self.profile, send_to=recipient
|
||||
)
|
||||
self._publish_message(msg)
|
||||
|
||||
async def _debug_error(self, msg):
|
||||
file_info, context = msg.content.split(FILENAME_CODE_SEP)
|
||||
file_name, code = await DebugError().run(context)
|
||||
if file_name:
|
||||
self.write_file(file_name, code)
|
||||
recipient = msg.sent_from # send back to the one who ran the code for another run, might be one's self
|
||||
msg = Message(content=file_info, role=self.profile, cause_by=DebugError, sent_from=self.profile, send_to=recipient)
|
||||
self._publish_message(msg)
|
||||
|
||||
async def _observe(self) -> int:
|
||||
await super()._observe()
|
||||
self._rc.news = [msg for msg in self._rc.news \
|
||||
if msg.send_to == self.profile] # only relevant msgs count as observed news
|
||||
return len(self._rc.news)
|
||||
|
||||
async def _act(self) -> Message:
|
||||
if self.test_round > self.test_round_allowed:
|
||||
result_msg = Message(
|
||||
content=f"Exceeding {self.test_round_allowed} rounds of tests, skip (writing code counts as a round, too)",
|
||||
role=self.profile, cause_by=WriteTest, sent_from=self.profile, send_to=""
|
||||
)
|
||||
return result_msg
|
||||
|
||||
for msg in self._rc.news:
|
||||
# Decide what to do based on observed msg type, currently defined by human,
|
||||
# might potentially be moved to _think, that is, let the agent decides for itself
|
||||
if msg.cause_by == WriteCode:
|
||||
# engineer wrote a code, time to write a test for it
|
||||
await self._write_test(msg)
|
||||
elif msg.cause_by in [WriteTest, DebugError]:
|
||||
# I wrote or debugged my test code, time to run it
|
||||
await self._run_code(msg)
|
||||
elif msg.cause_by == RunCode:
|
||||
# I ran my test code, time to fix bugs, if any
|
||||
await self._debug_error(msg)
|
||||
self.test_round += 1
|
||||
result_msg = Message(
|
||||
content=f"Round {self.test_round} of tests done",
|
||||
role=self.profile, cause_by=WriteTest, sent_from=self.profile, send_to=""
|
||||
)
|
||||
return result_msg
|
||||
|
|
|
|||
|
|
@ -70,6 +70,7 @@ class RoleContext(BaseModel):
|
|||
state: int = Field(default=0)
|
||||
todo: Action = Field(default=None)
|
||||
watch: set[Type[Action]] = Field(default_factory=set)
|
||||
news: list[Type[Message]] = Field(default=[])
|
||||
|
||||
class Config:
|
||||
arbitrary_types_allowed = True
|
||||
|
|
@ -184,15 +185,15 @@ class Role:
|
|||
|
||||
observed = self._rc.env.memory.get_by_actions(self._rc.watch)
|
||||
|
||||
news = self._rc.memory.remember(observed) # remember recent exact or similar memories
|
||||
self._rc.news = self._rc.memory.remember(observed) # remember recent exact or similar memories
|
||||
|
||||
for i in env_msgs:
|
||||
self.recv(i)
|
||||
|
||||
news_text = [f"{i.role}: {i.content[:20]}..." for i in news]
|
||||
news_text = [f"{i.role}: {i.content[:20]}..." for i in self._rc.news]
|
||||
if news_text:
|
||||
logger.debug(f'{self._setting} observed: {news_text}')
|
||||
return len(news)
|
||||
return len(self._rc.news)
|
||||
|
||||
def _publish_message(self, msg):
|
||||
"""如果role归属于env,那么role的消息会向env广播"""
|
||||
|
|
|
|||
|
|
@ -27,6 +27,8 @@ class Message:
|
|||
instruct_content: BaseModel = field(default=None)
|
||||
role: str = field(default='user') # system / user / assistant
|
||||
cause_by: Type["Action"] = field(default="")
|
||||
sent_from: str = field(default="")
|
||||
send_to: str = field(default="")
|
||||
|
||||
def __str__(self):
|
||||
# prefix = '-'.join([self.role, str(self.cause_by)])
|
||||
|
|
|
|||
|
|
@ -183,7 +183,7 @@ class CodeParser:
|
|||
def parse_file_list(cls, block: str, text: str, lang: str = "") -> list[str]:
|
||||
# Regular expression pattern to find the tasks list.
|
||||
code = cls.parse_code(block, text, lang)
|
||||
print(code)
|
||||
# print(code)
|
||||
pattern = r'\s*(.*=.*)?(\[.*\])'
|
||||
|
||||
# Extract tasks list string using regex.
|
||||
|
|
@ -230,3 +230,8 @@ def print_members(module, indent=0):
|
|||
print(f'{prefix}Function: {name}')
|
||||
elif inspect.ismethod(obj):
|
||||
print(f'{prefix}Method: {name}')
|
||||
|
||||
def parse_recipient(text):
|
||||
pattern = "## Send To:\s*([A-Za-z]+)\s*?" # hard code for now
|
||||
recipient = re.search(pattern, text)
|
||||
return recipient.group(1) if recipient else ""
|
||||
|
|
|
|||
4
metagpt/utils/special_tokens.py
Normal file
4
metagpt/utils/special_tokens.py
Normal file
|
|
@ -0,0 +1,4 @@
|
|||
# token to separate different code messages in a WriteCode Message content
|
||||
MSG_SEP = "#*000*#"
|
||||
# token to seperate file name and the actual code text in a code message
|
||||
FILENAME_CODE_SEP = "#*001*#"
|
||||
12
startup.py
12
startup.py
|
|
@ -4,23 +4,27 @@ import asyncio
|
|||
|
||||
import fire
|
||||
|
||||
from metagpt.roles import Architect, Engineer, ProductManager, ProjectManager
|
||||
from metagpt.roles import Architect, Engineer, ProductManager, ProjectManager, QaEngineer
|
||||
from metagpt.software_company import SoftwareCompany
|
||||
|
||||
|
||||
async def startup(idea: str, investment: float = 3.0, n_round: int = 5, code_review: bool = False):
|
||||
async def startup(idea: str, investment: float = 3.0, n_round: int = 5,
|
||||
code_review: bool = False, run_tests: bool = False):
|
||||
"""Run a startup. Be a boss."""
|
||||
company = SoftwareCompany()
|
||||
company.hire([ProductManager(),
|
||||
Architect(),
|
||||
ProjectManager(),
|
||||
Engineer(n_borg=5, use_code_review=code_review)])
|
||||
if run_tests:
|
||||
# developing features: run tests on the spot and identify bugs (bug fixing capability comes soon!)
|
||||
company.hire([QaEngineer()])
|
||||
company.invest(investment)
|
||||
company.start_project(idea)
|
||||
await company.run(n_round=n_round)
|
||||
|
||||
|
||||
def main(idea: str, investment: float = 3.0, n_round: int = 5, code_review: bool = False):
|
||||
def main(idea: str, investment: float = 3.0, n_round: int = 5, code_review: bool = False, run_tests: bool = False):
|
||||
"""
|
||||
We are a software startup comprised of AI. By investing in us, you are empowering a future filled with limitless possibilities.
|
||||
:param idea: Your innovative idea, such as "Creating a snake game."
|
||||
|
|
@ -29,7 +33,7 @@ def main(idea: str, investment: float = 3.0, n_round: int = 5, code_review: bool
|
|||
:param code_review: Whether to use code review.
|
||||
:return:
|
||||
"""
|
||||
asyncio.run(startup(idea, investment, n_round, code_review))
|
||||
asyncio.run(startup(idea, investment, n_round, code_review, run_tests))
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
|
|
|||
|
|
@ -9,15 +9,147 @@ import pytest
|
|||
|
||||
from metagpt.actions.debug_error import DebugError
|
||||
|
||||
EXAMPLE_MSG_CONTENT = '''
|
||||
---
|
||||
## Development Code File Name
|
||||
player.py
|
||||
## Development Code
|
||||
```python
|
||||
from typing import List
|
||||
from deck import Deck
|
||||
from card import Card
|
||||
|
||||
class Player:
|
||||
"""
|
||||
A class representing a player in the Black Jack game.
|
||||
"""
|
||||
|
||||
def __init__(self, name: str):
|
||||
"""
|
||||
Initialize a Player object.
|
||||
|
||||
Args:
|
||||
name (str): The name of the player.
|
||||
"""
|
||||
self.name = name
|
||||
self.hand: List[Card] = []
|
||||
self.score = 0
|
||||
|
||||
def draw(self, deck: Deck):
|
||||
"""
|
||||
Draw a card from the deck and add it to the player's hand.
|
||||
|
||||
Args:
|
||||
deck (Deck): The deck of cards.
|
||||
"""
|
||||
card = deck.draw_card()
|
||||
self.hand.append(card)
|
||||
self.calculate_score()
|
||||
|
||||
def calculate_score(self) -> int:
|
||||
"""
|
||||
Calculate the score of the player's hand.
|
||||
|
||||
Returns:
|
||||
int: The score of the player's hand.
|
||||
"""
|
||||
self.score = sum(card.value for card in self.hand)
|
||||
# Handle the case where Ace is counted as 11 and causes the score to exceed 21
|
||||
if self.score > 21 and any(card.rank == 'A' for card in self.hand):
|
||||
self.score -= 10
|
||||
return self.score
|
||||
|
||||
```
|
||||
## Test File Name
|
||||
test_player.py
|
||||
## Test Code
|
||||
```python
|
||||
import unittest
|
||||
from blackjack_game.player import Player
|
||||
from blackjack_game.deck import Deck
|
||||
from blackjack_game.card import Card
|
||||
|
||||
class TestPlayer(unittest.TestCase):
|
||||
## Test the Player's initialization
|
||||
def test_player_initialization(self):
|
||||
player = Player("Test Player")
|
||||
self.assertEqual(player.name, "Test Player")
|
||||
self.assertEqual(player.hand, [])
|
||||
self.assertEqual(player.score, 0)
|
||||
|
||||
## Test the Player's draw method
|
||||
def test_player_draw(self):
|
||||
deck = Deck()
|
||||
player = Player("Test Player")
|
||||
player.draw(deck)
|
||||
self.assertEqual(len(player.hand), 1)
|
||||
self.assertEqual(player.score, player.hand[0].value)
|
||||
|
||||
## Test the Player's calculate_score method
|
||||
def test_player_calculate_score(self):
|
||||
deck = Deck()
|
||||
player = Player("Test Player")
|
||||
player.draw(deck)
|
||||
player.draw(deck)
|
||||
self.assertEqual(player.score, sum(card.value for card in player.hand))
|
||||
|
||||
## Test the Player's calculate_score method with Ace card
|
||||
def test_player_calculate_score_with_ace(self):
|
||||
deck = Deck()
|
||||
player = Player("Test Player")
|
||||
player.hand.append(Card('A', 'Hearts', 11))
|
||||
player.hand.append(Card('K', 'Hearts', 10))
|
||||
player.calculate_score()
|
||||
self.assertEqual(player.score, 21)
|
||||
|
||||
## Test the Player's calculate_score method with multiple Aces
|
||||
def test_player_calculate_score_with_multiple_aces(self):
|
||||
deck = Deck()
|
||||
player = Player("Test Player")
|
||||
player.hand.append(Card('A', 'Hearts', 11))
|
||||
player.hand.append(Card('A', 'Diamonds', 11))
|
||||
player.calculate_score()
|
||||
self.assertEqual(player.score, 12)
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
||||
```
|
||||
## Running Command
|
||||
python tests/test_player.py
|
||||
## Running Output
|
||||
standard output: ;
|
||||
standard errors: ..F..
|
||||
======================================================================
|
||||
FAIL: test_player_calculate_score_with_multiple_aces (__main__.TestPlayer)
|
||||
----------------------------------------------------------------------
|
||||
Traceback (most recent call last):
|
||||
File "tests/test_player.py", line 46, in test_player_calculate_score_with_multiple_aces
|
||||
self.assertEqual(player.score, 12)
|
||||
AssertionError: 22 != 12
|
||||
|
||||
----------------------------------------------------------------------
|
||||
Ran 5 tests in 0.007s
|
||||
|
||||
FAILED (failures=1)
|
||||
;
|
||||
## instruction:
|
||||
The error is in the development code, specifically in the calculate_score method of the Player class. The method is not correctly handling the case where there are multiple Aces in the player's hand. The current implementation only subtracts 10 from the score once if the score is over 21 and there's an Ace in the hand. However, in the case of multiple Aces, it should subtract 10 for each Ace until the score is 21 or less.
|
||||
## File To Rewrite:
|
||||
player.py
|
||||
## Status:
|
||||
FAIL
|
||||
## Send To:
|
||||
Engineer
|
||||
---
|
||||
'''
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_debug_error():
|
||||
code = "def add(a, b):\n return a - b"
|
||||
error = "AssertionError: Expected add(1, 1) to equal 2 but got 0"
|
||||
|
||||
debug_error = DebugError("debug_error")
|
||||
|
||||
result = await debug_error.run(code, error)
|
||||
file_name, rewritten_code = await debug_error.run(context=EXAMPLE_MSG_CONTENT)
|
||||
|
||||
# mock_llm.ask.assert_called_once_with(prompt)
|
||||
assert len(result) > 0
|
||||
assert "class Player" in rewritten_code # rewrite the same class
|
||||
assert "while self.score > 21" in rewritten_code # a key logic to rewrite to (original one is "if self.score > 12")
|
||||
|
|
|
|||
|
|
@ -6,33 +6,65 @@
|
|||
@File : test_run_code.py
|
||||
"""
|
||||
import pytest
|
||||
|
||||
import asyncio
|
||||
from metagpt.actions.run_code import RunCode
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_run_text():
|
||||
action = RunCode()
|
||||
result, errs = await RunCode.run_text('result = 1 + 1')
|
||||
assert result == 2
|
||||
assert errs == ""
|
||||
|
||||
result, errs = await RunCode.run_text('result = 1 / 0')
|
||||
assert result == ""
|
||||
assert "ZeroDivisionError" in errs
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_run_code():
|
||||
code = """
|
||||
def add(a, b):
|
||||
return a + b
|
||||
result = add(1, 2)
|
||||
"""
|
||||
run_code = RunCode("run_code")
|
||||
|
||||
result = await run_code.run(code)
|
||||
|
||||
assert result == 3
|
||||
async def test_run_script():
|
||||
action = RunCode()
|
||||
|
||||
# Successful command
|
||||
out, err = await RunCode.run_script(".", command=["echo", "Hello World"])
|
||||
assert out.strip() == "Hello World"
|
||||
assert err == ""
|
||||
|
||||
# Unsuccessful command
|
||||
out, err = await RunCode.run_script(".", command=["python", "-c", "print(1/0)"])
|
||||
assert "ZeroDivisionError" in err
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_run_code_with_error():
|
||||
code = """
|
||||
def add(a, b):
|
||||
return a + b
|
||||
result = add(1, '2')
|
||||
"""
|
||||
run_code = RunCode("run_code")
|
||||
async def test_run():
|
||||
action = RunCode()
|
||||
result = await action.run(mode="text", code="print('Hello, World')")
|
||||
assert "PASS" in result
|
||||
|
||||
result = await run_code.run(code)
|
||||
result = await action.run(
|
||||
mode="script",
|
||||
code="echo 'Hello World'",
|
||||
code_file_name="",
|
||||
test_code="",
|
||||
test_file_name="",
|
||||
command=["echo", "Hello World"],
|
||||
working_directory=".",
|
||||
additional_python_paths=[]
|
||||
)
|
||||
assert "PASS" in result
|
||||
|
||||
assert "TypeError: unsupported operand type(s) for +" in result
|
||||
@pytest.mark.asyncio
|
||||
async def test_run_failure():
|
||||
action = RunCode()
|
||||
result = await action.run(mode="text", code="result = 1 / 0")
|
||||
assert "FAIL" in result
|
||||
|
||||
result = await action.run(
|
||||
mode="script",
|
||||
code='python -c "print(1/0)"',
|
||||
code_file_name="",
|
||||
test_code="",
|
||||
test_file_name="",
|
||||
command=["python", "-c", "print(1/0)"],
|
||||
working_directory=".",
|
||||
additional_python_paths=[]
|
||||
)
|
||||
assert "FAIL" in result
|
||||
|
|
@ -8,19 +8,35 @@
|
|||
import pytest
|
||||
|
||||
from metagpt.actions.write_test import WriteTest
|
||||
from metagpt.logs import logger
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_write_test():
|
||||
code = """
|
||||
def add(a, b):
|
||||
return a + b
|
||||
import random
|
||||
from typing import Tuple
|
||||
|
||||
class Food:
|
||||
def __init__(self, position: Tuple[int, int]):
|
||||
self.position = position
|
||||
|
||||
def generate(self, max_y: int, max_x: int):
|
||||
self.position = (random.randint(1, max_y - 1), random.randint(1, max_x - 1))
|
||||
"""
|
||||
|
||||
write_test = WriteTest("write_test")
|
||||
write_test = WriteTest()
|
||||
|
||||
test_cases = await write_test.run(code)
|
||||
test_code = await write_test.run(
|
||||
code_to_test=code,
|
||||
test_file_name="test_food.py",
|
||||
source_file_path="/some/dummy/path/cli_snake_game/cli_snake_game/food.py",
|
||||
workspace="/some/dummy/path/cli_snake_game"
|
||||
)
|
||||
logger.info(test_code)
|
||||
|
||||
# We cannot exactly predict the generated test cases, but we can check if it is a string and if it is not empty
|
||||
assert isinstance(test_cases, str)
|
||||
assert len(test_cases) > 0
|
||||
assert isinstance(test_code, str)
|
||||
assert "from cli_snake_game.food import Food" in test_code
|
||||
assert "class TestFood(unittest.TestCase)" in test_code
|
||||
assert "def test_generate" in test_code
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue