generalize write code with tools, simplify ml_engineer

This commit is contained in:
yzlin 2024-01-11 00:23:26 +08:00
parent 3a312007c2
commit e12ab25b7c
6 changed files with 221 additions and 162 deletions

View file

@ -12,14 +12,16 @@ import yaml
from tenacity import retry, stop_after_attempt, wait_fixed
from metagpt.actions import Action
from metagpt.const import METAGPT_ROOT
from metagpt.llm import LLM
from metagpt.logs import logger
from metagpt.prompts.ml_engineer import (
CODE_GENERATOR_WITH_TOOLS,
GENERATE_CODE_PROMPT,
ML_MODULE_MAP,
ML_SPECIFIC_PROMPT,
ML_TOOL_USAGE_PROMPT,
SELECT_FUNCTION_TOOLS,
TASK_MODULE_MAP,
TASK_SPECIFIC_PROMPT,
TOOL_RECOMMENDATION_PROMPT,
TOOL_USAGE_PROMPT,
)
@ -60,13 +62,12 @@ class BaseWriteAnalysisCode(Action):
}
return messages
async def run(self, context: List[Message], plan: Plan = None, code_steps: str = "") -> str:
async def run(self, context: List[Message], plan: Plan = None) -> str:
"""Run of a code writing action, used in data analysis or modeling
Args:
context (List[Message]): Action output history, source action denoted by Message.cause_by
plan (Plan, optional): Overall plan. Defaults to None.
code_steps (str, optional): suggested step breakdown for the current task. Defaults to "".
Returns:
str: The code string.
@ -92,15 +93,12 @@ class WriteCodeByGenerate(BaseWriteAnalysisCode):
class WriteCodeWithTools(BaseWriteAnalysisCode):
"""Write code with help of local available tools. Choose tools first, then generate code to use the tools"""
schema_path: str = ""
schema_path: Union[Path, str] = METAGPT_ROOT / "metagpt/tools/functions/schemas"
available_tools: dict = {}
def __init__(self, schema_path="", **kwargs):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.schema_path = schema_path
if schema_path:
self._load_tools(schema_path)
self._load_tools(self.schema_path)
def _load_tools(self, schema_path, schema_module=None):
"""Load tools from yaml file"""
@ -171,12 +169,11 @@ class WriteCodeWithTools(BaseWriteAnalysisCode):
self,
context: List[Message],
plan: Plan = None,
column_info: str = "",
**kwargs,
) -> Tuple[List[Message], str]:
) -> str:
task_type = plan.current_task.task_type
available_tools = self.available_tools.get(task_type, {})
special_prompt = ML_SPECIFIC_PROMPT.get(task_type, "")
special_prompt = TASK_SPECIFIC_PROMPT.get(task_type, "")
code_steps = plan.current_task.code_steps
finished_tasks = plan.get_finished_tasks()
@ -192,9 +189,54 @@ class WriteCodeWithTools(BaseWriteAnalysisCode):
tool_catalog = self._parse_recommend_tools(task_type, recommend_tools)
logger.info(f"Recommended tools: \n{recommend_tools}")
module_name = ML_MODULE_MAP[task_type]
module_name = TASK_MODULE_MAP[task_type]
prompt = TOOL_USAGE_PROMPT.format(
else:
tool_catalog = {}
module_name = ""
tools_instruction = TOOL_USAGE_PROMPT.format(
special_prompt=special_prompt, module_name=module_name, tool_catalog=tool_catalog
)
context.append(Message(content=tools_instruction, role="user"))
prompt = self.process_msg(context)
tool_config = create_func_config(CODE_GENERATOR_WITH_TOOLS)
rsp = await self.llm.aask_code(prompt, **tool_config)
return rsp["code"]
class WriteCodeWithToolsML(WriteCodeWithTools):
async def run(
self,
context: List[Message],
plan: Plan = None,
column_info: str = "",
**kwargs,
) -> Tuple[List[Message], str]:
task_type = plan.current_task.task_type
available_tools = self.available_tools.get(task_type, {})
special_prompt = TASK_SPECIFIC_PROMPT.get(task_type, "")
code_steps = plan.current_task.code_steps
finished_tasks = plan.get_finished_tasks()
code_context = [remove_comments(task.code) for task in finished_tasks]
code_context = "\n\n".join(code_context)
if len(available_tools) > 0:
available_tools = {k: v["description"] for k, v in available_tools.items()}
recommend_tools = await self._tool_recommendation(
plan.current_task.instruction, code_steps, available_tools
)
tool_catalog = self._parse_recommend_tools(task_type, recommend_tools)
logger.info(f"Recommended tools: \n{recommend_tools}")
module_name = TASK_MODULE_MAP[task_type]
prompt = ML_TOOL_USAGE_PROMPT.format(
user_requirement=plan.goal,
history_code=code_context,
current_task=plan.current_task.instruction,

View file

@ -198,6 +198,24 @@ model.fit(train, y_train)
"""
TOOL_USAGE_PROMPT = """
# Instruction
Write complete code for 'Current Task'. And avoid duplicating code from finished tasks, such as repeated import of packages, reading data, etc.
Specifically, {special_prompt}
# Capabilities
- You can utilize pre-defined tools in any code lines from 'Available Tools' in the form of Python Class.
- You can freely combine the use of any other public packages, like sklearn, numpy, pandas, etc..
# Available Tools (can be empty):
Each Class tool is described in JSON format. When you call a tool, import the tool from `{module_name}` first.
{tool_catalog}
# Constraints:
- Ensure the output new code is executable in the same Jupyter notebook with previous tasks code have been executed.
- Always prioritize using pre-defined tools for the same functionality.
"""
ML_TOOL_USAGE_PROMPT = """
# Background
As a data scientist, you need to help user to achieve their goal [{user_requirement}] step-by-step in an continuous Jupyter notebook.
@ -297,14 +315,14 @@ The current task is about evaluating a model, please note the following:
- Use trained model from previous task result directly, do not mock or reload model yourself.
"""
ML_SPECIFIC_PROMPT = {
TASK_SPECIFIC_PROMPT = {
"data_preprocess": DATA_PREPROCESS_PROMPT,
"feature_engineering": FEATURE_ENGINEERING_PROMPT,
"model_train": MODEL_TRAIN_PROMPT,
"model_evaluate": MODEL_EVALUATE_PROMPT,
}
ML_MODULE_MAP = {
TASK_MODULE_MAP = {
"data_preprocess": "metagpt.tools.functions.libs.data_preprocess",
"feature_engineering": "metagpt.tools.functions.libs.feature_engineering",
"udf": "metagpt.tools.functions.libs.udf",

View file

@ -4,14 +4,20 @@ from pydantic import Field
from metagpt.actions.ask_review import ReviewConst
from metagpt.actions.execute_code import ExecutePyCode
from metagpt.actions.write_analysis_code import WriteCodeByGenerate
from metagpt.actions.write_analysis_code import (
WriteCodeByGenerate,
WriteCodeWithTools,
)
from metagpt.logs import logger
from metagpt.roles import Role
from metagpt.roles.tool_maker import ToolMaker
from metagpt.schema import Message, Task, TaskResult
from metagpt.utils.save_code import save_code_file
class CodeInterpreter(Role):
use_tools: bool = False
make_udfs: bool = False # whether to save user-defined functions
execute_code: ExecutePyCode = Field(default_factory=ExecutePyCode, exclude=True)
def __init__(
@ -21,8 +27,10 @@ class CodeInterpreter(Role):
goal="",
auto_run=False,
use_tools=False,
make_udfs=False,
**kwargs,
):
super().__init__(name=name, profile=profile, goal=goal)
super().__init__(name=name, profile=profile, goal=goal, use_tools=use_tools, make_udfs=make_udfs, **kwargs)
self._set_react_mode(react_mode="plan_and_act", auto_run=auto_run, use_tools=use_tools)
@property
@ -36,6 +44,10 @@ class CodeInterpreter(Role):
project_record = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
save_code_file(name=project_record, code_context=self.execute_code.nb, file_format="ipynb")
# make tools out of workable codes for future use
if self.make_udfs:
await self.make_tools()
return rsp
async def _act_on_task(self, current_task: Task) -> TaskResult:
@ -48,20 +60,18 @@ class CodeInterpreter(Role):
success = False
while not success and counter < max_retry:
context = self.planner.get_useful_memories()
logger.info("Write code with pure generation")
code = await WriteCodeByGenerate().run(context=context, plan=self.planner.plan, temperature=0.0)
cause_by = WriteCodeByGenerate
### write code ###
code, cause_by = await self._write_code()
self.working_memory.add(Message(content=code, role="assistant", cause_by=cause_by))
### execute code ###
result, success = await self.execute_code.run(code)
print(result)
self.working_memory.add(Message(content=result, role="user", cause_by=ExecutePyCode))
### process execution result ###
if "!pip" in code:
success = False
@ -74,3 +84,19 @@ class CodeInterpreter(Role):
counter = 0 # redo the task again with help of human suggestions
return code, result, success
async def _write_code(self):
todo = WriteCodeByGenerate() if not self.use_tools else WriteCodeWithTools()
logger.info(f"ready to {todo.name}")
context = self.planner.get_useful_memories()
code = await todo.run(context=context, plan=self.planner.plan, temperature=0.0)
return code, todo
async def make_tools(self):
"""Make user-defined functions(udfs, aka tools) for pure generation code."""
logger.info("Plan completed. Now start to make tools ...")
tool_maker = ToolMaker()
for task in self.planner.plan.get_finished_tasks():
await tool_maker.make_tool(task.code, task.instruction, task.task_id)

View file

@ -1,31 +1,23 @@
import json
from metagpt.actions.ask_review import ReviewConst
from metagpt.actions.debug_code import DebugCode
from metagpt.actions.execute_code import ExecutePyCode
from metagpt.actions.ml_da_action import Reflect, SummarizeAnalysis, UpdateDataColumns
from metagpt.actions.write_analysis_code import (
MakeTools,
WriteCodeByGenerate,
WriteCodeWithTools,
)
from metagpt.actions.write_analysis_code import WriteCodeWithToolsML
from metagpt.actions.write_code_steps import WriteCodeSteps
from metagpt.const import METAGPT_ROOT
from metagpt.logs import logger
from metagpt.roles.code_interpreter import CodeInterpreter
from metagpt.roles.kaggle_manager import DownloadData, SubmitResult
from metagpt.schema import Message
from metagpt.tools.functions.libs.udf import UDFS_YAML
from metagpt.utils.common import remove_comments
from metagpt.utils.common import any_to_str
class MLEngineer(CodeInterpreter):
auto_run: bool = False
use_tools: bool = False
use_code_steps: bool = False
make_udfs: bool = False # whether to save user-defined functions
use_udfs: bool = False
data_desc: dict = {}
debug_context: list = []
latest_code: str = ""
def __init__(
self,
@ -38,27 +30,21 @@ class MLEngineer(CodeInterpreter):
make_udfs=False,
use_udfs=False,
):
super().__init__(name=name, profile=profile, goal=goal, auto_run=auto_run, use_tools=use_tools)
self.auto_run = auto_run
self.use_tools = use_tools
self.use_code_steps = use_code_steps
self.make_udfs = make_udfs
self.use_udfs = use_udfs
super().__init__(
name=name,
profile=profile,
goal=goal,
auto_run=auto_run,
use_tools=use_tools,
use_code_steps=use_code_steps,
make_udfs=make_udfs,
use_udfs=use_udfs,
)
# self._watch([DownloadData, SubmitResult]) # in multi-agent settings
async def _plan_and_act(self):
### Actions in a multi-agent multi-turn setting, a new attempt on the data ###
memories = self.get_memories()
if memories:
latest_event = memories[-1].cause_by
if latest_event == DownloadData:
self.planner.plan.context = memories[-1].content
elif latest_event == SubmitResult:
# self reflect on previous plan outcomes and think about how to improve the plan, add to working memory
await self._reflect()
# get feedback for improvement from human, add to working memory
await self.planner.ask_review(trigger=ReviewConst.TASK_REVIEW_TRIGGER)
### a new attempt on the data, relevant in a multi-agent multi-turn setting ###
await self._prepare_data_context()
### general plan process ###
await super()._plan_and_act()
@ -75,85 +61,48 @@ class MLEngineer(CodeInterpreter):
await WriteCodeSteps().run(self.planner.plan) if self.use_code_steps else ""
)
counter = 0
success = False
debug_context = []
while not success and counter < max_retry:
context = self.planner.get_useful_memories()
if counter > 0 and (self.use_tools or self.use_udfs):
logger.warning("We got a bug code, now start to debug...")
code = await DebugCode().run(
plan=self.planner.current_task.instruction,
code=code,
runtime_result=self.working_memory.get(),
context=debug_context,
)
logger.info(f"new code \n{code}")
cause_by = DebugCode
elif (not self.use_tools and not self.use_udfs) or (
self.planner.current_task.task_type == "other" and not self.use_udfs
):
logger.info("Write code with pure generation")
code = await WriteCodeByGenerate().run(context=context, plan=self.planner.plan, temperature=0.0)
debug_context = [self.planner.get_useful_memories(task_exclude_field={"result", "code_steps"})[0]]
cause_by = WriteCodeByGenerate
else:
logger.info("Write code with tools")
if self.use_udfs:
# use user-defined function tools.
logger.warning("Writing code with user-defined function tools by WriteCodeWithTools.")
logger.info(
f"Local user defined function as following:\
\n{json.dumps(list(UDFS_YAML.keys()), indent=2, ensure_ascii=False)}"
)
# set task_type to `udf`
self.planner.current_task.task_type = "udf"
schema_path = UDFS_YAML
else:
schema_path = METAGPT_ROOT / "metagpt/tools/functions/schemas"
tool_context, code = await WriteCodeWithTools(schema_path=schema_path).run(
context=context,
plan=self.planner.plan,
column_info=self.data_desc.get("column_info", ""),
)
debug_context = tool_context
cause_by = WriteCodeWithTools
self.working_memory.add(Message(content=code, role="assistant", cause_by=cause_by))
result, success = await self.execute_code.run(code)
print(result)
# make tools for successful code and long code.
if success and self.make_udfs and len(remove_comments(code).split("\n")) > 4:
logger.info("Execute code successfully. Now start to make tools ...")
await self.make_tools(code=code)
self.working_memory.add(Message(content=result, role="user", cause_by=ExecutePyCode))
if "!pip" in code:
success = False
counter += 1
if not success and counter >= max_retry:
logger.info("coding failed!")
review, _ = await self.planner.ask_review(auto_run=False, trigger=ReviewConst.CODE_REVIEW_TRIGGER)
if ReviewConst.CHANGE_WORD[0] in review:
counter = 0 # redo the task again with help of human suggestions
code, result, success = await super()._write_and_exec_code(max_retry=max_retry)
if success:
if (
self.use_tools and self.planner.current_task.task_type not in ["model_train", "model_evaluate"]
) or self.use_udfs:
if self.use_tools and self.planner.current_task.task_type in ["data_preprocess", "feature_engineering"]:
update_success, new_code = await self._update_data_columns()
if update_success:
code = code + "\n\n" + new_code
return code, result, success
async def _write_code(self):
if not self.use_tools:
return await super()._write_code()
code_execution_count = sum([msg.cause_by == any_to_str(ExecutePyCode) for msg in self.working_memory.get()])
print("*" * 10, code_execution_count)
if code_execution_count > 0:
logger.warning("We got a bug code, now start to debug...")
code = await DebugCode().run(
plan=self.planner.current_task.instruction,
code=self.latest_code,
runtime_result=self.working_memory.get(),
context=self.debug_context,
)
logger.info(f"new code \n{code}")
cause_by = DebugCode
else:
logger.info("Write code with tools")
tool_context, code = await WriteCodeWithToolsML().run(
context=[], # context assembled inside the Action
plan=self.planner.plan,
column_info=self.data_desc.get("column_info", ""),
)
self.debug_context = tool_context
cause_by = WriteCodeWithToolsML
self.latest_code = code
return code, cause_by
async def _update_data_columns(self):
logger.info("Check columns in updated data")
rsp = await UpdateDataColumns().run(self.planner.plan)
@ -166,6 +115,19 @@ class MLEngineer(CodeInterpreter):
self.data_desc["column_info"] = result
return success, code
async def _prepare_data_context(self):
memories = self.get_memories()
if memories:
latest_event = memories[-1].cause_by
if latest_event == DownloadData:
self.planner.plan.context = memories[-1].content
elif latest_event == SubmitResult:
# self reflect on previous plan outcomes and think about how to improve the plan, add to working memory
await self._reflect()
# get feedback for improvement from human, add to working memory
await self.planner.ask_review(trigger=ReviewConst.TASK_REVIEW_TRIGGER)
async def _reflect(self):
context = self.get_memories()
context = "\n".join([str(msg) for msg in context])
@ -173,38 +135,3 @@ class MLEngineer(CodeInterpreter):
reflection = await Reflect().run(context=context)
self.working_memory.add(Message(content=reflection, role="assistant"))
self.working_memory.add(Message(content=Reflect.REWRITE_PLAN_INSTRUCTION, role="user"))
async def make_tools(self, code: str):
"""Make user-defined functions(udfs, aka tools) for pure generation code.
Args:
code (str): pure generation code by class WriteCodeByGenerate.
"""
logger.warning(
f"Making tools for task_id {self.planner.current_task_id}: \
`{self.planner.current_task.instruction}` \n code: \n {code}"
)
make_tools = MakeTools()
make_tool_retries, make_tool_current_retry = 3, 0
while True:
# start make tools
tool_code = await make_tools.run(code, self.planner.current_task.instruction)
make_tool_current_retry += 1
# check tool_code by execute_code
logger.info(f"Checking task_id {self.planner.current_task_id} tool code by executor...")
execute_result, execute_success = await self.execute_code.run(tool_code)
if not execute_success:
logger.error(f"Tool code faild to execute, \n{execute_result}\n.We will try to fix it ...")
# end make tools
if execute_success or make_tool_current_retry >= make_tool_retries:
if make_tool_current_retry >= make_tool_retries:
logger.error(
f"We have tried the maximum number of attempts {make_tool_retries}\
and still have not created tools for task_id {self.planner.current_task_id} successfully,\
we will skip it."
)
break
# save successful tool code in udf
if execute_success:
make_tools.save(tool_code)

View file

@ -0,0 +1,46 @@
from pydantic import Field
from metagpt.actions.execute_code import ExecutePyCode
from metagpt.actions.write_analysis_code import (
MakeTools,
)
from metagpt.logs import logger
from metagpt.roles import Role
from metagpt.utils.common import remove_comments
class ToolMaker(Role):
execute_code: ExecutePyCode = Field(default_factory=ExecutePyCode, exclude=True)
async def make_tool(self, code: str, instruction: str, task_id: str = ""):
if len(remove_comments(code).split("\n")) < 5: # no need to consider trivial codes with fewer than 5 lines
return
logger.warning(
f"Making tools for task_id {task_id}: \
`{instruction}` \n code: \n {code}"
)
make_tools = MakeTools()
make_tool_retries, make_tool_current_retry = 3, 0
while True:
# start make tools
tool_code = await make_tools.run(code, instruction)
make_tool_current_retry += 1
# check tool_code by execute_code
logger.info(f"Checking task_id {task_id} tool code by executor...")
execute_result, execute_success = await self.execute_code.run(tool_code)
if not execute_success:
logger.error(f"Tool code faild to execute, \n{execute_result}\n.We will try to fix it ...")
# end make tools
if execute_success or make_tool_current_retry >= make_tool_retries:
if make_tool_current_retry >= make_tool_retries:
logger.error(
f"We have tried the maximum number of attempts {make_tool_retries}\
and still have not created tools for task_id {task_id} successfully,\
we will skip it."
)
break
# save successful tool code in udf
if execute_success:
make_tools.save(tool_code)

View file

@ -25,7 +25,7 @@ async def run_code_interpreter(
"""
if role_class == "ci":
role = CodeInterpreter(goal=requirement, auto_run=auto_run, use_tools=use_tools)
role = CodeInterpreter(goal=requirement, auto_run=auto_run, use_tools=use_tools, make_udfs=make_udfs)
else:
role = MLEngineer(
goal=requirement,