diff --git a/metagpt/actions/ask_review.py b/metagpt/actions/ask_review.py new file mode 100644 index 000000000..eec5e49aa --- /dev/null +++ b/metagpt/actions/ask_review.py @@ -0,0 +1,62 @@ +from typing import List + +from metagpt.actions import Action +from metagpt.schema import Message, Plan +from metagpt.logs import logger + + +class ReviewConst: + TASK_REVIEW_TRIGGER = "task" + CODE_REVIEW_TRIGGER = "code" + CONTINUE_WORD = ["confirm", "continue", "c", "yes", "y"] + CHANGE_WORD = ["change"] + EXIT_WORD = ["exit"] + TASK_REVIEW_INSTRUCTION = ( + f"If you want to change, add, delete a task or merge tasks in the plan, say '{CHANGE_WORD[0]} task task_id or current task, ... (things to change)' " + f"If you confirm the output from the current task and wish to continue, type: {CONTINUE_WORD[0]}" + ) + CODE_REVIEW_INSTRUCTION = ( + f"If you want the codes to be rewritten, say '{CHANGE_WORD[0]} ... (your change advice)' " + f"If you want to leave it as is, type: {CONTINUE_WORD[0]} or {CONTINUE_WORD[1]}" + ) + EXIT_INSTRUCTION = f"If you want to terminate the process, type: {EXIT_WORD[0]}" + + +class AskReview(Action): + async def run( + self, context: List[Message], plan: Plan = None, trigger: str = "task" + ): + logger.info("Current overall plan:") + logger.info( + "\n".join( + [ + f"{task.task_id}: {task.instruction}, is_finished: {task.is_finished}" + for task in plan.tasks + ] + ) + ) + + logger.info("most recent context:") + latest_action = context[-1].cause_by.__name__ if context[-1].cause_by else "" + review_instruction = ( + ReviewConst.TASK_REVIEW_INSTRUCTION + if trigger == ReviewConst.TASK_REVIEW_TRIGGER + else ReviewConst.CODE_REVIEW_INSTRUCTION + ) + prompt = ( + f"This is a <{trigger}> review. Please review output from {latest_action}\n" + f"{review_instruction}\n" + f"{ReviewConst.EXIT_INSTRUCTION}\n" + "Please type your review below:\n" + ) + + rsp = input(prompt) + + if rsp.lower() in ReviewConst.EXIT_WORD: + exit() + + # Confirmation can be one of "confirm", "continue", "c", "yes", "y" exactly, or sentences containing "confirm". + # One could say "confirm this task, but change the next task to ..." + confirmed = rsp.lower() in ReviewConst.CONTINUE_WORD or ReviewConst.CONTINUE_WORD[0] in rsp.lower() + + return rsp, confirmed diff --git a/metagpt/actions/ml_da_action.py b/metagpt/actions/ml_da_action.py index b6270f12f..50d1d2420 100644 --- a/metagpt/actions/ml_da_action.py +++ b/metagpt/actions/ml_da_action.py @@ -10,62 +10,6 @@ from metagpt.prompts.ml_engineer import ( PRINT_DATA_COLUMNS ) -class ReviewConst: - TASK_REVIEW_TRIGGER = "task" - CODE_REVIEW_TRIGGER = "code" - CONTINUE_WORD = ["confirm", "continue", "c", "yes", "y"] - CHANGE_WORD = ["change"] - EXIT_WORD = ["exit"] - TASK_REVIEW_INSTRUCTION = ( - f"If you want to change, add, delete a task or merge tasks in the plan, say '{CHANGE_WORD[0]} task task_id or current task, ... (things to change)' " - f"If you confirm the output from the current task and wish to continue, type: {CONTINUE_WORD[0]}" - ) - CODE_REVIEW_INSTRUCTION = ( - f"If you want the codes to be rewritten, say '{CHANGE_WORD[0]} ... (your change advice)' " - f"If you want to leave it as is, type: {CONTINUE_WORD[0]} or {CONTINUE_WORD[1]}" - ) - EXIT_INSTRUCTION = f"If you want to terminate the process, type: {EXIT_WORD[0]}" - - -class AskReview(Action): - async def run( - self, context: List[Message], plan: Plan = None, trigger: str = "task" - ): - logger.info("Current overall plan:") - logger.info( - "\n".join( - [ - f"{task.task_id}: {task.instruction}, is_finished: {task.is_finished}" - for task in plan.tasks - ] - ) - ) - - logger.info("most recent context:") - latest_action = context[-1].cause_by.__name__ if context[-1].cause_by else "" - review_instruction = ( - ReviewConst.TASK_REVIEW_INSTRUCTION - if trigger == ReviewConst.TASK_REVIEW_TRIGGER - else ReviewConst.CODE_REVIEW_INSTRUCTION - ) - prompt = ( - f"This is a <{trigger}> review. Please review output from {latest_action}\n" - f"{review_instruction}\n" - f"{ReviewConst.EXIT_INSTRUCTION}\n" - "Please type your review below:\n" - ) - - rsp = input(prompt) - - if rsp.lower() in ReviewConst.EXIT_WORD: - exit() - - # Confirmation can be one of "confirm", "continue", "c", "yes", "y" exactly, or sentences containing "confirm". - # One could say "confirm this task, but change the next task to ..." - confirmed = rsp.lower() in ReviewConst.CONTINUE_WORD or ReviewConst.CONTINUE_WORD[0] in rsp.lower() - - return rsp, confirmed - class SummarizeAnalysis(Action): PROMPT_TEMPLATE = """ diff --git a/metagpt/actions/write_analysis_code.py b/metagpt/actions/write_analysis_code.py index 2d9110e91..21add3159 100644 --- a/metagpt/actions/write_analysis_code.py +++ b/metagpt/actions/write_analysis_code.py @@ -277,7 +277,7 @@ class MakeTools(WriteCodeByGenerate): saved_path.write_text(tool_code, encoding='utf-8') @retry(stop=stop_after_attempt(3), wait=wait_fixed(1)) - async def run(self, code: str | List[dict], code_desc: str = None, **kwargs) -> str: + async def run(self, code: Union[str, List[dict]], code_desc: str = None, **kwargs) -> str: # 拼接code prompt code_prompt = f"The following code is about {code_desc}, convert it to be a General Function, {code}" if not self.context: diff --git a/metagpt/plan/__init__.py b/metagpt/plan/__init__.py new file mode 100644 index 000000000..5ad35e100 --- /dev/null +++ b/metagpt/plan/__init__.py @@ -0,0 +1 @@ +from metagpt.plan.planner import Planner \ No newline at end of file diff --git a/metagpt/plan/planner.py b/metagpt/plan/planner.py new file mode 100644 index 000000000..c2b430817 --- /dev/null +++ b/metagpt/plan/planner.py @@ -0,0 +1,109 @@ +import json + +from metagpt.logs import logger +from metagpt.memory import Memory +from metagpt.schema import Message, Plan, Task +from metagpt.actions.ask_review import AskReview, ReviewConst +from metagpt.actions.write_plan import WritePlan, update_plan_from_rsp, precheck_update_plan_from_rsp + + +STRUCTURAL_CONTEXT = """ +## User Requirement +{user_requirement} +## Context +{context} +## Current Plan +{tasks} +## Current Task +{current_task} +""" + + +class Planner: + def __init__(self, goal: str, working_memory: Memory, auto_run: bool = False): + self.plan = Plan(goal=goal) + self.auto_run = auto_run + + # memory for working on each task, discarded each time a task is done + self.working_memory = working_memory + + @property + def current_task(self): + return self.plan.current_task + + @property + def current_task_id(self): + return self.plan.current_task_id + + async def ask_review(self, task_to_review: Task = None, auto_run: bool = None, trigger: str = ReviewConst.TASK_REVIEW_TRIGGER): + """ + Ask to review the task result, reviewer needs to provide confirmation or request change. + If human confirms the task result, then we deem the task completed, regardless of whether the code run succeeds; + if auto mode, then the code run has to succeed for the task to be considered completed. + """ + auto_run = auto_run or self.auto_run + if not auto_run: + context = self.get_useful_memories() + review, confirmed = await AskReview().run(context=context[-5:], plan=self.plan, trigger=trigger) + if not confirmed: + self.working_memory.add(Message(content=review, role="user", cause_by=AskReview)) + return review, confirmed + confirmed = task_to_review.is_success if task_to_review else True + return "", confirmed + + async def confirm_task(self, task, updated_task, review): + assert updated_task.task_id == task.task_id + self.plan.replace_task(updated_task) + self.plan.finish_current_task() + self.working_memory.clear() + + confirmed_and_more = (ReviewConst.CONTINUE_WORD[0] in review.lower() + and review.lower() not in ReviewConst.CONTINUE_WORD[0]) # "confirm, ... (more content, such as changing downstream tasks)" + if confirmed_and_more: + self.working_memory.add(Message(content=review, role="user", cause_by=AskReview)) + await self.update_plan(review) + + async def update_plan(self, review: str = "", max_tasks: int = 3, max_retries: int = 3, **kwargs): + plan_confirmed = False + while not plan_confirmed: + context = self.get_useful_memories() + rsp = await WritePlan().run( + context, max_tasks=max_tasks, **kwargs + ) + self.working_memory.add( + Message(content=rsp, role="assistant", cause_by=WritePlan) + ) + + # precheck plan before asking reviews + is_plan_valid, error = precheck_update_plan_from_rsp(rsp, self.plan) + if not is_plan_valid and max_retries > 0: + error_msg = f"The generated plan is not valid with error: {error}, try regenerating, remember to generate either the whole plan or the single changed task only" + logger.warning(error_msg) + self.working_memory.add(Message(content=error_msg, role="assistant", cause_by=WritePlan)) + max_retries -= 1 + continue + + _, plan_confirmed = await self.ask_review(trigger=ReviewConst.TASK_REVIEW_TRIGGER) + + update_plan_from_rsp(rsp, self.plan) + + self.working_memory.clear() + + def get_useful_memories(self, task_exclude_field=None) -> list[Message]: + """find useful memories only to reduce context length and improve performance""" + # TODO dataset description , code steps + if task_exclude_field is None: + # Shorten the context as we don't need code steps after we get the codes. + # This doesn't affect current_task below, which should hold the code steps + task_exclude_field = {'code_steps'} + user_requirement = self.plan.goal + context = self.plan.context + tasks = [task.dict(exclude=task_exclude_field) for task in self.plan.tasks] + tasks = json.dumps(tasks, indent=4, ensure_ascii=False) + current_task = self.plan.current_task.json() if self.plan.current_task else {} + context = STRUCTURAL_CONTEXT.format( + user_requirement=user_requirement, context=context, tasks=tasks, current_task=current_task + ) + context_msg = [Message(content=context, role="user")] + + return context_msg + self.working_memory.get() diff --git a/metagpt/prompts/ml_engineer.py b/metagpt/prompts/ml_engineer.py index 6af40bf97..c4b0ad8ae 100644 --- a/metagpt/prompts/ml_engineer.py +++ b/metagpt/prompts/ml_engineer.py @@ -309,14 +309,3 @@ ML_MODULE_MAP = { "feature_engineering": "metagpt.tools.functions.libs.feature_engineering", "udf": "metagpt.tools.functions.libs.udf", } - -STRUCTURAL_CONTEXT = """ -## User Requirement -{user_requirement} -## Data Description -{data_desc} -## Current Plan -{tasks} -## Current Task -{current_task} -""" diff --git a/metagpt/roles/code_interpreter.py b/metagpt/roles/code_interpreter.py new file mode 100644 index 000000000..32f530548 --- /dev/null +++ b/metagpt/roles/code_interpreter.py @@ -0,0 +1,80 @@ +import json +from datetime import datetime + +from metagpt.actions.execute_code import ExecutePyCode +from metagpt.actions.ask_review import ReviewConst +from metagpt.actions.write_analysis_code import WriteCodeByGenerate +from metagpt.logs import logger +from metagpt.roles import Role +from metagpt.schema import Message, Task +from metagpt.utils.save_code import save_code_file + + +class CodeInterpreter(Role): + def __init__( + self, name="Charlie", profile="CodeInterpreter", goal="", auto_run=False, + ): + super().__init__(name=name, profile=profile, goal=goal) + self._set_react_mode(react_mode="plan_and_act", auto_run=auto_run) + self.execute_code = ExecutePyCode() + + @property + def working_memory(self): + return self._rc.working_memory + + async def _plan_and_act(self): + + rsp = await super()._plan_and_act() + + # save code using datetime.now or keywords related to the goal of your project (plan.goal). + project_record = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") + save_code_file(name=project_record, code_context=self.execute_code.nb, file_format="ipynb") + + return rsp + + async def _act_on_task(self, current_task) -> Task: + code, result, success = await self._write_and_exec_code() + task_copy_with_result = current_task.copy( + update={"code": code, "result": result, "is_success": success}, + deep=True + ) + return task_copy_with_result + + async def _write_and_exec_code(self, max_retry: int = 3): + + counter = 0 + success = False + + while not success and counter < max_retry: + context = self.planner.get_useful_memories() + + logger.info("Write code with pure generation") + + code = await WriteCodeByGenerate().run( + context=context, plan=self.planner.plan, temperature=0.0 + ) + cause_by = WriteCodeByGenerate + + self.working_memory.add( + Message(content=code, role="assistant", cause_by=cause_by) + ) + + result, success = await self.execute_code.run(code) + print(result) + + self.working_memory.add( + Message(content=result, role="user", cause_by=ExecutePyCode) + ) + + if "!pip" in code: + success = False + + counter += 1 + + if not success and counter >= max_retry: + logger.info("coding failed!") + review, _ = await self.planner.ask_review(auto_run=False, trigger=ReviewConst.CODE_REVIEW_TRIGGER) + if ReviewConst.CHANGE_WORD[0] in review: + counter = 0 # redo the task again with help of human suggestions + + return code, result, success diff --git a/metagpt/roles/ml_engineer.py b/metagpt/roles/ml_engineer.py index aaace9693..e29d8fce5 100644 --- a/metagpt/roles/ml_engineer.py +++ b/metagpt/roles/ml_engineer.py @@ -1,135 +1,62 @@ -from typing import List import json -from datetime import datetime - -import fire from metagpt.actions.debug_code import DebugCode from metagpt.actions.execute_code import ExecutePyCode -from metagpt.actions.ml_da_action import AskReview, SummarizeAnalysis, Reflect, ReviewConst, UpdateDataColumns +from metagpt.actions.ask_review import ReviewConst from metagpt.actions.write_analysis_code import WriteCodeByGenerate, WriteCodeWithTools, MakeTools from metagpt.actions.write_code_steps import WriteCodeSteps -from metagpt.actions.write_plan import WritePlan -from metagpt.actions.write_plan import update_plan_from_rsp, precheck_update_plan_from_rsp -from metagpt.const import DATA_PATH, PROJECT_ROOT +from metagpt.const import PROJECT_ROOT from metagpt.logs import logger -from metagpt.memory import Memory -from metagpt.prompts.ml_engineer import STRUCTURAL_CONTEXT -from metagpt.roles import Role -from metagpt.roles.kaggle_manager import DownloadData, SubmitResult -from metagpt.schema import Message, Plan -from metagpt.utils.save_code import save_code_file -from metagpt.utils.recovery_util import save_history, load_history +from metagpt.schema import Message from metagpt.utils.common import remove_comments +from metagpt.actions.ml_da_action import SummarizeAnalysis, Reflect, UpdateDataColumns +from metagpt.roles.code_interpreter import CodeInterpreter +from metagpt.roles.kaggle_manager import DownloadData, SubmitResult +from metagpt.tools.functions.libs.udf import UDFS_YAML -class MLEngineer(Role): +class MLEngineer(CodeInterpreter): def __init__( - self, name="ABC", profile="MLEngineer", goal="", auto_run: bool = False, use_tools=False, use_code_steps=False, + self, name="Mark", profile="MLEngineer", goal="", auto_run=False, use_tools=False, use_code_steps=False, + make_udfs=False, use_udfs=False ): - super().__init__(name=name, profile=profile, goal=goal) - self._set_react_mode(react_mode="plan_and_act") + super().__init__(name=name, profile=profile, goal=goal, auto_run=auto_run) self._watch([DownloadData, SubmitResult]) - - self.plan = Plan(goal=goal) - self.make_udfs = False # user-defined functions - self.use_udfs = False - self.execute_code = ExecutePyCode() - self.auto_run = auto_run + self.use_tools = use_tools self.use_code_steps = use_code_steps + self.make_udfs = make_udfs # user-defined functions + self.use_udfs = use_udfs self.data_desc = {} - - # memory for working on each task, discarded each time a task is done - self.working_memory = Memory() async def _plan_and_act(self): - ### Actions in a multi-agent multi-turn setting ### + ### Actions in a multi-agent multi-turn setting, a new attempt on the data ### memories = self.get_memories() if memories: latest_event = memories[-1].cause_by if latest_event == DownloadData: - self.plan.context = memories[-1].content + self.planner.plan.context = memories[-1].content elif latest_event == SubmitResult: # self reflect on previous plan outcomes and think about how to improve the plan, add to working memory await self._reflect() # get feedback for improvement from human, add to working memory - await self._ask_review(trigger=ReviewConst.TASK_REVIEW_TRIGGER) + await self.planner.ask_review(trigger=ReviewConst.TASK_REVIEW_TRIGGER) - ### Common Procedure in both single- and multi-agent setting ### - # create initial plan and update until confirmation - await self._update_plan() + ### general plan process ### + await super()._plan_and_act() - while self.plan.current_task: - task = self.plan.current_task - logger.info(f"ready to take on task {task}") - - # take on current task - code, result, success = await self._write_and_exec_code() - - # ask for acceptance, users can other refuse and change tasks in the plan - review, task_result_confirmed = await self._ask_review(trigger=ReviewConst.TASK_REVIEW_TRIGGER) - - if self.auto_run: - # if human confirms the task result, then we deem the task completed, regardless of whether the code run succeeds; - # if auto mode, then the code run has to succeed for the task to be considered completed - task_result_confirmed = success - - if task_result_confirmed: - # tick off this task and record progress - task.code = code - task.result = result - self.plan.finish_current_task() - self.working_memory.clear() - - if (self.use_tools and task.task_type not in ['model_train', 'model_evaluate']) or self.use_udfs: - success, new_code = await self._update_data_columns() - if success: - task.code = task.code + "\n\n" + new_code - - confirmed_and_more = (ReviewConst.CONTINUE_WORD[0] in review.lower() - and review.lower() not in ReviewConst.CONTINUE_WORD[0]) # "confirm, ... (more content, such as changing downstream tasks)" - if confirmed_and_more: - self.working_memory.add(Message(content=review, role="user", cause_by=AskReview)) - await self._update_plan(review) - - elif "redo" in review: - # Ask the Role to redo this task with help of review feedback, - # useful when the code run is successful but the procedure or result is not what we want - continue - - else: - # update plan according to user's feedback and to take on changed tasks - await self._update_plan(review) - - completed_plan_memory = self.get_useful_memories() # completed plan as a outcome - self._rc.memory.add(completed_plan_memory[0]) # add to persistent memory - - summary = await SummarizeAnalysis().run(self.plan) + ### summarize analysis ### + summary = await SummarizeAnalysis().run(self.planner.plan) rsp = Message(content=summary, cause_by=SummarizeAnalysis) self._rc.memory.add(rsp) - # save code using datetime.now or keywords related to the goal of your project (plan.goal). - project_record = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") - save_code_file(name=project_record, code_context=self.execute_code.nb, file_format="ipynb") return rsp - - async def _update_data_columns(self): - rsp = await UpdateDataColumns().run(self.plan) - is_update, code = rsp["is_update"], rsp["code"] - success = False - if is_update: - result, success = await self.execute_code.run(code) - if success: - print(result) - self.data_desc["column_info"] = result - return success, code - + async def _write_and_exec_code(self, max_retry: int = 3): - self.plan.current_task.code_steps = ( - await WriteCodeSteps().run(self.plan) + self.planner.current_task.code_steps = ( + await WriteCodeSteps().run(self.planner.plan) if self.use_code_steps else "" ) @@ -139,46 +66,49 @@ class MLEngineer(Role): debug_context = [] while not success and counter < max_retry: - context = self.get_useful_memories() + + context = self.planner.get_useful_memories() + if counter > 0 and (self.use_tools or self.use_udfs): logger.warning('We got a bug code, now start to debug...') code = await DebugCode().run( - plan=self.plan.current_task.instruction, + plan=self.planner.current_task.instruction, code=code, runtime_result=self.working_memory.get(), context=debug_context ) logger.info(f"new code \n{code}") cause_by = DebugCode + elif (not self.use_tools and not self.use_udfs) or ( - self.plan.current_task.task_type == 'other' and not self.use_udfs): + self.planner.current_task.task_type == 'other' and not self.use_udfs): logger.info("Write code with pure generation") - # TODO: 添加基于current_task.instruction-code_path的k-v缓存 code = await WriteCodeByGenerate().run( - context=context, plan=self.plan, temperature=0.0 + context=context, plan=self.planner.plan, temperature=0.0 ) - debug_context = [self.get_useful_memories(task_exclude_field={'result', 'code_steps'})[0]] + debug_context = [self.planner.get_useful_memories(task_exclude_field={'result', 'code_steps'})[0]] cause_by = WriteCodeByGenerate + else: logger.info("Write code with tools") if self.use_udfs: # use user-defined function tools. - from metagpt.tools.functions.libs.udf import UDFS_YAML logger.warning("Writing code with user-defined function tools by WriteCodeWithTools.") logger.info(f"Local user defined function as following:\ \n{json.dumps(list(UDFS_YAML.keys()), indent=2, ensure_ascii=False)}") # set task_type to `udf` - self.plan.current_task.task_type = 'udf' + self.planner.current_task.task_type = 'udf' schema_path = UDFS_YAML else: schema_path = PROJECT_ROOT / "metagpt/tools/functions/schemas" tool_context, code = await WriteCodeWithTools(schema_path=schema_path).run( context=context, - plan=self.plan, + plan=self.planner.plan, column_info=self.data_desc.get("column_info", ""), ) debug_context = tool_context cause_by = WriteCodeWithTools + self.working_memory.add( Message(content=code, role="assistant", cause_by=cause_by) ) @@ -200,47 +130,29 @@ class MLEngineer(Role): if not success and counter >= max_retry: logger.info("coding failed!") - review, _ = await self._ask_review(auto_run=False, trigger=ReviewConst.CODE_REVIEW_TRIGGER) + review, _ = await self.planner.ask_review(auto_run=False, trigger=ReviewConst.CODE_REVIEW_TRIGGER) if ReviewConst.CHANGE_WORD[0] in review: counter = 0 # redo the task again with help of human suggestions - + + if success: + if (self.use_tools and self.planner.current_task.task_type not in ['model_train', 'model_evaluate']) or self.use_udfs: + update_success, new_code = await self._update_data_columns() + if update_success: + code = code + "\n\n" + new_code + return code, result, success - async def _ask_review(self, auto_run: bool = None, trigger: str = ReviewConst.TASK_REVIEW_TRIGGER): - auto_run = auto_run or self.auto_run - if not auto_run: - context = self.get_useful_memories() - review, confirmed = await AskReview().run(context=context[-5:], plan=self.plan, trigger=trigger) - if not confirmed: - self.working_memory.add(Message(content=review, role="user", cause_by=AskReview)) - return review, confirmed - return "", True - - async def _update_plan(self, review: str = "", max_tasks: int = 3, max_retries: int = 3): - plan_confirmed = False - while not plan_confirmed: - context = self.get_useful_memories() - rsp = await WritePlan().run( - context, max_tasks=max_tasks, use_tools=self.use_tools - ) - self.working_memory.add( - Message(content=rsp, role="assistant", cause_by=WritePlan) - ) - - # precheck plan before asking reviews - is_plan_valid, error = precheck_update_plan_from_rsp(rsp, self.plan) - if not is_plan_valid and max_retries > 0: - error_msg = f"The generated plan is not valid with error: {error}, try regenerating, remember to generate either the whole plan or the single changed task only" - logger.warning(error_msg) - self.working_memory.add(Message(content=error_msg, role="assistant", cause_by=WritePlan)) - max_retries -= 1 - continue - - _, plan_confirmed = await self._ask_review(trigger=ReviewConst.TASK_REVIEW_TRIGGER) - - update_plan_from_rsp(rsp, self.plan) - - self.working_memory.clear() + async def _update_data_columns(self): + logger.info("Check columns in updated data") + rsp = await UpdateDataColumns().run(self.planner.plan) + is_update, code = rsp["is_update"], rsp["code"] + success = False + if is_update: + result, success = await self.execute_code.run(code) + if success: + print(result) + self.data_desc["column_info"] = result + return success, code async def _reflect(self): context = self.get_memories() @@ -249,34 +161,6 @@ class MLEngineer(Role): reflection = await Reflect().run(context=context) self.working_memory.add(Message(content=reflection, role="assistant")) self.working_memory.add(Message(content=Reflect.REWRITE_PLAN_INSTRUCTION, role="user")) - - def get_useful_memories(self, task_exclude_field=None) -> List[Message]: - """find useful memories only to reduce context length and improve performance""" - # TODO dataset description , code steps - if task_exclude_field is None: - # Shorten the context as we don't need code steps after we get the codes. - # This doesn't affect current_task below, which should hold the code steps - task_exclude_field = {'code_steps'} - user_requirement = self.plan.goal - data_desc = self.plan.context - tasks = [task.dict(exclude=task_exclude_field) for task in self.plan.tasks] - tasks = json.dumps(tasks, indent=4, ensure_ascii=False) - current_task = self.plan.current_task.json() if self.plan.current_task else {} - context = STRUCTURAL_CONTEXT.format( - user_requirement=user_requirement, data_desc=data_desc, tasks=tasks, current_task=current_task - ) - context_msg = [Message(content=context, role="user")] - - return context_msg + self.get_working_memories() - - def get_working_memories(self) -> List[Message]: - return self.working_memory.get() - - def reset(self): - """Restart role with the same goal.""" - self.plan = Plan(goal=self.plan.goal) - self.execute_code = ExecutePyCode() - self.working_memory = Memory() async def make_tools(self, code: str): """Make user-defined functions(udfs, aka tools) for pure generation code. @@ -284,17 +168,17 @@ class MLEngineer(Role): Args: code (str): pure generation code by class WriteCodeByGenerate. """ - logger.warning(f"Making tools for task_id {self.plan.current_task_id}: \ - `{self.plan.current_task.instruction}` \n code: \n {code}") + logger.warning(f"Making tools for task_id {self.planner.current_task_id}: \ + `{self.planner.current_task.instruction}` \n code: \n {code}") make_tools = MakeTools() make_tool_retries, make_tool_current_retry = 3, 0 while True: # start make tools - tool_code = await make_tools.run(code, self.plan.current_task.instruction) + tool_code = await make_tools.run(code, self.planner.current_task.instruction) make_tool_current_retry += 1 # check tool_code by execute_code - logger.info(f"Checking task_id {self.plan.current_task_id} tool code by executor...") + logger.info(f"Checking task_id {self.planner.current_task_id} tool code by executor...") execute_result, execute_success = await self.execute_code.run(tool_code) if not execute_success: logger.error(f"Tool code faild to execute, \n{execute_result}\n.We will try to fix it ...") @@ -302,60 +186,9 @@ class MLEngineer(Role): if execute_success or make_tool_current_retry >= make_tool_retries: if make_tool_current_retry >= make_tool_retries: logger.error(f"We have tried the maximum number of attempts {make_tool_retries}\ - and still have not created tools for task_id {self.plan.current_task_id} successfully,\ + and still have not created tools for task_id {self.planner.current_task_id} successfully,\ we will skip it.") break # save successful tool code in udf if execute_success: make_tools.save(tool_code) - - -if __name__ == "__main__": - requirement = "Perform data analysis on the provided data. Train a model to predict the target variable Survived. Include data preprocessing, feature engineering, and modeling in your pipeline. The metric is accuracy." - - data_path = f"{DATA_PATH}/titanic" - requirement = f"This is a titanic passenger survival dataset, your goal is to predict passenger survival outcome. The target column is Survived. Perform data analysis, data preprocessing, feature engineering, and modeling to predict the target. Report accuracy on the eval data. Train data path: '{data_path}/split_train.csv', eval data path: '{data_path}/split_eval.csv'." - requirement = f"Run data analysis on sklearn Wine recognition dataset, include a plot, and train a model to predict wine class (20% as validation), and show validation accuracy" - data_path = f"{DATA_PATH}/icr-identify-age-related-conditions" - requirement = f"This is a medical dataset with over fifty anonymized health characteristics linked to three age-related conditions. Your goal is to predict whether a subject has or has not been diagnosed with one of these conditions.The target column is Class. Perform data analysis, data preprocessing, feature engineering, and modeling to predict the target. Report f1 score on the eval data. Train data path: {data_path}/split_train.csv, eval data path: {data_path}/split_eval.csv." - - # data_path = f"{DATA_PATH}/santander-customer-transaction-prediction" - # requirement = f"This is a customers financial dataset. Your goal is to predict which customers will make a specific transaction in the future. The target column is target. Perform data analysis, data preprocessing, feature engineering, and modeling to predict the target. Report AUC Score on the eval data. Train data path: '{data_path}/split_train.csv', eval data path: '{data_path}/split_eval.csv' ." - - data_path = f"{DATA_PATH}/house-prices-advanced-regression-techniques" - requirement = f"This is a house price dataset, your goal is to predict the sale price of a property based on its features. The target column is SalePrice. Perform data analysis, data preprocessing, feature engineering, and modeling to predict the target. Report RMSE between the logarithm of the predicted value and the logarithm of the observed sales price on the eval data. Train data path: '{data_path}/split_train.csv', eval data path: '{data_path}/split_eval.csv'." - save_dir = "" - # save_dir = DATA_PATH / "output" / "2023-12-14_20-40-34" - - async def main(requirement: str = requirement, auto_run: bool = True, use_tools: bool = False, use_code_steps: bool = False, save_dir: str = ""): - """ - The main function to run the MLEngineer with optional history loading. - - Args: - requirement (str): The requirement for the MLEngineer. - auto_run (bool): Whether to auto-run the MLEngineer. - save_dir (str): The directory from which to load the history or to save the new history. - - Raises: - Exception: If an error occurs during execution, log the error and save the history. - """ - if save_dir: - logger.info("Resuming from history trajectory") - plan, nb = load_history(save_dir) - role = MLEngineer(goal=requirement, auto_run=auto_run, use_tools=use_tools, use_code_steps=use_code_steps) - role.plan = Plan(**plan) - role.execute_code = ExecutePyCode(nb) - - else: - logger.info("Run from scratch") - role = MLEngineer(goal=requirement, auto_run=auto_run, use_tools=use_tools, use_code_steps=use_code_steps) - - try: - await role.run(requirement) - except Exception as e: - - save_path = save_history(role, save_dir) - - logger.exception(f"An error occurred: {e}, save trajectory here: {save_path}") - - fire.Fire(main) diff --git a/metagpt/roles/ml_engineer_simple.py b/metagpt/roles/ml_engineer_simple.py index cc7d8fc97..7214e37c2 100644 --- a/metagpt/roles/ml_engineer_simple.py +++ b/metagpt/roles/ml_engineer_simple.py @@ -10,7 +10,7 @@ from metagpt.schema import Message from metagpt.memory import Memory from metagpt.logs import logger from metagpt.actions.write_analysis_code import WriteCodeByGenerate -from metagpt.actions.ml_da_action import AskReview, ReviewConst +from metagpt.actions.ask_review import AskReview, ReviewConst from metagpt.actions.execute_code import ExecutePyCode from metagpt.roles.kaggle_manager import DownloadData from metagpt.utils.save_code import save_code_file diff --git a/metagpt/schema.py b/metagpt/schema.py index 8eb7e31ca..f46da0fde 100644 --- a/metagpt/schema.py +++ b/metagpt/schema.py @@ -81,6 +81,7 @@ class Task(BaseModel): code_steps: str = "" code: str = "" result: str = "" + is_success: bool = False is_finished: bool = False @@ -169,6 +170,7 @@ class Plan(BaseModel): task = self.task_map[task_id] task.code = "" task.result = "" + task.is_success = False task.is_finished = False def replace_task(self, new_task: Task): @@ -181,18 +183,18 @@ class Plan(BaseModel): Returns: None """ - if new_task.task_id in self.task_map: - # Replace the task in the task map and the task list - self.task_map[new_task.task_id] = new_task - for i, task in enumerate(self.tasks): - if task.task_id == new_task.task_id: - self.tasks[i] = new_task - break + assert new_task.task_id in self.task_map + # Replace the task in the task map and the task list + self.task_map[new_task.task_id] = new_task + for i, task in enumerate(self.tasks): + if task.task_id == new_task.task_id: + self.tasks[i] = new_task + break - # Reset dependent tasks - for task in self.tasks: - if new_task.task_id in task.dependent_task_ids: - self.reset_task(task.task_id) + # Reset dependent tasks + for task in self.tasks: + if new_task.task_id in task.dependent_task_ids: + self.reset_task(task.task_id) def append_task(self, new_task: Task): """ diff --git a/metagpt/utils/recovery_util.py b/metagpt/utils/recovery_util.py index afe7fc021..cef302d6b 100644 --- a/metagpt/utils/recovery_util.py +++ b/metagpt/utils/recovery_util.py @@ -46,7 +46,7 @@ def save_history(role: Role, save_dir: str = ""): # overwrite exist trajectory save_path.mkdir(parents=True, exist_ok=True) - plan = role.plan.dict() + plan = role.planner.plan.dict() with open(save_path / "plan.json", "w", encoding="utf-8") as plan_file: json.dump(plan, plan_file, indent=4, ensure_ascii=False) diff --git a/tests/metagpt/roles/test_daml.py b/tests/metagpt/roles/test_daml.py index 55b425316..dbb4fb38f 100644 --- a/tests/metagpt/roles/test_daml.py +++ b/tests/metagpt/roles/test_daml.py @@ -2,8 +2,14 @@ import pytest from tqdm import tqdm from metagpt.logs import logger -from metagpt.roles.ml_engineer import MLEngineer +from metagpt.schema import Plan +from metagpt.roles.ml_engineer import MLEngineer, ExecutePyCode +def reset(role): + """Restart role with the same goal.""" + role.working_memory.clear() + role.planner.plan = Plan(goal=role.planner.plan.goal) + role.execute_code = ExecutePyCode() async def make_use_tools(requirement: str, auto_run: bool = True): """make and use tools for requirement.""" @@ -15,7 +21,7 @@ async def make_use_tools(requirement: str, auto_run: bool = True): role.use_udfs = False await role.run(requirement) # use udfs - role.reset() + reset(role) role.make_udfs = False role.use_udfs = True role.use_code_steps = False diff --git a/tests/metagpt/test_schema.py b/tests/metagpt/test_schema.py index b5d49b7a1..65fa7574d 100644 --- a/tests/metagpt/test_schema.py +++ b/tests/metagpt/test_schema.py @@ -141,7 +141,8 @@ class TestPlan: task = Task(task_id="1", instruction="First Task") plan.add_tasks([task]) new_task = Task(task_id="2", instruction="New Task") - plan.replace_task(new_task) # Task with ID 2 does not exist in plan + with pytest.raises(AssertionError): + plan.replace_task(new_task) # Task with ID 2 does not exist in plan assert "1" in plan.task_map assert "2" not in plan.task_map