From 4231e0a11e7775d22c35ec9f8f4dfc1a233cb925 Mon Sep 17 00:00:00 2001 From: yzlin Date: Mon, 11 Dec 2023 16:13:34 +0800 Subject: [PATCH] kaggle iterative trial done --- kaggle_team.py | 3 +- metagpt/actions/execute_code.py | 28 ++++++++++++++-- metagpt/actions/ml_da_action.py | 17 +++++----- metagpt/actions/write_plan.py | 38 ++++++++++++++++++---- metagpt/roles/kaggle_manager.py | 3 +- metagpt/roles/ml_engineer.py | 34 ++++++++++++++------ metagpt/schema.py | 39 +++++++++++++++++----- tests/metagpt/actions/test_write_plan.py | 20 ++++++------ tests/metagpt/test_schema.py | 41 ++++++++++++++++++++++++ 9 files changed, 178 insertions(+), 45 deletions(-) diff --git a/kaggle_team.py b/kaggle_team.py index e8ab3ec41..50a8f7288 100644 --- a/kaggle_team.py +++ b/kaggle_team.py @@ -19,8 +19,9 @@ async def main( competition, data_desc, requirement = ( "titanic", "Training set is train.csv.\nTest set is test.csv. We also include gender_submission.csv, a set of predictions that assume all and only female passengers survive, as an example of what a submission file should look like.", - "Run EDA on the train dataset, train a model to predict survival (20% as validation) and save it, predict the test set using saved model, save the test result according to format", + # "Run EDA on the train dataset, train a model to predict survival (20% as validation) and save it, predict the test set using saved model, save the test result according to format", # "generate a random prediction, replace the Survived column of gender_submission.csv, and save the prediction to a new submission file", + "Score as high as possible for the provided dataset, save the test prediction to a csv with two columns PassengerId and Survived" ) team = Team() diff --git a/metagpt/actions/execute_code.py b/metagpt/actions/execute_code.py index 981aa894c..9c2b8d96c 100644 --- a/metagpt/actions/execute_code.py +++ b/metagpt/actions/execute_code.py @@ -8,6 +8,7 @@ from abc import ABC, abstractmethod from pathlib import Path from typing import Dict, List, Tuple, Union import traceback +import re import nbformat from nbclient import NotebookClient @@ -171,11 +172,34 @@ class ExecutePyCode(ExecuteCode, Action): # TODO: add max_tries for run code. cell_index = len(self.nb.cells) - 1 await self.nb_client.async_execute_cell(self.nb.cells[-1], cell_index) - return self.parse_outputs(self.nb.cells[-1].outputs), True + outputs = self.parse_outputs(self.nb.cells[-1].outputs) + success = True except Exception as e: # FIXME: CellExecutionError is hard to read. for example `1\0` raise ZeroDivisionError: # CellExecutionError('An error occurred while executing the following cell:\n------------------\nz=1/0\n------------------\n\n\n\x1b[0;31m---------------------------------------------------------------------------\x1b[0m\n\x1b[0;31mZeroDivisionError\x1b[0m Traceback (most recent call last)\nCell \x1b[0;32mIn[1], line 1\x1b[0m\n\x1b[0;32m----> 1\x1b[0m z\x1b[38;5;241m=\x1b[39m\x1b[38;5;241;43m1\x1b[39;49m\x1b[38;5;241;43m/\x1b[39;49m\x1b[38;5;241;43m0\x1b[39;49m\n\n\x1b[0;31mZeroDivisionError\x1b[0m: division by zero\n') - return traceback.format_exc(), False + outputs = traceback.format_exc() + success = False + return truncate(remove_escape_and_color_codes(outputs)), success else: # TODO: markdown raise NotImplementedError(f"Not support this code type : {language}, Only support code!") + + +def truncate(result: str, keep_len: int = 2000) -> str: + desc = f"Truncated to show only the last {keep_len} characters\n" + if result.startswith(desc): + result = result[-len(desc) :] + + if len(result) > keep_len: + result = result[-keep_len:] + + if not result.startswith(desc): + return desc + result + return desc + + +def remove_escape_and_color_codes(input_str): + # 使用正则表达式去除转义字符和颜色代码 + pattern = re.compile(r'\x1b\[[0-9;]*[mK]') + result = pattern.sub('', input_str) + return result diff --git a/metagpt/actions/ml_da_action.py b/metagpt/actions/ml_da_action.py index a4537dad9..6be4b3040 100644 --- a/metagpt/actions/ml_da_action.py +++ b/metagpt/actions/ml_da_action.py @@ -7,8 +7,8 @@ from metagpt.utils.common import CodeParser from metagpt.logs import logger -def truncate(result: str, keep_len: int = 1000) -> str: - desc = "Truncated to show only the last 1000 characters\n" +def truncate(result: str, keep_len: int = 2000) -> str: + desc = "Truncated to show only the last keep_len characters\n" if result.startswith(desc): result = result[-len(desc) :] @@ -70,7 +70,9 @@ class AskReview(Action): if rsp.lower() in ReviewConst.EXIT_WORD: exit() - confirmed = rsp.lower() in ReviewConst.CONTINUE_WORD + # Confirmation can be one of "confirm", "continue", "c", "yes", "y" exactly, or sentences containing "confirm". + # One could say "confirm this task, but change the next task to ..." + confirmed = rsp.lower() in ReviewConst.CONTINUE_WORD or ReviewConst.CONTINUE_WORD[0] in rsp.lower() return rsp, confirmed @@ -109,13 +111,13 @@ class Reflect(Action): ```json { "summary": str = "summarize each of your previous trial in a triple of (your methods, the corresponding result, potential improvement), list them out", - "takeaways": str = "carefully find key takeaways from your summarization in a step-by-step thinking process", - "reflection": "in one sentence, state executable actions for improving your future plan", + "takeaways": str = "carefully find key takeaways from your summarization", + "reflection": str = "give specific instruction to improve your next trial in a step-by-step thinking process", } ``` """ - REWRITE_PLAN_INSTRUCTION = """When taking this reflection for rewriting plan, modify the current plan in place, replace, add, or delete tasks in the plan, - only make necessary change to the current plan, keep reusable tasks unchanged, provide the complete new plan.""" + REWRITE_PLAN_INSTRUCTION = """Take this reflection for rewriting plan, modify the current plan in place, make reference to your specific instruction, think about you should + change which task, add or delete what tasks in the plan. Only make necessary changes, keep reusable tasks unchanged, output the COMPLETE new plan starting from the first task. Your plan should have no more than 5 tasks.""" async def run(self, context: str, user_requirement: str = "") -> str: user_requirement = user_requirement or "Score as high as possible in a data modeling competition" @@ -124,5 +126,4 @@ class Reflect(Action): rsp_json = await self._aask(prompt) rsp = CodeParser.parse_code(block=None, text=rsp_json) reflection = json.loads(rsp)["reflection"] - reflection += self.REWRITE_PLAN_INSTRUCTION return reflection diff --git a/metagpt/actions/write_plan.py b/metagpt/actions/write_plan.py index 71133bb4d..f7ca1ff4c 100644 --- a/metagpt/actions/write_plan.py +++ b/metagpt/actions/write_plan.py @@ -4,12 +4,14 @@ @Author : orange-crow @File : plan.py """ -from typing import List, Dict +from typing import List, Dict, Tuple import json +from copy import deepcopy +import traceback from metagpt.actions import Action from metagpt.prompts.ml_engineer import ASSIGN_TASK_TYPE_PROMPT, ASSIGN_TASK_TYPE -from metagpt.schema import Message, Task +from metagpt.schema import Message, Task, Plan from metagpt.utils.common import CodeParser, create_func_config @@ -67,8 +69,30 @@ class WritePlan(Action): rsp = await self.assign_task_type(json.loads(rsp)) return rsp - @staticmethod - def rsp_to_tasks(rsp: str) -> List[Task]: - rsp = json.loads(rsp) - tasks = [Task(**task_config) for task_config in rsp] - return tasks +def rsp_to_tasks(rsp: str) -> List[Task]: + rsp = json.loads(rsp) + tasks = [Task(**task_config) for task_config in rsp] + return tasks + +def update_plan_from_rsp(rsp: str, current_plan: Plan): + tasks = rsp_to_tasks(rsp) + if len(tasks) == 1: + # handle a single task + if current_plan.has_task_id(tasks[0].task_id): + # replace an existing task + current_plan.replace_task(tasks[0]) + else: + # append one task + current_plan.append_task(tasks[0]) + + else: + # add tasks in general + current_plan.add_tasks(tasks) + +def precheck_update_plan_from_rsp(rsp: str, current_plan: Plan) -> Tuple[bool, str]: + temp_plan = deepcopy(current_plan) + try: + update_plan_from_rsp(rsp, temp_plan) + return True, "" + except Exception as e: + return False, e diff --git a/metagpt/roles/kaggle_manager.py b/metagpt/roles/kaggle_manager.py index 354289975..18ac6733a 100644 --- a/metagpt/roles/kaggle_manager.py +++ b/metagpt/roles/kaggle_manager.py @@ -1,6 +1,7 @@ from typing import Dict, List, Union, Tuple import json import subprocess +import os import fire import pandas as pd @@ -14,7 +15,7 @@ from metagpt.schema import Message, Task, Plan from metagpt.logs import logger from metagpt.utils.common import CodeParser -import os + os.environ["KAGGLE_USERNAME"] = CONFIG.kaggle_username os.environ["KAGGLE_KEY"] = CONFIG.kaggle_key diff --git a/metagpt/roles/ml_engineer.py b/metagpt/roles/ml_engineer.py index 4e818ca3c..6e7331281 100644 --- a/metagpt/roles/ml_engineer.py +++ b/metagpt/roles/ml_engineer.py @@ -10,7 +10,7 @@ from metagpt.actions import Action from metagpt.schema import Message, Task, Plan from metagpt.memory import Memory from metagpt.logs import logger -from metagpt.actions.write_plan import WritePlan +from metagpt.actions.write_plan import WritePlan, update_plan_from_rsp, precheck_update_plan_from_rsp from metagpt.actions.write_analysis_code import WriteCodeByGenerate, WriteCodeWithTools from metagpt.actions.ml_da_action import AskReview, SummarizeAnalysis, Reflect, ReviewConst from metagpt.actions.execute_code import ExecutePyCode @@ -69,13 +69,24 @@ class MLEngineer(Role): # ask for acceptance, users can other refuse and change tasks in the plan review, task_result_confirmed = await self._ask_review(trigger=ReviewConst.TASK_REVIEW_TRIGGER) - if success and task_result_confirmed: + if task_result_confirmed: # tick off this task and record progress task.code = code task.result = result self.plan.finish_current_task() self.working_memory.clear() + confirmed_and_more = (ReviewConst.CONTINUE_WORD[0] in review.lower() + and review.lower() not in ReviewConst.CONTINUE_WORD[0]) # "confirm, ... (more content, such as changing downstream tasks)" + if confirmed_and_more: + self.working_memory.add(Message(content=review, role="user", cause_by=AskReview)) + await self._update_plan(review) + + elif "redo" in review: + # Ask the Role to redo this task with help of review feedback, + # useful when the code run is successful but the procedure or result is not what we want + continue + else: # update plan according to user's feedback and to take on changed tasks await self._update_plan(review) @@ -151,7 +162,7 @@ class MLEngineer(Role): return review, confirmed return "", True - async def _update_plan(self, review: str = "", max_tasks: int = 3): + async def _update_plan(self, review: str = "", max_tasks: int = 3, max_retries: int = 3): plan_confirmed = False while not plan_confirmed: context = self.get_useful_memories() @@ -162,15 +173,19 @@ class MLEngineer(Role): Message(content=rsp, role="assistant", cause_by=WritePlan) ) - # TODO: precheck plan before asking reviews + # precheck plan before asking reviews + is_plan_valid, error = precheck_update_plan_from_rsp(rsp, self.plan) + if not is_plan_valid and max_retries > 0: + error_msg = f"The generated plan is not valid with error: {error}, try regenerating, remember to generate either the whole plan or the single changed task only" + logger.warning(error_msg) + self.working_memory.add(Message(content=error_msg, role="assistant", cause_by=WritePlan)) + max_retries -= 1 + continue _, plan_confirmed = await self._ask_review(trigger=ReviewConst.TASK_REVIEW_TRIGGER) - tasks = WritePlan.rsp_to_tasks(rsp) - if len(tasks) == 1 and self.plan.has_task_id(tasks[0].task_id): - self.plan.replace_task(tasks[0]) - else: - self.plan.add_tasks(tasks) + update_plan_from_rsp(rsp, self.plan) + self.working_memory.clear() async def _reflect(self): @@ -181,6 +196,7 @@ class MLEngineer(Role): # print("*" * 10) reflection = await Reflect().run(context=context) self.working_memory.add(Message(content=reflection, role="assistant")) + self.working_memory.add(Message(content=Reflect.REWRITE_PLAN_INSTRUCTION, role="user")) def get_useful_memories(self) -> List[Message]: """find useful memories only to reduce context length and improve performance""" diff --git a/metagpt/schema.py b/metagpt/schema.py index 9b86a2448..4e5e083ec 100644 --- a/metagpt/schema.py +++ b/metagpt/schema.py @@ -149,10 +149,7 @@ class Plan(BaseModel): self.tasks = final_tasks # Update current_task_id to the first unfinished task in the merged list - for task in self.tasks: - if not task.is_finished: - self.current_task_id = task.task_id - break + self._update_current_task() # Update the task map for quick access to tasks by ID self.task_map = {task.task_id: task for task in self.tasks} @@ -196,8 +193,36 @@ class Plan(BaseModel): if new_task.task_id in task.dependent_task_ids: self.reset_task(task.task_id) + def append_task(self, new_task: Task): + """ + Append a new task to the end of existing task sequences + + Args: + new_task (Task): The new task to be appended to the existing task sequence + + Returns: + None + """ + assert not self.has_task_id(new_task.task_id), "Task already in current plan, use replace_task instead" + + assert all([self.has_task_id(dep_id) for dep_id in new_task.dependent_task_ids]), \ + "New task has unknown dependencies" + + # Existing tasks do not depend on the new task, it's fine to put it to the end of the sorted task sequence + self.tasks.append(new_task) + self.task_map[new_task.task_id] = new_task + self._update_current_task() + def has_task_id(self, task_id: str) -> bool: return task_id in self.task_map + + def _update_current_task(self): + current_task_id = "" + for task in self.tasks: + if not task.is_finished: + current_task_id = task.task_id + break + self.current_task_id = current_task_id # all tasks finished @property def current_task(self) -> Task: @@ -212,10 +237,8 @@ class Plan(BaseModel): """Finish current task, set Task.is_finished=True, set current task to next task """ if self.current_task_id: - current_task = self.current_task - current_task.is_finished = True - next_task_index = self.tasks.index(current_task) + 1 - self.current_task_id = self.tasks[next_task_index].task_id if next_task_index < len(self.tasks) else None + self.current_task.is_finished = True + self._update_current_task() # set to next task def get_finished_tasks(self) -> list[Task]: """return all finished tasks in correct linearized order diff --git a/tests/metagpt/actions/test_write_plan.py b/tests/metagpt/actions/test_write_plan.py index 2bf200ab3..7766e0d51 100644 --- a/tests/metagpt/actions/test_write_plan.py +++ b/tests/metagpt/actions/test_write_plan.py @@ -1,13 +1,15 @@ import pytest -from metagpt.actions.write_plan import WritePlan +from metagpt.actions.write_plan import WritePlan, precheck_update_plan_from_rsp, Plan, Task +def test_precheck_update_plan_from_rsp(): + plan = Plan(goal="") + plan.add_tasks([Task(task_id="1")]) + rsp = '[{"task_id": "2"}]' + success, _ = precheck_update_plan_from_rsp(rsp, plan) + assert success + assert len(plan.tasks) == 1 and plan.tasks[0].task_id == "1" # precheck should not change the original one -@pytest.mark.asyncio -async def test_plan(): - p = WritePlan() - task_desc = """Here’s some background information on Cyclistic, a bike-sharing company designing a marketing strategy aimed at converting casual riders into annual members: So far, Cyclistic’s marketing strategy has relied on building general awareness and engaging a wide range of consumers. group. One way to help achieve these goals is the flexibility of its pricing plans: one-way passes, full-day passes, and annual memberships. Customers who purchase a one-way or full-day pass are known as recreational riders. Customers purchasing an annual membership are Cyclistic members. I will provide you with a data sheet that records user behavior: '/Users/vicis/Downloads/202103-divvy-tripdata.csv""" - rsp = await p.run(task_desc, role="data analyst") - assert len(rsp.content) > 0 - assert rsp.sent_from == "WritePlan" - print(rsp) + invalid_rsp = 'wrong' + success, _ = precheck_update_plan_from_rsp(invalid_rsp, plan) + assert not success diff --git a/tests/metagpt/test_schema.py b/tests/metagpt/test_schema.py index 324a083ca..b5d49b7a1 100644 --- a/tests/metagpt/test_schema.py +++ b/tests/metagpt/test_schema.py @@ -5,6 +5,7 @@ @Author : alexanderwu @File : test_schema.py """ +import pytest from metagpt.schema import AIMessage, Message, SystemMessage, UserMessage from metagpt.schema import Task, Plan @@ -143,3 +144,43 @@ class TestPlan: plan.replace_task(new_task) # Task with ID 2 does not exist in plan assert "1" in plan.task_map assert "2" not in plan.task_map + + def test_append_task_with_valid_dependencies(self): + plan = Plan(goal="Test") + existing_task = [Task(task_id="1")] + plan.add_tasks(existing_task) + new_task = Task(task_id="2", dependent_task_ids=["1"]) + plan.append_task(new_task) + assert plan.tasks[-1].task_id == "2" + assert plan.task_map["2"] == new_task + + def test_append_task_with_invalid_dependencies(self): + new_task = Task(task_id="2", dependent_task_ids=["3"]) + plan = Plan(goal="Test") + with pytest.raises(AssertionError): + plan.append_task(new_task) + + def test_append_task_without_dependencies(self): + plan = Plan(goal="Test") + existing_task = [Task(task_id="1")] + plan.add_tasks(existing_task) + + new_task = Task(task_id="2") + plan.append_task(new_task) + + assert len(plan.tasks) == 2 + assert plan.current_task_id == "1" + + def test_append_task_updates_current_task(self): + finished_task = Task(task_id="1", is_finished=True) + new_task = Task(task_id="2") + plan = Plan(goal="Test", tasks=[finished_task]) + plan.append_task(new_task) + assert plan.current_task_id == "2" + + def test_update_current_task(self): + task1 = Task(task_id="1", is_finished=True) + task2 = Task(task_id="2") + plan = Plan(goal="Test", tasks=[task1, task2]) + plan._update_current_task() + assert plan.current_task_id == "2"