From 537d51c26e29a1774269825e3611667b2436e80d Mon Sep 17 00:00:00 2001 From: lidanyang Date: Wed, 13 Dec 2023 14:32:25 +0800 Subject: [PATCH] write code with class tool --- metagpt/actions/write_analysis_code.py | 130 +++++++++---------- metagpt/roles/ml_engineer.py | 170 ++++++++++--------------- 2 files changed, 131 insertions(+), 169 deletions(-) diff --git a/metagpt/actions/write_analysis_code.py b/metagpt/actions/write_analysis_code.py index 58cab9c6a..aceebbfeb 100644 --- a/metagpt/actions/write_analysis_code.py +++ b/metagpt/actions/write_analysis_code.py @@ -4,7 +4,9 @@ @Author : orange-crow @File : write_code_v2.py """ -from typing import Dict, List, Union, Tuple, Optional, Any +from typing import Dict, List, Union, Tuple + +import yaml from metagpt.actions import Action from metagpt.logs import logger @@ -15,11 +17,9 @@ from metagpt.prompts.ml_engineer import ( TOOL_USAGE_PROMPT, ML_SPECIFIC_PROMPT, ML_MODULE_MAP, - TOOL_OUTPUT_DESC, DATA_PROCESS_PROMPT, - GENERATE_CODE_PROMPT + GENERATE_CODE_PROMPT, ) from metagpt.schema import Message, Plan -from metagpt.tools.functions import registry from metagpt.utils.common import create_func_config, remove_comments @@ -100,40 +100,55 @@ class WriteCodeByGenerate(BaseWriteAnalysisCode): class WriteCodeWithTools(BaseWriteAnalysisCode): """Write code with help of local available tools. Choose tools first, then generate code to use the tools""" - @staticmethod - def _parse_recommend_tools(module: str, recommend_tools: list) -> List[Dict]: + def __init__(self, name: str = "", context=None, llm=None, schema_path=None): + super().__init__(name, context, llm) + self.schema_path = schema_path + self.available_tools = {} + + if self.schema_path is not None: + self._load_tools(schema_path) + + def _load_tools(self, schema_path): + """Load tools from yaml file""" + yml_files = schema_path.glob("*.yml") + for yml_file in yml_files: + module = yml_file.stem + with open(yml_file, "r", encoding="utf-8") as f: + self.available_tools[module] = yaml.safe_load(f) + + def _parse_recommend_tools(self, module: str, recommend_tools: list) -> dict: """ Parses and validates a list of recommended tools, and retrieves their schema from registry. Args: module (str): The module name for querying tools in the registry. - recommend_tools (list): A list of lists of recommended tools for each step. + recommend_tools (list): A list of recommended tools. Returns: - List[Dict]: A list of dicts of valid tool schemas. + dict: A dict of valid tool schemas. """ valid_tools = [] - available_tools = registry.get_all_by_module(module).keys() + available_tools = self.available_tools[module].keys() for tool in recommend_tools: if tool in available_tools: valid_tools.append(tool) - tool_catalog = registry.get_schemas(module, valid_tools) + tool_catalog = {tool: self.available_tools[module][tool] for tool in valid_tools} return tool_catalog async def _tool_recommendation( - self, - task: str, - code_steps: str, - available_tools: list + self, + task: str, + code_steps: str, + available_tools: dict, ) -> list: """ Recommend tools for the specified task. Args: - context (List[Message]): Action output history, source action denoted by Message.cause_by + task (str): the task to recommend tools for code_steps (str): the code steps to generate the full code for the task - available_tools (list): the available tools for the task + available_tools (dict): the available tools description Returns: list: recommended tools for the specified task @@ -149,27 +164,23 @@ class WriteCodeWithTools(BaseWriteAnalysisCode): return recommend_tools async def run( - self, - context: List[Message], - plan: Plan = None, - code_steps: str = "", - column_info: str = "", - **kwargs, - ) -> str: + self, + context: List[Message], + plan: Plan = None, + code_steps: str = "", + column_info: str = "", + **kwargs, + ) -> Tuple[List[Message], str]: task_type = plan.current_task.task_type - available_tools = registry.get_all_schema_by_module(task_type) + available_tools = self.available_tools.get(task_type, {}) special_prompt = ML_SPECIFIC_PROMPT.get(task_type, "") - column_names = kwargs.get("column_names", {}) finished_tasks = plan.get_finished_tasks() code_context = [remove_comments(task.code) for task in finished_tasks] code_context = "\n\n".join(code_context) if len(available_tools) > 0: - available_tools = [ - {k: tool[k] for k in ["name", "description"] if k in tool} - for tool in available_tools - ] + available_tools = {k: v["description"] for k, v in available_tools.items()} recommend_tools = await self._tool_recommendation( plan.current_task.instruction, @@ -180,46 +191,27 @@ class WriteCodeWithTools(BaseWriteAnalysisCode): logger.info(f"Recommended tools: \n{recommend_tools}") module_name = ML_MODULE_MAP[task_type] - output_desc = TOOL_OUTPUT_DESC.get(task_type, "") - new_code = "" - - for idx, tool in enumerate(recommend_tools): - hist_info = f"Previous finished code is \n\n ```Python {code_context} ``` \n\n " - - prompt = TOOL_USAGE_PROMPT.format( - goal=plan.current_task.instruction, - context=hist_info, - code_steps=code_steps, - column_names=column_names, - special_prompt=special_prompt, - module_name=module_name, - output_desc=output_desc, - function_catalog=tool_catalog[idx], - ) - - tool_config = create_func_config(CODE_GENERATOR_WITH_TOOLS) - - rsp = await self.llm.aask_code(prompt, **tool_config) - logger.info(f"rsp is: {rsp}") - # final_code = final_code + "\n\n" + rsp["code"] - # final_code[key] = rsp["code"] - new_code = new_code + "\n\n" + rsp["code"] - code_context = code_context + "\n\n" + rsp["code"] - return new_code - - else: - hist_info = f"Previous finished code is \n\n ```Python {code_context} ``` \n\n " - - prompt = GENERATE_CODE_PROMPT.format( - goal=plan.current_task.instruction, - context=hist_info, - code_steps=code_steps, + prompt = TOOL_USAGE_PROMPT.format( + user_requirement=plan.goal, + history_code=code_context, + current_task=plan.current_task.instruction, + column_info=column_info, special_prompt=special_prompt, - # column_names=column_names + code_steps=code_steps, + module_name=module_name, + tool_catalog=tool_catalog, + ) + else: + prompt = GENERATE_CODE_PROMPT.format( + user_requirement=plan.goal, + history_code=code_context, + current_task=plan.current_task.instruction, + column_info=column_info, + special_prompt=special_prompt, + code_steps=code_steps, ) - tool_config = create_func_config(CODE_GENERATOR_WITH_TOOLS) - logger.info(f"prompt is: {prompt}") - rsp = await self.llm.aask_code(prompt, **tool_config) - logger.info(f"rsp is: {rsp}") - return rsp["code"] + tool_config = create_func_config(CODE_GENERATOR_WITH_TOOLS) + rsp = await self.llm.aask_code(prompt, **tool_config) + context = [Message(content=prompt, role="user")] + return context, rsp["code"] diff --git a/metagpt/roles/ml_engineer.py b/metagpt/roles/ml_engineer.py index 45fe728dd..20589079d 100644 --- a/metagpt/roles/ml_engineer.py +++ b/metagpt/roles/ml_engineer.py @@ -1,5 +1,6 @@ import json import re +from datetime import datetime from typing import List import fire @@ -10,12 +11,16 @@ from metagpt.actions.execute_code import ExecutePyCode from metagpt.actions.write_analysis_code import WriteCodeByGenerate, WriteCodeWithTools from metagpt.actions.write_code_steps import WriteCodeSteps from metagpt.actions.write_plan import WritePlan -from metagpt.const import DATA_PATH +from metagpt.const import DATA_PATH, PROJECT_ROOT from metagpt.logs import logger -from metagpt.prompts.ml_engineer import GEN_DATA_DESC_PROMPT +from metagpt.prompts.ml_engineer import ( + GEN_DATA_DESC_PROMPT, + UPDATE_DATA_COLUMNS, + PRINT_DATA_COLUMNS +) from metagpt.roles import Role from metagpt.schema import Message, Plan -from metagpt.utils.common import CodeParser +from metagpt.utils.common import CodeParser, remove_comments, create_func_config from metagpt.actions.debug_code import DebugCode STRUCTURAL_CONTEXT = """ @@ -57,34 +62,6 @@ def remove_escape_and_color_codes(input_str): return result -def read_data(file: str) -> pd.DataFrame: - if file.endswith(".csv"): - df = pd.read_csv(file, sep=",") - sep_list = [";", "\t", ":", " ", "|"] - for sep in sep_list: - if df.shape[1] == 1: - df = pd.read_csv(file, sep=sep) - else: - break - else: - raise ValueError(f"Unsupported file type: {file}") - return df - - -def get_column_info(df: pd.DataFrame) -> str: - data = [] - for i in df.columns: - nan_freq = float("%.2g" % (df[i].isna().mean() * 100)) - n_unique = df[i].nunique() - data.append([i, df[i].dtype, nan_freq, n_unique]) - - samples = pd.DataFrame( - data, - columns=["Column_name", "Data_type", "NaN_Frequency(%)", "N_unique"], - ) - return samples.to_string(index=False) - - class AskReview(Action): async def run(self, context: List[Message], plan: Plan = None): logger.info("Current overall plan:") @@ -108,26 +85,20 @@ class AskReview(Action): return rsp, confirmed -class GenerateDataDesc(Action): - async def run(self, file: str) -> dict: - data_desc = {} - df = read_data(file) - data_head = df.head().to_dict(orient="list") - data_head = json.dumps(data_head, indent=4, ensure_ascii=False) - prompt = GEN_DATA_DESC_PROMPT.replace("{data_head}", data_head) - rsp = await self._aask(prompt) - rsp = CodeParser.parse_code(block=None, text=rsp) - rsp = json.loads(rsp) - data_desc["path"] = file - data_desc["data_desc"] = rsp["data_desc"] - data_desc["column_desc"] = rsp["column_desc"] - data_desc["column_info"] = get_column_info(df) - return data_desc +class UpdateDataColumns(Action): + async def run(self, plan: Plan = None) -> dict: + finished_tasks = plan.get_finished_tasks() + code_context = [remove_comments(task.code) for task in finished_tasks] + code_context = "\n\n".join(code_context) + prompt = UPDATE_DATA_COLUMNS.format(history_code=code_context) + tool_config = create_func_config(PRINT_DATA_COLUMNS) + rsp = await self.llm.aask_code(prompt, **tool_config) + return rsp class MLEngineer(Role): def __init__( - self, name="ABC", profile="MLEngineer", goal="", auto_run: bool = False, data_path: str = None + self, name="ABC", profile="MLEngineer", goal="", auto_run: bool = False, ): super().__init__(name=name, profile=profile, goal=goal) self._set_react_mode(react_mode="plan_and_act") @@ -136,13 +107,9 @@ class MLEngineer(Role): self.use_code_steps = True self.execute_code = ExecutePyCode() self.auto_run = auto_run - self.data_path = data_path self.data_desc = {} async def _plan_and_act(self): - if self.data_path: - self.data_desc = await self._generate_data_desc() - # create initial plan and update until confirmation await self._update_plan() @@ -163,25 +130,27 @@ class MLEngineer(Role): task.code_steps = code_steps self.plan.finish_current_task() self.working_memory.clear() - - if "print(df_processed.info())" in code: - self.data_desc["column_info"] = result + + success, new_code = await self._update_data_columns() + if success: + task.code = task.code + "\n\n" + new_code else: # update plan according to user's feedback and to take on changed tasks await self._update_plan() - - finished_tasks = self.plan.get_finished_tasks() - if len(finished_tasks) == len(self.plan.tasks): - code_context = [task.code for task in finished_tasks] - code_context = "\n\n".join(code_context) - result, success = await self.execute_code.run(code_context) - # truncated the result - print(truncate(result)) - - async def _generate_data_desc(self): - data_desc = await GenerateDataDesc().run(self.data_path) - return data_desc - + + time = datetime.now().strftime('%Y-%m-%d_%H-%M-%S') + self.execute_code.save_notebook(f"{DATA_PATH}/notebooks/ml_{time}.ipynb") + + async def _update_data_columns(self): + rsp = await UpdateDataColumns().run(self.plan) + is_update, code = rsp["is_update"], rsp["code"] + success = False + if is_update: + result, success = await self.execute_code.run(code) + if success: + self.data_desc["column_info"] = result + return success, code + async def _write_and_exec_code(self, max_retry: int = 3): code_steps = ( await WriteCodeSteps().run(self.plan) @@ -192,6 +161,7 @@ class MLEngineer(Role): counter = 0 improve_code = "" success = False + debug_context = [] finished_tasks = self.plan.get_finished_tasks() code_context = [task.code for task in finished_tasks] @@ -200,37 +170,38 @@ class MLEngineer(Role): code_result = "\n\n".join(code_result) while not success and counter < max_retry: - if counter == 0: - context = self.get_useful_memories() - else: - # context = self.get_useful_memories() - # logger.info(f"context {context}") + context = self.get_useful_memories() + + if counter > 0: improve_code = await DebugCode().run(plan=self.plan.current_task.instruction, - finished_code=code_context, - finished_code_result=code_result, + # finished_code=code_context, + # finished_code_result=code_result, code=code, - runtime_result=self.working_memory.get()) - - if not self.use_tools or self.plan.current_task.task_type == "other": + runtime_result=self.working_memory.get(), + context=debug_context) + + if improve_code != "": + code = improve_code + logger.info(f"new code \n{improve_code}") + cause_by = DebugCode + elif not self.use_tools or self.plan.current_task.task_type == "other": logger.info("Write code with pure generation") - code = await WriteCodeByGenerate().run( context=context, plan=self.plan, code_steps=code_steps, temperature=0.0 ) + debug_context = [self.get_useful_memories(task_exclude_field={'result', 'code_steps'})[0]] cause_by = WriteCodeByGenerate else: logger.info("Write code with tools") - - if improve_code != "": - code = improve_code - logger.info(f"new code {code}") - cause_by = DebugCode - else: - code = await WriteCodeWithTools().run( - context=context, plan=self.plan, code_steps=code_steps, **{"column_names": {}} - ) - - cause_by = WriteCodeWithTools + schema_path = PROJECT_ROOT / "metagpt/tools/functions/schemas" + tool_context, code = await WriteCodeWithTools(schema_path=schema_path).run( + context=context, + plan=self.plan, + code_steps=code_steps, + column_info=self.data_desc.get("column_info", ""), + ) + debug_context = tool_context + cause_by = WriteCodeWithTools self.working_memory.add( Message(content=code, role="assistant", cause_by=cause_by) @@ -238,9 +209,7 @@ class MLEngineer(Role): # debug on code, run on runcode with finished code and new_df # runcode = code_context + "\n\n" + code - runcode = code - - result, success = await self.execute_code.run(runcode) + result, success = await self.execute_code.run(code) # truncated the result print(truncate(result)) @@ -289,12 +258,12 @@ class MLEngineer(Role): self.plan.add_tasks(tasks) self.working_memory.clear() - def get_useful_memories(self) -> List[Message]: + def get_useful_memories(self, task_exclude_field: set = None) -> List[Message]: """find useful memories only to reduce context length and improve performance""" # TODO dataset description , code steps user_requirement = self.plan.goal tasks = json.dumps( - [task.dict() for task in self.plan.tasks], indent=4, ensure_ascii=False + [task.dict(exclude=task_exclude_field) for task in self.plan.tasks], indent=4, ensure_ascii=False ) current_task = self.plan.current_task.json() if self.plan.current_task else {} context = STRUCTURAL_CONTEXT.format( @@ -321,12 +290,13 @@ if __name__ == "__main__": # requirement = "Perform data analysis on the provided data. Train a model to predict the target variable Survived. Include data preprocessing, feature engineering, and modeling in your pipeline. The metric is accuracy." - data_path = f"{DATA_PATH}/titanic" - requirement = f"This is a titanic passenger survival dataset, your goal is to predict passenger survival outcome. The target column is Survived. Perform data analysis, data preprocessing, feature engineering, and modeling to predict the target. Report accuracy on the eval data. Train data path: '{data_path}/split_train.csv', eval data path: '{data_path}/split_eval.csv'." - - - async def main(requirement: str = requirement, auto_run: bool = True, data_path: str = ""): - role = MLEngineer(goal=requirement, auto_run=auto_run, data_path=data_path) + # data_path = f"{DATA_PATH}/titanic" + # requirement = f"This is a titanic passenger survival dataset, your goal is to predict passenger survival outcome. The target column is Survived. Perform data analysis, data preprocessing, feature engineering, and modeling to predict the target. Report accuracy on the eval data. Train data path: '{data_path}/split_train.csv', eval data path: '{data_path}/split_eval.csv'." + # requirement = f"Run data analysis on sklearn Wine recognition dataset, include a plot, and train a model to predict wine class (20% as validation), and show validation accuracy" + data_path = f"{DATA_PATH}/icr-identify-age-related-conditions" + requirement = f"This is a medical dataset with over fifty anonymized health characteristics linked to three age-related conditions. Your goal is to predict whether a subject has or has not been diagnosed with one of these conditions.The target column is Class. Perform data analysis, data preprocessing, feature engineering, and modeling to predict the target. Report f1 score on the eval data. Train data path: {data_path}/split_train.csv, eval data path: {data_path}/split_eval.csv." + async def main(requirement: str = requirement, auto_run: bool = True): + role = MLEngineer(goal=requirement, auto_run=auto_run) await role.run(requirement)