fix task type issue; add TaskResult data type

This commit is contained in:
yzlin 2024-01-09 16:54:36 +08:00
parent 0b6b3a0df6
commit 851ec41380
8 changed files with 49 additions and 44 deletions

View file

@ -10,7 +10,7 @@ from copy import deepcopy
import traceback
from metagpt.actions import Action
from metagpt.prompts.ml_engineer import ASSIGN_TASK_TYPE_PROMPT, ASSIGN_TASK_TYPE
from metagpt.prompts.ml_engineer import ASSIGN_TASK_TYPE_PROMPT, ASSIGN_TASK_TYPE_CONFIG
from metagpt.schema import Message, Task, Plan
from metagpt.utils.common import CodeParser, create_func_config
from metagpt.logs import logger
@ -50,7 +50,7 @@ class WritePlan(Action):
[f"Task {task['task_id']}: {task['instruction']}" for task in tasks]
)
prompt = ASSIGN_TASK_TYPE_PROMPT.format(task_list=task_list)
tool_config = create_func_config(ASSIGN_TASK_TYPE)
tool_config = create_func_config(ASSIGN_TASK_TYPE_CONFIG)
rsp = await self.llm.aask_code(prompt, **tool_config)
task_type_list = rsp["task_type"]
for task, task_type in zip(tasks, task_type_list):

View file

@ -2,7 +2,7 @@ import json
from metagpt.logs import logger
from metagpt.memory import Memory
from metagpt.schema import Message, Plan, Task
from metagpt.schema import Message, Plan, Task, TaskResult
from metagpt.actions.ask_review import AskReview, ReviewConst
from metagpt.actions.write_plan import WritePlan, update_plan_from_rsp, precheck_update_plan_from_rsp
@ -20,9 +20,10 @@ STRUCTURAL_CONTEXT = """
class Planner:
def __init__(self, goal: str, working_memory: Memory, auto_run: bool = False):
def __init__(self, goal: str, working_memory: Memory, auto_run: bool = False, use_tools: bool = False):
self.plan = Plan(goal=goal)
self.auto_run = auto_run
self.use_tools = use_tools
# memory for working on each task, discarded each time a task is done
self.working_memory = working_memory
@ -35,7 +36,7 @@ class Planner:
def current_task_id(self):
return self.plan.current_task_id
async def ask_review(self, task_to_review: Task = None, auto_run: bool = None, trigger: str = ReviewConst.TASK_REVIEW_TRIGGER):
async def ask_review(self, task_result: TaskResult = None, auto_run: bool = None, trigger: str = ReviewConst.TASK_REVIEW_TRIGGER):
"""
Ask to review the task result, reviewer needs to provide confirmation or request change.
If human confirms the task result, then we deem the task completed, regardless of whether the code run succeeds;
@ -48,12 +49,11 @@ class Planner:
if not confirmed:
self.working_memory.add(Message(content=review, role="user", cause_by=AskReview))
return review, confirmed
confirmed = task_to_review.is_success if task_to_review else True
confirmed = task_result.is_success if task_result else True
return "", confirmed
async def confirm_task(self, task, updated_task, review):
assert updated_task.task_id == task.task_id
self.plan.replace_task(updated_task)
async def confirm_task(self, task: Task, task_result: TaskResult, review: str):
self.plan.update_task_result(task=task, task_result=task_result)
self.plan.finish_current_task()
self.working_memory.clear()
@ -63,13 +63,11 @@ class Planner:
self.working_memory.add(Message(content=review, role="user", cause_by=AskReview))
await self.update_plan(review)
async def update_plan(self, review: str = "", max_tasks: int = 3, max_retries: int = 3, **kwargs):
async def update_plan(self, max_tasks: int = 3, max_retries: int = 3):
plan_confirmed = False
while not plan_confirmed:
context = self.get_useful_memories()
rsp = await WritePlan().run(
context, max_tasks=max_tasks, **kwargs
)
rsp = await WritePlan().run(context, max_tasks=max_tasks, use_tools=self.use_tools)
self.working_memory.add(
Message(content=rsp, role="assistant", cause_by=WritePlan)
)
@ -85,10 +83,10 @@ class Planner:
_, plan_confirmed = await self.ask_review(trigger=ReviewConst.TASK_REVIEW_TRIGGER)
update_plan_from_rsp(rsp, self.plan)
update_plan_from_rsp(rsp=rsp, current_plan=self.plan)
self.working_memory.clear()
def get_useful_memories(self, task_exclude_field=None) -> list[Message]:
"""find useful memories only to reduce context length and improve performance"""
# TODO dataset description , code steps

View file

@ -61,7 +61,7 @@ Please assign a task type to each task in the list below from the given categori
- **other**: Any tasks that do not fit into the previous categories, such as visualization, summarizing findings, etc.
"""
ASSIGN_TASK_TYPE = {
ASSIGN_TASK_TYPE_CONFIG = {
"name": "assign_task_type",
"description": "Assign task type to each task by order.",
"parameters": {

View file

@ -6,16 +6,16 @@ from metagpt.actions.ask_review import ReviewConst
from metagpt.actions.write_analysis_code import WriteCodeByGenerate
from metagpt.logs import logger
from metagpt.roles import Role
from metagpt.schema import Message, Task
from metagpt.schema import Message, Task, TaskResult
from metagpt.utils.save_code import save_code_file
class CodeInterpreter(Role):
def __init__(
self, name="Charlie", profile="CodeInterpreter", goal="", auto_run=False,
self, name="Charlie", profile="CodeInterpreter", goal="", auto_run=False, use_tools=False,
):
super().__init__(name=name, profile=profile, goal=goal)
self._set_react_mode(react_mode="plan_and_act", auto_run=auto_run)
self._set_react_mode(react_mode="plan_and_act", auto_run=auto_run, use_tools=use_tools)
self.execute_code = ExecutePyCode()
@property
@ -32,13 +32,10 @@ class CodeInterpreter(Role):
return rsp
async def _act_on_task(self, current_task) -> Task:
code, result, success = await self._write_and_exec_code()
task_copy_with_result = current_task.copy(
update={"code": code, "result": result, "is_success": success},
deep=True
)
return task_copy_with_result
async def _act_on_task(self, current_task: Task) -> TaskResult:
code, result, is_success = await self._write_and_exec_code()
task_result = TaskResult(code=code, result=result, is_success=is_success)
return task_result
async def _write_and_exec_code(self, max_retry: int = 3):

View file

@ -20,7 +20,7 @@ class MLEngineer(CodeInterpreter):
self, name="Mark", profile="MLEngineer", goal="", auto_run=False, use_tools=False, use_code_steps=False,
make_udfs=False, use_udfs=False
):
super().__init__(name=name, profile=profile, goal=goal, auto_run=auto_run)
super().__init__(name=name, profile=profile, goal=goal, auto_run=auto_run, use_tools=use_tools)
self._watch([DownloadData, SubmitResult])
self.use_tools = use_tools

View file

@ -18,7 +18,7 @@ from metagpt.actions import Action, ActionOutput
from metagpt.llm import LLM, HumanProvider
from metagpt.logs import logger
from metagpt.memory import Memory, LongTermMemory
from metagpt.schema import Message, Task
from metagpt.schema import Message, Task, TaskResult
from metagpt.plan.planner import Planner
PREFIX_TEMPLATE = """You are a {profile}, named {name}, your goal is {goal}, and the constraint is {constraints}. """
@ -137,7 +137,7 @@ class Role:
self._actions.append(i)
self._states.append(f"{idx}. {action}")
def _set_react_mode(self, react_mode: str, max_react_loop: int = 1, auto_run: bool = True):
def _set_react_mode(self, react_mode: str, max_react_loop: int = 1, auto_run: bool = True, use_tools: bool = False):
"""Set strategy of the Role reacting to observed Message. Variation lies in how
this Role elects action to perform during the _think stage, especially if it is capable of multiple Actions.
@ -158,7 +158,7 @@ class Role:
if react_mode == RoleReactMode.REACT:
self._rc.max_react_loop = max_react_loop
elif react_mode == RoleReactMode.PLAN_AND_ACT:
self.planner = Planner(goal=self._setting.goal, working_memory=self._rc.working_memory, auto_run=auto_run)
self.planner = Planner(goal=self._setting.goal, working_memory=self._rc.working_memory, auto_run=auto_run, use_tools=use_tools)
def _watch(self, actions: Iterable[Type[Action]]):
"""Listen to the corresponding behaviors"""
@ -285,18 +285,19 @@ class Role:
await self.planner.update_plan()
while self.planner.current_task:
task = self.planner.current_task
logger.info(f"ready to take on task {task}")
# take on current task
task_copy_with_result = await self._act_on_task(task)
task_result = await self._act_on_task(task)
# ask for acceptance, users can other refuse and change tasks in the plan
review, task_result_confirmed = await self.planner.ask_review(task_copy_with_result)
review, task_result_confirmed = await self.planner.ask_review(task_result)
if task_result_confirmed:
# tick off this task and record progress
await self.planner.confirm_task(task, task_copy_with_result, review)
await self.planner.confirm_task(task, task_result, review)
elif "redo" in review:
# Ask the Role to redo this task with help of review feedback,
@ -315,7 +316,7 @@ class Role:
return rsp
async def _act_on_task(self, current_task: Task) -> Task:
async def _act_on_task(self, current_task: Task) -> TaskResult:
"""Taking specific action to handle one task in plan
Args:
@ -325,7 +326,7 @@ class Role:
NotImplementedError: Specific Role must implement this method if expected to use planner
Returns:
Task: A copy of the current task with result from actions
TaskResult: Result from the actions
"""
raise NotImplementedError

View file

@ -85,6 +85,14 @@ class Task(BaseModel):
is_finished: bool = False
class TaskResult(BaseModel):
"""Result of taking a task, with result and is_success required to be filled"""
code_steps: str = ""
code: str = ""
result: str
is_success: bool
class Plan(BaseModel):
goal: str
context: str = ""
@ -215,6 +223,12 @@ class Plan(BaseModel):
self.tasks.append(new_task)
self.task_map[new_task.task_id] = new_task
self._update_current_task()
def update_task_result(self, task: Task, task_result: TaskResult):
task.code_steps = task_result.code_steps
task.code = task_result.code
task.result = task_result.result
task.is_success = task_result.is_success
def has_task_id(self, task_id: str) -> bool:
return task_id in self.task_map

View file

@ -23,7 +23,7 @@ async def run_code_interpreter(role_class, requirement, auto_run, use_tools, use
"""
if role_class == "ci":
role = CodeInterpreter(goal=requirement, auto_run=auto_run)
role = CodeInterpreter(goal=requirement, auto_run=auto_run, use_tools=use_tools)
else:
role = MLEngineer(
goal=requirement, auto_run=auto_run, use_tools=use_tools, use_code_steps=use_code_steps,
@ -62,17 +62,12 @@ if __name__ == "__main__":
# requirement = f"This is a house price dataset, your goal is to predict the sale price of a property based on its features. The target column is SalePrice. Perform data analysis, data preprocessing, feature engineering, and modeling to predict the target. Report RMSE between the logarithm of the predicted value and the logarithm of the observed sales price on the eval data. Train data path: '{data_path}/split_train.csv', eval data path: '{data_path}/split_eval.csv'."
save_dir = ""
# save_dir = DATA_PATH / "output" / "2023-12-14_20-40-34"
role_class = "ci"
# role_class = "mle"
# role_class = "ci"
role_class = "mle"
auto_run = True
# auto_run = False
# use_tools = True
use_tools = False
# make_udfs = True
use_tools = True
make_udfs = False
# use_udfs = True
use_udfs = False
async def main(