add reflection

change write code internal ppl
This commit is contained in:
stellahsr 2023-12-08 11:01:13 +08:00
parent ba6a62f55a
commit 13e2b05812
4 changed files with 219 additions and 50 deletions

View file

@ -0,0 +1,111 @@
from typing import Dict, List, Union, Tuple, Optional, Any
from metagpt.actions import Action
from metagpt.logs import logger
from metagpt.schema import Message, Plan
from metagpt.utils.common import CodeParser
from metagpt.actions.write_analysis_code import BaseWriteAnalysisCode
DEBUG_REFLECTION_EXAMPLE = '''Example 1:
[previous impl]:
```python
def add(a: int, b: int) -> int:
"""
Given integers a and b, return the total value of a and b.
"""
return a - b
```
[runtime Error]:
Tested passed:
Tests failed:
assert add(1, 2) == 3 # output: -1
assert add(1, 2) == 4 # output: -1
[reflection on previous impl]:
The implementation failed the test cases where the input integers are 1 and 2. The issue arises because the code does not add the two integers together, but instead subtracts the second integer from the first. To fix this issue, we should change the operator from `-` to `+` in the return statement. This will ensure that the function returns the correct output for the given input.
[improved impl]:
```python
def add(a: int, b: int) -> int:
"""
Given integers a and b, return the total value of a and b.
"""
return a + b
```
'''
REFLECTION_PROMPT = """
Here is an example for you.
{debug_example}
[requirement]
{goal}
[previous impl]
{code}
[runtime Error]
{runtime_result}
Analysis the error step by step, provide me improve method. Do not repeat [previous impl]
[reflection on previous impl]:
xxx
"""
def message_to_str(message: Message) -> str:
return f"{message.role}: {message.content}"
def messages_to_str(messages: List[Message]) -> str:
return "\n".join([message_to_str(message) for message in messages])
class DebugCode(BaseWriteAnalysisCode):
name: str = "debugcode"
context: Optional[str] = None
llm: None
def __init__(self, **kwargs: Any):
super().__init__(**kwargs)
async def run_reflection(self, plan, code, runtime_result) -> str:
info = []
reflection_prompt = REFLECTION_PROMPT.format(debug_example=DEBUG_REFLECTION_EXAMPLE,
goal=plan.goal,
code=code,
runtime_result=runtime_result
)
system_prompt = "You are an AI Python assistant. You will be given your previous implementation of a function, runtime error results, and a hint to change the implementation appropriately. Write your full implementation "
info.append(Message(role="system", content=system_prompt))
info.append(Message(role="assistant", content=reflection_prompt))
msg = messages_to_str(info)
resp = await self.llm.aask(msg=msg)
logger.info(f"reflection is {resp}")
return resp
async def rewrite_code(self, reflection: str = "") -> str:
"""
根据reflection重写代码
"""
info = []
info.append(Message(role="assistant", content=f"[reflection]: \n {reflection}"))
info.append(Message(role="user", content=f"[improved impl]:\n Return in Python block"))
msg = messages_to_str(info)
resp = await self.llm.aask(msg=msg)
logger.info(f"improve code is {resp}")
improv_code = CodeParser.parse_code(block=None, text=resp)
return improv_code
async def run(self,
plan: Plan = None,
code: str = "",
runtime_result: str = "") -> str:
"""
根据当前运行代码和报错信息进行reflection和纠错
"""
reflection = await self.run_reflection(plan, code, runtime_result)
# 根据reflection结果重写代码
improv_code = await self.rewrite_code(reflection)
return improv_code

View file

@ -4,7 +4,7 @@
@Author : orange-crow
@File : write_code_v2.py
"""
from typing import Dict, List, Union, Tuple
from typing import Dict, List, Union, Tuple, Optional, Any
from metagpt.actions import Action
from metagpt.logs import logger
@ -12,7 +12,7 @@ from metagpt.prompts.ml_engineer import (
TOOL_RECOMMENDATION_PROMPT,
SELECT_FUNCTION_TOOLS,
CODE_GENERATOR_WITH_TOOLS,
TOO_ORGANIZATION_PROMPT,
TOOL_ORGANIZATION_PROMPT,
ML_SPECIFIC_PROMPT,
ML_MODULE_MAP,
TOOL_OUTPUT_DESC,
@ -22,10 +22,13 @@ from metagpt.schema import Message, Plan
from metagpt.tools.functions import registry
from metagpt.utils.common import create_func_config
from metagpt.prompts.ml_engineer import GEN_DATA_DESC_PROMPT, GENERATE_CODE_PROMPT
from metagpt.utils.common import CodeParser
from metagpt.actions.execute_code import ExecutePyCode
class BaseWriteAnalysisCode(Action):
DEFAULT_SYSTEM_MSG = """You are Code Interpreter, a world-class programmer that can complete any goal by executing code. Strictly follow the plan and generate code step by step. Each step of the code will be executed on the user's machine, and the user will provide the code execution results to you.""" # prompt reference: https://github.com/KillianLucas/open-interpreter/blob/v0.1.4/interpreter/system_message.txt
REUSE_CODE_INSTRUCTION = """ATTENTION: DONT include codes from previous tasks in your current code block, include new codes only, DONT repeat codes!"""
@ -80,6 +83,8 @@ class BaseWriteAnalysisCode(Action):
"""
class WriteCodeByGenerate(BaseWriteAnalysisCode):
"""Write code fully by generation"""
@ -153,7 +158,6 @@ class WriteCodeWithTools(BaseWriteAnalysisCode):
recommend_tools = rsp["recommend_tools"]
return recommend_tools
async def run(
self,
context: List[Message],
@ -164,25 +168,23 @@ class WriteCodeWithTools(BaseWriteAnalysisCode):
task_type = plan.current_task.task_type
logger.info(f"task_type is: {task_type}")
available_tools = registry.get_all_schema_by_module(task_type)
special_prompt = ML_SPECIFIC_PROMPT.get(task_type, "")
# special_prompt = ML_SPECIFIC_PROMPT.get(task_type, "")
column_names = kwargs.get("column_names", {})
finished_tasks = plan.get_finished_tasks()
code_context = [task.code for task in finished_tasks]
code_context = "\n\n".join(code_context)
### add runtime info
result, success = await self.execute_code.run(code_context)
logger.info(result)
if len(available_tools) > 0:
available_tools = [
{k: tool[k] for k in ["name", "description"] if k in tool}
for tool in available_tools
]
final_code = code_context
final_code = {}
new_code = ""
code_steps_dict = eval(code_steps)
recommend_tools = await self._tool_recommendation(context, code_steps, available_tools)
tool_catalog = self._parse_recommend_tools(task_type, recommend_tools)
@ -191,33 +193,40 @@ class WriteCodeWithTools(BaseWriteAnalysisCode):
module_name = ML_MODULE_MAP[task_type]
output_desc = TOOL_OUTPUT_DESC.get(task_type, "")
hist_info = f"Previous finished code is \n\n ```Python {final_code} ``` \n\n " \
f"Runtime result is {result} \n\n"
prompt = TOOL_USAGE_PROMPT.format(
goal=plan.current_task.instruction,
context=hist_info,
code_steps=code_steps,
module_name=module_name,
output_desc=output_desc,
function_catalog=tool_catalog,
)
for idx, tool in enumerate(recommend_tools):
hist_info = f"Previous finished code is \n\n ```Python {code_context} ``` \n\n "
tool_config = create_func_config(CODE_GENERATOR_WITH_TOOLS)
prompt = TOOL_USAGE_PROMPT.format(
goal=plan.current_task.instruction,
context=hist_info,
code_steps=code_steps,
column_names=column_names,
special_prompt=special_prompt,
module_name=module_name,
output_desc=output_desc,
function_catalog=tool_catalog[idx],
)
rsp = await self.llm.aask_code(prompt, **tool_config)
logger.info(f"rsp is: {rsp}")
final_code = final_code + "\n\n" + rsp["code"]
tool_config = create_func_config(CODE_GENERATOR_WITH_TOOLS)
return final_code
rsp = await self.llm.aask_code(prompt, **tool_config)
logger.info(f"rsp is: {rsp}")
# final_code = final_code + "\n\n" + rsp["code"]
# final_code[key] = rsp["code"]
new_code = new_code + "\n\n" + rsp["code"]
code_context = code_context + "\n\n" + rsp["code"]
return new_code
else:
hist_info = f"Previous finished code is \n\n ```Python {code_context} ``` \n\n " \
f"runtime result is {result} \n\n"
hist_info = f"Previous finished code is \n\n ```Python {code_context} ``` \n\n "
prompt = GENERATE_CODE_PROMPT.format(
goal=plan.current_task.instruction,
context=hist_info,
code_steps=code_steps,
special_prompt=special_prompt,
# column_names=column_names
)
tool_config = create_func_config(CODE_GENERATOR_WITH_TOOLS)

View file

@ -105,9 +105,15 @@ TOOL_USAGE_PROMPT = """
## Target
{goal}
Specifically, {special_prompt}
## History Info
{context}
## Code Steps for Current Task:
Follow steps below when you writing code if it's convenient.
{code_steps}
## Available Tools:
Each function is described in JSON format, including the function name and parameters. {output_desc}
{function_catalog}
@ -125,7 +131,7 @@ Generate the complete code for this task:
```end
## Attention:
Make sure use the columns from the dataset columns
Make sure use the columns from the dataset columns: {column_names}
Finish your coding tasks as a helpful programmer based on the tools.
"""
@ -133,23 +139,30 @@ GENERATE_CODE_PROMPT = """
## Target
{goal}
Specifically, {special_prompt}
## History Info
{context}
## Code Steps for Current Task:
Follow steps below when you writing code if it's convenient.
{code_steps}
## Your Output Format:
Generate the complete code for this task:
```python
# Tools used: [function names or 'none']
<your code for the current task>
```end
import pandas as pd
```
## Attention:
Make sure use the columns from the dataset columns
Finish your coding tasks as a helpful programmer based on the tools.
Finish your coding tasks as a helpful programmer based on the code.
"""
TOO_ORGANIZATION_PROMPT = """
TOOL_ORGANIZATION_PROMPT = """
The previous conversation has provided all tasks step-by-step for the use goal and their statuses.
Now, begin writing code for the current task. This code should writen strictly on the basis of all previous completed tasks code, not a standalone code. And avoid writing duplicate code that has already been written in previous tasks, such as repeated import of packages, reading data, etc.
Specifically, {special_prompt}

View file

@ -16,6 +16,7 @@ from metagpt.roles import Role
from metagpt.schema import Message, Plan
from metagpt.utils.common import CodeParser
from metagpt.actions.write_code_steps import WriteCodeSteps
from metagpt.actions.debug_code import DebugCode
STRUCTURAL_CONTEXT = """
## User Requirement
@ -36,10 +37,13 @@ catboost
"""
def truncate(result: str, keep_len: int = 1000) -> str:
desc = "Truncated to show only the last 1000 characters\n"
if result.startswith(desc):
result = result[-len(desc) :]
result = result[-len(desc):]
if len(result) > keep_len:
result = result[-keep_len:]
@ -110,9 +114,9 @@ class AskReview(Action):
logger.info("most recent context:")
latest_action = context[-1].cause_by.__name__ if context[-1].cause_by else ""
prompt = f"\nPlease review output from {latest_action}:\n" \
"If you want to change a task in the plan, say 'change task task_id, ... (things to change)'\n" \
"If you confirm the output and wish to continue with the current process, type CONFIRM\n" \
"If you want to terminate the process, type exit:\n"
"If you want to change a task in the plan, say 'change task task_id, ... (things to change)'\n" \
"If you confirm the output and wish to continue with the current process, type CONFIRM\n" \
"If you want to terminate the process, type exit:\n"
rsp = input(prompt)
if rsp.lower() in ("exit"):
@ -143,7 +147,7 @@ class GenerateDataDesc(Action):
class MLEngineer(Role):
def __init__(
self, name="ABC", profile="MLEngineer", goal="", auto_run: bool = False, data_path: str = None
self, name="ABC", profile="MLEngineer", goal="", auto_run: bool = False, data_path: str = None
):
super().__init__(name=name, profile=profile, goal=goal)
self._set_react_mode(react_mode="plan_and_act")
@ -159,7 +163,6 @@ class MLEngineer(Role):
if self.data_path:
self.data_desc = await self._generate_data_desc()
# create initial plan and update until confirmation
await self._update_plan()
@ -185,6 +188,15 @@ class MLEngineer(Role):
# update plan according to user's feedback and to take on changed tasks
await self._update_plan()
finished_tasks = self.plan.get_finished_tasks()
if len(finished_tasks) == len(self.plan.tasks):
code_context = [task.code for task in finished_tasks]
code_context = "\n\n".join(code_context)
result, success = await self.execute_code.run(code_context)
# truncated the result
print(truncate(result))
async def _generate_data_desc(self):
files = glob.glob(self.data_path + "/*.csv")
data_desc = await GenerateDataDesc().run(files=files)
@ -198,16 +210,29 @@ class MLEngineer(Role):
)
counter = 0
improve_code = ""
success = False
finished_tasks = self.plan.get_finished_tasks()
code_context = [task.code for task in finished_tasks]
code_context = "\n\n".join(code_context)
while not success and counter < max_retry:
context = self.get_useful_memories()
if counter == 0:
context = self.get_useful_memories()
else:
# improve_code = await DebugCode().run(plan=self.plan,
# code= code_context + "\n\n" + code,
# runtime_result=self.working_memory.get())
improve_code = ""
# breakpoint()
column_names_dict = {key: value["column_info"] for key,value in self.data_desc.items()}
column_names_dict = {key: value["column_info"] for key, value in self.data_desc.items()}
if not self.use_tools or self.plan.current_task.task_type == "other":
logger.info("Write code with pure generation")
# code = "print('abc')"
code = await WriteCodeByGenerate().run(
context=context, plan=self.plan, code_steps=code_steps, temperature=0.0
)
@ -215,16 +240,24 @@ class MLEngineer(Role):
else:
logger.info("Write code with tools")
code = await WriteCodeWithTools().run(
context=context, plan=self.plan, code_steps=code_steps, **{"column_names": column_names_dict}
)
cause_by = WriteCodeWithTools
if improve_code!="":
code = improve_code
logger.info(f"new code {code}")
cause_by = DebugCode
else:
code = await WriteCodeWithTools().run(
context=context, plan=self.plan, code_steps=code_steps, **{"column_names": column_names_dict}
)
cause_by = WriteCodeWithTools
self.working_memory.add(
Message(content=code, role="assistant", cause_by=cause_by)
)
result, success = await self.execute_code.run(code)
# debug on code, run on runcode with finished code and new_df
runcode = code_context + "\n\n" + code
result, success = await self.execute_code.run(runcode)
# truncated the result
print(truncate(result))
# print(result)
@ -266,6 +299,7 @@ class MLEngineer(Role):
self.plan.add_tasks(tasks)
self.working_memory.clear()
def get_useful_memories(self) -> List[Message]:
"""find useful memories only to reduce context length and improve performance"""
# TODO dataset description , code steps
@ -298,11 +332,13 @@ if __name__ == "__main__":
from metagpt.const import DATA_PATH
requirement = "Perform data analysis on the provided data. Train a model to predict the target variable Survived. Include data preprocessing, feature engineering, and modeling in your pipeline. The metric is accuracy."
# requirement = "Perform data analysis on the provided data. Train a model to predict the target variable Survived. Include data preprocessing, feature engineering, and modeling in your pipeline. The metric is accuracy."
data_path = f"{DATA_PATH}/titanic"
requirement = f"This is a titanic passenger survival dataset, your goal is to predict passenger survival outcome. The target column is Survived. Perform data analysis, data preprocessing, feature engineering, and modeling to predict the target. Report accuracy on the eval data. Train data path: '{data_path}/split_train.csv', eval data path: '{data_path}/split_eval.csv'."
async def main(requirement: str = requirement, auto_run: bool = True, data_path: str = data_path):
async def main(requirement: str = requirement, auto_run: bool = True, data_path: str = ""):
role = MLEngineer(goal=requirement, auto_run=auto_run, data_path=data_path)
await role.run(requirement)
fire.Fire(main)