kaggle iterative trial done

This commit is contained in:
yzlin 2023-12-11 16:13:34 +08:00
parent f7989b0ce0
commit 4231e0a11e
9 changed files with 178 additions and 45 deletions

View file

@ -19,8 +19,9 @@ async def main(
competition, data_desc, requirement = (
"titanic",
"Training set is train.csv.\nTest set is test.csv. We also include gender_submission.csv, a set of predictions that assume all and only female passengers survive, as an example of what a submission file should look like.",
"Run EDA on the train dataset, train a model to predict survival (20% as validation) and save it, predict the test set using saved model, save the test result according to format",
# "Run EDA on the train dataset, train a model to predict survival (20% as validation) and save it, predict the test set using saved model, save the test result according to format",
# "generate a random prediction, replace the Survived column of gender_submission.csv, and save the prediction to a new submission file",
"Score as high as possible for the provided dataset, save the test prediction to a csv with two columns PassengerId and Survived"
)
team = Team()

View file

@ -8,6 +8,7 @@ from abc import ABC, abstractmethod
from pathlib import Path
from typing import Dict, List, Tuple, Union
import traceback
import re
import nbformat
from nbclient import NotebookClient
@ -171,11 +172,34 @@ class ExecutePyCode(ExecuteCode, Action):
# TODO: add max_tries for run code.
cell_index = len(self.nb.cells) - 1
await self.nb_client.async_execute_cell(self.nb.cells[-1], cell_index)
return self.parse_outputs(self.nb.cells[-1].outputs), True
outputs = self.parse_outputs(self.nb.cells[-1].outputs)
success = True
except Exception as e:
# FIXME: CellExecutionError is hard to read. for example `1\0` raise ZeroDivisionError:
# CellExecutionError('An error occurred while executing the following cell:\n------------------\nz=1/0\n------------------\n\n\n\x1b[0;31m---------------------------------------------------------------------------\x1b[0m\n\x1b[0;31mZeroDivisionError\x1b[0m Traceback (most recent call last)\nCell \x1b[0;32mIn[1], line 1\x1b[0m\n\x1b[0;32m----> 1\x1b[0m z\x1b[38;5;241m=\x1b[39m\x1b[38;5;241;43m1\x1b[39;49m\x1b[38;5;241;43m/\x1b[39;49m\x1b[38;5;241;43m0\x1b[39;49m\n\n\x1b[0;31mZeroDivisionError\x1b[0m: division by zero\n')
return traceback.format_exc(), False
outputs = traceback.format_exc()
success = False
return truncate(remove_escape_and_color_codes(outputs)), success
else:
# TODO: markdown
raise NotImplementedError(f"Not support this code type : {language}, Only support code!")
def truncate(result: str, keep_len: int = 2000) -> str:
desc = f"Truncated to show only the last {keep_len} characters\n"
if result.startswith(desc):
result = result[-len(desc) :]
if len(result) > keep_len:
result = result[-keep_len:]
if not result.startswith(desc):
return desc + result
return desc
def remove_escape_and_color_codes(input_str):
# 使用正则表达式去除转义字符和颜色代码
pattern = re.compile(r'\x1b\[[0-9;]*[mK]')
result = pattern.sub('', input_str)
return result

View file

@ -7,8 +7,8 @@ from metagpt.utils.common import CodeParser
from metagpt.logs import logger
def truncate(result: str, keep_len: int = 1000) -> str:
desc = "Truncated to show only the last 1000 characters\n"
def truncate(result: str, keep_len: int = 2000) -> str:
desc = "Truncated to show only the last keep_len characters\n"
if result.startswith(desc):
result = result[-len(desc) :]
@ -70,7 +70,9 @@ class AskReview(Action):
if rsp.lower() in ReviewConst.EXIT_WORD:
exit()
confirmed = rsp.lower() in ReviewConst.CONTINUE_WORD
# Confirmation can be one of "confirm", "continue", "c", "yes", "y" exactly, or sentences containing "confirm".
# One could say "confirm this task, but change the next task to ..."
confirmed = rsp.lower() in ReviewConst.CONTINUE_WORD or ReviewConst.CONTINUE_WORD[0] in rsp.lower()
return rsp, confirmed
@ -109,13 +111,13 @@ class Reflect(Action):
```json
{
"summary": str = "summarize each of your previous trial in a triple of (your methods, the corresponding result, potential improvement), list them out",
"takeaways": str = "carefully find key takeaways from your summarization in a step-by-step thinking process",
"reflection": "in one sentence, state executable actions for improving your future plan",
"takeaways": str = "carefully find key takeaways from your summarization",
"reflection": str = "give specific instruction to improve your next trial in a step-by-step thinking process",
}
```
"""
REWRITE_PLAN_INSTRUCTION = """When taking this reflection for rewriting plan, modify the current plan in place, replace, add, or delete tasks in the plan,
only make necessary change to the current plan, keep reusable tasks unchanged, provide the complete new plan."""
REWRITE_PLAN_INSTRUCTION = """Take this reflection for rewriting plan, modify the current plan in place, make reference to your specific instruction, think about you should
change which task, add or delete what tasks in the plan. Only make necessary changes, keep reusable tasks unchanged, output the COMPLETE new plan starting from the first task. Your plan should have no more than 5 tasks."""
async def run(self, context: str, user_requirement: str = "") -> str:
user_requirement = user_requirement or "Score as high as possible in a data modeling competition"
@ -124,5 +126,4 @@ class Reflect(Action):
rsp_json = await self._aask(prompt)
rsp = CodeParser.parse_code(block=None, text=rsp_json)
reflection = json.loads(rsp)["reflection"]
reflection += self.REWRITE_PLAN_INSTRUCTION
return reflection

View file

@ -4,12 +4,14 @@
@Author : orange-crow
@File : plan.py
"""
from typing import List, Dict
from typing import List, Dict, Tuple
import json
from copy import deepcopy
import traceback
from metagpt.actions import Action
from metagpt.prompts.ml_engineer import ASSIGN_TASK_TYPE_PROMPT, ASSIGN_TASK_TYPE
from metagpt.schema import Message, Task
from metagpt.schema import Message, Task, Plan
from metagpt.utils.common import CodeParser, create_func_config
@ -67,8 +69,30 @@ class WritePlan(Action):
rsp = await self.assign_task_type(json.loads(rsp))
return rsp
@staticmethod
def rsp_to_tasks(rsp: str) -> List[Task]:
rsp = json.loads(rsp)
tasks = [Task(**task_config) for task_config in rsp]
return tasks
def rsp_to_tasks(rsp: str) -> List[Task]:
rsp = json.loads(rsp)
tasks = [Task(**task_config) for task_config in rsp]
return tasks
def update_plan_from_rsp(rsp: str, current_plan: Plan):
tasks = rsp_to_tasks(rsp)
if len(tasks) == 1:
# handle a single task
if current_plan.has_task_id(tasks[0].task_id):
# replace an existing task
current_plan.replace_task(tasks[0])
else:
# append one task
current_plan.append_task(tasks[0])
else:
# add tasks in general
current_plan.add_tasks(tasks)
def precheck_update_plan_from_rsp(rsp: str, current_plan: Plan) -> Tuple[bool, str]:
temp_plan = deepcopy(current_plan)
try:
update_plan_from_rsp(rsp, temp_plan)
return True, ""
except Exception as e:
return False, e

View file

@ -1,6 +1,7 @@
from typing import Dict, List, Union, Tuple
import json
import subprocess
import os
import fire
import pandas as pd
@ -14,7 +15,7 @@ from metagpt.schema import Message, Task, Plan
from metagpt.logs import logger
from metagpt.utils.common import CodeParser
import os
os.environ["KAGGLE_USERNAME"] = CONFIG.kaggle_username
os.environ["KAGGLE_KEY"] = CONFIG.kaggle_key

View file

@ -10,7 +10,7 @@ from metagpt.actions import Action
from metagpt.schema import Message, Task, Plan
from metagpt.memory import Memory
from metagpt.logs import logger
from metagpt.actions.write_plan import WritePlan
from metagpt.actions.write_plan import WritePlan, update_plan_from_rsp, precheck_update_plan_from_rsp
from metagpt.actions.write_analysis_code import WriteCodeByGenerate, WriteCodeWithTools
from metagpt.actions.ml_da_action import AskReview, SummarizeAnalysis, Reflect, ReviewConst
from metagpt.actions.execute_code import ExecutePyCode
@ -69,13 +69,24 @@ class MLEngineer(Role):
# ask for acceptance, users can other refuse and change tasks in the plan
review, task_result_confirmed = await self._ask_review(trigger=ReviewConst.TASK_REVIEW_TRIGGER)
if success and task_result_confirmed:
if task_result_confirmed:
# tick off this task and record progress
task.code = code
task.result = result
self.plan.finish_current_task()
self.working_memory.clear()
confirmed_and_more = (ReviewConst.CONTINUE_WORD[0] in review.lower()
and review.lower() not in ReviewConst.CONTINUE_WORD[0]) # "confirm, ... (more content, such as changing downstream tasks)"
if confirmed_and_more:
self.working_memory.add(Message(content=review, role="user", cause_by=AskReview))
await self._update_plan(review)
elif "redo" in review:
# Ask the Role to redo this task with help of review feedback,
# useful when the code run is successful but the procedure or result is not what we want
continue
else:
# update plan according to user's feedback and to take on changed tasks
await self._update_plan(review)
@ -151,7 +162,7 @@ class MLEngineer(Role):
return review, confirmed
return "", True
async def _update_plan(self, review: str = "", max_tasks: int = 3):
async def _update_plan(self, review: str = "", max_tasks: int = 3, max_retries: int = 3):
plan_confirmed = False
while not plan_confirmed:
context = self.get_useful_memories()
@ -162,15 +173,19 @@ class MLEngineer(Role):
Message(content=rsp, role="assistant", cause_by=WritePlan)
)
# TODO: precheck plan before asking reviews
# precheck plan before asking reviews
is_plan_valid, error = precheck_update_plan_from_rsp(rsp, self.plan)
if not is_plan_valid and max_retries > 0:
error_msg = f"The generated plan is not valid with error: {error}, try regenerating, remember to generate either the whole plan or the single changed task only"
logger.warning(error_msg)
self.working_memory.add(Message(content=error_msg, role="assistant", cause_by=WritePlan))
max_retries -= 1
continue
_, plan_confirmed = await self._ask_review(trigger=ReviewConst.TASK_REVIEW_TRIGGER)
tasks = WritePlan.rsp_to_tasks(rsp)
if len(tasks) == 1 and self.plan.has_task_id(tasks[0].task_id):
self.plan.replace_task(tasks[0])
else:
self.plan.add_tasks(tasks)
update_plan_from_rsp(rsp, self.plan)
self.working_memory.clear()
async def _reflect(self):
@ -181,6 +196,7 @@ class MLEngineer(Role):
# print("*" * 10)
reflection = await Reflect().run(context=context)
self.working_memory.add(Message(content=reflection, role="assistant"))
self.working_memory.add(Message(content=Reflect.REWRITE_PLAN_INSTRUCTION, role="user"))
def get_useful_memories(self) -> List[Message]:
"""find useful memories only to reduce context length and improve performance"""

View file

@ -149,10 +149,7 @@ class Plan(BaseModel):
self.tasks = final_tasks
# Update current_task_id to the first unfinished task in the merged list
for task in self.tasks:
if not task.is_finished:
self.current_task_id = task.task_id
break
self._update_current_task()
# Update the task map for quick access to tasks by ID
self.task_map = {task.task_id: task for task in self.tasks}
@ -196,8 +193,36 @@ class Plan(BaseModel):
if new_task.task_id in task.dependent_task_ids:
self.reset_task(task.task_id)
def append_task(self, new_task: Task):
"""
Append a new task to the end of existing task sequences
Args:
new_task (Task): The new task to be appended to the existing task sequence
Returns:
None
"""
assert not self.has_task_id(new_task.task_id), "Task already in current plan, use replace_task instead"
assert all([self.has_task_id(dep_id) for dep_id in new_task.dependent_task_ids]), \
"New task has unknown dependencies"
# Existing tasks do not depend on the new task, it's fine to put it to the end of the sorted task sequence
self.tasks.append(new_task)
self.task_map[new_task.task_id] = new_task
self._update_current_task()
def has_task_id(self, task_id: str) -> bool:
return task_id in self.task_map
def _update_current_task(self):
current_task_id = ""
for task in self.tasks:
if not task.is_finished:
current_task_id = task.task_id
break
self.current_task_id = current_task_id # all tasks finished
@property
def current_task(self) -> Task:
@ -212,10 +237,8 @@ class Plan(BaseModel):
"""Finish current task, set Task.is_finished=True, set current task to next task
"""
if self.current_task_id:
current_task = self.current_task
current_task.is_finished = True
next_task_index = self.tasks.index(current_task) + 1
self.current_task_id = self.tasks[next_task_index].task_id if next_task_index < len(self.tasks) else None
self.current_task.is_finished = True
self._update_current_task() # set to next task
def get_finished_tasks(self) -> list[Task]:
"""return all finished tasks in correct linearized order

View file

@ -1,13 +1,15 @@
import pytest
from metagpt.actions.write_plan import WritePlan
from metagpt.actions.write_plan import WritePlan, precheck_update_plan_from_rsp, Plan, Task
def test_precheck_update_plan_from_rsp():
plan = Plan(goal="")
plan.add_tasks([Task(task_id="1")])
rsp = '[{"task_id": "2"}]'
success, _ = precheck_update_plan_from_rsp(rsp, plan)
assert success
assert len(plan.tasks) == 1 and plan.tasks[0].task_id == "1" # precheck should not change the original one
@pytest.mark.asyncio
async def test_plan():
p = WritePlan()
task_desc = """Heres some background information on Cyclistic, a bike-sharing company designing a marketing strategy aimed at converting casual riders into annual members: So far, Cyclistics marketing strategy has relied on building general awareness and engaging a wide range of consumers. group. One way to help achieve these goals is the flexibility of its pricing plans: one-way passes, full-day passes, and annual memberships. Customers who purchase a one-way or full-day pass are known as recreational riders. Customers purchasing an annual membership are Cyclistic members. I will provide you with a data sheet that records user behavior: '/Users/vicis/Downloads/202103-divvy-tripdata.csv"""
rsp = await p.run(task_desc, role="data analyst")
assert len(rsp.content) > 0
assert rsp.sent_from == "WritePlan"
print(rsp)
invalid_rsp = 'wrong'
success, _ = precheck_update_plan_from_rsp(invalid_rsp, plan)
assert not success

View file

@ -5,6 +5,7 @@
@Author : alexanderwu
@File : test_schema.py
"""
import pytest
from metagpt.schema import AIMessage, Message, SystemMessage, UserMessage
from metagpt.schema import Task, Plan
@ -143,3 +144,43 @@ class TestPlan:
plan.replace_task(new_task) # Task with ID 2 does not exist in plan
assert "1" in plan.task_map
assert "2" not in plan.task_map
def test_append_task_with_valid_dependencies(self):
plan = Plan(goal="Test")
existing_task = [Task(task_id="1")]
plan.add_tasks(existing_task)
new_task = Task(task_id="2", dependent_task_ids=["1"])
plan.append_task(new_task)
assert plan.tasks[-1].task_id == "2"
assert plan.task_map["2"] == new_task
def test_append_task_with_invalid_dependencies(self):
new_task = Task(task_id="2", dependent_task_ids=["3"])
plan = Plan(goal="Test")
with pytest.raises(AssertionError):
plan.append_task(new_task)
def test_append_task_without_dependencies(self):
plan = Plan(goal="Test")
existing_task = [Task(task_id="1")]
plan.add_tasks(existing_task)
new_task = Task(task_id="2")
plan.append_task(new_task)
assert len(plan.tasks) == 2
assert plan.current_task_id == "1"
def test_append_task_updates_current_task(self):
finished_task = Task(task_id="1", is_finished=True)
new_task = Task(task_id="2")
plan = Plan(goal="Test", tasks=[finished_task])
plan.append_task(new_task)
assert plan.current_task_id == "2"
def test_update_current_task(self):
task1 = Task(task_id="1", is_finished=True)
task2 = Task(task_id="2")
plan = Plan(goal="Test", tasks=[task1, task2])
plan._update_current_task()
assert plan.current_task_id == "2"