From e12ab25b7c51475c15eeaeba0eb9dfec472b889f Mon Sep 17 00:00:00 2001 From: yzlin Date: Thu, 11 Jan 2024 00:23:26 +0800 Subject: [PATCH] generalize write code with tools, simplify ml_engineer --- metagpt/actions/write_analysis_code.py | 72 +++++-- metagpt/prompts/ml_engineer.py | 22 ++- metagpt/roles/code_interpreter.py | 42 ++++- metagpt/roles/ml_engineer.py | 199 +++++++------------- metagpt/roles/tool_maker.py | 46 +++++ tests/metagpt/roles/run_code_interpreter.py | 2 +- 6 files changed, 221 insertions(+), 162 deletions(-) create mode 100644 metagpt/roles/tool_maker.py diff --git a/metagpt/actions/write_analysis_code.py b/metagpt/actions/write_analysis_code.py index d1e108b54..aef86122b 100644 --- a/metagpt/actions/write_analysis_code.py +++ b/metagpt/actions/write_analysis_code.py @@ -12,14 +12,16 @@ import yaml from tenacity import retry, stop_after_attempt, wait_fixed from metagpt.actions import Action +from metagpt.const import METAGPT_ROOT from metagpt.llm import LLM from metagpt.logs import logger from metagpt.prompts.ml_engineer import ( CODE_GENERATOR_WITH_TOOLS, GENERATE_CODE_PROMPT, - ML_MODULE_MAP, - ML_SPECIFIC_PROMPT, + ML_TOOL_USAGE_PROMPT, SELECT_FUNCTION_TOOLS, + TASK_MODULE_MAP, + TASK_SPECIFIC_PROMPT, TOOL_RECOMMENDATION_PROMPT, TOOL_USAGE_PROMPT, ) @@ -60,13 +62,12 @@ class BaseWriteAnalysisCode(Action): } return messages - async def run(self, context: List[Message], plan: Plan = None, code_steps: str = "") -> str: + async def run(self, context: List[Message], plan: Plan = None) -> str: """Run of a code writing action, used in data analysis or modeling Args: context (List[Message]): Action output history, source action denoted by Message.cause_by plan (Plan, optional): Overall plan. Defaults to None. - code_steps (str, optional): suggested step breakdown for the current task. Defaults to "". Returns: str: The code string. @@ -92,15 +93,12 @@ class WriteCodeByGenerate(BaseWriteAnalysisCode): class WriteCodeWithTools(BaseWriteAnalysisCode): """Write code with help of local available tools. Choose tools first, then generate code to use the tools""" - schema_path: str = "" + schema_path: Union[Path, str] = METAGPT_ROOT / "metagpt/tools/functions/schemas" available_tools: dict = {} - def __init__(self, schema_path="", **kwargs): + def __init__(self, **kwargs): super().__init__(**kwargs) - self.schema_path = schema_path - - if schema_path: - self._load_tools(schema_path) + self._load_tools(self.schema_path) def _load_tools(self, schema_path, schema_module=None): """Load tools from yaml file""" @@ -171,12 +169,11 @@ class WriteCodeWithTools(BaseWriteAnalysisCode): self, context: List[Message], plan: Plan = None, - column_info: str = "", **kwargs, - ) -> Tuple[List[Message], str]: + ) -> str: task_type = plan.current_task.task_type available_tools = self.available_tools.get(task_type, {}) - special_prompt = ML_SPECIFIC_PROMPT.get(task_type, "") + special_prompt = TASK_SPECIFIC_PROMPT.get(task_type, "") code_steps = plan.current_task.code_steps finished_tasks = plan.get_finished_tasks() @@ -192,9 +189,54 @@ class WriteCodeWithTools(BaseWriteAnalysisCode): tool_catalog = self._parse_recommend_tools(task_type, recommend_tools) logger.info(f"Recommended tools: \n{recommend_tools}") - module_name = ML_MODULE_MAP[task_type] + module_name = TASK_MODULE_MAP[task_type] - prompt = TOOL_USAGE_PROMPT.format( + else: + tool_catalog = {} + module_name = "" + + tools_instruction = TOOL_USAGE_PROMPT.format( + special_prompt=special_prompt, module_name=module_name, tool_catalog=tool_catalog + ) + + context.append(Message(content=tools_instruction, role="user")) + + prompt = self.process_msg(context) + + tool_config = create_func_config(CODE_GENERATOR_WITH_TOOLS) + rsp = await self.llm.aask_code(prompt, **tool_config) + return rsp["code"] + + +class WriteCodeWithToolsML(WriteCodeWithTools): + async def run( + self, + context: List[Message], + plan: Plan = None, + column_info: str = "", + **kwargs, + ) -> Tuple[List[Message], str]: + task_type = plan.current_task.task_type + available_tools = self.available_tools.get(task_type, {}) + special_prompt = TASK_SPECIFIC_PROMPT.get(task_type, "") + code_steps = plan.current_task.code_steps + + finished_tasks = plan.get_finished_tasks() + code_context = [remove_comments(task.code) for task in finished_tasks] + code_context = "\n\n".join(code_context) + + if len(available_tools) > 0: + available_tools = {k: v["description"] for k, v in available_tools.items()} + + recommend_tools = await self._tool_recommendation( + plan.current_task.instruction, code_steps, available_tools + ) + tool_catalog = self._parse_recommend_tools(task_type, recommend_tools) + logger.info(f"Recommended tools: \n{recommend_tools}") + + module_name = TASK_MODULE_MAP[task_type] + + prompt = ML_TOOL_USAGE_PROMPT.format( user_requirement=plan.goal, history_code=code_context, current_task=plan.current_task.instruction, diff --git a/metagpt/prompts/ml_engineer.py b/metagpt/prompts/ml_engineer.py index 9b873d39f..13ee4db42 100644 --- a/metagpt/prompts/ml_engineer.py +++ b/metagpt/prompts/ml_engineer.py @@ -198,6 +198,24 @@ model.fit(train, y_train) """ TOOL_USAGE_PROMPT = """ +# Instruction +Write complete code for 'Current Task'. And avoid duplicating code from finished tasks, such as repeated import of packages, reading data, etc. +Specifically, {special_prompt} + +# Capabilities +- You can utilize pre-defined tools in any code lines from 'Available Tools' in the form of Python Class. +- You can freely combine the use of any other public packages, like sklearn, numpy, pandas, etc.. + +# Available Tools (can be empty): +Each Class tool is described in JSON format. When you call a tool, import the tool from `{module_name}` first. +{tool_catalog} + +# Constraints: +- Ensure the output new code is executable in the same Jupyter notebook with previous tasks code have been executed. +- Always prioritize using pre-defined tools for the same functionality. +""" + +ML_TOOL_USAGE_PROMPT = """ # Background As a data scientist, you need to help user to achieve their goal [{user_requirement}] step-by-step in an continuous Jupyter notebook. @@ -297,14 +315,14 @@ The current task is about evaluating a model, please note the following: - Use trained model from previous task result directly, do not mock or reload model yourself. """ -ML_SPECIFIC_PROMPT = { +TASK_SPECIFIC_PROMPT = { "data_preprocess": DATA_PREPROCESS_PROMPT, "feature_engineering": FEATURE_ENGINEERING_PROMPT, "model_train": MODEL_TRAIN_PROMPT, "model_evaluate": MODEL_EVALUATE_PROMPT, } -ML_MODULE_MAP = { +TASK_MODULE_MAP = { "data_preprocess": "metagpt.tools.functions.libs.data_preprocess", "feature_engineering": "metagpt.tools.functions.libs.feature_engineering", "udf": "metagpt.tools.functions.libs.udf", diff --git a/metagpt/roles/code_interpreter.py b/metagpt/roles/code_interpreter.py index 390666fd5..9bb543d99 100644 --- a/metagpt/roles/code_interpreter.py +++ b/metagpt/roles/code_interpreter.py @@ -4,14 +4,20 @@ from pydantic import Field from metagpt.actions.ask_review import ReviewConst from metagpt.actions.execute_code import ExecutePyCode -from metagpt.actions.write_analysis_code import WriteCodeByGenerate +from metagpt.actions.write_analysis_code import ( + WriteCodeByGenerate, + WriteCodeWithTools, +) from metagpt.logs import logger from metagpt.roles import Role +from metagpt.roles.tool_maker import ToolMaker from metagpt.schema import Message, Task, TaskResult from metagpt.utils.save_code import save_code_file class CodeInterpreter(Role): + use_tools: bool = False + make_udfs: bool = False # whether to save user-defined functions execute_code: ExecutePyCode = Field(default_factory=ExecutePyCode, exclude=True) def __init__( @@ -21,8 +27,10 @@ class CodeInterpreter(Role): goal="", auto_run=False, use_tools=False, + make_udfs=False, + **kwargs, ): - super().__init__(name=name, profile=profile, goal=goal) + super().__init__(name=name, profile=profile, goal=goal, use_tools=use_tools, make_udfs=make_udfs, **kwargs) self._set_react_mode(react_mode="plan_and_act", auto_run=auto_run, use_tools=use_tools) @property @@ -36,6 +44,10 @@ class CodeInterpreter(Role): project_record = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") save_code_file(name=project_record, code_context=self.execute_code.nb, file_format="ipynb") + # make tools out of workable codes for future use + if self.make_udfs: + await self.make_tools() + return rsp async def _act_on_task(self, current_task: Task) -> TaskResult: @@ -48,20 +60,18 @@ class CodeInterpreter(Role): success = False while not success and counter < max_retry: - context = self.planner.get_useful_memories() - - logger.info("Write code with pure generation") - - code = await WriteCodeByGenerate().run(context=context, plan=self.planner.plan, temperature=0.0) - cause_by = WriteCodeByGenerate + ### write code ### + code, cause_by = await self._write_code() self.working_memory.add(Message(content=code, role="assistant", cause_by=cause_by)) + ### execute code ### result, success = await self.execute_code.run(code) print(result) self.working_memory.add(Message(content=result, role="user", cause_by=ExecutePyCode)) + ### process execution result ### if "!pip" in code: success = False @@ -74,3 +84,19 @@ class CodeInterpreter(Role): counter = 0 # redo the task again with help of human suggestions return code, result, success + + async def _write_code(self): + todo = WriteCodeByGenerate() if not self.use_tools else WriteCodeWithTools() + logger.info(f"ready to {todo.name}") + + context = self.planner.get_useful_memories() + code = await todo.run(context=context, plan=self.planner.plan, temperature=0.0) + + return code, todo + + async def make_tools(self): + """Make user-defined functions(udfs, aka tools) for pure generation code.""" + logger.info("Plan completed. Now start to make tools ...") + tool_maker = ToolMaker() + for task in self.planner.plan.get_finished_tasks(): + await tool_maker.make_tool(task.code, task.instruction, task.task_id) diff --git a/metagpt/roles/ml_engineer.py b/metagpt/roles/ml_engineer.py index a230b2e2d..b6d660137 100644 --- a/metagpt/roles/ml_engineer.py +++ b/metagpt/roles/ml_engineer.py @@ -1,31 +1,23 @@ -import json - from metagpt.actions.ask_review import ReviewConst from metagpt.actions.debug_code import DebugCode from metagpt.actions.execute_code import ExecutePyCode from metagpt.actions.ml_da_action import Reflect, SummarizeAnalysis, UpdateDataColumns -from metagpt.actions.write_analysis_code import ( - MakeTools, - WriteCodeByGenerate, - WriteCodeWithTools, -) +from metagpt.actions.write_analysis_code import WriteCodeWithToolsML from metagpt.actions.write_code_steps import WriteCodeSteps -from metagpt.const import METAGPT_ROOT from metagpt.logs import logger from metagpt.roles.code_interpreter import CodeInterpreter from metagpt.roles.kaggle_manager import DownloadData, SubmitResult from metagpt.schema import Message -from metagpt.tools.functions.libs.udf import UDFS_YAML -from metagpt.utils.common import remove_comments +from metagpt.utils.common import any_to_str class MLEngineer(CodeInterpreter): auto_run: bool = False - use_tools: bool = False use_code_steps: bool = False - make_udfs: bool = False # whether to save user-defined functions use_udfs: bool = False data_desc: dict = {} + debug_context: list = [] + latest_code: str = "" def __init__( self, @@ -38,27 +30,21 @@ class MLEngineer(CodeInterpreter): make_udfs=False, use_udfs=False, ): - super().__init__(name=name, profile=profile, goal=goal, auto_run=auto_run, use_tools=use_tools) - self.auto_run = auto_run - self.use_tools = use_tools - self.use_code_steps = use_code_steps - self.make_udfs = make_udfs - self.use_udfs = use_udfs + super().__init__( + name=name, + profile=profile, + goal=goal, + auto_run=auto_run, + use_tools=use_tools, + use_code_steps=use_code_steps, + make_udfs=make_udfs, + use_udfs=use_udfs, + ) # self._watch([DownloadData, SubmitResult]) # in multi-agent settings async def _plan_and_act(self): - ### Actions in a multi-agent multi-turn setting, a new attempt on the data ### - memories = self.get_memories() - if memories: - latest_event = memories[-1].cause_by - if latest_event == DownloadData: - self.planner.plan.context = memories[-1].content - elif latest_event == SubmitResult: - # self reflect on previous plan outcomes and think about how to improve the plan, add to working memory - await self._reflect() - - # get feedback for improvement from human, add to working memory - await self.planner.ask_review(trigger=ReviewConst.TASK_REVIEW_TRIGGER) + ### a new attempt on the data, relevant in a multi-agent multi-turn setting ### + await self._prepare_data_context() ### general plan process ### await super()._plan_and_act() @@ -75,85 +61,48 @@ class MLEngineer(CodeInterpreter): await WriteCodeSteps().run(self.planner.plan) if self.use_code_steps else "" ) - counter = 0 - success = False - debug_context = [] - - while not success and counter < max_retry: - context = self.planner.get_useful_memories() - - if counter > 0 and (self.use_tools or self.use_udfs): - logger.warning("We got a bug code, now start to debug...") - code = await DebugCode().run( - plan=self.planner.current_task.instruction, - code=code, - runtime_result=self.working_memory.get(), - context=debug_context, - ) - logger.info(f"new code \n{code}") - cause_by = DebugCode - - elif (not self.use_tools and not self.use_udfs) or ( - self.planner.current_task.task_type == "other" and not self.use_udfs - ): - logger.info("Write code with pure generation") - code = await WriteCodeByGenerate().run(context=context, plan=self.planner.plan, temperature=0.0) - debug_context = [self.planner.get_useful_memories(task_exclude_field={"result", "code_steps"})[0]] - cause_by = WriteCodeByGenerate - - else: - logger.info("Write code with tools") - if self.use_udfs: - # use user-defined function tools. - logger.warning("Writing code with user-defined function tools by WriteCodeWithTools.") - logger.info( - f"Local user defined function as following:\ - \n{json.dumps(list(UDFS_YAML.keys()), indent=2, ensure_ascii=False)}" - ) - # set task_type to `udf` - self.planner.current_task.task_type = "udf" - schema_path = UDFS_YAML - else: - schema_path = METAGPT_ROOT / "metagpt/tools/functions/schemas" - tool_context, code = await WriteCodeWithTools(schema_path=schema_path).run( - context=context, - plan=self.planner.plan, - column_info=self.data_desc.get("column_info", ""), - ) - debug_context = tool_context - cause_by = WriteCodeWithTools - - self.working_memory.add(Message(content=code, role="assistant", cause_by=cause_by)) - - result, success = await self.execute_code.run(code) - print(result) - # make tools for successful code and long code. - if success and self.make_udfs and len(remove_comments(code).split("\n")) > 4: - logger.info("Execute code successfully. Now start to make tools ...") - await self.make_tools(code=code) - self.working_memory.add(Message(content=result, role="user", cause_by=ExecutePyCode)) - - if "!pip" in code: - success = False - - counter += 1 - - if not success and counter >= max_retry: - logger.info("coding failed!") - review, _ = await self.planner.ask_review(auto_run=False, trigger=ReviewConst.CODE_REVIEW_TRIGGER) - if ReviewConst.CHANGE_WORD[0] in review: - counter = 0 # redo the task again with help of human suggestions + code, result, success = await super()._write_and_exec_code(max_retry=max_retry) if success: - if ( - self.use_tools and self.planner.current_task.task_type not in ["model_train", "model_evaluate"] - ) or self.use_udfs: + if self.use_tools and self.planner.current_task.task_type in ["data_preprocess", "feature_engineering"]: update_success, new_code = await self._update_data_columns() if update_success: code = code + "\n\n" + new_code return code, result, success + async def _write_code(self): + if not self.use_tools: + return await super()._write_code() + + code_execution_count = sum([msg.cause_by == any_to_str(ExecutePyCode) for msg in self.working_memory.get()]) + print("*" * 10, code_execution_count) + + if code_execution_count > 0: + logger.warning("We got a bug code, now start to debug...") + code = await DebugCode().run( + plan=self.planner.current_task.instruction, + code=self.latest_code, + runtime_result=self.working_memory.get(), + context=self.debug_context, + ) + logger.info(f"new code \n{code}") + cause_by = DebugCode + + else: + logger.info("Write code with tools") + tool_context, code = await WriteCodeWithToolsML().run( + context=[], # context assembled inside the Action + plan=self.planner.plan, + column_info=self.data_desc.get("column_info", ""), + ) + self.debug_context = tool_context + cause_by = WriteCodeWithToolsML + + self.latest_code = code + + return code, cause_by + async def _update_data_columns(self): logger.info("Check columns in updated data") rsp = await UpdateDataColumns().run(self.planner.plan) @@ -166,6 +115,19 @@ class MLEngineer(CodeInterpreter): self.data_desc["column_info"] = result return success, code + async def _prepare_data_context(self): + memories = self.get_memories() + if memories: + latest_event = memories[-1].cause_by + if latest_event == DownloadData: + self.planner.plan.context = memories[-1].content + elif latest_event == SubmitResult: + # self reflect on previous plan outcomes and think about how to improve the plan, add to working memory + await self._reflect() + + # get feedback for improvement from human, add to working memory + await self.planner.ask_review(trigger=ReviewConst.TASK_REVIEW_TRIGGER) + async def _reflect(self): context = self.get_memories() context = "\n".join([str(msg) for msg in context]) @@ -173,38 +135,3 @@ class MLEngineer(CodeInterpreter): reflection = await Reflect().run(context=context) self.working_memory.add(Message(content=reflection, role="assistant")) self.working_memory.add(Message(content=Reflect.REWRITE_PLAN_INSTRUCTION, role="user")) - - async def make_tools(self, code: str): - """Make user-defined functions(udfs, aka tools) for pure generation code. - - Args: - code (str): pure generation code by class WriteCodeByGenerate. - """ - logger.warning( - f"Making tools for task_id {self.planner.current_task_id}: \ - `{self.planner.current_task.instruction}` \n code: \n {code}" - ) - make_tools = MakeTools() - make_tool_retries, make_tool_current_retry = 3, 0 - while True: - # start make tools - tool_code = await make_tools.run(code, self.planner.current_task.instruction) - make_tool_current_retry += 1 - - # check tool_code by execute_code - logger.info(f"Checking task_id {self.planner.current_task_id} tool code by executor...") - execute_result, execute_success = await self.execute_code.run(tool_code) - if not execute_success: - logger.error(f"Tool code faild to execute, \n{execute_result}\n.We will try to fix it ...") - # end make tools - if execute_success or make_tool_current_retry >= make_tool_retries: - if make_tool_current_retry >= make_tool_retries: - logger.error( - f"We have tried the maximum number of attempts {make_tool_retries}\ - and still have not created tools for task_id {self.planner.current_task_id} successfully,\ - we will skip it." - ) - break - # save successful tool code in udf - if execute_success: - make_tools.save(tool_code) diff --git a/metagpt/roles/tool_maker.py b/metagpt/roles/tool_maker.py new file mode 100644 index 000000000..a2f854adb --- /dev/null +++ b/metagpt/roles/tool_maker.py @@ -0,0 +1,46 @@ +from pydantic import Field + +from metagpt.actions.execute_code import ExecutePyCode +from metagpt.actions.write_analysis_code import ( + MakeTools, +) +from metagpt.logs import logger +from metagpt.roles import Role +from metagpt.utils.common import remove_comments + + +class ToolMaker(Role): + execute_code: ExecutePyCode = Field(default_factory=ExecutePyCode, exclude=True) + + async def make_tool(self, code: str, instruction: str, task_id: str = ""): + if len(remove_comments(code).split("\n")) < 5: # no need to consider trivial codes with fewer than 5 lines + return + + logger.warning( + f"Making tools for task_id {task_id}: \ + `{instruction}` \n code: \n {code}" + ) + make_tools = MakeTools() + make_tool_retries, make_tool_current_retry = 3, 0 + while True: + # start make tools + tool_code = await make_tools.run(code, instruction) + make_tool_current_retry += 1 + + # check tool_code by execute_code + logger.info(f"Checking task_id {task_id} tool code by executor...") + execute_result, execute_success = await self.execute_code.run(tool_code) + if not execute_success: + logger.error(f"Tool code faild to execute, \n{execute_result}\n.We will try to fix it ...") + # end make tools + if execute_success or make_tool_current_retry >= make_tool_retries: + if make_tool_current_retry >= make_tool_retries: + logger.error( + f"We have tried the maximum number of attempts {make_tool_retries}\ + and still have not created tools for task_id {task_id} successfully,\ + we will skip it." + ) + break + # save successful tool code in udf + if execute_success: + make_tools.save(tool_code) diff --git a/tests/metagpt/roles/run_code_interpreter.py b/tests/metagpt/roles/run_code_interpreter.py index 7c5c1939b..539b20286 100644 --- a/tests/metagpt/roles/run_code_interpreter.py +++ b/tests/metagpt/roles/run_code_interpreter.py @@ -25,7 +25,7 @@ async def run_code_interpreter( """ if role_class == "ci": - role = CodeInterpreter(goal=requirement, auto_run=auto_run, use_tools=use_tools) + role = CodeInterpreter(goal=requirement, auto_run=auto_run, use_tools=use_tools, make_udfs=make_udfs) else: role = MLEngineer( goal=requirement,