add supprot of task_type and add output of special command

This commit is contained in:
lidanyang 2024-07-01 10:52:50 +08:00
parent f2cf8dd74b
commit 898ee44bec
5 changed files with 29 additions and 25 deletions

View file

@ -22,6 +22,9 @@ class Task(BaseModel):
{available_commands}
Special Command: Use {{"command_name": "end"}} to do nothing or indicate completion of all requirements and the end of actions.
# Available Task Types
{task_type_desc}
# Current Plan
{plan_status}
@ -38,7 +41,7 @@ Pay close attention to the Example provided, you can reuse the example for your
You may use any of the available commands to create a plan or update the plan. You may output mutiple commands, they will be executed sequentially.
If you finish current task, you will automatically take the next task in the existing plan, use Plan.finish_task, DON'T append a new task.
# Your commands in a json array, in the following output format. If there is nothing to do, use the pass or end command:
# Your commands in a json array, in the following output format with command_name and args. If there is nothing to do, use the pass or end command:
Some text indicating your thoughts, such as how you should update the plan status, respond to inquiry, or seek for help. Then a json array of commands. You must output ONE and ONLY ONE json array. DON'T output multiple json arrays with thoughts between them.
```json
[

View file

@ -24,7 +24,6 @@ class DataAnalyst(RoleZero):
use_reflection: bool = True
write_code: WriteAnalysisCode = Field(default_factory=WriteAnalysisCode, exclude=True)
execute_code: ExecuteNbCode = Field(default_factory=ExecuteNbCode, exclude=True)
task_result: TaskResult = None
@model_validator(mode="after")
def set_custom_tool(self):
@ -75,17 +74,14 @@ class DataAnalyst(RoleZero):
### process execution result ###
counter += 1
self.task_result = TaskResult(code=code, result=result, is_success=success)
if success:
task_result = TaskResult(code=code, result=result, is_success=success)
self.planner.current_task.update_task_result(task_result)
output = f"""
Code written:
**Code written**:
{code}
Execution status:{'Success' if success else 'Failed'}
Execution result: {result}
**Execution status**:{'Success' if success else 'Failed'}
**Execution result**: {result}
"""
self.rc.working_memory.clear()
return output
def _finish_current_task(self):
self.planner.current_task.update_task_result(self.task_result)
super()._finish_current_task()

View file

@ -6,6 +6,7 @@ import re
import traceback
from typing import Callable, Literal, Tuple
from metagpt.strategy.task_type import TaskType
from pydantic import model_validator
from metagpt.actions import Action
@ -130,6 +131,7 @@ class RoleZero(Role):
### 2. Plan Status ###
plan_status, current_task = self._get_plan_status()
task_type_desc = "\n".join([f"- **{tt.type_name}**: {tt.value.desc}" for tt in TaskType])
### 3. Tool/Command Info ###
tools = await self.tool_recommender.recommend_tools()
@ -142,6 +144,7 @@ class RoleZero(Role):
example=example,
available_commands=tool_info,
instruction=self.instruction.strip(),
task_type_desc=task_type_desc,
)
memory = self.rc.memory.get(self.memory_k)
if not self.browser.is_empty_page:
@ -201,13 +204,14 @@ class RoleZero(Role):
async def _run_commands(self, commands) -> str:
outputs = []
for cmd in commands:
output = f"Command {cmd['command_name']} executed"
# handle special command first
if await self._run_special_command(cmd):
outputs.append(output)
continue
# run command as specified by tool_execute_map
if cmd["command_name"] in self.tool_execution_map:
tool_obj = self.tool_execution_map[cmd["command_name"]]
output = f"Command {cmd['command_name']} executed"
try:
if inspect.iscoroutinefunction(tool_obj):
tool_output = await tool_obj(**cmd["args"])
@ -235,16 +239,13 @@ class RoleZero(Role):
if cmd["command_name"] == "Plan.finish_current_task" and not self.planner.plan.is_plan_finished():
# task_result = TaskResult(code=str(commands), result=outputs, is_success=is_success)
# self.planner.plan.current_task.update_task_result(task_result=task_result)
self._finish_current_task()
self.planner.plan.finish_current_task()
elif cmd["command_name"] == "end":
self._set_state(-1)
return is_special_cmd
def _finish_current_task(self):
self.planner.plan.finish_current_task()
def _get_plan_status(self) -> Tuple[str, str]:
plan_status = self.planner.plan.model_dump(include=["goal", "tasks"])
for task in plan_status["tasks"]:

View file

@ -464,7 +464,7 @@ class Task(BaseModel):
self.is_finished = False
def update_task_result(self, task_result: TaskResult):
self.code = task_result.code
self.code = task_result.code + "\n" + task_result.code
self.result = task_result.result
self.is_success = task_result.is_success
@ -669,10 +669,14 @@ class Plan(BaseModel):
"""
return [task for task in self.tasks if task.is_finished]
def append_task(self, task_id: str, dependent_task_ids: list[str], instruction: str, assignee: str):
def append_task(self, task_id: str, dependent_task_ids: list[str], instruction: str, assignee: str, task_type: str):
"""Append a new task with task_id (number) to the end of existing task sequences. If dependent_task_ids is not empty, the task will depend on the tasks with the ids in the list."""
new_task = Task(
task_id=task_id, dependent_task_ids=dependent_task_ids, instruction=instruction, assignee=assignee
task_id=task_id,
dependent_task_ids=dependent_task_ids,
instruction=instruction,
assignee=assignee,
task_type=task_type
)
return self._append_task(new_task)

View file

@ -5,11 +5,11 @@
# @File : __init__.py
# @Desc :
from metagpt.tools.libs import (
# data_preprocess,
# feature_engineering,
data_preprocess,
feature_engineering,
sd_engine,
gpt_v_generator,
# web_scraping,
web_scraping,
# email_login,
terminal,
editor,
@ -20,11 +20,11 @@ from metagpt.tools.libs import (
from metagpt.tools.libs.env import get_env, set_get_env_entry, default_get_env, get_env_description
_ = (
# data_preprocess,
# feature_engineering,
data_preprocess,
feature_engineering,
sd_engine,
gpt_v_generator,
# web_scraping,
web_scraping,
# email_login,
terminal,
editor,