diff --git a/metagpt/actions/write_analysis_code.py b/metagpt/actions/write_analysis_code.py index 1cfc28811..136a4956f 100644 --- a/metagpt/actions/write_analysis_code.py +++ b/metagpt/actions/write_analysis_code.py @@ -202,32 +202,7 @@ class WriteCodeWithTools(BaseWriteAnalysisCode): module_name=module_name, tool_catalog=tool_catalog, ) - code_steps_ = eval(code_steps) - print(code_steps_) - new_code = "" - tool_context = "" - for idx, (step_id, step_instruction) in enumerate(code_steps_.items()): - prompt = TOOL_USAGE_PROMPT.format( - user_requirement=plan.goal, - history_code=code_context, - current_task=plan.current_task.instruction, - column_info=column_info, - special_prompt=special_prompt, - code_steps=step_instruction, - module_name=module_name, - tool_catalog=tool_catalog, - ) - - tool_config = create_func_config(CODE_GENERATOR_WITH_TOOLS) - - rsp = await self.llm.aask_code(prompt, **tool_config) - logger.info(f"rsp is: {rsp}") - new_code = new_code + "\n\n" + rsp["code"] - code_context = code_context + "\n\n" + new_code - tool_context = tool_context + "\n\n" + prompt - context = [Message(content=tool_context, role="user")] - return context, new_code else: diff --git a/metagpt/roles/ml_engineer.py b/metagpt/roles/ml_engineer.py index fa006b061..b38c752a4 100644 --- a/metagpt/roles/ml_engineer.py +++ b/metagpt/roles/ml_engineer.py @@ -1,4 +1,4 @@ -from typing import List +from typing import List import json from datetime import datetime @@ -42,24 +42,24 @@ class UpdateDataColumns(Action): class MLEngineer(Role): def __init__( - self, name="ABC", profile="MLEngineer", goal="", auto_run: bool = False + self, name="ABC", profile="MLEngineer", goal="", auto_run: bool = False ): super().__init__(name=name, profile=profile, goal=goal) self._set_react_mode(react_mode="plan_and_act") self._watch([DownloadData, SubmitResult]) - + self.plan = Plan(goal=goal) self.use_tools = False self.use_code_steps = False self.execute_code = ExecutePyCode() self.auto_run = auto_run self.data_desc = {} - + # memory for working on each task, discarded each time a task is done self.working_memory = Memory() - + async def _plan_and_act(self): - + ### Actions in a multi-agent multi-turn setting ### memories = self.get_memories() if memories: @@ -69,29 +69,29 @@ class MLEngineer(Role): elif latest_event == SubmitResult: # self reflect on previous plan outcomes and think about how to improve the plan, add to working memory await self._reflect() - + # get feedback for improvement from human, add to working memory await self._ask_review(trigger=ReviewConst.TASK_REVIEW_TRIGGER) - + ### Common Procedure in both single- and multi-agent setting ### # create initial plan and update until confirmation await self._update_plan() - + while self.plan.current_task: task = self.plan.current_task logger.info(f"ready to take on task {task}") - + # take on current task code, result, success = await self._write_and_exec_code() - + # ask for acceptance, users can other refuse and change tasks in the plan review, task_result_confirmed = await self._ask_review(trigger=ReviewConst.TASK_REVIEW_TRIGGER) - + if self.auto_run: # if human confirms the task result, then we deem the task completed, regardless of whether the code run succeeds; # if auto mode, then the code run has to succeed for the task to be considered completed task_result_confirmed = success - + if task_result_confirmed: # tick off this task and record progress task.code = code @@ -100,12 +100,13 @@ class MLEngineer(Role): self.working_memory.clear() if self.use_tools: - success, new_code = await self._update_data_columns() + success, new_code = await self._update_data_columns() if success: task.code = task.code + "\n\n" + new_code - + confirmed_and_more = (ReviewConst.CONTINUE_WORD[0] in review.lower() - and review.lower() not in ReviewConst.CONTINUE_WORD[0]) # "confirm, ... (more content, such as changing downstream tasks)" + and review.lower() not in ReviewConst.CONTINUE_WORD[ + 0]) # "confirm, ... (more content, such as changing downstream tasks)" if confirmed_and_more: self.working_memory.add(Message(content=review, role="user", cause_by=AskReview)) await self._update_plan(review) @@ -114,23 +115,23 @@ class MLEngineer(Role): # Ask the Role to redo this task with help of review feedback, # useful when the code run is successful but the procedure or result is not what we want continue - + else: # update plan according to user's feedback and to take on changed tasks await self._update_plan(review) completed_plan_memory = self.get_useful_memories() # completed plan as a outcome self._rc.memory.add(completed_plan_memory[0]) # add to persistent memory - + summary = await SummarizeAnalysis().run(self.plan) rsp = Message(content=summary, cause_by=SummarizeAnalysis) self._rc.memory.add(rsp) - + # save code using datetime.now or keywords related to the goal of your project (plan.goal). project_record = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") save_code_file(name=project_record, code_context=self.execute_code.nb, file_format="ipynb") return rsp - + async def _update_data_columns(self): rsp = await UpdateDataColumns().run(self.plan) is_update, code = rsp["is_update"], rsp["code"] @@ -147,23 +148,19 @@ class MLEngineer(Role): if self.use_code_steps else "" ) - + counter = 0 success = False debug_context = [] - + while not success and counter < max_retry: context = self.get_useful_memories() - # print("*" * 10) - # print(context) - # print("*" * 10) - # breakpoint() if counter > 0 and self.use_tools: code = await DebugCode().run( plan=self.plan.current_task.instruction, - code=code, - runtime_result=self.working_memory.get(), + code=code, + runtime_result=self.working_memory.get(), context=debug_context ) logger.info(f"new code \n{code}") @@ -185,30 +182,30 @@ class MLEngineer(Role): ) debug_context = tool_context cause_by = WriteCodeWithTools - + self.working_memory.add( Message(content=code, role="assistant", cause_by=cause_by) ) - + result, success = await self.execute_code.run(code) print(result) self.working_memory.add( Message(content=result, role="user", cause_by=ExecutePyCode) ) - + if "!pip" in code: success = False - + counter += 1 - + if not success and counter >= max_retry: logger.info("coding failed!") review, _ = await self._ask_review(auto_run=False, trigger=ReviewConst.CODE_REVIEW_TRIGGER) if ReviewConst.CHANGE_WORD[0] in review: counter = 0 # redo the task again with help of human suggestions - + return code, result, success - + async def _ask_review(self, auto_run: bool = None, trigger: str = ReviewConst.TASK_REVIEW_TRIGGER): auto_run = auto_run or self.auto_run if not auto_run: @@ -218,7 +215,7 @@ class MLEngineer(Role): self.working_memory.add(Message(content=review, role="user", cause_by=AskReview)) return review, confirmed return "", True - + async def _update_plan(self, review: str = "", max_tasks: int = 3, max_retries: int = 3): plan_confirmed = False while not plan_confirmed: @@ -229,7 +226,7 @@ class MLEngineer(Role): self.working_memory.add( Message(content=rsp, role="assistant", cause_by=WritePlan) ) - + # precheck plan before asking reviews is_plan_valid, error = precheck_update_plan_from_rsp(rsp, self.plan) if not is_plan_valid and max_retries > 0: @@ -238,11 +235,11 @@ class MLEngineer(Role): self.working_memory.add(Message(content=error_msg, role="assistant", cause_by=WritePlan)) max_retries -= 1 continue - + _, plan_confirmed = await self._ask_review(trigger=ReviewConst.TASK_REVIEW_TRIGGER) - + update_plan_from_rsp(rsp, self.plan) - + self.working_memory.clear() async def _reflect(self): @@ -254,7 +251,7 @@ class MLEngineer(Role): reflection = await Reflect().run(context=context) self.working_memory.add(Message(content=reflection, role="assistant")) self.working_memory.add(Message(content=Reflect.REWRITE_PLAN_INSTRUCTION, role="user")) - + def get_useful_memories(self, task_exclude_field=None) -> List[Message]: """find useful memories only to reduce context length and improve performance""" # TODO dataset description , code steps @@ -271,9 +268,9 @@ class MLEngineer(Role): user_requirement=user_requirement, data_desc=data_desc, tasks=tasks, current_task=current_task ) context_msg = [Message(content=context, role="user")] - - return context_msg + self.get_working_memories() + return context_msg + self.get_working_memories() + def get_working_memories(self) -> List[Message]: return self.working_memory.get() @@ -298,7 +295,6 @@ if __name__ == "__main__": # data_path = f"{DATA_PATH}/santander-customer-transaction-prediction" # requirement = f"This is a customers financial dataset. Your goal is to predict which customers will make a specific transaction in the future. The target column is target. Perform data analysis, data preprocessing, feature engineering, and modeling to predict the target. Report F1 Score on the eval data. Train data path: '{data_path}/split_train.csv', eval data path: '{data_path}/split_eval.csv' ." - save_dir = "" save_dir = DATA_PATH / "save" / "2023-12-14_15-11-40" @@ -365,7 +361,8 @@ if __name__ == "__main__": role = MLEngineer(goal=requirement, auto_run=auto_run) role.plan = Plan(**plan) role.execute_code = ExecutePyCode(nb) - import pdb;pdb.set_trace() + import pdb; + pdb.set_trace() else: logger.info("Run from scratch") role = MLEngineer(goal=requirement, auto_run=auto_run)