Merge branch 'general_new' into 'dev'

Planner and Code Interpreter

See merge request agents/data_agents_opt!33
This commit is contained in:
林义章 2024-01-10 04:09:26 +00:00
commit 7603a1a906
17 changed files with 489 additions and 319 deletions

View file

@ -0,0 +1,62 @@
from typing import List
from metagpt.actions import Action
from metagpt.schema import Message, Plan
from metagpt.logs import logger
class ReviewConst:
TASK_REVIEW_TRIGGER = "task"
CODE_REVIEW_TRIGGER = "code"
CONTINUE_WORD = ["confirm", "continue", "c", "yes", "y"]
CHANGE_WORD = ["change"]
EXIT_WORD = ["exit"]
TASK_REVIEW_INSTRUCTION = (
f"If you want to change, add, delete a task or merge tasks in the plan, say '{CHANGE_WORD[0]} task task_id or current task, ... (things to change)' "
f"If you confirm the output from the current task and wish to continue, type: {CONTINUE_WORD[0]}"
)
CODE_REVIEW_INSTRUCTION = (
f"If you want the codes to be rewritten, say '{CHANGE_WORD[0]} ... (your change advice)' "
f"If you want to leave it as is, type: {CONTINUE_WORD[0]} or {CONTINUE_WORD[1]}"
)
EXIT_INSTRUCTION = f"If you want to terminate the process, type: {EXIT_WORD[0]}"
class AskReview(Action):
async def run(
self, context: List[Message], plan: Plan = None, trigger: str = "task"
):
logger.info("Current overall plan:")
logger.info(
"\n".join(
[
f"{task.task_id}: {task.instruction}, is_finished: {task.is_finished}"
for task in plan.tasks
]
)
)
logger.info("most recent context:")
latest_action = context[-1].cause_by.__name__ if context[-1].cause_by else ""
review_instruction = (
ReviewConst.TASK_REVIEW_INSTRUCTION
if trigger == ReviewConst.TASK_REVIEW_TRIGGER
else ReviewConst.CODE_REVIEW_INSTRUCTION
)
prompt = (
f"This is a <{trigger}> review. Please review output from {latest_action}\n"
f"{review_instruction}\n"
f"{ReviewConst.EXIT_INSTRUCTION}\n"
"Please type your review below:\n"
)
rsp = input(prompt)
if rsp.lower() in ReviewConst.EXIT_WORD:
exit()
# Confirmation can be one of "confirm", "continue", "c", "yes", "y" exactly, or sentences containing "confirm".
# One could say "confirm this task, but change the next task to ..."
confirmed = rsp.lower() in ReviewConst.CONTINUE_WORD or ReviewConst.CONTINUE_WORD[0] in rsp.lower()
return rsp, confirmed

View file

@ -10,62 +10,6 @@ from metagpt.prompts.ml_engineer import (
PRINT_DATA_COLUMNS
)
class ReviewConst:
TASK_REVIEW_TRIGGER = "task"
CODE_REVIEW_TRIGGER = "code"
CONTINUE_WORD = ["confirm", "continue", "c", "yes", "y"]
CHANGE_WORD = ["change"]
EXIT_WORD = ["exit"]
TASK_REVIEW_INSTRUCTION = (
f"If you want to change, add, delete a task or merge tasks in the plan, say '{CHANGE_WORD[0]} task task_id or current task, ... (things to change)' "
f"If you confirm the output from the current task and wish to continue, type: {CONTINUE_WORD[0]}"
)
CODE_REVIEW_INSTRUCTION = (
f"If you want the codes to be rewritten, say '{CHANGE_WORD[0]} ... (your change advice)' "
f"If you want to leave it as is, type: {CONTINUE_WORD[0]} or {CONTINUE_WORD[1]}"
)
EXIT_INSTRUCTION = f"If you want to terminate the process, type: {EXIT_WORD[0]}"
class AskReview(Action):
async def run(
self, context: List[Message], plan: Plan = None, trigger: str = "task"
):
logger.info("Current overall plan:")
logger.info(
"\n".join(
[
f"{task.task_id}: {task.instruction}, is_finished: {task.is_finished}"
for task in plan.tasks
]
)
)
logger.info("most recent context:")
latest_action = context[-1].cause_by.__name__ if context[-1].cause_by else ""
review_instruction = (
ReviewConst.TASK_REVIEW_INSTRUCTION
if trigger == ReviewConst.TASK_REVIEW_TRIGGER
else ReviewConst.CODE_REVIEW_INSTRUCTION
)
prompt = (
f"This is a <{trigger}> review. Please review output from {latest_action}\n"
f"{review_instruction}\n"
f"{ReviewConst.EXIT_INSTRUCTION}\n"
"Please type your review below:\n"
)
rsp = input(prompt)
if rsp.lower() in ReviewConst.EXIT_WORD:
exit()
# Confirmation can be one of "confirm", "continue", "c", "yes", "y" exactly, or sentences containing "confirm".
# One could say "confirm this task, but change the next task to ..."
confirmed = rsp.lower() in ReviewConst.CONTINUE_WORD or ReviewConst.CONTINUE_WORD[0] in rsp.lower()
return rsp, confirmed
class SummarizeAnalysis(Action):
PROMPT_TEMPLATE = """

View file

@ -277,7 +277,7 @@ class MakeTools(WriteCodeByGenerate):
saved_path.write_text(tool_code, encoding='utf-8')
@retry(stop=stop_after_attempt(3), wait=wait_fixed(1))
async def run(self, code: str | List[dict], code_desc: str = None, **kwargs) -> str:
async def run(self, code: Union[str, List[dict]], code_desc: str = None, **kwargs) -> str:
# 拼接code prompt
code_prompt = f"The following code is about {code_desc}, convert it to be a General Function, {code}"
if not self.context:

View file

@ -10,7 +10,7 @@ from copy import deepcopy
import traceback
from metagpt.actions import Action
from metagpt.prompts.ml_engineer import ASSIGN_TASK_TYPE_PROMPT, ASSIGN_TASK_TYPE
from metagpt.prompts.ml_engineer import ASSIGN_TASK_TYPE_PROMPT, ASSIGN_TASK_TYPE_CONFIG
from metagpt.schema import Message, Task, Plan
from metagpt.utils.common import CodeParser, create_func_config
from metagpt.logs import logger
@ -50,7 +50,7 @@ class WritePlan(Action):
[f"Task {task['task_id']}: {task['instruction']}" for task in tasks]
)
prompt = ASSIGN_TASK_TYPE_PROMPT.format(task_list=task_list)
tool_config = create_func_config(ASSIGN_TASK_TYPE)
tool_config = create_func_config(ASSIGN_TASK_TYPE_CONFIG)
rsp = await self.llm.aask_code(prompt, **tool_config)
task_type_list = rsp["task_type"]
for task, task_type in zip(tasks, task_type_list):

1
metagpt/plan/__init__.py Normal file
View file

@ -0,0 +1 @@
from metagpt.plan.planner import Planner

107
metagpt/plan/planner.py Normal file
View file

@ -0,0 +1,107 @@
import json
from metagpt.logs import logger
from metagpt.memory import Memory
from metagpt.schema import Message, Plan, Task, TaskResult
from metagpt.actions.ask_review import AskReview, ReviewConst
from metagpt.actions.write_plan import WritePlan, update_plan_from_rsp, precheck_update_plan_from_rsp
STRUCTURAL_CONTEXT = """
## User Requirement
{user_requirement}
## Context
{context}
## Current Plan
{tasks}
## Current Task
{current_task}
"""
class Planner:
def __init__(self, goal: str, working_memory: Memory, auto_run: bool = False, use_tools: bool = False):
self.plan = Plan(goal=goal)
self.auto_run = auto_run
self.use_tools = use_tools
# memory for working on each task, discarded each time a task is done
self.working_memory = working_memory
@property
def current_task(self):
return self.plan.current_task
@property
def current_task_id(self):
return self.plan.current_task_id
async def ask_review(self, task_result: TaskResult = None, auto_run: bool = None, trigger: str = ReviewConst.TASK_REVIEW_TRIGGER):
"""
Ask to review the task result, reviewer needs to provide confirmation or request change.
If human confirms the task result, then we deem the task completed, regardless of whether the code run succeeds;
if auto mode, then the code run has to succeed for the task to be considered completed.
"""
auto_run = auto_run or self.auto_run
if not auto_run:
context = self.get_useful_memories()
review, confirmed = await AskReview().run(context=context[-5:], plan=self.plan, trigger=trigger)
if not confirmed:
self.working_memory.add(Message(content=review, role="user", cause_by=AskReview))
return review, confirmed
confirmed = task_result.is_success if task_result else True
return "", confirmed
async def confirm_task(self, task: Task, task_result: TaskResult, review: str):
self.plan.update_task_result(task=task, task_result=task_result)
self.plan.finish_current_task()
self.working_memory.clear()
confirmed_and_more = (ReviewConst.CONTINUE_WORD[0] in review.lower()
and review.lower() not in ReviewConst.CONTINUE_WORD[0]) # "confirm, ... (more content, such as changing downstream tasks)"
if confirmed_and_more:
self.working_memory.add(Message(content=review, role="user", cause_by=AskReview))
await self.update_plan(review)
async def update_plan(self, max_tasks: int = 3, max_retries: int = 3):
plan_confirmed = False
while not plan_confirmed:
context = self.get_useful_memories()
rsp = await WritePlan().run(context, max_tasks=max_tasks, use_tools=self.use_tools)
self.working_memory.add(
Message(content=rsp, role="assistant", cause_by=WritePlan)
)
# precheck plan before asking reviews
is_plan_valid, error = precheck_update_plan_from_rsp(rsp, self.plan)
if not is_plan_valid and max_retries > 0:
error_msg = f"The generated plan is not valid with error: {error}, try regenerating, remember to generate either the whole plan or the single changed task only"
logger.warning(error_msg)
self.working_memory.add(Message(content=error_msg, role="assistant", cause_by=WritePlan))
max_retries -= 1
continue
_, plan_confirmed = await self.ask_review(trigger=ReviewConst.TASK_REVIEW_TRIGGER)
update_plan_from_rsp(rsp=rsp, current_plan=self.plan)
self.working_memory.clear()
def get_useful_memories(self, task_exclude_field=None) -> list[Message]:
"""find useful memories only to reduce context length and improve performance"""
# TODO dataset description , code steps
if task_exclude_field is None:
# Shorten the context as we don't need code steps after we get the codes.
# This doesn't affect current_task below, which should hold the code steps
task_exclude_field = {'code_steps'}
user_requirement = self.plan.goal
context = self.plan.context
tasks = [task.dict(exclude=task_exclude_field) for task in self.plan.tasks]
tasks = json.dumps(tasks, indent=4, ensure_ascii=False)
current_task = self.plan.current_task.json() if self.plan.current_task else {}
context = STRUCTURAL_CONTEXT.format(
user_requirement=user_requirement, context=context, tasks=tasks, current_task=current_task
)
context_msg = [Message(content=context, role="user")]
return context_msg + self.working_memory.get()

View file

@ -61,7 +61,7 @@ Please assign a task type to each task in the list below from the given categori
- **other**: Any tasks that do not fit into the previous categories, such as visualization, summarizing findings, etc.
"""
ASSIGN_TASK_TYPE = {
ASSIGN_TASK_TYPE_CONFIG = {
"name": "assign_task_type",
"description": "Assign task type to each task by order.",
"parameters": {
@ -309,14 +309,3 @@ ML_MODULE_MAP = {
"feature_engineering": "metagpt.tools.functions.libs.feature_engineering",
"udf": "metagpt.tools.functions.libs.udf",
}
STRUCTURAL_CONTEXT = """
## User Requirement
{user_requirement}
## Data Description
{data_desc}
## Current Plan
{tasks}
## Current Task
{current_task}
"""

View file

@ -0,0 +1,77 @@
import json
from datetime import datetime
from metagpt.actions.execute_code import ExecutePyCode
from metagpt.actions.ask_review import ReviewConst
from metagpt.actions.write_analysis_code import WriteCodeByGenerate
from metagpt.logs import logger
from metagpt.roles import Role
from metagpt.schema import Message, Task, TaskResult
from metagpt.utils.save_code import save_code_file
class CodeInterpreter(Role):
def __init__(
self, name="Charlie", profile="CodeInterpreter", goal="", auto_run=False, use_tools=False,
):
super().__init__(name=name, profile=profile, goal=goal)
self._set_react_mode(react_mode="plan_and_act", auto_run=auto_run, use_tools=use_tools)
self.execute_code = ExecutePyCode()
@property
def working_memory(self):
return self._rc.working_memory
async def _plan_and_act(self):
rsp = await super()._plan_and_act()
# save code using datetime.now or keywords related to the goal of your project (plan.goal).
project_record = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
save_code_file(name=project_record, code_context=self.execute_code.nb, file_format="ipynb")
return rsp
async def _act_on_task(self, current_task: Task) -> TaskResult:
code, result, is_success = await self._write_and_exec_code()
task_result = TaskResult(code=code, result=result, is_success=is_success)
return task_result
async def _write_and_exec_code(self, max_retry: int = 3):
counter = 0
success = False
while not success and counter < max_retry:
context = self.planner.get_useful_memories()
logger.info("Write code with pure generation")
code = await WriteCodeByGenerate().run(
context=context, plan=self.planner.plan, temperature=0.0
)
cause_by = WriteCodeByGenerate
self.working_memory.add(
Message(content=code, role="assistant", cause_by=cause_by)
)
result, success = await self.execute_code.run(code)
print(result)
self.working_memory.add(
Message(content=result, role="user", cause_by=ExecutePyCode)
)
if "!pip" in code:
success = False
counter += 1
if not success and counter >= max_retry:
logger.info("coding failed!")
review, _ = await self.planner.ask_review(auto_run=False, trigger=ReviewConst.CODE_REVIEW_TRIGGER)
if ReviewConst.CHANGE_WORD[0] in review:
counter = 0 # redo the task again with help of human suggestions
return code, result, success

View file

@ -10,7 +10,8 @@ from metagpt.config import CONFIG
from metagpt.const import WORKSPACE_ROOT
from metagpt.roles import Role
from metagpt.actions import Action, BossRequirement
from metagpt.actions.ml_da_action import AskReview, SummarizeAnalysis
from metagpt.actions.ask_review import AskReview
from metagpt.actions.ml_da_action import SummarizeAnalysis
from metagpt.schema import Message, Task, Plan
from metagpt.logs import logger
from metagpt.utils.common import CodeParser

View file

@ -1,135 +1,62 @@
from typing import List
import json
from datetime import datetime
import fire
from metagpt.actions.debug_code import DebugCode
from metagpt.actions.execute_code import ExecutePyCode
from metagpt.actions.ml_da_action import AskReview, SummarizeAnalysis, Reflect, ReviewConst, UpdateDataColumns
from metagpt.actions.ask_review import ReviewConst
from metagpt.actions.write_analysis_code import WriteCodeByGenerate, WriteCodeWithTools, MakeTools
from metagpt.actions.write_code_steps import WriteCodeSteps
from metagpt.actions.write_plan import WritePlan
from metagpt.actions.write_plan import update_plan_from_rsp, precheck_update_plan_from_rsp
from metagpt.const import DATA_PATH, PROJECT_ROOT
from metagpt.const import PROJECT_ROOT
from metagpt.logs import logger
from metagpt.memory import Memory
from metagpt.prompts.ml_engineer import STRUCTURAL_CONTEXT
from metagpt.roles import Role
from metagpt.roles.kaggle_manager import DownloadData, SubmitResult
from metagpt.schema import Message, Plan
from metagpt.utils.save_code import save_code_file
from metagpt.utils.recovery_util import save_history, load_history
from metagpt.schema import Message
from metagpt.utils.common import remove_comments
from metagpt.actions.ml_da_action import SummarizeAnalysis, Reflect, UpdateDataColumns
from metagpt.roles.code_interpreter import CodeInterpreter
from metagpt.roles.kaggle_manager import DownloadData, SubmitResult
from metagpt.tools.functions.libs.udf import UDFS_YAML
class MLEngineer(Role):
class MLEngineer(CodeInterpreter):
def __init__(
self, name="ABC", profile="MLEngineer", goal="", auto_run: bool = False, use_tools=False, use_code_steps=False,
self, name="Mark", profile="MLEngineer", goal="", auto_run=False, use_tools=False, use_code_steps=False,
make_udfs=False, use_udfs=False
):
super().__init__(name=name, profile=profile, goal=goal)
self._set_react_mode(react_mode="plan_and_act")
super().__init__(name=name, profile=profile, goal=goal, auto_run=auto_run, use_tools=use_tools)
self._watch([DownloadData, SubmitResult])
self.plan = Plan(goal=goal)
self.make_udfs = False # user-defined functions
self.use_udfs = False
self.execute_code = ExecutePyCode()
self.auto_run = auto_run
self.use_tools = use_tools
self.use_code_steps = use_code_steps
self.make_udfs = make_udfs # user-defined functions
self.use_udfs = use_udfs
self.data_desc = {}
# memory for working on each task, discarded each time a task is done
self.working_memory = Memory()
async def _plan_and_act(self):
### Actions in a multi-agent multi-turn setting ###
### Actions in a multi-agent multi-turn setting, a new attempt on the data ###
memories = self.get_memories()
if memories:
latest_event = memories[-1].cause_by
if latest_event == DownloadData:
self.plan.context = memories[-1].content
self.planner.plan.context = memories[-1].content
elif latest_event == SubmitResult:
# self reflect on previous plan outcomes and think about how to improve the plan, add to working memory
await self._reflect()
# get feedback for improvement from human, add to working memory
await self._ask_review(trigger=ReviewConst.TASK_REVIEW_TRIGGER)
await self.planner.ask_review(trigger=ReviewConst.TASK_REVIEW_TRIGGER)
### Common Procedure in both single- and multi-agent setting ###
# create initial plan and update until confirmation
await self._update_plan()
### general plan process ###
await super()._plan_and_act()
while self.plan.current_task:
task = self.plan.current_task
logger.info(f"ready to take on task {task}")
# take on current task
code, result, success = await self._write_and_exec_code()
# ask for acceptance, users can other refuse and change tasks in the plan
review, task_result_confirmed = await self._ask_review(trigger=ReviewConst.TASK_REVIEW_TRIGGER)
if self.auto_run:
# if human confirms the task result, then we deem the task completed, regardless of whether the code run succeeds;
# if auto mode, then the code run has to succeed for the task to be considered completed
task_result_confirmed = success
if task_result_confirmed:
# tick off this task and record progress
task.code = code
task.result = result
self.plan.finish_current_task()
self.working_memory.clear()
if (self.use_tools and task.task_type not in ['model_train', 'model_evaluate']) or self.use_udfs:
success, new_code = await self._update_data_columns()
if success:
task.code = task.code + "\n\n" + new_code
confirmed_and_more = (ReviewConst.CONTINUE_WORD[0] in review.lower()
and review.lower() not in ReviewConst.CONTINUE_WORD[0]) # "confirm, ... (more content, such as changing downstream tasks)"
if confirmed_and_more:
self.working_memory.add(Message(content=review, role="user", cause_by=AskReview))
await self._update_plan(review)
elif "redo" in review:
# Ask the Role to redo this task with help of review feedback,
# useful when the code run is successful but the procedure or result is not what we want
continue
else:
# update plan according to user's feedback and to take on changed tasks
await self._update_plan(review)
completed_plan_memory = self.get_useful_memories() # completed plan as a outcome
self._rc.memory.add(completed_plan_memory[0]) # add to persistent memory
summary = await SummarizeAnalysis().run(self.plan)
### summarize analysis ###
summary = await SummarizeAnalysis().run(self.planner.plan)
rsp = Message(content=summary, cause_by=SummarizeAnalysis)
self._rc.memory.add(rsp)
# save code using datetime.now or keywords related to the goal of your project (plan.goal).
project_record = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
save_code_file(name=project_record, code_context=self.execute_code.nb, file_format="ipynb")
return rsp
async def _update_data_columns(self):
rsp = await UpdateDataColumns().run(self.plan)
is_update, code = rsp["is_update"], rsp["code"]
success = False
if is_update:
result, success = await self.execute_code.run(code)
if success:
print(result)
self.data_desc["column_info"] = result
return success, code
async def _write_and_exec_code(self, max_retry: int = 3):
self.plan.current_task.code_steps = (
await WriteCodeSteps().run(self.plan)
self.planner.current_task.code_steps = (
await WriteCodeSteps().run(self.planner.plan)
if self.use_code_steps
else ""
)
@ -139,46 +66,49 @@ class MLEngineer(Role):
debug_context = []
while not success and counter < max_retry:
context = self.get_useful_memories()
context = self.planner.get_useful_memories()
if counter > 0 and (self.use_tools or self.use_udfs):
logger.warning('We got a bug code, now start to debug...')
code = await DebugCode().run(
plan=self.plan.current_task.instruction,
plan=self.planner.current_task.instruction,
code=code,
runtime_result=self.working_memory.get(),
context=debug_context
)
logger.info(f"new code \n{code}")
cause_by = DebugCode
elif (not self.use_tools and not self.use_udfs) or (
self.plan.current_task.task_type == 'other' and not self.use_udfs):
self.planner.current_task.task_type == 'other' and not self.use_udfs):
logger.info("Write code with pure generation")
# TODO: 添加基于current_task.instruction-code_path的k-v缓存
code = await WriteCodeByGenerate().run(
context=context, plan=self.plan, temperature=0.0
context=context, plan=self.planner.plan, temperature=0.0
)
debug_context = [self.get_useful_memories(task_exclude_field={'result', 'code_steps'})[0]]
debug_context = [self.planner.get_useful_memories(task_exclude_field={'result', 'code_steps'})[0]]
cause_by = WriteCodeByGenerate
else:
logger.info("Write code with tools")
if self.use_udfs:
# use user-defined function tools.
from metagpt.tools.functions.libs.udf import UDFS_YAML
logger.warning("Writing code with user-defined function tools by WriteCodeWithTools.")
logger.info(f"Local user defined function as following:\
\n{json.dumps(list(UDFS_YAML.keys()), indent=2, ensure_ascii=False)}")
# set task_type to `udf`
self.plan.current_task.task_type = 'udf'
self.planner.current_task.task_type = 'udf'
schema_path = UDFS_YAML
else:
schema_path = PROJECT_ROOT / "metagpt/tools/functions/schemas"
tool_context, code = await WriteCodeWithTools(schema_path=schema_path).run(
context=context,
plan=self.plan,
plan=self.planner.plan,
column_info=self.data_desc.get("column_info", ""),
)
debug_context = tool_context
cause_by = WriteCodeWithTools
self.working_memory.add(
Message(content=code, role="assistant", cause_by=cause_by)
)
@ -200,47 +130,29 @@ class MLEngineer(Role):
if not success and counter >= max_retry:
logger.info("coding failed!")
review, _ = await self._ask_review(auto_run=False, trigger=ReviewConst.CODE_REVIEW_TRIGGER)
review, _ = await self.planner.ask_review(auto_run=False, trigger=ReviewConst.CODE_REVIEW_TRIGGER)
if ReviewConst.CHANGE_WORD[0] in review:
counter = 0 # redo the task again with help of human suggestions
if success:
if (self.use_tools and self.planner.current_task.task_type not in ['model_train', 'model_evaluate']) or self.use_udfs:
update_success, new_code = await self._update_data_columns()
if update_success:
code = code + "\n\n" + new_code
return code, result, success
async def _ask_review(self, auto_run: bool = None, trigger: str = ReviewConst.TASK_REVIEW_TRIGGER):
auto_run = auto_run or self.auto_run
if not auto_run:
context = self.get_useful_memories()
review, confirmed = await AskReview().run(context=context[-5:], plan=self.plan, trigger=trigger)
if not confirmed:
self.working_memory.add(Message(content=review, role="user", cause_by=AskReview))
return review, confirmed
return "", True
async def _update_plan(self, review: str = "", max_tasks: int = 3, max_retries: int = 3):
plan_confirmed = False
while not plan_confirmed:
context = self.get_useful_memories()
rsp = await WritePlan().run(
context, max_tasks=max_tasks, use_tools=self.use_tools
)
self.working_memory.add(
Message(content=rsp, role="assistant", cause_by=WritePlan)
)
# precheck plan before asking reviews
is_plan_valid, error = precheck_update_plan_from_rsp(rsp, self.plan)
if not is_plan_valid and max_retries > 0:
error_msg = f"The generated plan is not valid with error: {error}, try regenerating, remember to generate either the whole plan or the single changed task only"
logger.warning(error_msg)
self.working_memory.add(Message(content=error_msg, role="assistant", cause_by=WritePlan))
max_retries -= 1
continue
_, plan_confirmed = await self._ask_review(trigger=ReviewConst.TASK_REVIEW_TRIGGER)
update_plan_from_rsp(rsp, self.plan)
self.working_memory.clear()
async def _update_data_columns(self):
logger.info("Check columns in updated data")
rsp = await UpdateDataColumns().run(self.planner.plan)
is_update, code = rsp["is_update"], rsp["code"]
success = False
if is_update:
result, success = await self.execute_code.run(code)
if success:
print(result)
self.data_desc["column_info"] = result
return success, code
async def _reflect(self):
context = self.get_memories()
@ -249,34 +161,6 @@ class MLEngineer(Role):
reflection = await Reflect().run(context=context)
self.working_memory.add(Message(content=reflection, role="assistant"))
self.working_memory.add(Message(content=Reflect.REWRITE_PLAN_INSTRUCTION, role="user"))
def get_useful_memories(self, task_exclude_field=None) -> List[Message]:
"""find useful memories only to reduce context length and improve performance"""
# TODO dataset description , code steps
if task_exclude_field is None:
# Shorten the context as we don't need code steps after we get the codes.
# This doesn't affect current_task below, which should hold the code steps
task_exclude_field = {'code_steps'}
user_requirement = self.plan.goal
data_desc = self.plan.context
tasks = [task.dict(exclude=task_exclude_field) for task in self.plan.tasks]
tasks = json.dumps(tasks, indent=4, ensure_ascii=False)
current_task = self.plan.current_task.json() if self.plan.current_task else {}
context = STRUCTURAL_CONTEXT.format(
user_requirement=user_requirement, data_desc=data_desc, tasks=tasks, current_task=current_task
)
context_msg = [Message(content=context, role="user")]
return context_msg + self.get_working_memories()
def get_working_memories(self) -> List[Message]:
return self.working_memory.get()
def reset(self):
"""Restart role with the same goal."""
self.plan = Plan(goal=self.plan.goal)
self.execute_code = ExecutePyCode()
self.working_memory = Memory()
async def make_tools(self, code: str):
"""Make user-defined functions(udfs, aka tools) for pure generation code.
@ -284,17 +168,17 @@ class MLEngineer(Role):
Args:
code (str): pure generation code by class WriteCodeByGenerate.
"""
logger.warning(f"Making tools for task_id {self.plan.current_task_id}: \
`{self.plan.current_task.instruction}` \n code: \n {code}")
logger.warning(f"Making tools for task_id {self.planner.current_task_id}: \
`{self.planner.current_task.instruction}` \n code: \n {code}")
make_tools = MakeTools()
make_tool_retries, make_tool_current_retry = 3, 0
while True:
# start make tools
tool_code = await make_tools.run(code, self.plan.current_task.instruction)
tool_code = await make_tools.run(code, self.planner.current_task.instruction)
make_tool_current_retry += 1
# check tool_code by execute_code
logger.info(f"Checking task_id {self.plan.current_task_id} tool code by executor...")
logger.info(f"Checking task_id {self.planner.current_task_id} tool code by executor...")
execute_result, execute_success = await self.execute_code.run(tool_code)
if not execute_success:
logger.error(f"Tool code faild to execute, \n{execute_result}\n.We will try to fix it ...")
@ -302,60 +186,9 @@ class MLEngineer(Role):
if execute_success or make_tool_current_retry >= make_tool_retries:
if make_tool_current_retry >= make_tool_retries:
logger.error(f"We have tried the maximum number of attempts {make_tool_retries}\
and still have not created tools for task_id {self.plan.current_task_id} successfully,\
and still have not created tools for task_id {self.planner.current_task_id} successfully,\
we will skip it.")
break
# save successful tool code in udf
if execute_success:
make_tools.save(tool_code)
if __name__ == "__main__":
requirement = "Perform data analysis on the provided data. Train a model to predict the target variable Survived. Include data preprocessing, feature engineering, and modeling in your pipeline. The metric is accuracy."
data_path = f"{DATA_PATH}/titanic"
requirement = f"This is a titanic passenger survival dataset, your goal is to predict passenger survival outcome. The target column is Survived. Perform data analysis, data preprocessing, feature engineering, and modeling to predict the target. Report accuracy on the eval data. Train data path: '{data_path}/split_train.csv', eval data path: '{data_path}/split_eval.csv'."
requirement = f"Run data analysis on sklearn Wine recognition dataset, include a plot, and train a model to predict wine class (20% as validation), and show validation accuracy"
data_path = f"{DATA_PATH}/icr-identify-age-related-conditions"
requirement = f"This is a medical dataset with over fifty anonymized health characteristics linked to three age-related conditions. Your goal is to predict whether a subject has or has not been diagnosed with one of these conditions.The target column is Class. Perform data analysis, data preprocessing, feature engineering, and modeling to predict the target. Report f1 score on the eval data. Train data path: {data_path}/split_train.csv, eval data path: {data_path}/split_eval.csv."
# data_path = f"{DATA_PATH}/santander-customer-transaction-prediction"
# requirement = f"This is a customers financial dataset. Your goal is to predict which customers will make a specific transaction in the future. The target column is target. Perform data analysis, data preprocessing, feature engineering, and modeling to predict the target. Report AUC Score on the eval data. Train data path: '{data_path}/split_train.csv', eval data path: '{data_path}/split_eval.csv' ."
data_path = f"{DATA_PATH}/house-prices-advanced-regression-techniques"
requirement = f"This is a house price dataset, your goal is to predict the sale price of a property based on its features. The target column is SalePrice. Perform data analysis, data preprocessing, feature engineering, and modeling to predict the target. Report RMSE between the logarithm of the predicted value and the logarithm of the observed sales price on the eval data. Train data path: '{data_path}/split_train.csv', eval data path: '{data_path}/split_eval.csv'."
save_dir = ""
# save_dir = DATA_PATH / "output" / "2023-12-14_20-40-34"
async def main(requirement: str = requirement, auto_run: bool = True, use_tools: bool = False, use_code_steps: bool = False, save_dir: str = ""):
"""
The main function to run the MLEngineer with optional history loading.
Args:
requirement (str): The requirement for the MLEngineer.
auto_run (bool): Whether to auto-run the MLEngineer.
save_dir (str): The directory from which to load the history or to save the new history.
Raises:
Exception: If an error occurs during execution, log the error and save the history.
"""
if save_dir:
logger.info("Resuming from history trajectory")
plan, nb = load_history(save_dir)
role = MLEngineer(goal=requirement, auto_run=auto_run, use_tools=use_tools, use_code_steps=use_code_steps)
role.plan = Plan(**plan)
role.execute_code = ExecutePyCode(nb)
else:
logger.info("Run from scratch")
role = MLEngineer(goal=requirement, auto_run=auto_run, use_tools=use_tools, use_code_steps=use_code_steps)
try:
await role.run(requirement)
except Exception as e:
save_path = save_history(role, save_dir)
logger.exception(f"An error occurred: {e}, save trajectory here: {save_path}")
fire.Fire(main)

View file

@ -10,7 +10,7 @@ from metagpt.schema import Message
from metagpt.memory import Memory
from metagpt.logs import logger
from metagpt.actions.write_analysis_code import WriteCodeByGenerate
from metagpt.actions.ml_da_action import AskReview, ReviewConst
from metagpt.actions.ask_review import AskReview, ReviewConst
from metagpt.actions.execute_code import ExecutePyCode
from metagpt.roles.kaggle_manager import DownloadData
from metagpt.utils.save_code import save_code_file

View file

@ -18,7 +18,8 @@ from metagpt.actions import Action, ActionOutput
from metagpt.llm import LLM, HumanProvider
from metagpt.logs import logger
from metagpt.memory import Memory, LongTermMemory
from metagpt.schema import Message
from metagpt.schema import Message, Task, TaskResult
from metagpt.plan.planner import Planner
PREFIX_TEMPLATE = """You are a {profile}, named {name}, your goal is {goal}, and the constraint is {constraints}. """
@ -79,6 +80,7 @@ class RoleContext(BaseModel):
env: 'Environment' = Field(default=None)
memory: Memory = Field(default_factory=Memory)
long_term_memory: LongTermMemory = Field(default_factory=LongTermMemory)
working_memory: Memory = Field(default_factory=Memory)
state: int = Field(default=-1) # -1 indicates initial or termination state where todo is None
todo: Action = Field(default=None)
watch: set[Type[Action]] = Field(default_factory=set)
@ -115,6 +117,7 @@ class Role:
self._actions = []
self._role_id = str(self._setting)
self._rc = RoleContext()
self.planner = None
def _reset(self):
self._states = []
@ -134,7 +137,7 @@ class Role:
self._actions.append(i)
self._states.append(f"{idx}. {action}")
def _set_react_mode(self, react_mode: str, max_react_loop: int = 1):
def _set_react_mode(self, react_mode: str, max_react_loop: int = 1, auto_run: bool = True, use_tools: bool = False):
"""Set strategy of the Role reacting to observed Message. Variation lies in how
this Role elects action to perform during the _think stage, especially if it is capable of multiple Actions.
@ -154,6 +157,8 @@ class Role:
self._rc.react_mode = react_mode
if react_mode == RoleReactMode.REACT:
self._rc.max_react_loop = max_react_loop
elif react_mode == RoleReactMode.PLAN_AND_ACT:
self.planner = Planner(goal=self._setting.goal, working_memory=self._rc.working_memory, auto_run=auto_run, use_tools=use_tools)
def _watch(self, actions: Iterable[Type[Action]]):
"""Listen to the corresponding behaviors"""
@ -274,8 +279,56 @@ class Role:
async def _plan_and_act(self) -> Message:
"""first plan, then execute an action sequence, i.e. _think (of a plan) -> _act -> _act -> ... Use llm to come up with the plan dynamically."""
# TODO: to be implemented
return Message("")
### Common Procedure in both single- and multi-agent setting ###
# create initial plan and update until confirmation
await self.planner.update_plan()
while self.planner.current_task:
task = self.planner.current_task
logger.info(f"ready to take on task {task}")
# take on current task
task_result = await self._act_on_task(task)
# ask for acceptance, users can other refuse and change tasks in the plan
review, task_result_confirmed = await self.planner.ask_review(task_result)
if task_result_confirmed:
# tick off this task and record progress
await self.planner.confirm_task(task, task_result, review)
elif "redo" in review:
# Ask the Role to redo this task with help of review feedback,
# useful when the code run is successful but the procedure or result is not what we want
continue
else:
# update plan according to user's feedback and to take on changed tasks
await self.planner.update_plan(review)
completed_plan_memory = self.planner.get_useful_memories() # completed plan as a outcome
rsp = completed_plan_memory[0]
self._rc.memory.add(rsp) # add to persistent memory
return rsp
async def _act_on_task(self, current_task: Task) -> TaskResult:
"""Taking specific action to handle one task in plan
Args:
current_task (Task): current task to take on
Raises:
NotImplementedError: Specific Role must implement this method if expected to use planner
Returns:
TaskResult: Result from the actions
"""
raise NotImplementedError
async def react(self) -> Message:
"""Entry to one of three strategies by which Role reacts to the observed Message"""

View file

@ -81,9 +81,18 @@ class Task(BaseModel):
code_steps: str = ""
code: str = ""
result: str = ""
is_success: bool = False
is_finished: bool = False
class TaskResult(BaseModel):
"""Result of taking a task, with result and is_success required to be filled"""
code_steps: str = ""
code: str = ""
result: str
is_success: bool
class Plan(BaseModel):
goal: str
context: str = ""
@ -169,6 +178,7 @@ class Plan(BaseModel):
task = self.task_map[task_id]
task.code = ""
task.result = ""
task.is_success = False
task.is_finished = False
def replace_task(self, new_task: Task):
@ -181,18 +191,18 @@ class Plan(BaseModel):
Returns:
None
"""
if new_task.task_id in self.task_map:
# Replace the task in the task map and the task list
self.task_map[new_task.task_id] = new_task
for i, task in enumerate(self.tasks):
if task.task_id == new_task.task_id:
self.tasks[i] = new_task
break
assert new_task.task_id in self.task_map
# Replace the task in the task map and the task list
self.task_map[new_task.task_id] = new_task
for i, task in enumerate(self.tasks):
if task.task_id == new_task.task_id:
self.tasks[i] = new_task
break
# Reset dependent tasks
for task in self.tasks:
if new_task.task_id in task.dependent_task_ids:
self.reset_task(task.task_id)
# Reset dependent tasks
for task in self.tasks:
if new_task.task_id in task.dependent_task_ids:
self.reset_task(task.task_id)
def append_task(self, new_task: Task):
"""
@ -213,6 +223,12 @@ class Plan(BaseModel):
self.tasks.append(new_task)
self.task_map[new_task.task_id] = new_task
self._update_current_task()
def update_task_result(self, task: Task, task_result: TaskResult):
task.code_steps = task_result.code_steps
task.code = task_result.code
task.result = task_result.result
task.is_success = task_result.is_success
def has_task_id(self, task_id: str) -> bool:
return task_id in self.task_map

View file

@ -46,7 +46,7 @@ def save_history(role: Role, save_dir: str = ""):
# overwrite exist trajectory
save_path.mkdir(parents=True, exist_ok=True)
plan = role.plan.dict()
plan = role.planner.plan.dict()
with open(save_path / "plan.json", "w", encoding="utf-8") as plan_file:
json.dump(plan, plan_file, indent=4, ensure_ascii=False)

View file

@ -0,0 +1,80 @@
import fire
from metagpt.actions.execute_code import ExecutePyCode
from metagpt.const import DATA_PATH
from metagpt.logs import logger
from metagpt.roles.code_interpreter import CodeInterpreter
from metagpt.roles.ml_engineer import MLEngineer
from metagpt.schema import Plan
from metagpt.utils.recovery_util import save_history, load_history
async def run_code_interpreter(role_class, requirement, auto_run, use_tools, use_code_steps, make_udfs, use_udfs, save_dir):
"""
The main function to run the MLEngineer with optional history loading.
Args:
requirement (str): The requirement for the MLEngineer.
auto_run (bool): Whether to auto-run the MLEngineer.
save_dir (str): The directory from which to load the history or to save the new history.
Raises:
Exception: If an error occurs during execution, log the error and save the history.
"""
if role_class == "ci":
role = CodeInterpreter(goal=requirement, auto_run=auto_run, use_tools=use_tools)
else:
role = MLEngineer(
goal=requirement, auto_run=auto_run, use_tools=use_tools, use_code_steps=use_code_steps,
make_udfs=make_udfs, use_udfs=use_udfs
)
if save_dir:
logger.info("Resuming from history trajectory")
plan, nb = load_history(save_dir)
role.planner.plan = Plan(**plan)
role.execute_code = ExecutePyCode(nb)
else:
logger.info("Run from scratch")
try:
await role.run(requirement)
except Exception as e:
save_path = save_history(role, save_dir)
logger.exception(f"An error occurred: {e}, save trajectory here: {save_path}")
if __name__ == "__main__":
requirement = "Run data analysis on sklearn Iris dataset, include a plot"
# requirement = "Run data analysis on sklearn Wine recognition dataset, include a plot, and train a model to predict wine class (20% as validation), and show validation accuracy"
# data_path = f"{DATA_PATH}/titanic"
# requirement = f"This is a titanic passenger survival dataset, your goal is to predict passenger survival outcome. The target column is Survived. Perform data analysis, data preprocessing, feature engineering, and modeling to predict the target. Report accuracy on the eval data. Train data path: '{data_path}/split_train.csv', eval data path: '{data_path}/split_eval.csv'."
# data_path = f"{DATA_PATH}/icr-identify-age-related-conditions"
# requirement = f"This is a medical dataset with over fifty anonymized health characteristics linked to three age-related conditions. Your goal is to predict whether a subject has or has not been diagnosed with one of these conditions.The target column is Class. Perform data analysis, data preprocessing, feature engineering, and modeling to predict the target. Report f1 score on the eval data. Train data path: {data_path}/split_train.csv, eval data path: {data_path}/split_eval.csv."
# data_path = f"{DATA_PATH}/santander-customer-transaction-prediction"
# requirement = f"This is a customers financial dataset. Your goal is to predict which customers will make a specific transaction in the future. The target column is target. Perform data analysis, data preprocessing, feature engineering, and modeling to predict the target. Report AUC Score on the eval data. Train data path: '{data_path}/split_train.csv', eval data path: '{data_path}/split_eval.csv' ."
# data_path = f"{DATA_PATH}/house-prices-advanced-regression-techniques"
# requirement = f"This is a house price dataset, your goal is to predict the sale price of a property based on its features. The target column is SalePrice. Perform data analysis, data preprocessing, feature engineering, and modeling to predict the target. Report RMSE between the logarithm of the predicted value and the logarithm of the observed sales price on the eval data. Train data path: '{data_path}/split_train.csv', eval data path: '{data_path}/split_eval.csv'."
save_dir = ""
# role_class = "ci"
role_class = "mle"
auto_run = True
use_tools = True
make_udfs = False
use_udfs = False
async def main(
role_class: str = role_class, requirement: str = requirement, auto_run: bool = auto_run,
use_tools: bool = use_tools, use_code_steps: bool = False, make_udfs: bool = make_udfs, use_udfs: bool = use_udfs,
save_dir: str = save_dir
):
await run_code_interpreter(role_class, requirement, auto_run, use_tools, use_code_steps, make_udfs, use_udfs, save_dir)
fire.Fire(main)

View file

@ -2,8 +2,14 @@ import pytest
from tqdm import tqdm
from metagpt.logs import logger
from metagpt.roles.ml_engineer import MLEngineer
from metagpt.schema import Plan
from metagpt.roles.ml_engineer import MLEngineer, ExecutePyCode
def reset(role):
"""Restart role with the same goal."""
role.working_memory.clear()
role.planner.plan = Plan(goal=role.planner.plan.goal)
role.execute_code = ExecutePyCode()
async def make_use_tools(requirement: str, auto_run: bool = True):
"""make and use tools for requirement."""
@ -15,7 +21,7 @@ async def make_use_tools(requirement: str, auto_run: bool = True):
role.use_udfs = False
await role.run(requirement)
# use udfs
role.reset()
reset(role)
role.make_udfs = False
role.use_udfs = True
role.use_code_steps = False

View file

@ -141,7 +141,8 @@ class TestPlan:
task = Task(task_id="1", instruction="First Task")
plan.add_tasks([task])
new_task = Task(task_id="2", instruction="New Task")
plan.replace_task(new_task) # Task with ID 2 does not exist in plan
with pytest.raises(AssertionError):
plan.replace_task(new_task) # Task with ID 2 does not exist in plan
assert "1" in plan.task_map
assert "2" not in plan.task_map