From 8b3d640dd60b3accce7845744f24522a8ec1bd22 Mon Sep 17 00:00:00 2001 From: yzlin Date: Fri, 1 Dec 2023 00:44:47 +0800 Subject: [PATCH 1/5] add kaggle manager --- kaggle_team.py | 37 +++++++++ metagpt/roles/kaggle_manager.py | 129 ++++++++++++++++++++++++++++++++ metagpt/schema.py | 1 + 3 files changed, 167 insertions(+) create mode 100644 kaggle_team.py create mode 100644 metagpt/roles/kaggle_manager.py diff --git a/kaggle_team.py b/kaggle_team.py new file mode 100644 index 000000000..0743d445b --- /dev/null +++ b/kaggle_team.py @@ -0,0 +1,37 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +import asyncio + +import fire + +from metagpt.roles.kaggle_manager import KaggleManager +from metagpt.roles.ml_engineer import MLEngineer +from metagpt.team import Team + +async def main( + # competition: str, + # data_desc: str, + # requirement: str, + investment: float = 3.0, + n_round: int = 5, +): + competition, data_desc, requirement = ( + "titanic", + "Training set is train.csv.\nTest set is test.csv. We also include gender_submission.csv, a set of predictions that assume all and only female passengers survive, as an example of what a submission file should look like.", + "Run EDA on the train dataset, train a model to predict survival (20% as validation) and save it, predict the test set using saved model, save the test result according to format", + ) + + team = Team() + team.hire( + [ + KaggleManager(competition=competition, data_desc=data_desc), + MLEngineer(goal=requirement), + ] + ) + + team.invest(investment) + team.start_project(requirement) + await team.run(n_round=n_round) + +if __name__ == '__main__': + fire.Fire(main) diff --git a/metagpt/roles/kaggle_manager.py b/metagpt/roles/kaggle_manager.py new file mode 100644 index 000000000..e902d99a0 --- /dev/null +++ b/metagpt/roles/kaggle_manager.py @@ -0,0 +1,129 @@ +from typing import Dict, List, Union, Tuple +import json +import subprocess + +import fire +import pandas as pd + +from metagpt.const import WORKSPACE_ROOT +from metagpt.roles import Role +from metagpt.actions import Action, BossRequirement +from metagpt.actions.write_analysis_code import AskReview, SummarizeAnalysis +from metagpt.schema import Message, Task, Plan +from metagpt.logs import logger + +import os +os.environ["KAGGLE_USERNAME"] = "xxx" +os.environ["KAGGLE_KEY"] = "xxx" + +def run_command(cmd): + print(cmd) + output = subprocess.run(cmd, shell=True, capture_output=True, text=True) + if output.returncode != 0: + print("Error output:", output.stderr) + exit() + else: + print(output.stdout) + return output.stdout + +class DownloadData(Action): + + async def run(self, competition, data_desc="") -> str: + data_path = WORKSPACE_ROOT / competition + + output = run_command(f"kaggle competitions list --search {competition}") + assert output != "No competitions found", "You must provide the correct competition name" + + run_command(f"kaggle competitions download {competition} --path {WORKSPACE_ROOT}") + + # if not os.path.exists(data_path): + if True: + run_command(f"unzip -o {WORKSPACE_ROOT / '*.zip'} -d {data_path}") # FIXME: not safe + + file_list = run_command(f"ls {data_path}") + + rsp = f""" + Location: + Data downloaded at {data_path} folder, including {file_list} + Data Description: + {data_desc} + """ + return rsp + +class SubmitResult(Action): + PROMPT_TEMPLATE = """ + # Context + {context} + # Your task + Extract the prediction file for test set, return only the path string, e.g., xxx.csv, xxx.xlsx + """ + + def __init__(self, name: str = "", context=None, llm=None) -> str: + super().__init__(name, context, llm) + + async def _parse_submit_file_path(self, context) -> str: + prompt = self.PROMPT_TEMPLATE.format(context=context) + rsp = await self._aask(prompt) + return rsp + + async def run(self, competition, submit_message="") -> str: + submit_file_path = self._parse_submit_file_path(submit_message) + + data_path = WORKSPACE_ROOT / competition + + run_command(f"kaggle competitions submit {competition} -f {submit_file_path} -m '{submit_message}'") + run_command(f"kaggle competitions leaderboard --show --csv {competition} > {data_path / 'leaderboard.csv'}") + run_command(f"kaggle competitions submissions --csv {competition} > {data_path / 'submission.csv'}") + + leaderboard = pd.read_csv(data_path / 'leaderboard.csv') + submission = pd.read_csv(data_path / 'submission.csv') + submission_score = submission.loc[0, "publicScore"] + submission_rank = leaderboard.loc[leaderboard["score"] == submission_score].index[0] + submission_rank_pct = round(submission_rank / len(leaderboard), 4) * 100 + + # best_score = max(submission["publicScore"]) + # best_rank = leaderboard.loc[leaderboard["score"] == best_score].index[0] + + submission_summary = f""" + ## All History + {submission.to_json(orient="records")} + ## Current + Current submission score: {submission_score}, rank: {submission_rank} (top {submission_rank_pct}%); + """ + print(submission_summary) + return submission_summary + + +class KaggleManager(Role): + def __init__( + self, name="ABC", profile="KaggleManager", goal="", competition="titanic", data_desc="" + ): + super().__init__(name=name, profile=profile, goal=goal) + self._init_actions([DownloadData, SubmitResult]) + self._watch([BossRequirement, SummarizeAnalysis]) + self.competition = competition + self.data_desc = data_desc # currently passed in, later can be scrapped down from web by another Role + + async def _think(self): + observed = self.get_memories()[-1].cause_by + if observed == BossRequirement: + self._set_state(0) # DownloadData, get competition of interest from human, download datasets + elif observed == SummarizeAnalysis: + self._set_state(1) # SubmitResult, get prediction from MLEngineer and submit it to Kaggle + elif observed == SubmitResult: + self._set_state(2) # AskReview, ask human for improvement + + async def _act(self): + todo = self._rc.todo + logger.info(f"{self._setting}: ready to {self._rc.todo}") + + if isinstance(todo, DownloadData): + rsp = await todo.run(self.competition, self.data_desc) + + elif isinstance(todo, SubmitResult): + submit_message = self.get_memories()[-1].content # use analysis summary from MLEngineer as submission message + rsp = await todo.run(competition=self.competition, submit_message=submit_message) + + msg = Message(content=rsp, role="user", cause_by=type(todo)) + + return msg diff --git a/metagpt/schema.py b/metagpt/schema.py index e39f54a0c..601bdcea2 100644 --- a/metagpt/schema.py +++ b/metagpt/schema.py @@ -85,6 +85,7 @@ class Task(BaseModel): class Plan(BaseModel): goal: str + context: str = "" tasks: list[Task] = [] task_map: dict[str, Task] = {} current_task_id = "" From d3d08fe5f33cf65fcf74442d2dd754ffed1c2b7a Mon Sep 17 00:00:00 2001 From: yzlin Date: Sat, 2 Dec 2023 01:34:22 +0800 Subject: [PATCH 2/5] more plan operation, review update, add kaggle team --- config/config.yaml | 5 +- kaggle_team.py | 3 +- metagpt/actions/ml_da_action.py | 119 +++++++++++++++++++++++++++++ metagpt/actions/write_plan.py | 2 +- metagpt/config.py | 3 + metagpt/prompts/ml_engineer.py | 11 +++ metagpt/roles/kaggle_manager.py | 65 ++++++++++------ metagpt/roles/ml_engineer.py | 129 ++++++++++++++++---------------- metagpt/schema.py | 42 +++++++++++ tests/metagpt/test_schema.py | 39 ++++++++++ 10 files changed, 330 insertions(+), 88 deletions(-) create mode 100644 metagpt/actions/ml_da_action.py diff --git a/config/config.yaml b/config/config.yaml index bed67083c..52a8eb036 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -94,4 +94,7 @@ MODEL_FOR_RESEARCHER_REPORT: gpt-3.5-turbo-16k ### browser path for pyppeteer engine, support Chrome, Chromium,MS Edge #PYPPETEER_EXECUTABLE_PATH: "/usr/bin/google-chrome-stable" -PROMPT_FORMAT: json #json or markdown \ No newline at end of file +PROMPT_FORMAT: json #json or markdown + +KAGGLE_USERNAME: "" +KAGGLE_KEY: "" \ No newline at end of file diff --git a/kaggle_team.py b/kaggle_team.py index 0743d445b..659c4a495 100644 --- a/kaggle_team.py +++ b/kaggle_team.py @@ -12,13 +12,14 @@ async def main( # competition: str, # data_desc: str, # requirement: str, - investment: float = 3.0, + investment: float = 5.0, n_round: int = 5, ): competition, data_desc, requirement = ( "titanic", "Training set is train.csv.\nTest set is test.csv. We also include gender_submission.csv, a set of predictions that assume all and only female passengers survive, as an example of what a submission file should look like.", "Run EDA on the train dataset, train a model to predict survival (20% as validation) and save it, predict the test set using saved model, save the test result according to format", + # "generate a random prediction of the same shape as gender_submission.csv and save", ) team = Team() diff --git a/metagpt/actions/ml_da_action.py b/metagpt/actions/ml_da_action.py new file mode 100644 index 000000000..9f903fd22 --- /dev/null +++ b/metagpt/actions/ml_da_action.py @@ -0,0 +1,119 @@ +import json +from typing import Dict, List, Union + +from metagpt.actions import Action +from metagpt.schema import Message, Plan +from metagpt.logs import logger + + +def truncate(result: str, keep_len: int = 1000) -> str: + desc = "Truncated to show only the last 1000 characters\n" + if result.startswith(desc): + result = result[-len(desc) :] + + if len(result) > keep_len: + result = result[-keep_len:] + + if not result.startswith(desc): + return desc + result + return desc + + +class ReviewConst: + TASK_REVIEW_TRIGGER = "task" + CODE_REVIEW_TRIGGER = "code" + CONTINUE_WORD = ["confirm", "continue", "c", "yes", "y"] + CHANGE_WORD = ["change"] + EXIT_WORD = ["exit"] + TASK_REVIEW_INSTRUCTION = ( + f"If you want to change, add, delete a task or merge tasks in the plan, say '{CHANGE_WORD[0]} task task_id or current task, ... (things to change)' " + f"If you confirm the output from the current task and wish to continue, type: {CONTINUE_WORD[0]}" + ) + CODE_REVIEW_INSTRUCTION = ( + f"If you want the codes to be rewritten, say '{CHANGE_WORD[0]} ... (your change advice)' " + f"If you want to leave it as is, type: {CONTINUE_WORD[0]} or {CONTINUE_WORD[1]}" + ) + EXIT_INSTRUCTION = f"If you want to terminate the process, type: {EXIT_WORD[0]}" + + +class AskReview(Action): + async def run( + self, context: List[Message], plan: Plan = None, trigger: str = "task" + ): + logger.info("Current overall plan:") + logger.info( + "\n".join( + [ + f"{task.task_id}: {task.instruction}, is_finished: {task.is_finished}" + for task in plan.tasks + ] + ) + ) + + logger.info("most recent context:") + latest_action = context[-1].cause_by.__name__ if context[-1].cause_by else "" + review_instruction = ( + ReviewConst.TASK_REVIEW_INSTRUCTION + if trigger == ReviewConst.TASK_REVIEW_TRIGGER + else ReviewConst.CODE_REVIEW_INSTRUCTION + ) + prompt = ( + f"This is a <{trigger}> review. Please review output from {latest_action}\n" + f"{review_instruction}\n" + f"{ReviewConst.EXIT_INSTRUCTION}\n" + "Please type your review below:\n" + ) + + rsp = input(prompt) + + if rsp.lower() in ReviewConst.EXIT_WORD: + exit() + + confirmed = rsp.lower() in ReviewConst.CONTINUE_WORD + + return rsp, confirmed + + +class SummarizeAnalysis(Action): + PROMPT_TEMPLATE = """ + # Context + {context} + # Summary + Output a 30-word summary on analysis tool and modeling algorithms you have used, and the corresponding result. Make sure to announce the complete path to your test prediction file. Your summary: + """ + + def __init__(self, name: str = "", context=None, llm=None) -> str: + super().__init__(name, context, llm) + + async def run(self, conmpleted_plan: Plan) -> str: + tasks = json.dumps( + [task.dict() for task in conmpleted_plan.tasks], + indent=4, + ensure_ascii=False, + ) # all tasks finished, return all task outputs + prompt = self.PROMPT_TEMPLATE.format(context=tasks) + summary = await self._aask(prompt) + return summary + + +class Reflect(Action): + PROMPT_TEMPLATE = """ + # User Requirement + {user_requirement} + # Context + {context} + # Summary + Above is all your attempts to tackle the user requirement. You plan, act, submit your output, and get the result and feedback. + First, summarize each of your previous trial in a triple of (your methods, the corresponding result, potential improvement), list them out. + # Takeaways + Second, carefully find key takeaways from your summarization in a step-by-step thinking process + # Guidance + Finally, make a concise one-sentence guidance for improving your future plan. + Your response: + """ + + async def run(self, context: str) -> str: + user_requirement = "Score as high as possible in a data modeling competition" + prompt = self.PROMPT_TEMPLATE.format(context=context, user_requirement=user_requirement) + rsp = await self._aask(prompt) + return rsp diff --git a/metagpt/actions/write_plan.py b/metagpt/actions/write_plan.py index dcfa25d55..5ff6d965c 100644 --- a/metagpt/actions/write_plan.py +++ b/metagpt/actions/write_plan.py @@ -17,7 +17,7 @@ class WritePlan(Action): __context__ # Task: Based on the context, write a plan or modify an existing plan of what you should do to achieve the goal. A plan consists of one to __max_tasks__ tasks. - If you are modifying an existing plan, carefully follow the instruction, don't make unnecessary changes. + If you are modifying an existing plan, carefully follow the instruction, don't make unnecessary changes. Give the whole plan unless instructed to modify only one task of the plan. Output a list of jsons following the format: ```json [ diff --git a/metagpt/config.py b/metagpt/config.py index 3f9e742bd..5973adfc4 100644 --- a/metagpt/config.py +++ b/metagpt/config.py @@ -95,6 +95,9 @@ class Config(metaclass=Singleton): self.prompt_format = self._get("PROMPT_FORMAT", "markdown") + self.kaggle_username = self._get("KAGGLE_USERNAME", "") + self.kaggle_key = self._get("KAGGLE_KEY", "") + def _init_with_config_files_and_env(self, configs: dict, yaml_file): """Load from config/key.yaml, config/config.yaml, and env in decreasing order of priority""" configs.update(os.environ) diff --git a/metagpt/prompts/ml_engineer.py b/metagpt/prompts/ml_engineer.py index 55ac27d82..e78ea4166 100644 --- a/metagpt/prompts/ml_engineer.py +++ b/metagpt/prompts/ml_engineer.py @@ -168,3 +168,14 @@ ML_MODULE_MAP = { "classification_model": "metagpt.tools.functions.libs.machine_learning.ml_model", "regression_model": "metagpt.tools.functions.libs.machine_learning.ml_model", } + +STRUCTURAL_CONTEXT = """ +## User Requirement +{user_requirement} +## Data Description +{data_desc} +## Current Plan +{tasks} +## Current Task +{current_task} +""" diff --git a/metagpt/roles/kaggle_manager.py b/metagpt/roles/kaggle_manager.py index e902d99a0..d20769b92 100644 --- a/metagpt/roles/kaggle_manager.py +++ b/metagpt/roles/kaggle_manager.py @@ -5,16 +5,18 @@ import subprocess import fire import pandas as pd +from metagpt.config import CONFIG from metagpt.const import WORKSPACE_ROOT from metagpt.roles import Role from metagpt.actions import Action, BossRequirement -from metagpt.actions.write_analysis_code import AskReview, SummarizeAnalysis +from metagpt.actions.ml_da_action import AskReview, SummarizeAnalysis from metagpt.schema import Message, Task, Plan from metagpt.logs import logger +from metagpt.utils.common import CodeParser import os -os.environ["KAGGLE_USERNAME"] = "xxx" -os.environ["KAGGLE_KEY"] = "xxx" +os.environ["KAGGLE_USERNAME"] = CONFIG.kaggle_username +os.environ["KAGGLE_KEY"] = CONFIG.kaggle_key def run_command(cmd): print(cmd) @@ -38,6 +40,7 @@ class DownloadData(Action): # if not os.path.exists(data_path): if True: + # run_command(f"rm -r {data_path / '*'}") run_command(f"unzip -o {WORKSPACE_ROOT / '*.zip'} -d {data_path}") # FIXME: not safe file_list = run_command(f"ls {data_path}") @@ -52,24 +55,30 @@ class DownloadData(Action): class SubmitResult(Action): PROMPT_TEMPLATE = """ - # Context - {context} + # Summary + __summary__ # Your task - Extract the prediction file for test set, return only the path string, e.g., xxx.csv, xxx.xlsx + Extract the file path for test set prediction from the summary above, output a json following the format: + ```json + {"file_path": str = "the file path, for example, /path/to/the/prediction/file/xxx.csv, /path/to/the/prediction/file/xxx.xlsx"} + ``` """ def __init__(self, name: str = "", context=None, llm=None) -> str: super().__init__(name, context, llm) async def _parse_submit_file_path(self, context) -> str: - prompt = self.PROMPT_TEMPLATE.format(context=context) + prompt = self.PROMPT_TEMPLATE.replace("__summary__", context) rsp = await self._aask(prompt) - return rsp + rsp = CodeParser.parse_code(block=None, text=rsp) + file_path = json.loads(rsp)["file_path"] + return file_path async def run(self, competition, submit_message="") -> str: - submit_file_path = self._parse_submit_file_path(submit_message) + submit_file_path = await self._parse_submit_file_path(submit_message) data_path = WORKSPACE_ROOT / competition + submit_message = submit_message.replace("'", "") run_command(f"kaggle competitions submit {competition} -f {submit_file_path} -m '{submit_message}'") run_command(f"kaggle competitions leaderboard --show --csv {competition} > {data_path / 'leaderboard.csv'}") @@ -77,20 +86,20 @@ class SubmitResult(Action): leaderboard = pd.read_csv(data_path / 'leaderboard.csv') submission = pd.read_csv(data_path / 'submission.csv') - submission_score = submission.loc[0, "publicScore"] - submission_rank = leaderboard.loc[leaderboard["score"] == submission_score].index[0] - submission_rank_pct = round(submission_rank / len(leaderboard), 4) * 100 + print(submission) # submission.to_json(orient="records") - # best_score = max(submission["publicScore"]) - # best_rank = leaderboard.loc[leaderboard["score"] == best_score].index[0] + submission_score = submission.loc[0, "publicScore"] + best_score = max(submission["publicScore"]) # might be min + rank = leaderboard.loc[leaderboard["score"] == best_score].index[0] + rank_pct = round(rank / len(leaderboard), 4) * 100 submission_summary = f""" - ## All History - {submission.to_json(orient="records")} - ## Current - Current submission score: {submission_score}, rank: {submission_rank} (top {submission_rank_pct}%); + # All histories: + {submission.head(5).to_string()} + # Current + Current submission score: {submission_score}, best score: {best_score}, best rank: {rank} (top {rank_pct}%) """ - print(submission_summary) + logger.info(submission_summary) return submission_summary @@ -110,8 +119,6 @@ class KaggleManager(Role): self._set_state(0) # DownloadData, get competition of interest from human, download datasets elif observed == SummarizeAnalysis: self._set_state(1) # SubmitResult, get prediction from MLEngineer and submit it to Kaggle - elif observed == SubmitResult: - self._set_state(2) # AskReview, ask human for improvement async def _act(self): todo = self._rc.todo @@ -127,3 +134,19 @@ class KaggleManager(Role): msg = Message(content=rsp, role="user", cause_by=type(todo)) return msg + +if __name__ == "__main__": + competition, data_desc, requirement = ( + "titanic", + "Training set is train.csv.\nTest set is test.csv. We also include gender_submission.csv, a set of predictions that assume all and only female passengers survive, as an example of what a submission file should look like.", + "Run EDA on the train dataset, train a model to predict survival (20% as validation) and save it, predict the test set using saved model, save the test result according to format", + ) + + summary = "I used Python with pandas for data preprocessing, sklearn's RandomForestClassifier for modeling, and achieved 82.12% accuracy on validation. Predictions saved at '/Users/gary/Desktop/data_agents_opt/workspace/titanic/gender_submission.csv'." + + async def main(requirement: str = requirement): + role = KaggleManager(competition=competition, data_desc=data_desc) + # await role.run(Message(content="", cause_by=BossRequirement)) + await role.run(Message(content=summary, cause_by=SummarizeAnalysis)) + + fire.Fire(main) \ No newline at end of file diff --git a/metagpt/roles/ml_engineer.py b/metagpt/roles/ml_engineer.py index 1e4367372..4536395ba 100644 --- a/metagpt/roles/ml_engineer.py +++ b/metagpt/roles/ml_engineer.py @@ -7,55 +7,14 @@ import fire from metagpt.roles import Role from metagpt.actions import Action from metagpt.schema import Message, Task, Plan +from metagpt.memory import Memory from metagpt.logs import logger from metagpt.actions.write_plan import WritePlan from metagpt.actions.write_analysis_code import WriteCodeByGenerate, WriteCodeWithTools +from metagpt.actions.ml_da_action import AskReview, SummarizeAnalysis, Reflect, ReviewConst, truncate from metagpt.actions.execute_code import ExecutePyCode - -STRUCTURAL_CONTEXT = """ -## User Requirement -{user_requirement} -## Current Plan -{tasks} -## Current Task -{current_task} -""" - - -def truncate(result: str, keep_len: int = 1000) -> str: - desc = "Truncated to show only the last 1000 characters\n" - if result.startswith(desc): - result = result[-len(desc) :] - - if len(result) > keep_len: - result = result[-keep_len:] - - if not result.startswith(desc): - return desc + result - return desc - - -class AskReview(Action): - async def run(self, context: List[Message], plan: Plan = None): - logger.info("Current overall plan:") - logger.info( - "\n".join([f"{task.task_id}: {task.instruction}, is_finished: {task.is_finished}" for task in plan.tasks]) - ) - - logger.info("most recent context:") - latest_action = context[-1].cause_by.__name__ if context[-1].cause_by else "" - prompt = f"\nPlease review output from {latest_action}:\n" \ - "If you want to change a task in the plan, say 'change task task_id, ... (things to change)'\n" \ - "If you confirm the output and wish to continue with the current process, type CONFIRM\n" \ - "If you want to terminate the process, type exit:\n" - rsp = input(prompt) - - if rsp.lower() in ("exit"): - exit() - - confirmed = rsp.lower() in ("confirm", "yes", "y") - - return rsp, confirmed +from metagpt.roles.kaggle_manager import DownloadData, SubmitResult +from metagpt.prompts.ml_engineer import STRUCTURAL_CONTEXT class WriteTaskGuide(Action): @@ -69,13 +28,35 @@ class MLEngineer(Role): ): super().__init__(name=name, profile=profile, goal=goal) self._set_react_mode(react_mode="plan_and_act") + self._watch([DownloadData, SubmitResult]) + self.plan = Plan(goal=goal) self.use_tools = False self.use_task_guide = False self.execute_code = ExecutePyCode() self.auto_run = auto_run + # memory for working on each task, discarded each time a task is done + self.working_memory = Memory() + async def _plan_and_act(self): + + ### Actions in a multi-agent multi-turn setting ### + memories = self.get_memories() + if memories: + latest_event = memories[-1].cause_by + if latest_event == DownloadData: + self.plan.context = memories[-1].content + elif latest_event == SubmitResult: + # get feedback for improvement from human, add to working memory + await self._ask_review(trigger=ReviewConst.TASK_REVIEW_TRIGGER) + # self reflect on previous plan outcomes and think about how to improve the plan, add to working memory + prev_plan_outcomes = memories[-1].content + reflection = await Reflect().run(context=prev_plan_outcomes) + self.working_memory.add(Message(content=reflection, role="assistant")) + + + ### Common Procedure in both single- and multi-agent setting ### # create initial plan and update until confirmation await self._update_plan() @@ -87,7 +68,7 @@ class MLEngineer(Role): code, result, success = await self._write_and_exec_code() # ask for acceptance, users can other refuse and change tasks in the plan - task_result_confirmed = await self._ask_review() + review, task_result_confirmed = await self._ask_review(trigger=ReviewConst.TASK_REVIEW_TRIGGER) if success and task_result_confirmed: # tick off this task and record progress @@ -98,7 +79,16 @@ class MLEngineer(Role): else: # update plan according to user's feedback and to take on changed tasks - await self._update_plan() + await self._update_plan(review) + + completed_plan_memory = self.get_useful_memories() # completed plan as a outcome + self._rc.memory.add(completed_plan_memory[0]) # add to persistent memory + + summary = await SummarizeAnalysis().run(self.plan) + rsp = Message(content=summary, cause_by=SummarizeAnalysis) + self._rc.memory.add(rsp) + + return rsp async def _write_and_exec_code(self, max_retry: int = 3): task_guide = ( @@ -143,23 +133,28 @@ class MLEngineer(Role): if "!pip" in code: success = False - # if not success: - # await self._ask_review() counter += 1 + if not success and counter >= max_retry: + logger.info("coding failed!") + review, _ = await self._ask_review(auto_run=False, trigger=ReviewConst.CODE_REVIEW_TRIGGER) + if ReviewConst.CHANGE_WORD in review: + counter = 0 # redo the task again with help of human suggestions + return code, result, success - async def _ask_review(self): - if not self.auto_run: + async def _ask_review(self, auto_run: bool = None, trigger: str = ReviewConst.TASK_REVIEW_TRIGGER): + auto_run = auto_run or self.auto_run + if not auto_run: context = self.get_useful_memories() - review, confirmed = await AskReview().run(context=context[-5:], plan=self.plan) + review, confirmed = await AskReview().run(context=context[-5:], plan=self.plan, trigger=trigger) if not confirmed: self.working_memory.add(Message(content=review, role="user", cause_by=AskReview)) - return confirmed - return True + return review, confirmed + return "", True - async def _update_plan(self, max_tasks: int = 3): + async def _update_plan(self, review: str = "", max_tasks: int = 3): plan_confirmed = False while not plan_confirmed: context = self.get_useful_memories() @@ -167,30 +162,36 @@ class MLEngineer(Role): self.working_memory.add( Message(content=rsp, role="assistant", cause_by=WritePlan) ) - plan_confirmed = await self._ask_review() + + # TODO: precheck plan before asking reviews + + _, plan_confirmed = await self._ask_review(trigger=ReviewConst.TASK_REVIEW_TRIGGER) tasks = WritePlan.rsp_to_tasks(rsp) - self.plan.add_tasks(tasks) - self.working_memory.clear() + if len(tasks) == 1 and self.plan.has_task_id(tasks[0].task_id): + self.plan.replace_task(tasks[0]) + else: + self.plan.add_tasks(tasks) + self.working_memory.clear() def get_useful_memories(self) -> List[Message]: """find useful memories only to reduce context length and improve performance""" user_requirement = self.plan.goal + data_desc = self.plan.context tasks = json.dumps( [task.dict() for task in self.plan.tasks], indent=4, ensure_ascii=False ) current_task = self.plan.current_task.json() if self.plan.current_task else {} context = STRUCTURAL_CONTEXT.format( - user_requirement=user_requirement, tasks=tasks, current_task=current_task + user_requirement=user_requirement, data_desc=data_desc, tasks=tasks, current_task=current_task ) context_msg = [Message(content=context, role="user")] - return context_msg + self.working_memory.get() - - @property - def working_memory(self): - return self._rc.memory + return context_msg + self.get_working_memories() + + def get_working_memories(self) -> List[Message]: + return self.working_memory.get() if __name__ == "__main__": diff --git a/metagpt/schema.py b/metagpt/schema.py index 601bdcea2..9b86a2448 100644 --- a/metagpt/schema.py +++ b/metagpt/schema.py @@ -156,7 +156,49 @@ class Plan(BaseModel): # Update the task map for quick access to tasks by ID self.task_map = {task.task_id: task for task in self.tasks} + + def reset_task(self, task_id: str): + """ + Clear code and result of the task based on task_id, and set the task as unfinished. + Args: + task_id (str): The ID of the task to be reset. + + Returns: + None + """ + if task_id in self.task_map: + task = self.task_map[task_id] + task.code = "" + task.result = "" + task.is_finished = False + + def replace_task(self, new_task: Task): + """ + Replace an existing task with the new input task based on task_id, and reset all tasks depending on it. + + Args: + new_task (Task): The new task that will replace an existing one. + + Returns: + None + """ + if new_task.task_id in self.task_map: + # Replace the task in the task map and the task list + self.task_map[new_task.task_id] = new_task + for i, task in enumerate(self.tasks): + if task.task_id == new_task.task_id: + self.tasks[i] = new_task + break + + # Reset dependent tasks + for task in self.tasks: + if new_task.task_id in task.dependent_task_ids: + self.reset_task(task.task_id) + + def has_task_id(self, task_id: str) -> bool: + return task_id in self.task_map + @property def current_task(self) -> Task: """Find current task to execute diff --git a/tests/metagpt/test_schema.py b/tests/metagpt/test_schema.py index 8f65d3785..324a083ca 100644 --- a/tests/metagpt/test_schema.py +++ b/tests/metagpt/test_schema.py @@ -104,3 +104,42 @@ class TestPlan: finished_tasks = plan.get_finished_tasks() assert len(finished_tasks) == 1 assert finished_tasks[0].task_id == "1" + + def test_reset_task_existing(self): + plan = Plan(goal="") + task = Task(task_id="1", instruction="Do something", code="print('Hello')", result="Hello", finished=True) + plan.add_tasks([task]) + plan.reset_task("1") + reset_task = plan.task_map["1"] + assert reset_task.code == "" + assert reset_task.result == "" + assert not reset_task.is_finished + + def test_reset_task_non_existing(self): + plan = Plan(goal="") + task = Task(task_id="1", instruction="Do something", code="print('Hello')", result="Hello", finished=True) + plan.add_tasks([task]) + plan.reset_task("2") # Task with ID 2 does not exist + assert "1" in plan.task_map + assert "2" not in plan.task_map + + def test_replace_task_with_dependents(self): + plan = Plan(goal="") + tasks = [Task(task_id="1", instruction="First Task", finished=True), + Task(task_id="2", instruction="Second Task", dependent_task_ids=["1"], finished=True)] + plan.add_tasks(tasks) + new_task = Task(task_id="1", instruction="Updated First Task") + plan.replace_task(new_task) + assert plan.task_map["1"].instruction == "Updated First Task" + assert not plan.task_map["2"].is_finished # Dependent task should be reset + assert plan.task_map["2"].code == "" + assert plan.task_map["2"].result == "" + + def test_replace_task_non_existing(self): + plan = Plan(goal="") + task = Task(task_id="1", instruction="First Task") + plan.add_tasks([task]) + new_task = Task(task_id="2", instruction="New Task") + plan.replace_task(new_task) # Task with ID 2 does not exist in plan + assert "1" in plan.task_map + assert "2" not in plan.task_map From 8d7657f347d51feb3048d6774bdbe17308ecf2ee Mon Sep 17 00:00:00 2001 From: yzlin Date: Mon, 4 Dec 2023 14:29:47 +0800 Subject: [PATCH 3/5] update reflect on previous plan --- config/config.yaml | 4 ++-- kaggle_team.py | 7 ++++--- metagpt/actions/ml_da_action.py | 37 ++++++++++++++++++++------------- metagpt/roles/kaggle_manager.py | 4 ++-- metagpt/roles/ml_engineer.py | 19 +++++++++++------ 5 files changed, 44 insertions(+), 27 deletions(-) diff --git a/config/config.yaml b/config/config.yaml index 52a8eb036..bf998def7 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -96,5 +96,5 @@ MODEL_FOR_RESEARCHER_REPORT: gpt-3.5-turbo-16k PROMPT_FORMAT: json #json or markdown -KAGGLE_USERNAME: "" -KAGGLE_KEY: "" \ No newline at end of file +# KAGGLE_USERNAME: "" +# KAGGLE_KEY: "" \ No newline at end of file diff --git a/kaggle_team.py b/kaggle_team.py index 659c4a495..e8ab3ec41 100644 --- a/kaggle_team.py +++ b/kaggle_team.py @@ -13,20 +13,21 @@ async def main( # data_desc: str, # requirement: str, investment: float = 5.0, - n_round: int = 5, + n_round: int = 10, + auto_run: bool = False, ): competition, data_desc, requirement = ( "titanic", "Training set is train.csv.\nTest set is test.csv. We also include gender_submission.csv, a set of predictions that assume all and only female passengers survive, as an example of what a submission file should look like.", "Run EDA on the train dataset, train a model to predict survival (20% as validation) and save it, predict the test set using saved model, save the test result according to format", - # "generate a random prediction of the same shape as gender_submission.csv and save", + # "generate a random prediction, replace the Survived column of gender_submission.csv, and save the prediction to a new submission file", ) team = Team() team.hire( [ KaggleManager(competition=competition, data_desc=data_desc), - MLEngineer(goal=requirement), + MLEngineer(goal=requirement, auto_run=auto_run), ] ) diff --git a/metagpt/actions/ml_da_action.py b/metagpt/actions/ml_da_action.py index 9f903fd22..a4537dad9 100644 --- a/metagpt/actions/ml_da_action.py +++ b/metagpt/actions/ml_da_action.py @@ -3,6 +3,7 @@ from typing import Dict, List, Union from metagpt.actions import Action from metagpt.schema import Message, Plan +from metagpt.utils.common import CodeParser from metagpt.logs import logger @@ -98,22 +99,30 @@ class SummarizeAnalysis(Action): class Reflect(Action): PROMPT_TEMPLATE = """ - # User Requirement - {user_requirement} # Context - {context} + __context__ + # Latest User Requirement + __user_requirement__ # Summary Above is all your attempts to tackle the user requirement. You plan, act, submit your output, and get the result and feedback. - First, summarize each of your previous trial in a triple of (your methods, the corresponding result, potential improvement), list them out. - # Takeaways - Second, carefully find key takeaways from your summarization in a step-by-step thinking process - # Guidance - Finally, make a concise one-sentence guidance for improving your future plan. - Your response: + Output a json following the format: + ```json + { + "summary": str = "summarize each of your previous trial in a triple of (your methods, the corresponding result, potential improvement), list them out", + "takeaways": str = "carefully find key takeaways from your summarization in a step-by-step thinking process", + "reflection": "in one sentence, state executable actions for improving your future plan", + } + ``` """ + REWRITE_PLAN_INSTRUCTION = """When taking this reflection for rewriting plan, modify the current plan in place, replace, add, or delete tasks in the plan, + only make necessary change to the current plan, keep reusable tasks unchanged, provide the complete new plan.""" - async def run(self, context: str) -> str: - user_requirement = "Score as high as possible in a data modeling competition" - prompt = self.PROMPT_TEMPLATE.format(context=context, user_requirement=user_requirement) - rsp = await self._aask(prompt) - return rsp + async def run(self, context: str, user_requirement: str = "") -> str: + user_requirement = user_requirement or "Score as high as possible in a data modeling competition" + # prompt = self.PROMPT_TEMPLATE.format(context=context, user_requirement=user_requirement) + prompt = self.PROMPT_TEMPLATE.replace("__context__", context).replace("__user_requirement__", user_requirement) + rsp_json = await self._aask(prompt) + rsp = CodeParser.parse_code(block=None, text=rsp_json) + reflection = json.loads(rsp)["reflection"] + reflection += self.REWRITE_PLAN_INSTRUCTION + return reflection diff --git a/metagpt/roles/kaggle_manager.py b/metagpt/roles/kaggle_manager.py index d20769b92..354289975 100644 --- a/metagpt/roles/kaggle_manager.py +++ b/metagpt/roles/kaggle_manager.py @@ -38,8 +38,8 @@ class DownloadData(Action): run_command(f"kaggle competitions download {competition} --path {WORKSPACE_ROOT}") - # if not os.path.exists(data_path): - if True: + if not os.path.exists(data_path): + # if True: # run_command(f"rm -r {data_path / '*'}") run_command(f"unzip -o {WORKSPACE_ROOT / '*.zip'} -d {data_path}") # FIXME: not safe diff --git a/metagpt/roles/ml_engineer.py b/metagpt/roles/ml_engineer.py index 4536395ba..abd14c7fb 100644 --- a/metagpt/roles/ml_engineer.py +++ b/metagpt/roles/ml_engineer.py @@ -48,13 +48,11 @@ class MLEngineer(Role): if latest_event == DownloadData: self.plan.context = memories[-1].content elif latest_event == SubmitResult: + # self reflect on previous plan outcomes and think about how to improve the plan, add to working memory + await self._reflect() + # get feedback for improvement from human, add to working memory await self._ask_review(trigger=ReviewConst.TASK_REVIEW_TRIGGER) - # self reflect on previous plan outcomes and think about how to improve the plan, add to working memory - prev_plan_outcomes = memories[-1].content - reflection = await Reflect().run(context=prev_plan_outcomes) - self.working_memory.add(Message(content=reflection, role="assistant")) - ### Common Procedure in both single- and multi-agent setting ### # create initial plan and update until confirmation @@ -172,7 +170,16 @@ class MLEngineer(Role): self.plan.replace_task(tasks[0]) else: self.plan.add_tasks(tasks) - self.working_memory.clear() + self.working_memory.clear() + + async def _reflect(self): + context = self.get_memories() + context = "\n".join([str(msg) for msg in context]) + # print("*" * 10) + # print(context) + # print("*" * 10) + reflection = await Reflect().run(context=context) + self.working_memory.add(Message(content=reflection, role="assistant")) def get_useful_memories(self) -> List[Message]: """find useful memories only to reduce context length and improve performance""" From 4231e0a11e7775d22c35ec9f8f4dfc1a233cb925 Mon Sep 17 00:00:00 2001 From: yzlin Date: Mon, 11 Dec 2023 16:13:34 +0800 Subject: [PATCH 4/5] kaggle iterative trial done --- kaggle_team.py | 3 +- metagpt/actions/execute_code.py | 28 ++++++++++++++-- metagpt/actions/ml_da_action.py | 17 +++++----- metagpt/actions/write_plan.py | 38 ++++++++++++++++++---- metagpt/roles/kaggle_manager.py | 3 +- metagpt/roles/ml_engineer.py | 34 ++++++++++++++------ metagpt/schema.py | 39 +++++++++++++++++----- tests/metagpt/actions/test_write_plan.py | 20 ++++++------ tests/metagpt/test_schema.py | 41 ++++++++++++++++++++++++ 9 files changed, 178 insertions(+), 45 deletions(-) diff --git a/kaggle_team.py b/kaggle_team.py index e8ab3ec41..50a8f7288 100644 --- a/kaggle_team.py +++ b/kaggle_team.py @@ -19,8 +19,9 @@ async def main( competition, data_desc, requirement = ( "titanic", "Training set is train.csv.\nTest set is test.csv. We also include gender_submission.csv, a set of predictions that assume all and only female passengers survive, as an example of what a submission file should look like.", - "Run EDA on the train dataset, train a model to predict survival (20% as validation) and save it, predict the test set using saved model, save the test result according to format", + # "Run EDA on the train dataset, train a model to predict survival (20% as validation) and save it, predict the test set using saved model, save the test result according to format", # "generate a random prediction, replace the Survived column of gender_submission.csv, and save the prediction to a new submission file", + "Score as high as possible for the provided dataset, save the test prediction to a csv with two columns PassengerId and Survived" ) team = Team() diff --git a/metagpt/actions/execute_code.py b/metagpt/actions/execute_code.py index 981aa894c..9c2b8d96c 100644 --- a/metagpt/actions/execute_code.py +++ b/metagpt/actions/execute_code.py @@ -8,6 +8,7 @@ from abc import ABC, abstractmethod from pathlib import Path from typing import Dict, List, Tuple, Union import traceback +import re import nbformat from nbclient import NotebookClient @@ -171,11 +172,34 @@ class ExecutePyCode(ExecuteCode, Action): # TODO: add max_tries for run code. cell_index = len(self.nb.cells) - 1 await self.nb_client.async_execute_cell(self.nb.cells[-1], cell_index) - return self.parse_outputs(self.nb.cells[-1].outputs), True + outputs = self.parse_outputs(self.nb.cells[-1].outputs) + success = True except Exception as e: # FIXME: CellExecutionError is hard to read. for example `1\0` raise ZeroDivisionError: # CellExecutionError('An error occurred while executing the following cell:\n------------------\nz=1/0\n------------------\n\n\n\x1b[0;31m---------------------------------------------------------------------------\x1b[0m\n\x1b[0;31mZeroDivisionError\x1b[0m Traceback (most recent call last)\nCell \x1b[0;32mIn[1], line 1\x1b[0m\n\x1b[0;32m----> 1\x1b[0m z\x1b[38;5;241m=\x1b[39m\x1b[38;5;241;43m1\x1b[39;49m\x1b[38;5;241;43m/\x1b[39;49m\x1b[38;5;241;43m0\x1b[39;49m\n\n\x1b[0;31mZeroDivisionError\x1b[0m: division by zero\n') - return traceback.format_exc(), False + outputs = traceback.format_exc() + success = False + return truncate(remove_escape_and_color_codes(outputs)), success else: # TODO: markdown raise NotImplementedError(f"Not support this code type : {language}, Only support code!") + + +def truncate(result: str, keep_len: int = 2000) -> str: + desc = f"Truncated to show only the last {keep_len} characters\n" + if result.startswith(desc): + result = result[-len(desc) :] + + if len(result) > keep_len: + result = result[-keep_len:] + + if not result.startswith(desc): + return desc + result + return desc + + +def remove_escape_and_color_codes(input_str): + # 使用正则表达式去除转义字符和颜色代码 + pattern = re.compile(r'\x1b\[[0-9;]*[mK]') + result = pattern.sub('', input_str) + return result diff --git a/metagpt/actions/ml_da_action.py b/metagpt/actions/ml_da_action.py index a4537dad9..6be4b3040 100644 --- a/metagpt/actions/ml_da_action.py +++ b/metagpt/actions/ml_da_action.py @@ -7,8 +7,8 @@ from metagpt.utils.common import CodeParser from metagpt.logs import logger -def truncate(result: str, keep_len: int = 1000) -> str: - desc = "Truncated to show only the last 1000 characters\n" +def truncate(result: str, keep_len: int = 2000) -> str: + desc = "Truncated to show only the last keep_len characters\n" if result.startswith(desc): result = result[-len(desc) :] @@ -70,7 +70,9 @@ class AskReview(Action): if rsp.lower() in ReviewConst.EXIT_WORD: exit() - confirmed = rsp.lower() in ReviewConst.CONTINUE_WORD + # Confirmation can be one of "confirm", "continue", "c", "yes", "y" exactly, or sentences containing "confirm". + # One could say "confirm this task, but change the next task to ..." + confirmed = rsp.lower() in ReviewConst.CONTINUE_WORD or ReviewConst.CONTINUE_WORD[0] in rsp.lower() return rsp, confirmed @@ -109,13 +111,13 @@ class Reflect(Action): ```json { "summary": str = "summarize each of your previous trial in a triple of (your methods, the corresponding result, potential improvement), list them out", - "takeaways": str = "carefully find key takeaways from your summarization in a step-by-step thinking process", - "reflection": "in one sentence, state executable actions for improving your future plan", + "takeaways": str = "carefully find key takeaways from your summarization", + "reflection": str = "give specific instruction to improve your next trial in a step-by-step thinking process", } ``` """ - REWRITE_PLAN_INSTRUCTION = """When taking this reflection for rewriting plan, modify the current plan in place, replace, add, or delete tasks in the plan, - only make necessary change to the current plan, keep reusable tasks unchanged, provide the complete new plan.""" + REWRITE_PLAN_INSTRUCTION = """Take this reflection for rewriting plan, modify the current plan in place, make reference to your specific instruction, think about you should + change which task, add or delete what tasks in the plan. Only make necessary changes, keep reusable tasks unchanged, output the COMPLETE new plan starting from the first task. Your plan should have no more than 5 tasks.""" async def run(self, context: str, user_requirement: str = "") -> str: user_requirement = user_requirement or "Score as high as possible in a data modeling competition" @@ -124,5 +126,4 @@ class Reflect(Action): rsp_json = await self._aask(prompt) rsp = CodeParser.parse_code(block=None, text=rsp_json) reflection = json.loads(rsp)["reflection"] - reflection += self.REWRITE_PLAN_INSTRUCTION return reflection diff --git a/metagpt/actions/write_plan.py b/metagpt/actions/write_plan.py index 71133bb4d..f7ca1ff4c 100644 --- a/metagpt/actions/write_plan.py +++ b/metagpt/actions/write_plan.py @@ -4,12 +4,14 @@ @Author : orange-crow @File : plan.py """ -from typing import List, Dict +from typing import List, Dict, Tuple import json +from copy import deepcopy +import traceback from metagpt.actions import Action from metagpt.prompts.ml_engineer import ASSIGN_TASK_TYPE_PROMPT, ASSIGN_TASK_TYPE -from metagpt.schema import Message, Task +from metagpt.schema import Message, Task, Plan from metagpt.utils.common import CodeParser, create_func_config @@ -67,8 +69,30 @@ class WritePlan(Action): rsp = await self.assign_task_type(json.loads(rsp)) return rsp - @staticmethod - def rsp_to_tasks(rsp: str) -> List[Task]: - rsp = json.loads(rsp) - tasks = [Task(**task_config) for task_config in rsp] - return tasks +def rsp_to_tasks(rsp: str) -> List[Task]: + rsp = json.loads(rsp) + tasks = [Task(**task_config) for task_config in rsp] + return tasks + +def update_plan_from_rsp(rsp: str, current_plan: Plan): + tasks = rsp_to_tasks(rsp) + if len(tasks) == 1: + # handle a single task + if current_plan.has_task_id(tasks[0].task_id): + # replace an existing task + current_plan.replace_task(tasks[0]) + else: + # append one task + current_plan.append_task(tasks[0]) + + else: + # add tasks in general + current_plan.add_tasks(tasks) + +def precheck_update_plan_from_rsp(rsp: str, current_plan: Plan) -> Tuple[bool, str]: + temp_plan = deepcopy(current_plan) + try: + update_plan_from_rsp(rsp, temp_plan) + return True, "" + except Exception as e: + return False, e diff --git a/metagpt/roles/kaggle_manager.py b/metagpt/roles/kaggle_manager.py index 354289975..18ac6733a 100644 --- a/metagpt/roles/kaggle_manager.py +++ b/metagpt/roles/kaggle_manager.py @@ -1,6 +1,7 @@ from typing import Dict, List, Union, Tuple import json import subprocess +import os import fire import pandas as pd @@ -14,7 +15,7 @@ from metagpt.schema import Message, Task, Plan from metagpt.logs import logger from metagpt.utils.common import CodeParser -import os + os.environ["KAGGLE_USERNAME"] = CONFIG.kaggle_username os.environ["KAGGLE_KEY"] = CONFIG.kaggle_key diff --git a/metagpt/roles/ml_engineer.py b/metagpt/roles/ml_engineer.py index 4e818ca3c..6e7331281 100644 --- a/metagpt/roles/ml_engineer.py +++ b/metagpt/roles/ml_engineer.py @@ -10,7 +10,7 @@ from metagpt.actions import Action from metagpt.schema import Message, Task, Plan from metagpt.memory import Memory from metagpt.logs import logger -from metagpt.actions.write_plan import WritePlan +from metagpt.actions.write_plan import WritePlan, update_plan_from_rsp, precheck_update_plan_from_rsp from metagpt.actions.write_analysis_code import WriteCodeByGenerate, WriteCodeWithTools from metagpt.actions.ml_da_action import AskReview, SummarizeAnalysis, Reflect, ReviewConst from metagpt.actions.execute_code import ExecutePyCode @@ -69,13 +69,24 @@ class MLEngineer(Role): # ask for acceptance, users can other refuse and change tasks in the plan review, task_result_confirmed = await self._ask_review(trigger=ReviewConst.TASK_REVIEW_TRIGGER) - if success and task_result_confirmed: + if task_result_confirmed: # tick off this task and record progress task.code = code task.result = result self.plan.finish_current_task() self.working_memory.clear() + confirmed_and_more = (ReviewConst.CONTINUE_WORD[0] in review.lower() + and review.lower() not in ReviewConst.CONTINUE_WORD[0]) # "confirm, ... (more content, such as changing downstream tasks)" + if confirmed_and_more: + self.working_memory.add(Message(content=review, role="user", cause_by=AskReview)) + await self._update_plan(review) + + elif "redo" in review: + # Ask the Role to redo this task with help of review feedback, + # useful when the code run is successful but the procedure or result is not what we want + continue + else: # update plan according to user's feedback and to take on changed tasks await self._update_plan(review) @@ -151,7 +162,7 @@ class MLEngineer(Role): return review, confirmed return "", True - async def _update_plan(self, review: str = "", max_tasks: int = 3): + async def _update_plan(self, review: str = "", max_tasks: int = 3, max_retries: int = 3): plan_confirmed = False while not plan_confirmed: context = self.get_useful_memories() @@ -162,15 +173,19 @@ class MLEngineer(Role): Message(content=rsp, role="assistant", cause_by=WritePlan) ) - # TODO: precheck plan before asking reviews + # precheck plan before asking reviews + is_plan_valid, error = precheck_update_plan_from_rsp(rsp, self.plan) + if not is_plan_valid and max_retries > 0: + error_msg = f"The generated plan is not valid with error: {error}, try regenerating, remember to generate either the whole plan or the single changed task only" + logger.warning(error_msg) + self.working_memory.add(Message(content=error_msg, role="assistant", cause_by=WritePlan)) + max_retries -= 1 + continue _, plan_confirmed = await self._ask_review(trigger=ReviewConst.TASK_REVIEW_TRIGGER) - tasks = WritePlan.rsp_to_tasks(rsp) - if len(tasks) == 1 and self.plan.has_task_id(tasks[0].task_id): - self.plan.replace_task(tasks[0]) - else: - self.plan.add_tasks(tasks) + update_plan_from_rsp(rsp, self.plan) + self.working_memory.clear() async def _reflect(self): @@ -181,6 +196,7 @@ class MLEngineer(Role): # print("*" * 10) reflection = await Reflect().run(context=context) self.working_memory.add(Message(content=reflection, role="assistant")) + self.working_memory.add(Message(content=Reflect.REWRITE_PLAN_INSTRUCTION, role="user")) def get_useful_memories(self) -> List[Message]: """find useful memories only to reduce context length and improve performance""" diff --git a/metagpt/schema.py b/metagpt/schema.py index 9b86a2448..4e5e083ec 100644 --- a/metagpt/schema.py +++ b/metagpt/schema.py @@ -149,10 +149,7 @@ class Plan(BaseModel): self.tasks = final_tasks # Update current_task_id to the first unfinished task in the merged list - for task in self.tasks: - if not task.is_finished: - self.current_task_id = task.task_id - break + self._update_current_task() # Update the task map for quick access to tasks by ID self.task_map = {task.task_id: task for task in self.tasks} @@ -196,8 +193,36 @@ class Plan(BaseModel): if new_task.task_id in task.dependent_task_ids: self.reset_task(task.task_id) + def append_task(self, new_task: Task): + """ + Append a new task to the end of existing task sequences + + Args: + new_task (Task): The new task to be appended to the existing task sequence + + Returns: + None + """ + assert not self.has_task_id(new_task.task_id), "Task already in current plan, use replace_task instead" + + assert all([self.has_task_id(dep_id) for dep_id in new_task.dependent_task_ids]), \ + "New task has unknown dependencies" + + # Existing tasks do not depend on the new task, it's fine to put it to the end of the sorted task sequence + self.tasks.append(new_task) + self.task_map[new_task.task_id] = new_task + self._update_current_task() + def has_task_id(self, task_id: str) -> bool: return task_id in self.task_map + + def _update_current_task(self): + current_task_id = "" + for task in self.tasks: + if not task.is_finished: + current_task_id = task.task_id + break + self.current_task_id = current_task_id # all tasks finished @property def current_task(self) -> Task: @@ -212,10 +237,8 @@ class Plan(BaseModel): """Finish current task, set Task.is_finished=True, set current task to next task """ if self.current_task_id: - current_task = self.current_task - current_task.is_finished = True - next_task_index = self.tasks.index(current_task) + 1 - self.current_task_id = self.tasks[next_task_index].task_id if next_task_index < len(self.tasks) else None + self.current_task.is_finished = True + self._update_current_task() # set to next task def get_finished_tasks(self) -> list[Task]: """return all finished tasks in correct linearized order diff --git a/tests/metagpt/actions/test_write_plan.py b/tests/metagpt/actions/test_write_plan.py index 2bf200ab3..7766e0d51 100644 --- a/tests/metagpt/actions/test_write_plan.py +++ b/tests/metagpt/actions/test_write_plan.py @@ -1,13 +1,15 @@ import pytest -from metagpt.actions.write_plan import WritePlan +from metagpt.actions.write_plan import WritePlan, precheck_update_plan_from_rsp, Plan, Task +def test_precheck_update_plan_from_rsp(): + plan = Plan(goal="") + plan.add_tasks([Task(task_id="1")]) + rsp = '[{"task_id": "2"}]' + success, _ = precheck_update_plan_from_rsp(rsp, plan) + assert success + assert len(plan.tasks) == 1 and plan.tasks[0].task_id == "1" # precheck should not change the original one -@pytest.mark.asyncio -async def test_plan(): - p = WritePlan() - task_desc = """Here’s some background information on Cyclistic, a bike-sharing company designing a marketing strategy aimed at converting casual riders into annual members: So far, Cyclistic’s marketing strategy has relied on building general awareness and engaging a wide range of consumers. group. One way to help achieve these goals is the flexibility of its pricing plans: one-way passes, full-day passes, and annual memberships. Customers who purchase a one-way or full-day pass are known as recreational riders. Customers purchasing an annual membership are Cyclistic members. I will provide you with a data sheet that records user behavior: '/Users/vicis/Downloads/202103-divvy-tripdata.csv""" - rsp = await p.run(task_desc, role="data analyst") - assert len(rsp.content) > 0 - assert rsp.sent_from == "WritePlan" - print(rsp) + invalid_rsp = 'wrong' + success, _ = precheck_update_plan_from_rsp(invalid_rsp, plan) + assert not success diff --git a/tests/metagpt/test_schema.py b/tests/metagpt/test_schema.py index 324a083ca..b5d49b7a1 100644 --- a/tests/metagpt/test_schema.py +++ b/tests/metagpt/test_schema.py @@ -5,6 +5,7 @@ @Author : alexanderwu @File : test_schema.py """ +import pytest from metagpt.schema import AIMessage, Message, SystemMessage, UserMessage from metagpt.schema import Task, Plan @@ -143,3 +144,43 @@ class TestPlan: plan.replace_task(new_task) # Task with ID 2 does not exist in plan assert "1" in plan.task_map assert "2" not in plan.task_map + + def test_append_task_with_valid_dependencies(self): + plan = Plan(goal="Test") + existing_task = [Task(task_id="1")] + plan.add_tasks(existing_task) + new_task = Task(task_id="2", dependent_task_ids=["1"]) + plan.append_task(new_task) + assert plan.tasks[-1].task_id == "2" + assert plan.task_map["2"] == new_task + + def test_append_task_with_invalid_dependencies(self): + new_task = Task(task_id="2", dependent_task_ids=["3"]) + plan = Plan(goal="Test") + with pytest.raises(AssertionError): + plan.append_task(new_task) + + def test_append_task_without_dependencies(self): + plan = Plan(goal="Test") + existing_task = [Task(task_id="1")] + plan.add_tasks(existing_task) + + new_task = Task(task_id="2") + plan.append_task(new_task) + + assert len(plan.tasks) == 2 + assert plan.current_task_id == "1" + + def test_append_task_updates_current_task(self): + finished_task = Task(task_id="1", is_finished=True) + new_task = Task(task_id="2") + plan = Plan(goal="Test", tasks=[finished_task]) + plan.append_task(new_task) + assert plan.current_task_id == "2" + + def test_update_current_task(self): + task1 = Task(task_id="1", is_finished=True) + task2 = Task(task_id="2") + plan = Plan(goal="Test", tasks=[task1, task2]) + plan._update_current_task() + assert plan.current_task_id == "2" From 3847e672b1ad8ad4f6ca5c8a149f570c445b2e09 Mon Sep 17 00:00:00 2001 From: yzlin Date: Tue, 12 Dec 2023 14:20:15 +0800 Subject: [PATCH 5/5] rm redundant --- metagpt/actions/execute_code.py | 2 -- metagpt/actions/ml_da_action.py | 13 ------------- 2 files changed, 15 deletions(-) diff --git a/metagpt/actions/execute_code.py b/metagpt/actions/execute_code.py index 9c2b8d96c..1d20bf3f6 100644 --- a/metagpt/actions/execute_code.py +++ b/metagpt/actions/execute_code.py @@ -175,8 +175,6 @@ class ExecutePyCode(ExecuteCode, Action): outputs = self.parse_outputs(self.nb.cells[-1].outputs) success = True except Exception as e: - # FIXME: CellExecutionError is hard to read. for example `1\0` raise ZeroDivisionError: - # CellExecutionError('An error occurred while executing the following cell:\n------------------\nz=1/0\n------------------\n\n\n\x1b[0;31m---------------------------------------------------------------------------\x1b[0m\n\x1b[0;31mZeroDivisionError\x1b[0m Traceback (most recent call last)\nCell \x1b[0;32mIn[1], line 1\x1b[0m\n\x1b[0;32m----> 1\x1b[0m z\x1b[38;5;241m=\x1b[39m\x1b[38;5;241;43m1\x1b[39;49m\x1b[38;5;241;43m/\x1b[39;49m\x1b[38;5;241;43m0\x1b[39;49m\n\n\x1b[0;31mZeroDivisionError\x1b[0m: division by zero\n') outputs = traceback.format_exc() success = False return truncate(remove_escape_and_color_codes(outputs)), success diff --git a/metagpt/actions/ml_da_action.py b/metagpt/actions/ml_da_action.py index 6be4b3040..5e4580b17 100644 --- a/metagpt/actions/ml_da_action.py +++ b/metagpt/actions/ml_da_action.py @@ -7,19 +7,6 @@ from metagpt.utils.common import CodeParser from metagpt.logs import logger -def truncate(result: str, keep_len: int = 2000) -> str: - desc = "Truncated to show only the last keep_len characters\n" - if result.startswith(desc): - result = result[-len(desc) :] - - if len(result) > keep_len: - result = result[-keep_len:] - - if not result.startswith(desc): - return desc + result - return desc - - class ReviewConst: TASK_REVIEW_TRIGGER = "task" CODE_REVIEW_TRIGGER = "code"