From 3d18dfe2b582f16cf08f6b4e23eea56e85ee1c59 Mon Sep 17 00:00:00 2001 From: yzlin Date: Thu, 23 Nov 2023 21:59:25 +0800 Subject: [PATCH] pipeline first version --- metagpt/actions/execute_code.py | 9 +- metagpt/actions/write_code_function.py | 22 +++-- metagpt/actions/write_plan.py | 46 ++++++++--- metagpt/prompts/plan.py | 8 -- metagpt/roles/ml_engineer.py | 110 +++++++++++++++++++++++++ metagpt/schema.py | 109 ++++++++++++++++++++++++ requirements.txt | 6 +- tests/metagpt/test_schema.py | 85 +++++++++++++++++++ 8 files changed, 362 insertions(+), 33 deletions(-) delete mode 100644 metagpt/prompts/plan.py create mode 100644 metagpt/roles/ml_engineer.py diff --git a/metagpt/actions/execute_code.py b/metagpt/actions/execute_code.py index e80886c3e..7b16d559a 100644 --- a/metagpt/actions/execute_code.py +++ b/metagpt/actions/execute_code.py @@ -7,6 +7,7 @@ from abc import ABC, abstractmethod from pathlib import Path from typing import Dict, List, Tuple, Union +import traceback import nbformat from nbclient import NotebookClient @@ -152,7 +153,7 @@ class ExecutePyCode(ExecuteCode, Action): return code, language - async def run(self, code: Union[str, Dict, Message], language: str = "python") -> Message: + async def run(self, code: Union[str, Dict, Message], language: str = "python") -> Tuple[str, bool]: code, language = self._process_code(code, language) self._display(code, language) @@ -167,13 +168,11 @@ class ExecutePyCode(ExecuteCode, Action): # TODO: add max_tries for run code. cell_index = len(self.nb.cells) - 1 await self.nb_client.async_execute_cell(self.nb.cells[-1], cell_index) - return Message( - self.parse_outputs(self.nb.cells[-1].outputs), state="done", sent_from=self.__class__.__name__ - ) + return self.parse_outputs(self.nb.cells[-1].outputs), True except Exception as e: # FIXME: CellExecutionError is hard to read. for example `1\0` raise ZeroDivisionError: # CellExecutionError('An error occurred while executing the following cell:\n------------------\nz=1/0\n------------------\n\n\n\x1b[0;31m---------------------------------------------------------------------------\x1b[0m\n\x1b[0;31mZeroDivisionError\x1b[0m Traceback (most recent call last)\nCell \x1b[0;32mIn[1], line 1\x1b[0m\n\x1b[0;32m----> 1\x1b[0m z\x1b[38;5;241m=\x1b[39m\x1b[38;5;241;43m1\x1b[39;49m\x1b[38;5;241;43m/\x1b[39;49m\x1b[38;5;241;43m0\x1b[39;49m\n\n\x1b[0;31mZeroDivisionError\x1b[0m: division by zero\n') - return Message(e, state="error", sent_from=self.__class__.__name__) + return traceback.format_exc(), False else: # TODO: markdown raise NotImplementedError(f"Not support this code type : {language}, Only support code!") diff --git a/metagpt/actions/write_code_function.py b/metagpt/actions/write_code_function.py index 6fb7f535e..4ec565eb1 100644 --- a/metagpt/actions/write_code_function.py +++ b/metagpt/actions/write_code_function.py @@ -7,10 +7,20 @@ from typing import Dict, List, Union from metagpt.actions import Action -from metagpt.schema import Message +from metagpt.schema import Message, Plan +class BaseWriteAnalysisCode(Action): -class WriteCodeFunction(Action): + async def run(self, context: List[Message], plan: Plan = None, task_guidance: str = ""): + """Run of a code writing action, used in data analysis or modeling + + Args: + context (List[Message]): Action output history, source action denoted by Message.cause_by + plan (Plan, optional): Overall plan. Defaults to None. + task_guidance (str, optional): suggested step breakdown for the current task. Defaults to "". + """ + +class WriteCodeFunction(BaseWriteAnalysisCode): """Use openai function to generate code.""" def __init__(self, name: str = "", context=None, llm=None) -> str: @@ -44,8 +54,8 @@ class WriteCodeFunction(Action): return prompt async def run( - self, prompt: Union[str, List[Dict], Message, List[Message]], system_msg: str = None, **kwargs - ) -> Message: - prompt = self.process_msg(prompt, system_msg) + self, context: [List[Message]], plan: Plan = None, task_guidance: str = "", system_msg: str = None, **kwargs + ) -> str: + prompt = self.process_msg(context, system_msg) code_content = await self.llm.aask_code(prompt, **kwargs) - return Message(content=code_content, role="assistant") + return code_content diff --git a/metagpt/actions/write_plan.py b/metagpt/actions/write_plan.py index 96d15cb84..48cb1aad5 100644 --- a/metagpt/actions/write_plan.py +++ b/metagpt/actions/write_plan.py @@ -4,21 +4,41 @@ @Author : orange-crow @File : plan.py """ -from typing import Union +from typing import List +import json from metagpt.actions import Action -from metagpt.prompts.plan import TASK_PLAN_SYSTEM_MSG -from metagpt.schema import Message -from metagpt.utils.common import CodeParser - +from metagpt.schema import Message, Task class WritePlan(Action): - def __init__(self, llm=None): - super().__init__("", None, llm) + PROMPT_TEMPLATE = """ + # Context: + __context__ + # Current Plan: + __current_plan__ + # Task: + Based on the context, write a plan or modify an existing plan of what you should do to achieve the goal. A plan consists of one to __max_tasks__ tasks. + If you are modifying an existing plan, carefully follow the instruction, don't make unnecessary changes. + Output a list of jsons following the format: + [ + { + "task_id": str = "unique identifier for a task in plan, can be a ordinal", + "dependent_task_ids": list[str] = "ids of tasks prerequisite to this task", + "instruction": "what you should do in this task, one short phrase or sentence", + }, + ... + ] + """ + async def run(self, context: List[Message], current_plan: str = "", max_tasks: int = 5) -> str: + prompt = ( + self.PROMPT_TEMPLATE.replace("__context__", "\n".join([str(ct) for ct in context])) + .replace("__current_plan__", current_plan).replace("__max_tasks__", str(max_tasks)) + ) + rsp = await self._aask(prompt) + return rsp - async def run(self, prompt: Union[str, Message], role: str = None, system_msg: str = None) -> str: - if role: - system_msg = TASK_PLAN_SYSTEM_MSG.format(role=role) - rsp = self._aask(system_msg + prompt.content) if isinstance(prompt, Message) else await self._aask(system_msg + prompt) - plan = CodeParser.parse_code(None, rsp).split('\n\n') - return Message(plan, role="assistant", sent_from=self.__class__.__name__) + @staticmethod + def rsp_to_tasks(rsp: str) -> List[Task]: + rsp = json.loads(rsp) + tasks = [Task(**task_config) for task_config in rsp] + return tasks diff --git a/metagpt/prompts/plan.py b/metagpt/prompts/plan.py deleted file mode 100644 index 4d3add211..000000000 --- a/metagpt/prompts/plan.py +++ /dev/null @@ -1,8 +0,0 @@ -TASK_PLAN_SYSTEM_MSG = """You are a {role}. Write a plan with single digits steps. make sure others can understand what you are doing. -Example, must start with ```, and end with ```: -``` -1. ...\n\n -2. ...\n\n -... -``` -""" diff --git a/metagpt/roles/ml_engineer.py b/metagpt/roles/ml_engineer.py new file mode 100644 index 000000000..c795bda11 --- /dev/null +++ b/metagpt/roles/ml_engineer.py @@ -0,0 +1,110 @@ +from typing import Dict, List, Union +import json +import subprocess + +import fire + +from metagpt.roles import Role +from metagpt.actions import Action +from metagpt.schema import Message, Task, Plan +from metagpt.logs import logger +from metagpt.actions.write_plan import WritePlan +from metagpt.actions.write_code_function import WriteCodeFunction +from metagpt.actions.execute_code import ExecutePyCode + +class AskReview(Action): + + async def run(self, context: List[Message], plan: Plan = None): + prompt = "\n".join( + [f"{msg.cause_by() if msg.cause_by else 'Main Requirement'}: {msg.content}" for msg in context] + ) + + latest_action = context[-1].cause_by() + + prompt += f"\nPlease review output from {latest_action}, " \ + "provide feedback or type YES to continue with the process:\n" + rsp = input(prompt) + confirmed = "yes" in rsp.lower() + return rsp, confirmed + + +class MLEngineer(Role): + def __init__(self, name="ABC", profile="MLEngineer"): + super().__init__(name=name, profile=profile) + self._set_react_mode(react_mode="plan_and_act") + self.plan = Plan() + + async def _plan_and_act(self): + + # create initial plan and update until confirmation + await self._update_plan() + + while self.plan.current_task: + task = self.plan.current_task + logger.info(f"ready to take on task {task}") + + # take on current task + code, result, success = await self._write_and_exec_code() + + # ask for acceptance, users can other refuse and change tasks in the plan + task_result_confirmed = await self._ask_review() + + if success and task_result_confirmed: + # tick off this task and record progress + task.code = code + task.result = result + self.plan.finish_current_task() + + else: + # update plan according to user's feedback and to take on changed tasks + await self._update_plan() + + async def _write_and_exec_code(self, max_retry: int = 3): + counter = 0 + success = False + while not success and counter < max_retry: + context = self.get_memories() + + code = "print('abc')" + # code = await WriteCodeFunction().run(context=context) + # code = await WriteCodeWithOps.run(context, task, result) + self._rc.memory.add(Message(content=code, role="assistant", cause_by=WriteCodeFunction)) + + result, success = await ExecutePyCode().run(code) + self._rc.memory.add(Message(content=result, role="assistant", cause_by=ExecutePyCode)) + + # if not success: + # await self._ask_review() + + counter += 1 + + return code, result, success + + async def _ask_review(self): + context = self.get_memories() + review, confirmed = await AskReview().run(context=context[-5:], plan=self.plan) + self._rc.memory.add(Message(content=review, role="assistant", cause_by=AskReview)) + return confirmed + + async def _update_plan(self, max_tasks: int = 3): + current_plan = str([task.json() for task in self.plan.tasks]) + plan_confirmed = False + while not plan_confirmed: + context = self.get_memories() + rsp = await WritePlan().run(context, current_plan=current_plan, max_tasks=max_tasks) + self._rc.memory.add(Message(content=rsp, role="assistant", cause_by=WritePlan)) + plan_confirmed = await self._ask_review() + + tasks = WritePlan.rsp_to_tasks(rsp) + self.plan.add_tasks(tasks) + + +if __name__ == "__main__": + # requirement = "create a normal distribution and visualize it" + requirement = "run some analysis on iris dataset" + + async def main(requirement: str = requirement): + role = MLEngineer() + await role.run(requirement) + + fire.Fire(main) diff --git a/metagpt/schema.py b/metagpt/schema.py index 4bada005a..3cd7d9730 100644 --- a/metagpt/schema.py +++ b/metagpt/schema.py @@ -73,6 +73,115 @@ class AIMessage(Message): super().__init__(content, 'assistant') +class Task(BaseModel): + task_id: str = "" + dependent_task_ids: list[str] = [] # Tasks prerequisite to this Task + instruction: str = "" + task_type: str = "" + code: str = "" + result: str = "" + is_finished: bool = False + + +class Plan(BaseModel): + tasks: list[Task] = [] + task_map: dict[str, Task] = {} + current_task_id = "" + + def _topological_sort(self, tasks: list[Task]): + task_map = {task.task_id: task for task in tasks} + dependencies = {task.task_id: set(task.dependent_task_ids) for task in tasks} + sorted_tasks = [] + visited = set() + + def visit(task_id): + if task_id in visited: + return + visited.add(task_id) + for dependent_id in dependencies.get(task_id, []): + visit(dependent_id) + sorted_tasks.append(task_map[task_id]) + + for task in tasks: + visit(task.task_id) + + return sorted_tasks + + def add_tasks(self, tasks: list[Task]): + """ + Integrates new tasks into the existing plan, ensuring dependency order is maintained. + + This method performs two primary functions based on the current state of the task list: + 1. If there are no existing tasks, it topologically sorts the provided tasks to ensure + correct execution order based on dependencies, and sets these as the current tasks. + 2. If there are existing tasks, it merges the new tasks with the existing ones. It maintains + any common prefix of tasks (based on task_id and instruction) and appends the remainder + of the new tasks. The current task is updated to the first unfinished task in this merged list. + + Args: + tasks (list[Task]): A list of tasks (may be unordered) to add to the plan. + + Returns: + None: The method updates the internal state of the plan but does not return anything. + """ + if not tasks: + return + + # Topologically sort the new tasks to ensure correct dependency order + new_tasks = self._topological_sort(tasks) + + if not self.tasks: + # If there are no existing tasks, set the new tasks as the current tasks + self.tasks = new_tasks + + else: + # Find the length of the common prefix between existing and new tasks + prefix_length = 0 + for old_task, new_task in zip(self.tasks, new_tasks): + if old_task.task_id != new_task.task_id or old_task.instruction != new_task.instruction: + break + prefix_length += 1 + + # Combine the common prefix with the remainder of the new tasks + final_tasks = self.tasks[:prefix_length] + new_tasks[prefix_length:] + self.tasks = final_tasks + + # Update current_task_id to the first unfinished task in the merged list + for task in self.tasks: + if not task.is_finished: + self.current_task_id = task.task_id + break + + # Update the task map for quick access to tasks by ID + self.task_map = {task.task_id: task for task in self.tasks} + + @property + def current_task(self) -> Task: + """Find current task to execute + + Returns: + Task: the current task to be executed + """ + return self.task_map.get(self.current_task_id, None) + + def finish_current_task(self): + """Finish current task, set Task.is_finished=True, set current task to next task + """ + if self.current_task_id: + current_task = self.current_task + current_task.is_finished = True + next_task_index = self.tasks.index(current_task) + 1 + self.current_task_id = self.tasks[next_task_index].task_id if next_task_index < len(self.tasks) else None + + def get_finished_tasks(self) -> list[Task]: + """return all finished tasks in correct linearized order + + Returns: + list[Task]: list of finished tasks + """ + return [task for task in self.tasks if task.is_finished] + + if __name__ == '__main__': test_content = 'test_message' msgs = [ diff --git a/requirements.txt b/requirements.txt index 53176bd0a..c0f466457 100644 --- a/requirements.txt +++ b/requirements.txt @@ -45,4 +45,8 @@ semantic-kernel==0.3.13.dev0 wrapt==1.15.0 websocket-client==0.58.0 zhipuai==1.0.7 -rich==13.6.0 \ No newline at end of file +rich==13.6.0 +nbclient==0.9.0 +nbformat==5.9.2 +ipython==8.17.2 +ipykernel==6.27.0 \ No newline at end of file diff --git a/tests/metagpt/test_schema.py b/tests/metagpt/test_schema.py index 12666e0d3..6aae82006 100644 --- a/tests/metagpt/test_schema.py +++ b/tests/metagpt/test_schema.py @@ -6,6 +6,7 @@ @File : test_schema.py """ from metagpt.schema import AIMessage, Message, SystemMessage, UserMessage +from metagpt.schema import Task, Plan def test_messages(): @@ -19,3 +20,87 @@ def test_messages(): text = str(msgs) roles = ['user', 'system', 'assistant', 'QA'] assert all([i in text for i in roles]) + + +class TestPlan: + def test_add_tasks_ordering(self): + plan = Plan() + + tasks = [ + Task(task_id="1", dependent_task_ids=["2", "3"], instruction="Third"), + Task(task_id="2", instruction="First"), + Task(task_id="3", dependent_task_ids=["2"], instruction="Second") + ] # 2 -> 3 -> 1 + plan.add_tasks(tasks) + + assert [task.task_id for task in plan.tasks] == ["2", "3", "1"] + + def test_add_tasks_to_existing_no_common_prefix(self): + plan = Plan() + + tasks = [ + Task(task_id="1", dependent_task_ids=["2", "3"], instruction="Third"), + Task(task_id="2", instruction="First"), + Task(task_id="3", dependent_task_ids=["2"], instruction="Second", is_finished=True) + ] # 2 -> 3 -> 1 + plan.add_tasks(tasks) + + new_tasks = [Task(task_id="3", instruction="")] + plan.add_tasks(new_tasks) + + assert [task.task_id for task in plan.tasks] == ["3"] + assert not plan.tasks[0].is_finished # must be the new unfinished task + + def test_add_tasks_to_existing_with_common_prefix(self): + plan = Plan() + + tasks = [ + Task(task_id="1", dependent_task_ids=["2", "3"], instruction="Third"), + Task(task_id="2", instruction="First"), + Task(task_id="3", dependent_task_ids=["2"], instruction="Second") + ] # 2 -> 3 -> 1 + plan.add_tasks(tasks) + plan.finish_current_task() # finish 2 + plan.finish_current_task() # finish 3 + + new_tasks = [ + Task(task_id="4", dependent_task_ids=["3"], instruction="Third"), + Task(task_id="2", instruction="First"), + Task(task_id="3", dependent_task_ids=["2"], instruction="Second") + ] # 2 -> 3 -> 4, so the common prefix is 2 -> 3, and these two should be obtained from the existing tasks + plan.add_tasks(new_tasks) + + assert [task.task_id for task in plan.tasks] == ["2", "3", "4"] + assert plan.tasks[0].is_finished and plan.tasks[1].is_finished # "2" and "3" should be the original finished one + assert plan.current_task_id == "4" + + def test_current_task(self): + plan = Plan() + tasks = [ + Task(task_id="1", dependent_task_ids=["2"], instruction="Second"), + Task(task_id="2", instruction="First") + ] + plan.add_tasks(tasks) + assert plan.current_task.task_id == "2" + + def test_finish_task(self): + plan = Plan() + tasks = [ + Task(task_id="1", instruction="First"), + Task(task_id="2", dependent_task_ids=["1"], instruction="Second") + ] + plan.add_tasks(tasks) + plan.finish_current_task() + assert plan.current_task.task_id == "2" + + def test_finished_tasks(self): + plan = Plan() + tasks = [ + Task(task_id="1", instruction="First"), + Task(task_id="2", dependent_task_ids=["1"], instruction="Second") + ] + plan.add_tasks(tasks) + plan.finish_current_task() + finished_tasks = plan.get_finished_tasks() + assert len(finished_tasks) == 1 + assert finished_tasks[0].task_id == "1"