diff --git a/kaggle_team.py b/kaggle_team.py index 50a8f7288..e9f3e67de 100644 --- a/kaggle_team.py +++ b/kaggle_team.py @@ -1,6 +1,5 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- -import asyncio import fire @@ -8,6 +7,7 @@ from metagpt.roles.kaggle_manager import KaggleManager from metagpt.roles.ml_engineer import MLEngineer from metagpt.team import Team + async def main( # competition: str, # data_desc: str, @@ -21,7 +21,7 @@ async def main( "Training set is train.csv.\nTest set is test.csv. We also include gender_submission.csv, a set of predictions that assume all and only female passengers survive, as an example of what a submission file should look like.", # "Run EDA on the train dataset, train a model to predict survival (20% as validation) and save it, predict the test set using saved model, save the test result according to format", # "generate a random prediction, replace the Survived column of gender_submission.csv, and save the prediction to a new submission file", - "Score as high as possible for the provided dataset, save the test prediction to a csv with two columns PassengerId and Survived" + "Score as high as possible for the provided dataset, save the test prediction to a csv with two columns PassengerId and Survived", ) team = Team() @@ -36,5 +36,6 @@ async def main( team.start_project(requirement) await team.run(n_round=n_round) -if __name__ == '__main__': + +if __name__ == "__main__": fire.Fire(main) diff --git a/metagpt/actions/ask_review.py b/metagpt/actions/ask_review.py index eec5e49aa..85ac33bd8 100644 --- a/metagpt/actions/ask_review.py +++ b/metagpt/actions/ask_review.py @@ -1,8 +1,8 @@ from typing import List from metagpt.actions import Action -from metagpt.schema import Message, Plan from metagpt.logs import logger +from metagpt.schema import Message, Plan class ReviewConst: @@ -23,17 +23,10 @@ class ReviewConst: class AskReview(Action): - async def run( - self, context: List[Message], plan: Plan = None, trigger: str = "task" - ): + async def run(self, context: List[Message], plan: Plan = None, trigger: str = "task"): logger.info("Current overall plan:") logger.info( - "\n".join( - [ - f"{task.task_id}: {task.instruction}, is_finished: {task.is_finished}" - for task in plan.tasks - ] - ) + "\n".join([f"{task.task_id}: {task.instruction}, is_finished: {task.is_finished}" for task in plan.tasks]) ) logger.info("most recent context:") diff --git a/metagpt/actions/debug_code.py b/metagpt/actions/debug_code.py index 3e1705d8e..be09f3493 100644 --- a/metagpt/actions/debug_code.py +++ b/metagpt/actions/debug_code.py @@ -1,9 +1,9 @@ -from typing import Dict, List, Union, Tuple, Optional, Any +from typing import Any, List, Optional -from metagpt.logs import logger -from metagpt.schema import Message, Plan -from metagpt.utils.common import CodeParser, create_func_config from metagpt.actions.write_analysis_code import BaseWriteAnalysisCode +from metagpt.logs import logger +from metagpt.schema import Message +from metagpt.utils.common import create_func_config DEBUG_REFLECTION_EXAMPLE = ''' Example 1: @@ -113,9 +113,7 @@ class DebugCode(BaseWriteAnalysisCode): # msg = messages_to_str(info) # resp = await self.llm.aask(msg=msg) - resp = await self.llm.aask_code( - messages=info, **create_func_config(CODE_REFLECTION) - ) + resp = await self.llm.aask_code(messages=info, **create_func_config(CODE_REFLECTION)) logger.info(f"reflection is {resp}") return resp diff --git a/metagpt/actions/execute_code.py b/metagpt/actions/execute_code.py index d192ca79a..b2f6067ab 100644 --- a/metagpt/actions/execute_code.py +++ b/metagpt/actions/execute_code.py @@ -4,23 +4,23 @@ @Author : orange-crow @File : code_executor.py """ +import re +import traceback from abc import ABC, abstractmethod from pathlib import Path from typing import Dict, List, Tuple, Union -import traceback -import re import nbformat from nbclient import NotebookClient -from nbclient.exceptions import DeadKernelError, CellTimeoutError +from nbclient.exceptions import CellTimeoutError, DeadKernelError from nbformat import NotebookNode from nbformat.v4 import new_code_cell, new_output from rich.console import Console from rich.syntax import Syntax from metagpt.actions import Action -from metagpt.schema import Message from metagpt.logs import logger +from metagpt.schema import Message class ExecuteCode(ABC): @@ -113,7 +113,9 @@ class ExecutePyCode(ExecuteCode, Action): if "image/png" in output["data"]: self.show_bytes_figure(output["data"]["image/png"], self.interaction) else: - logger.info(f"{i}th output['data'] from nbclient outputs dont have image/png, continue next output ...") + logger.info( + f"{i}th output['data'] from nbclient outputs dont have image/png, continue next output ..." + ) elif output["output_type"] == "execute_result": parsed_output += output["data"]["text/plain"] return parsed_output @@ -148,7 +150,7 @@ class ExecutePyCode(ExecuteCode, Action): return False def _process_code(self, code: Union[str, Dict, Message], language: str = None) -> Tuple: - language = language or 'python' + language = language or "python" if isinstance(code, str) and Path(code).suffix in (".py", ".txt"): code = Path(code).read_text(encoding="utf-8") return code, language @@ -158,11 +160,11 @@ class ExecutePyCode(ExecuteCode, Action): if isinstance(code, dict): assert "code" in code if "language" not in code: - code['language'] = 'python' + code["language"] = "python" code, language = code["code"], code["language"] elif isinstance(code, Message): if isinstance(code.content, dict) and "language" not in code.content: - code.content["language"] = 'python' + code.content["language"] = "python" code, language = code.content["code"], code.content["language"] elif isinstance(code.content, str): code, language = code.content, language @@ -181,7 +183,7 @@ class ExecutePyCode(ExecuteCode, Action): except DeadKernelError: await self.reset() return False, "DeadKernelError" - except Exception as e: + except Exception: return False, f"{traceback.format_exc()}" async def run(self, code: Union[str, Dict, Message], language: str = "python") -> Tuple[str, bool]: @@ -224,6 +226,6 @@ def truncate(result: str, keep_len: int = 2000) -> str: def remove_escape_and_color_codes(input_str): # 使用正则表达式去除转义字符和颜色代码 - pattern = re.compile(r'\x1b\[[0-9;]*[mK]') - result = pattern.sub('', input_str) + pattern = re.compile(r"\x1b\[[0-9;]*[mK]") + result = pattern.sub("", input_str) return result diff --git a/metagpt/actions/ml_da_action.py b/metagpt/actions/ml_da_action.py index 50d1d2420..3ab5e0429 100644 --- a/metagpt/actions/ml_da_action.py +++ b/metagpt/actions/ml_da_action.py @@ -1,14 +1,9 @@ import json -from typing import Dict, List, Union from metagpt.actions import Action -from metagpt.schema import Message, Plan -from metagpt.utils.common import CodeParser, remove_comments, create_func_config -from metagpt.logs import logger -from metagpt.prompts.ml_engineer import ( - UPDATE_DATA_COLUMNS, - PRINT_DATA_COLUMNS -) +from metagpt.prompts.ml_engineer import PRINT_DATA_COLUMNS, UPDATE_DATA_COLUMNS +from metagpt.schema import Plan +from metagpt.utils.common import CodeParser, create_func_config, remove_comments class SummarizeAnalysis(Action): diff --git a/metagpt/actions/write_analysis_code.py b/metagpt/actions/write_analysis_code.py index 21add3159..b0c8dab3b 100644 --- a/metagpt/actions/write_analysis_code.py +++ b/metagpt/actions/write_analysis_code.py @@ -4,25 +4,24 @@ @Author : orange-crow @File : write_code_v2.py """ -from typing import Dict, List, Union, Tuple -from tenacity import retry, stop_after_attempt, wait_fixed -from pathlib import Path import re -import json +from pathlib import Path +from typing import Dict, List, Tuple, Union import yaml +from tenacity import retry, stop_after_attempt, wait_fixed from metagpt.actions import Action from metagpt.llm import LLM from metagpt.logs import logger from metagpt.prompts.ml_engineer import ( - TOOL_RECOMMENDATION_PROMPT, - SELECT_FUNCTION_TOOLS, CODE_GENERATOR_WITH_TOOLS, - TOOL_USAGE_PROMPT, - ML_SPECIFIC_PROMPT, - ML_MODULE_MAP, GENERATE_CODE_PROMPT, + ML_MODULE_MAP, + ML_SPECIFIC_PROMPT, + SELECT_FUNCTION_TOOLS, + TOOL_RECOMMENDATION_PROMPT, + TOOL_USAGE_PROMPT, ) from metagpt.schema import Message, Plan from metagpt.utils.common import create_func_config, remove_comments @@ -52,24 +51,16 @@ class BaseWriteAnalysisCode(Action): messages.append(p.content["code"]) # 添加默认的提示词 - if ( - default_system_msg not in messages[0]["content"] - and messages[0]["role"] != "system" - ): + if default_system_msg not in messages[0]["content"] and messages[0]["role"] != "system": messages.insert(0, {"role": "system", "content": default_system_msg}) - elif ( - default_system_msg not in messages[0]["content"] - and messages[0]["role"] == "system" - ): + elif default_system_msg not in messages[0]["content"] and messages[0]["role"] == "system": messages[0] = { "role": "system", "content": messages[0]["content"] + default_system_msg, } return messages - async def run( - self, context: List[Message], plan: Plan = None, code_steps: str = "" - ) -> str: + async def run(self, context: List[Message], plan: Plan = None, code_steps: str = "") -> str: """Run of a code writing action, used in data analysis or modeling Args: @@ -115,7 +106,7 @@ class WriteCodeWithTools(BaseWriteAnalysisCode): def _load_tools(self, schema_path, schema_module=None): """Load tools from yaml file""" if isinstance(schema_path, dict): - schema_module = schema_module or 'udf' + schema_module = schema_module or "udf" self.available_tools.update({schema_module: schema_path}) else: if isinstance(schema_path, list): @@ -197,9 +188,7 @@ class WriteCodeWithTools(BaseWriteAnalysisCode): available_tools = {k: v["description"] for k, v in available_tools.items()} recommend_tools = await self._tool_recommendation( - plan.current_task.instruction, - code_steps, - available_tools + plan.current_task.instruction, code_steps, available_tools ) tool_catalog = self._parse_recommend_tools(task_type, recommend_tools) logger.info(f"Recommended tools: \n{recommend_tools}") @@ -216,8 +205,7 @@ class WriteCodeWithTools(BaseWriteAnalysisCode): module_name=module_name, tool_catalog=tool_catalog, ) - - + else: prompt = GENERATE_CODE_PROMPT.format( user_requirement=plan.goal, @@ -245,7 +233,7 @@ class MakeTools(WriteCodeByGenerate): 5. Only use the imported packages** """ - def __init__(self, name: str = '', context: list[Message] = None, llm: LLM = None, workspace: str = None): + def __init__(self, name: str = "", context: list[Message] = None, llm: LLM = None, workspace: str = None): """ :param str name: name, defaults to '' :param list[Message] context: context, defaults to None @@ -254,12 +242,12 @@ class MakeTools(WriteCodeByGenerate): """ super().__init__(name, context, llm) self.workspace = workspace or str(Path(__file__).parents[1].joinpath("./tools/functions/libs/udf")) - self.file_suffix: str = '.py' + self.file_suffix: str = ".py" self.context = [] def parse_function_name(self, function_code: str) -> str: # 定义正则表达式模式 - pattern = r'\bdef\s+([a-zA-Z_]\w*)\s*\(' + pattern = r"\bdef\s+([a-zA-Z_]\w*)\s*\(" # 在代码中搜索匹配的模式 match = re.search(pattern, function_code) # 如果找到匹配项,则返回匹配的函数名;否则返回None @@ -272,9 +260,9 @@ class MakeTools(WriteCodeByGenerate): func_name = self.parse_function_name(tool_code) if func_name is None: raise ValueError(f"No function name found in {tool_code}") - saved_path = Path(self.workspace).joinpath(func_name+self.file_suffix) + saved_path = Path(self.workspace).joinpath(func_name + self.file_suffix) logger.info(f"Saved tool_code {func_name} in {str(saved_path)}.") - saved_path.write_text(tool_code, encoding='utf-8') + saved_path.write_text(tool_code, encoding="utf-8") @retry(stop=stop_after_attempt(3), wait=wait_fixed(1)) async def run(self, code: Union[str, List[dict]], code_desc: str = None, **kwargs) -> str: @@ -287,27 +275,31 @@ class MakeTools(WriteCodeByGenerate): logger.info(f"\n\nAsk to Make tools:\n{'-'*60}\n {self.context[-1]}") # 更新kwargs - if 'code' in kwargs: - kwargs.pop('code') - if 'code_desc' in kwargs: - kwargs.pop('code_desc') + if "code" in kwargs: + kwargs.pop("code") + if "code_desc" in kwargs: + kwargs.pop("code_desc") max_tries, current_try = 3, 0 while True: tool_code = await self.llm.aask_code(self.context, **kwargs) - func_name = self.parse_function_name(tool_code['code']) + func_name = self.parse_function_name(tool_code["code"]) current_try += 1 # make tools failed, add error message to context. if not func_name: logger.info(f"\n\nTools Respond\n{'-'*60}\n: {tool_code}") logger.error(f"No function name found in code, we will retry make tools.\n{tool_code['code']}\n") - self.context.append({'role': 'user', 'content': 'We need a general function in above code,but not found function.'}) + self.context.append( + {"role": "user", "content": "We need a general function in above code,but not found function."} + ) # end make tools if func_name is not None or current_try >= max_tries: if current_try >= max_tries: - logger.error(f"We have tried the maximum number of attempts {max_tries}\ - and still have not created tools successfully, we will skip it.") + logger.error( + f"We have tried the maximum number of attempts {max_tries}\ + and still have not created tools successfully, we will skip it." + ) break logger.info(f"\n\nTools Respond\n{'-'*60}\n: {tool_code}") - self.save(tool_code['code']) + self.save(tool_code["code"]) return tool_code["code"] diff --git a/metagpt/actions/write_code_steps.py b/metagpt/actions/write_code_steps.py index 79f3e5902..7ba22fde4 100644 --- a/metagpt/actions/write_code_steps.py +++ b/metagpt/actions/write_code_steps.py @@ -1,9 +1,7 @@ - import json -from typing import Dict, List, Union from metagpt.actions import Action -from metagpt.schema import Message, Task, Plan +from metagpt.schema import Plan from metagpt.utils.common import CodeParser # CODE_STEPS_PROMPT_TEMPLATE = """ @@ -79,7 +77,6 @@ STRUCTURAL_CONTEXT = """ class WriteCodeSteps(Action): - async def run(self, plan: Plan) -> str: """Run of a task guide writing action, used in ml engineer @@ -91,9 +88,7 @@ class WriteCodeSteps(Action): """ context = self.get_context(plan) - code_steps_prompt = CODE_STEPS_PROMPT_TEMPLATE.replace( - "{context}", context - ) + code_steps_prompt = CODE_STEPS_PROMPT_TEMPLATE.replace("{context}", context) code_steps = await self._aask(code_steps_prompt) code_steps = CodeParser.parse_code(block=None, text=code_steps) return code_steps @@ -102,19 +97,16 @@ class WriteCodeSteps(Action): user_requirement = plan.goal # select_task_keys = ['task_id', 'instruction', 'is_finished', 'code'] # select_task_keys = ['task_id','instruction'] - + def process_task(task): task_dict = task.dict() # ptask = {k: task_dict[k] for k in task_dict if k in select_task_keys } ptask = f"task_id_{task_dict['task_id']}:{task_dict['instruction']}" return ptask - - - tasks = json.dumps( - [process_task(task) for task in plan.tasks], indent=4, ensure_ascii=False - ) - - code_lists = [task.code for task in plan.tasks if task.is_finished==True] + + tasks = json.dumps([process_task(task) for task in plan.tasks], indent=4, ensure_ascii=False) + + code_lists = [task.code for task in plan.tasks if task.is_finished == True] codes = "\n\n".join(code_lists) current_task = json.dumps(process_task(plan.current_task)) if plan.current_task else {} context = STRUCTURAL_CONTEXT.format( diff --git a/metagpt/actions/write_plan.py b/metagpt/actions/write_plan.py index d90138d46..d2553e609 100644 --- a/metagpt/actions/write_plan.py +++ b/metagpt/actions/write_plan.py @@ -4,16 +4,15 @@ @Author : orange-crow @File : plan.py """ -from typing import List, Dict, Tuple import json from copy import deepcopy -import traceback +from typing import Dict, List, Tuple from metagpt.actions import Action -from metagpt.prompts.ml_engineer import ASSIGN_TASK_TYPE_PROMPT, ASSIGN_TASK_TYPE_CONFIG -from metagpt.schema import Message, Task, Plan -from metagpt.utils.common import CodeParser, create_func_config from metagpt.logs import logger +from metagpt.prompts.ml_engineer import ASSIGN_TASK_TYPE_CONFIG, ASSIGN_TASK_TYPE_PROMPT +from metagpt.schema import Message, Plan, Task +from metagpt.utils.common import CodeParser, create_func_config class WritePlan(Action): @@ -46,9 +45,7 @@ class WritePlan(Action): Returns: List[Dict]: tasks with task type assigned """ - task_list = "\n".join( - [f"Task {task['task_id']}: {task['instruction']}" for task in tasks] - ) + task_list = "\n".join([f"Task {task['task_id']}: {task['instruction']}" for task in tasks]) prompt = ASSIGN_TASK_TYPE_PROMPT.format(task_list=task_list) tool_config = create_func_config(ASSIGN_TASK_TYPE_CONFIG) rsp = await self.llm.aask_code(prompt, **tool_config) @@ -57,9 +54,7 @@ class WritePlan(Action): task["task_type"] = task_type return json.dumps(tasks) - async def run( - self, context: List[Message], max_tasks: int = 5, use_tools: bool = False - ) -> str: + async def run(self, context: List[Message], max_tasks: int = 5, use_tools: bool = False) -> str: prompt = ( self.PROMPT_TEMPLATE.replace("__context__", "\n".join([str(ct) for ct in context])) # .replace("__current_plan__", current_plan) @@ -71,11 +66,13 @@ class WritePlan(Action): rsp = await self.assign_task_type(json.loads(rsp)) return rsp + def rsp_to_tasks(rsp: str) -> List[Task]: rsp = json.loads(rsp) tasks = [Task(**task_config) for task_config in rsp] return tasks + def update_plan_from_rsp(rsp: str, current_plan: Plan): tasks = rsp_to_tasks(rsp) if len(tasks) == 1 or tasks[0].dependent_task_ids: @@ -97,6 +94,7 @@ def update_plan_from_rsp(rsp: str, current_plan: Plan): # add tasks in general current_plan.add_tasks(tasks) + def precheck_update_plan_from_rsp(rsp: str, current_plan: Plan) -> Tuple[bool, str]: temp_plan = deepcopy(current_plan) try: diff --git a/metagpt/plan/__init__.py b/metagpt/plan/__init__.py index 5ad35e100..e69de29bb 100644 --- a/metagpt/plan/__init__.py +++ b/metagpt/plan/__init__.py @@ -1 +0,0 @@ -from metagpt.plan.planner import Planner \ No newline at end of file diff --git a/metagpt/plan/planner.py b/metagpt/plan/planner.py index 86b197256..dadc2e563 100644 --- a/metagpt/plan/planner.py +++ b/metagpt/plan/planner.py @@ -1,11 +1,14 @@ import json +from metagpt.actions.ask_review import AskReview, ReviewConst +from metagpt.actions.write_plan import ( + WritePlan, + precheck_update_plan_from_rsp, + update_plan_from_rsp, +) from metagpt.logs import logger from metagpt.memory import Memory from metagpt.schema import Message, Plan, Task, TaskResult -from metagpt.actions.ask_review import AskReview, ReviewConst -from metagpt.actions.write_plan import WritePlan, update_plan_from_rsp, precheck_update_plan_from_rsp - STRUCTURAL_CONTEXT = """ ## User Requirement @@ -27,16 +30,18 @@ class Planner: # memory for working on each task, discarded each time a task is done self.working_memory = working_memory - + @property def current_task(self): return self.plan.current_task - + @property def current_task_id(self): return self.plan.current_task_id - async def ask_review(self, task_result: TaskResult = None, auto_run: bool = None, trigger: str = ReviewConst.TASK_REVIEW_TRIGGER): + async def ask_review( + self, task_result: TaskResult = None, auto_run: bool = None, trigger: str = ReviewConst.TASK_REVIEW_TRIGGER + ): """ Ask to review the task result, reviewer needs to provide confirmation or request change. If human confirms the task result, then we deem the task completed, regardless of whether the code run succeeds; @@ -51,27 +56,26 @@ class Planner: return review, confirmed confirmed = task_result.is_success if task_result else True return "", confirmed - + async def confirm_task(self, task: Task, task_result: TaskResult, review: str): self.plan.update_task_result(task=task, task_result=task_result) self.plan.finish_current_task() self.working_memory.clear() - - confirmed_and_more = (ReviewConst.CONTINUE_WORD[0] in review.lower() - and review.lower() not in ReviewConst.CONTINUE_WORD[0]) # "confirm, ... (more content, such as changing downstream tasks)" + + confirmed_and_more = ( + ReviewConst.CONTINUE_WORD[0] in review.lower() and review.lower() not in ReviewConst.CONTINUE_WORD[0] + ) # "confirm, ... (more content, such as changing downstream tasks)" if confirmed_and_more: self.working_memory.add(Message(content=review, role="user", cause_by=AskReview)) await self.update_plan(review) - + async def update_plan(self, max_tasks: int = 3, max_retries: int = 3): plan_confirmed = False while not plan_confirmed: context = self.get_useful_memories() rsp = await WritePlan().run(context, max_tasks=max_tasks, use_tools=self.use_tools) - self.working_memory.add( - Message(content=rsp, role="assistant", cause_by=WritePlan) - ) - + self.working_memory.add(Message(content=rsp, role="assistant", cause_by=WritePlan)) + # precheck plan before asking reviews is_plan_valid, error = precheck_update_plan_from_rsp(rsp, self.plan) if not is_plan_valid and max_retries > 0: @@ -80,11 +84,11 @@ class Planner: self.working_memory.add(Message(content=error_msg, role="assistant", cause_by=WritePlan)) max_retries -= 1 continue - + _, plan_confirmed = await self.ask_review(trigger=ReviewConst.TASK_REVIEW_TRIGGER) - + update_plan_from_rsp(rsp=rsp, current_plan=self.plan) - + self.working_memory.clear() def get_useful_memories(self, task_exclude_field=None) -> list[Message]: @@ -93,7 +97,7 @@ class Planner: if task_exclude_field is None: # Shorten the context as we don't need code steps after we get the codes. # This doesn't affect current_task below, which should hold the code steps - task_exclude_field = {'code_steps'} + task_exclude_field = {"code_steps"} user_requirement = self.plan.goal context = self.plan.context tasks = [task.dict(exclude=task_exclude_field) for task in self.plan.tasks] @@ -103,5 +107,5 @@ class Planner: user_requirement=user_requirement, context=context, tasks=tasks, current_task=current_task ) context_msg = [Message(content=context, role="user")] - + return context_msg + self.working_memory.get() diff --git a/metagpt/prompts/ml_engineer.py b/metagpt/prompts/ml_engineer.py index 8fde85d86..9b873d39f 100644 --- a/metagpt/prompts/ml_engineer.py +++ b/metagpt/prompts/ml_engineer.py @@ -259,7 +259,7 @@ for col in num_cols: - Always copy the DataFrame before processing it and use the copy to process. - The output code should contain all steps implemented correctly in 'Code Steps'. """ -#- If 'Code Steps' contains step done in 'Done Tasks', such as reading data, don't repeat it. +# - If 'Code Steps' contains step done in 'Done Tasks', such as reading data, don't repeat it. DATA_PREPROCESS_PROMPT = """ The current task is about data preprocessing, please note the following: diff --git a/metagpt/provider/openai_api.py b/metagpt/provider/openai_api.py index 85362fca9..747e36480 100644 --- a/metagpt/provider/openai_api.py +++ b/metagpt/provider/openai_api.py @@ -22,7 +22,6 @@ from tenacity import ( retry_if_exception_type, stop_after_attempt, wait_random_exponential, - wait_fixed, ) from metagpt.config import CONFIG, Config, LLMProviderEnum diff --git a/metagpt/roles/code_interpreter.py b/metagpt/roles/code_interpreter.py index 437f15698..25890bc93 100644 --- a/metagpt/roles/code_interpreter.py +++ b/metagpt/roles/code_interpreter.py @@ -1,8 +1,7 @@ -import json from datetime import datetime -from metagpt.actions.execute_code import ExecutePyCode from metagpt.actions.ask_review import ReviewConst +from metagpt.actions.execute_code import ExecutePyCode from metagpt.actions.write_analysis_code import WriteCodeByGenerate from metagpt.logs import logger from metagpt.roles import Role @@ -12,7 +11,12 @@ from metagpt.utils.save_code import save_code_file class CodeInterpreter(Role): def __init__( - self, name="Charlie", profile="CodeInterpreter", goal="", auto_run=False, use_tools=False, + self, + name="Charlie", + profile="CodeInterpreter", + goal="", + auto_run=False, + use_tools=False, ): super().__init__(name=name, profile=profile, goal=goal) self._set_react_mode(react_mode="plan_and_act", auto_run=auto_run, use_tools=use_tools) @@ -21,9 +25,8 @@ class CodeInterpreter(Role): @property def working_memory(self): return self._rc.working_memory - - async def _plan_and_act(self): + async def _plan_and_act(self): rsp = await super()._plan_and_act() # save code using datetime.now or keywords related to the goal of your project (plan.goal). @@ -31,47 +34,40 @@ class CodeInterpreter(Role): save_code_file(name=project_record, code_context=self.execute_code.nb, file_format="ipynb") return rsp - + async def _act_on_task(self, current_task: Task) -> TaskResult: code, result, is_success = await self._write_and_exec_code() task_result = TaskResult(code=code, result=result, is_success=is_success) return task_result async def _write_and_exec_code(self, max_retry: int = 3): - counter = 0 success = False - + while not success and counter < max_retry: context = self.planner.get_useful_memories() logger.info("Write code with pure generation") - code = await WriteCodeByGenerate().run( - context=context, plan=self.planner.plan, temperature=0.0 - ) + code = await WriteCodeByGenerate().run(context=context, plan=self.planner.plan, temperature=0.0) cause_by = WriteCodeByGenerate - self.working_memory.add( - Message(content=code, role="assistant", cause_by=cause_by) - ) - + self.working_memory.add(Message(content=code, role="assistant", cause_by=cause_by)) + result, success = await self.execute_code.run(code) print(result) - self.working_memory.add( - Message(content=result, role="user", cause_by=ExecutePyCode) - ) - + self.working_memory.add(Message(content=result, role="user", cause_by=ExecutePyCode)) + if "!pip" in code: success = False - + counter += 1 - + if not success and counter >= max_retry: logger.info("coding failed!") review, _ = await self.planner.ask_review(auto_run=False, trigger=ReviewConst.CODE_REVIEW_TRIGGER) if ReviewConst.CHANGE_WORD[0] in review: counter = 0 # redo the task again with help of human suggestions - + return code, result, success diff --git a/metagpt/roles/kaggle_manager.py b/metagpt/roles/kaggle_manager.py index cad12a16a..e12f47051 100644 --- a/metagpt/roles/kaggle_manager.py +++ b/metagpt/roles/kaggle_manager.py @@ -1,25 +1,23 @@ -from typing import Dict, List, Union, Tuple import json -import subprocess import os +import subprocess import fire import pandas as pd +from metagpt.actions import Action, BossRequirement +from metagpt.actions.ml_da_action import SummarizeAnalysis from metagpt.config import CONFIG from metagpt.const import WORKSPACE_ROOT -from metagpt.roles import Role -from metagpt.actions import Action, BossRequirement -from metagpt.actions.ask_review import AskReview -from metagpt.actions.ml_da_action import SummarizeAnalysis -from metagpt.schema import Message, Task, Plan from metagpt.logs import logger +from metagpt.roles import Role +from metagpt.schema import Message from metagpt.utils.common import CodeParser - os.environ["KAGGLE_USERNAME"] = CONFIG.kaggle_username os.environ["KAGGLE_KEY"] = CONFIG.kaggle_key + def run_command(cmd): print(cmd) output = subprocess.run(cmd, shell=True, capture_output=True, text=True) @@ -30,21 +28,21 @@ def run_command(cmd): print(output.stdout) return output.stdout -class DownloadData(Action): +class DownloadData(Action): async def run(self, competition, data_desc="") -> str: data_path = WORKSPACE_ROOT / competition - + output = run_command(f"kaggle competitions list --search {competition}") assert output != "No competitions found", "You must provide the correct competition name" - + run_command(f"kaggle competitions download {competition} --path {WORKSPACE_ROOT}") - + if not os.path.exists(data_path): - # if True: + # if True: # run_command(f"rm -r {data_path / '*'}") run_command(f"unzip -o {WORKSPACE_ROOT / '*.zip'} -d {data_path}") # FIXME: not safe - + file_list = run_command(f"ls {data_path}") rsp = f""" @@ -55,6 +53,7 @@ class DownloadData(Action): """ return rsp + class SubmitResult(Action): PROMPT_TEMPLATE = """ # Summary @@ -85,9 +84,9 @@ class SubmitResult(Action): run_command(f"kaggle competitions submit {competition} -f {submit_file_path} -m '{submit_message}'") run_command(f"kaggle competitions leaderboard --show --csv {competition} > {data_path / 'leaderboard.csv'}") run_command(f"kaggle competitions submissions --csv {competition} > {data_path / 'submission.csv'}") - - leaderboard = pd.read_csv(data_path / 'leaderboard.csv') - submission = pd.read_csv(data_path / 'submission.csv') + + leaderboard = pd.read_csv(data_path / "leaderboard.csv") + submission = pd.read_csv(data_path / "submission.csv") print(submission) # submission.to_json(orient="records") submission_score = submission.loc[0, "publicScore"] @@ -106,9 +105,7 @@ class SubmitResult(Action): class KaggleManager(Role): - def __init__( - self, name="ABC", profile="KaggleManager", goal="", competition="titanic", data_desc="" - ): + def __init__(self, name="ABC", profile="KaggleManager", goal="", competition="titanic", data_desc=""): super().__init__(name=name, profile=profile, goal=goal) self._init_actions([DownloadData, SubmitResult]) self._watch([BossRequirement, SummarizeAnalysis]) @@ -130,13 +127,16 @@ class KaggleManager(Role): rsp = await todo.run(self.competition, self.data_desc) elif isinstance(todo, SubmitResult): - submit_message = self.get_memories()[-1].content # use analysis summary from MLEngineer as submission message + submit_message = self.get_memories()[ + -1 + ].content # use analysis summary from MLEngineer as submission message rsp = await todo.run(competition=self.competition, submit_message=submit_message) msg = Message(content=rsp, role="user", cause_by=type(todo)) return msg + if __name__ == "__main__": competition, data_desc, requirement = ( "titanic", @@ -151,4 +151,4 @@ if __name__ == "__main__": # await role.run(Message(content="", cause_by=BossRequirement)) await role.run(Message(content=summary, cause_by=SummarizeAnalysis)) - fire.Fire(main) \ No newline at end of file + fire.Fire(main) diff --git a/metagpt/roles/ml_engineer.py b/metagpt/roles/ml_engineer.py index eef6dbd21..a631daa47 100644 --- a/metagpt/roles/ml_engineer.py +++ b/metagpt/roles/ml_engineer.py @@ -1,36 +1,46 @@ import json +from metagpt.actions.ask_review import ReviewConst from metagpt.actions.debug_code import DebugCode from metagpt.actions.execute_code import ExecutePyCode -from metagpt.actions.ask_review import ReviewConst -from metagpt.actions.write_analysis_code import WriteCodeByGenerate, WriteCodeWithTools, MakeTools +from metagpt.actions.ml_da_action import Reflect, SummarizeAnalysis, UpdateDataColumns +from metagpt.actions.write_analysis_code import ( + MakeTools, + WriteCodeByGenerate, + WriteCodeWithTools, +) from metagpt.actions.write_code_steps import WriteCodeSteps from metagpt.const import PROJECT_ROOT from metagpt.logs import logger -from metagpt.schema import Message -from metagpt.utils.common import remove_comments -from metagpt.actions.ml_da_action import SummarizeAnalysis, Reflect, UpdateDataColumns from metagpt.roles.code_interpreter import CodeInterpreter from metagpt.roles.kaggle_manager import DownloadData, SubmitResult +from metagpt.schema import Message from metagpt.tools.functions.libs.udf import UDFS_YAML +from metagpt.utils.common import remove_comments class MLEngineer(CodeInterpreter): def __init__( - self, name="Mark", profile="MLEngineer", goal="", auto_run=False, use_tools=False, use_code_steps=False, - make_udfs=False, use_udfs=False + self, + name="Mark", + profile="MLEngineer", + goal="", + auto_run=False, + use_tools=False, + use_code_steps=False, + make_udfs=False, + use_udfs=False, ): super().__init__(name=name, profile=profile, goal=goal, auto_run=auto_run, use_tools=use_tools) self._watch([DownloadData, SubmitResult]) self.use_tools = use_tools self.use_code_steps = use_code_steps - self.make_udfs = make_udfs # user-defined functions + self.make_udfs = make_udfs # user-defined functions self.use_udfs = use_udfs self.data_desc = {} - + async def _plan_and_act(self): - ### Actions in a multi-agent multi-turn setting, a new attempt on the data ### memories = self.get_memories() if memories: @@ -40,64 +50,62 @@ class MLEngineer(CodeInterpreter): elif latest_event == SubmitResult: # self reflect on previous plan outcomes and think about how to improve the plan, add to working memory await self._reflect() - + # get feedback for improvement from human, add to working memory await self.planner.ask_review(trigger=ReviewConst.TASK_REVIEW_TRIGGER) - + ### general plan process ### await super()._plan_and_act() - + ### summarize analysis ### summary = await SummarizeAnalysis().run(self.planner.plan) rsp = Message(content=summary, cause_by=SummarizeAnalysis) self._rc.memory.add(rsp) - + return rsp async def _write_and_exec_code(self, max_retry: int = 3): self.planner.current_task.code_steps = ( - await WriteCodeSteps().run(self.planner.plan) - if self.use_code_steps - else "" + await WriteCodeSteps().run(self.planner.plan) if self.use_code_steps else "" ) - + counter = 0 success = False debug_context = [] - - while not success and counter < max_retry: + while not success and counter < max_retry: context = self.planner.get_useful_memories() if counter > 0 and (self.use_tools or self.use_udfs): - logger.warning('We got a bug code, now start to debug...') + logger.warning("We got a bug code, now start to debug...") code = await DebugCode().run( plan=self.planner.current_task.instruction, code=code, runtime_result=self.working_memory.get(), - context=debug_context + context=debug_context, ) logger.info(f"new code \n{code}") cause_by = DebugCode - + elif (not self.use_tools and not self.use_udfs) or ( - self.planner.current_task.task_type == 'other' and not self.use_udfs): + self.planner.current_task.task_type == "other" and not self.use_udfs + ): logger.info("Write code with pure generation") - code = await WriteCodeByGenerate().run( - context=context, plan=self.planner.plan, temperature=0.0 - ) - debug_context = [self.planner.get_useful_memories(task_exclude_field={'result', 'code_steps'})[0]] + code = await WriteCodeByGenerate().run(context=context, plan=self.planner.plan, temperature=0.0) + debug_context = [self.planner.get_useful_memories(task_exclude_field={"result", "code_steps"})[0]] cause_by = WriteCodeByGenerate - + else: logger.info("Write code with tools") if self.use_udfs: # use user-defined function tools. logger.warning("Writing code with user-defined function tools by WriteCodeWithTools.") - logger.info(f"Local user defined function as following:\ - \n{json.dumps(list(UDFS_YAML.keys()), indent=2, ensure_ascii=False)}") + logger.info( + f"Local user defined function as following:\ + \n{json.dumps(list(UDFS_YAML.keys()), indent=2, ensure_ascii=False)}" + ) # set task_type to `udf` - self.planner.current_task.task_type = 'udf' + self.planner.current_task.task_type = "udf" schema_path = UDFS_YAML else: schema_path = PROJECT_ROOT / "metagpt/tools/functions/schemas" @@ -108,26 +116,22 @@ class MLEngineer(CodeInterpreter): ) debug_context = tool_context cause_by = WriteCodeWithTools - - self.working_memory.add( - Message(content=code, role="assistant", cause_by=cause_by) - ) - + + self.working_memory.add(Message(content=code, role="assistant", cause_by=cause_by)) + result, success = await self.execute_code.run(code) print(result) # make tools for successful code and long code. - if success and self.make_udfs and len(remove_comments(code).split('\n')) > 4: - logger.info('Execute code successfully. Now start to make tools ...') + if success and self.make_udfs and len(remove_comments(code).split("\n")) > 4: + logger.info("Execute code successfully. Now start to make tools ...") await self.make_tools(code=code) - self.working_memory.add( - Message(content=result, role="user", cause_by=ExecutePyCode) - ) - + self.working_memory.add(Message(content=result, role="user", cause_by=ExecutePyCode)) + if "!pip" in code: success = False - + counter += 1 - + if not success and counter >= max_retry: logger.info("coding failed!") review, _ = await self.planner.ask_review(auto_run=False, trigger=ReviewConst.CODE_REVIEW_TRIGGER) @@ -135,13 +139,15 @@ class MLEngineer(CodeInterpreter): counter = 0 # redo the task again with help of human suggestions if success: - if (self.use_tools and self.planner.current_task.task_type not in ['model_train', 'model_evaluate']) or self.use_udfs: + if ( + self.use_tools and self.planner.current_task.task_type not in ["model_train", "model_evaluate"] + ) or self.use_udfs: update_success, new_code = await self._update_data_columns() if update_success: code = code + "\n\n" + new_code return code, result, success - + async def _update_data_columns(self): logger.info("Check columns in updated data") rsp = await UpdateDataColumns().run(self.planner.plan) @@ -153,11 +159,11 @@ class MLEngineer(CodeInterpreter): print(result) self.data_desc["column_info"] = result return success, code - + async def _reflect(self): context = self.get_memories() context = "\n".join([str(msg) for msg in context]) - + reflection = await Reflect().run(context=context) self.working_memory.add(Message(content=reflection, role="assistant")) self.working_memory.add(Message(content=Reflect.REWRITE_PLAN_INSTRUCTION, role="user")) @@ -168,8 +174,10 @@ class MLEngineer(CodeInterpreter): Args: code (str): pure generation code by class WriteCodeByGenerate. """ - logger.warning(f"Making tools for task_id {self.planner.current_task_id}: \ - `{self.planner.current_task.instruction}` \n code: \n {code}") + logger.warning( + f"Making tools for task_id {self.planner.current_task_id}: \ + `{self.planner.current_task.instruction}` \n code: \n {code}" + ) make_tools = MakeTools() make_tool_retries, make_tool_current_retry = 3, 0 while True: @@ -185,9 +193,11 @@ class MLEngineer(CodeInterpreter): # end make tools if execute_success or make_tool_current_retry >= make_tool_retries: if make_tool_current_retry >= make_tool_retries: - logger.error(f"We have tried the maximum number of attempts {make_tool_retries}\ + logger.error( + f"We have tried the maximum number of attempts {make_tool_retries}\ and still have not created tools for task_id {self.planner.current_task_id} successfully,\ - we will skip it.") + we will skip it." + ) break # save successful tool code in udf if execute_success: diff --git a/metagpt/roles/ml_engineer_simple.py b/metagpt/roles/ml_engineer_simple.py index 7214e37c2..1006a4262 100644 --- a/metagpt/roles/ml_engineer_simple.py +++ b/metagpt/roles/ml_engineer_simple.py @@ -1,18 +1,17 @@ import re -from typing import List -import json from datetime import datetime +from typing import List import fire -from metagpt.roles import Role -from metagpt.schema import Message -from metagpt.memory import Memory -from metagpt.logs import logger -from metagpt.actions.write_analysis_code import WriteCodeByGenerate from metagpt.actions.ask_review import AskReview, ReviewConst from metagpt.actions.execute_code import ExecutePyCode +from metagpt.actions.write_analysis_code import WriteCodeByGenerate +from metagpt.logs import logger +from metagpt.memory import Memory +from metagpt.roles import Role from metagpt.roles.kaggle_manager import DownloadData +from metagpt.schema import Message from metagpt.utils.save_code import save_code_file STRUCTURAL_CONTEXT_SIMPLE = """ @@ -40,9 +39,7 @@ Next Steps: class MLEngineerSimple(Role): - def __init__( - self, name="ABC", profile="MLEngineerSimple", goal="", auto_run: bool = False - ): + def __init__(self, name="ABC", profile="MLEngineerSimple", goal="", auto_run: bool = False): super().__init__(name=name, profile=profile, goal=goal) self._set_react_mode(react_mode="react") self._watch([DownloadData]) @@ -78,19 +75,13 @@ class MLEngineerSimple(Role): context = self.get_useful_memories() print(f"memories数量:{len(context)}") # print("===\n" +str(context) + "\n===") - code = await WriteCodeByGenerate().run( - context=context, temperature=0.0 - ) + code = await WriteCodeByGenerate().run(context=context, temperature=0.0) cause_by = WriteCodeByGenerate - self.working_memory.add( - Message(content=code, role="assistant", cause_by=cause_by) - ) + self.working_memory.add(Message(content=code, role="assistant", cause_by=cause_by)) result, success = await self.execute_code.run(code) print(result) - self.working_memory.add( - Message(content=result, role="user", cause_by=ExecutePyCode) - ) + self.working_memory.add(Message(content=result, role="user", cause_by=ExecutePyCode)) if "!pip" in code: success = False @@ -107,12 +98,10 @@ class MLEngineerSimple(Role): self._rc.memory.add(completed_plan_memory[0]) # add to persistent memory prompt = JUDGE_PROMPT_TEMPLATE.format(user_requirement=self.goal, context=completed_plan_memory) rsp = await self._llm.aask(prompt) - self.working_memory.add( - Message(content=rsp, role="system") - ) + self.working_memory.add(Message(content=rsp, role="system")) - matches = re.findall(r'\b(True|False)\b', rsp) - state = False if 'False' in matches else True + matches = re.findall(r"\b(True|False)\b", rsp) + state = False if "False" in matches else True async def _ask_review(self, auto_run: bool = None, trigger: str = ReviewConst.TASK_REVIEW_TRIGGER): auto_run = auto_run or self.auto_run @@ -127,9 +116,7 @@ class MLEngineerSimple(Role): def get_useful_memories(self) -> List[Message]: """find useful memories only to reduce context length and improve performance""" user_requirement = self.goal - context = STRUCTURAL_CONTEXT_SIMPLE.format( - user_requirement=user_requirement, data_desc=self.data_desc - ) + context = STRUCTURAL_CONTEXT_SIMPLE.format(user_requirement=user_requirement, data_desc=self.data_desc) context_msg = [Message(content=context, role="user")] return context_msg + self.get_working_memories(6) diff --git a/metagpt/roles/role.py b/metagpt/roles/role.py index cb1d2eef3..0ea6d6ee6 100644 --- a/metagpt/roles/role.py +++ b/metagpt/roles/role.py @@ -35,10 +35,9 @@ from metagpt.const import SERDESER_PATH from metagpt.llm import LLM, HumanProvider from metagpt.logs import logger from metagpt.memory import Memory -from metagpt.provider.base_llm import BaseLLM -from metagpt.schema import Message, MessageQueue, SerializationMixin -from metagpt.schema import Task, TaskResult from metagpt.plan.planner import Planner +from metagpt.provider.base_llm import BaseLLM +from metagpt.schema import Message, MessageQueue, SerializationMixin, Task, TaskResult from metagpt.utils.common import ( any_to_name, any_to_str, @@ -270,7 +269,9 @@ class Role(SerializationMixin, is_polymorphic_base=True): if react_mode == RoleReactMode.REACT: self.rc.max_react_loop = max_react_loop elif react_mode == RoleReactMode.PLAN_AND_ACT: - self.planner = Planner(goal=self._setting.goal, working_memory=self.rc.working_memory, auto_run=auto_run, use_tools=use_tools) + self.planner = Planner( + goal=self._setting.goal, working_memory=self.rc.working_memory, auto_run=auto_run, use_tools=use_tools + ) def _watch(self, actions: Iterable[Type[Action]] | Iterable[Action]): """Watch Actions of interest. Role will select Messages caused by these Actions from its personal message @@ -450,35 +451,34 @@ class Role(SerializationMixin, is_polymorphic_base=True): async def _plan_and_act(self) -> Message: """first plan, then execute an action sequence, i.e. _think (of a plan) -> _act -> _act -> ... Use llm to come up with the plan dynamically.""" - + ### Common Procedure in both single- and multi-agent setting ### # create initial plan and update until confirmation await self.planner.update_plan() - - while self.planner.current_task: + while self.planner.current_task: task = self.planner.current_task logger.info(f"ready to take on task {task}") - + # take on current task task_result = await self._act_on_task(task) - + # ask for acceptance, users can other refuse and change tasks in the plan review, task_result_confirmed = await self.planner.ask_review(task_result) - + if task_result_confirmed: # tick off this task and record progress await self.planner.confirm_task(task, task_result, review) - + elif "redo" in review: # Ask the Role to redo this task with help of review feedback, # useful when the code run is successful but the procedure or result is not what we want continue - + else: # update plan according to user's feedback and to take on changed tasks await self.planner.update_plan(review) - + completed_plan_memory = self.planner.get_useful_memories() # completed plan as a outcome rsp = completed_plan_memory[0] @@ -486,7 +486,7 @@ class Role(SerializationMixin, is_polymorphic_base=True): self.rc.memory.add(rsp) # add to persistent memory return rsp - + async def _act_on_task(self, current_task: Task) -> TaskResult: """Taking specific action to handle one task in plan diff --git a/metagpt/schema.py b/metagpt/schema.py index 402b3e93f..31a83e5dd 100644 --- a/metagpt/schema.py +++ b/metagpt/schema.py @@ -308,12 +308,12 @@ class AIMessage(Message): """ def __init__(self, content: str): - super().__init__(content, 'assistant') + super().__init__(content, "assistant") class Task(BaseModel): task_id: str = "" - dependent_task_ids: list[str] = [] # Tasks prerequisite to this Task + dependent_task_ids: list[str] = [] # Tasks prerequisite to this Task instruction: str = "" task_type: str = "" code_steps: str = "" @@ -325,6 +325,7 @@ class Task(BaseModel): class TaskResult(BaseModel): """Result of taking a task, with result and is_success required to be filled""" + code_steps: str = "" code: str = "" result: str @@ -360,12 +361,12 @@ class Plan(BaseModel): def add_tasks(self, tasks: list[Task]): """ Integrates new tasks into the existing plan, ensuring dependency order is maintained. - + This method performs two primary functions based on the current state of the task list: - 1. If there are no existing tasks, it topologically sorts the provided tasks to ensure + 1. If there are no existing tasks, it topologically sorts the provided tasks to ensure correct execution order based on dependencies, and sets these as the current tasks. - 2. If there are existing tasks, it merges the new tasks with the existing ones. It maintains - any common prefix of tasks (based on task_id and instruction) and appends the remainder + 2. If there are existing tasks, it merges the new tasks with the existing ones. It maintains + any common prefix of tasks (based on task_id and instruction) and appends the remainder of the new tasks. The current task is updated to the first unfinished task in this merged list. Args: @@ -395,13 +396,13 @@ class Plan(BaseModel): # Combine the common prefix with the remainder of the new tasks final_tasks = self.tasks[:prefix_length] + new_tasks[prefix_length:] self.tasks = final_tasks - + # Update current_task_id to the first unfinished task in the merged list self._update_current_task() # Update the task map for quick access to tasks by ID self.task_map = {task.task_id: task for task in self.tasks} - + def reset_task(self, task_id: str): """ Clear code and result of the task based on task_id, and set the task as unfinished. @@ -448,20 +449,21 @@ class Plan(BaseModel): Args: new_task (Task): The new task to be appended to the existing task sequence - + Returns: None """ assert not self.has_task_id(new_task.task_id), "Task already in current plan, use replace_task instead" - assert all([self.has_task_id(dep_id) for dep_id in new_task.dependent_task_ids]), \ - "New task has unknown dependencies" + assert all( + [self.has_task_id(dep_id) for dep_id in new_task.dependent_task_ids] + ), "New task has unknown dependencies" # Existing tasks do not depend on the new task, it's fine to put it to the end of the sorted task sequence self.tasks.append(new_task) self.task_map[new_task.task_id] = new_task self._update_current_task() - + def update_task_result(self, task: Task, task_result: TaskResult): task.code_steps = task_result.code_steps task.code = task_result.code @@ -478,7 +480,7 @@ class Plan(BaseModel): current_task_id = task.task_id break self.current_task_id = current_task_id # all tasks finished - + @property def current_task(self) -> Task: """Find current task to execute @@ -489,8 +491,7 @@ class Plan(BaseModel): return self.task_map.get(self.current_task_id, None) def finish_current_task(self): - """Finish current task, set Task.is_finished=True, set current task to next task - """ + """Finish current task, set Task.is_finished=True, set current task to next task""" if self.current_task_id: self.current_task.is_finished = True self._update_current_task() # set to next task diff --git a/metagpt/tools/functions/libs/data_preprocess.py b/metagpt/tools/functions/libs/data_preprocess.py index f1665b405..5d1cd97d8 100644 --- a/metagpt/tools/functions/libs/data_preprocess.py +++ b/metagpt/tools/functions/libs/data_preprocess.py @@ -3,19 +3,26 @@ import json import numpy as np import pandas as pd from sklearn.impute import SimpleImputer -from sklearn.preprocessing import LabelEncoder -from sklearn.preprocessing import MaxAbsScaler -from sklearn.preprocessing import MinMaxScaler -from sklearn.preprocessing import OneHotEncoder -from sklearn.preprocessing import OrdinalEncoder -from sklearn.preprocessing import RobustScaler -from sklearn.preprocessing import StandardScaler +from sklearn.preprocessing import ( + LabelEncoder, + MaxAbsScaler, + MinMaxScaler, + OneHotEncoder, + OrdinalEncoder, + RobustScaler, + StandardScaler, +) from metagpt.tools.functions.libs.base import MLProcess class FillMissingValue(MLProcess): - def __init__(self, features: list, strategy: str = 'mean', fill_value=None,): + def __init__( + self, + features: list, + strategy: str = "mean", + fill_value=None, + ): self.features = features self.strategy = strategy self.fill_value = fill_value @@ -35,7 +42,10 @@ class FillMissingValue(MLProcess): class MinMaxScale(MLProcess): - def __init__(self, features: list,): + def __init__( + self, + features: list, + ): self.features = features self.mms = None @@ -49,7 +59,10 @@ class MinMaxScale(MLProcess): class StandardScale(MLProcess): - def __init__(self, features: list,): + def __init__( + self, + features: list, + ): self.features = features self.ss = None @@ -63,7 +76,10 @@ class StandardScale(MLProcess): class MaxAbsScale(MLProcess): - def __init__(self, features: list,): + def __init__( + self, + features: list, + ): self.features = features self.mas = None @@ -77,7 +93,10 @@ class MaxAbsScale(MLProcess): class RobustScale(MLProcess): - def __init__(self, features: list,): + def __init__( + self, + features: list, + ): self.features = features self.rs = None @@ -91,7 +110,10 @@ class RobustScale(MLProcess): class OrdinalEncode(MLProcess): - def __init__(self, features: list,): + def __init__( + self, + features: list, + ): self.features = features self.oe = None @@ -105,7 +127,10 @@ class OrdinalEncode(MLProcess): class OneHotEncode(MLProcess): - def __init__(self, features: list,): + def __init__( + self, + features: list, + ): self.features = features self.ohe = None @@ -123,7 +148,10 @@ class OneHotEncode(MLProcess): class LabelEncode(MLProcess): - def __init__(self, features: list,): + def __init__( + self, + features: list, + ): self.features = features self.le_encoders = [] @@ -131,7 +159,7 @@ class LabelEncode(MLProcess): if len(self.features) == 0: return for col in self.features: - le = LabelEncoder().fit(df[col].astype(str).unique().tolist() + ['unknown']) + le = LabelEncoder().fit(df[col].astype(str).unique().tolist() + ["unknown"]) self.le_encoders.append(le) def transform(self, df: pd.DataFrame): @@ -141,7 +169,7 @@ class LabelEncode(MLProcess): data_list = df[self.features[i]].astype(str).tolist() for unique_item in np.unique(df[self.features[i]].astype(str)): if unique_item not in self.le_encoders[i].classes_: - data_list = ['unknown' if x == unique_item else x for x in data_list] + data_list = ["unknown" if x == unique_item else x for x in data_list] df[self.features[i]] = self.le_encoders[i].transform(data_list) return df @@ -165,5 +193,5 @@ def get_column_info(df: pd.DataFrame) -> dict: column_info["Others"].append(col) if len(json.dumps(column_info)) > 2000: - column_info['Numeric'] = column_info['Numeric'][0:5] + ['Too many cols, omission here...'] + column_info["Numeric"] = column_info["Numeric"][0:5] + ["Too many cols, omission here..."] return column_info diff --git a/metagpt/tools/functions/libs/feature_engineering.py b/metagpt/tools/functions/libs/feature_engineering.py index df36752b9..534c5b8e4 100644 --- a/metagpt/tools/functions/libs/feature_engineering.py +++ b/metagpt/tools/functions/libs/feature_engineering.py @@ -13,7 +13,7 @@ from joblib import Parallel, delayed from pandas.core.dtypes.common import is_object_dtype from sklearn.feature_selection import VarianceThreshold from sklearn.model_selection import KFold -from sklearn.preprocessing import PolynomialFeatures, KBinsDiscretizer +from sklearn.preprocessing import KBinsDiscretizer, PolynomialFeatures from metagpt.tools.functions.libs.base import MLProcess @@ -91,9 +91,7 @@ class KFoldTargetMeanEncoder(MLProcess): col_name = f"{self.col}_kf_target_mean" for trn_idx, val_idx in kf.split(tmp, tmp[self.label]): _trn, _val = tmp.iloc[trn_idx], tmp.iloc[val_idx] - tmp.loc[tmp.index[val_idx], col_name] = _val[self.col].map( - _trn.groupby(self.col)[self.label].mean() - ) + tmp.loc[tmp.index[val_idx], col_name] = _val[self.col].map(_trn.groupby(self.col)[self.label].mean()) tmp[col_name].fillna(global_mean, inplace=True) self.encoder_dict = tmp.groupby(self.col)[col_name].mean().to_dict() @@ -111,7 +109,7 @@ class CatCross(MLProcess): @staticmethod def cross_two(comb, df): - new_col = f'{comb[0]}_{comb[1]}' + new_col = f"{comb[0]}_{comb[1]}" new_col_combs = list(itertools.product(df[comb[0]].unique(), df[comb[1]].unique())) ll = list(range(len(new_col_combs))) comb_map = dict(zip(new_col_combs, ll)) @@ -122,13 +120,12 @@ class CatCross(MLProcess): if df[col].nunique() > self.max_cat_num: self.cols.remove(col) self.combs = list(itertools.combinations(self.cols, 2)) - res = Parallel(n_jobs=4, require='sharedmem')( - delayed(self.cross_two)(comb, df) for comb in self.combs) + res = Parallel(n_jobs=4, require="sharedmem")(delayed(self.cross_two)(comb, df) for comb in self.combs) self.combs_map = dict(res) def transform(self, df: pd.DataFrame) -> pd.DataFrame: for comb in self.combs: - new_col = f'{comb[0]}_{comb[1]}' + new_col = f"{comb[0]}_{comb[1]}" _map = self.combs_map[new_col] df[new_col] = pd.Series(zip(df[comb[0]], df[comb[1]])).map(_map) # set the unknown value to a new number @@ -157,13 +154,13 @@ class GroupStat(MLProcess): class SplitBins(MLProcess): - def __init__(self, cols: str, strategy: str = 'quantile'): + def __init__(self, cols: str, strategy: str = "quantile"): self.cols = cols self.strategy = strategy self.encoder = None def fit(self, df: pd.DataFrame): - self.encoder = KBinsDiscretizer(strategy=self.strategy, encode='ordinal') + self.encoder = KBinsDiscretizer(strategy=self.strategy, encode="ordinal") self.encoder.fit(df[self.cols].fillna(0)) def transform(self, df: pd.DataFrame) -> pd.DataFrame: @@ -296,10 +293,7 @@ class GeneralSelection(MLProcess): if df[col].nunique() == 1: feats.remove(col) - if ( - df.loc[df[col] == np.inf].shape[0] != 0 - or df.loc[df[col] == np.inf].shape[0] != 0 - ): + if df.loc[df[col] == np.inf].shape[0] != 0 or df.loc[df[col] == np.inf].shape[0] != 0: feats.remove(col) if is_object_dtype(df[col]) and df[col].nunique() == df.shape[0]: @@ -320,10 +314,10 @@ class TreeBasedSelection(MLProcess): def fit(self, df: pd.DataFrame): params = { - 'boosting_type': 'gbdt', - 'objective': 'binary', - 'learning_rate': 0.1, - 'num_leaves': 31, + "boosting_type": "gbdt", + "objective": "binary", + "learning_rate": 0.1, + "num_leaves": 31, } if self.task_type == "cls": @@ -342,12 +336,11 @@ class TreeBasedSelection(MLProcess): dtrain = lgb.Dataset(df[cols], df[self.label_col]) model = lgb.train(params, dtrain, num_boost_round=100) - df_imp = pd.DataFrame({'feature_name': dtrain.feature_name, - 'importance': model.feature_importance("gain")}) + df_imp = pd.DataFrame({"feature_name": dtrain.feature_name, "importance": model.feature_importance("gain")}) df_imp.sort_values("importance", ascending=False, inplace=True) df_imp = df_imp[df_imp["importance"] > 0] - self.feats = df_imp['feature_name'].tolist() + self.feats = df_imp["feature_name"].tolist() self.feats.append(self.label_col) def transform(self, df: pd.DataFrame) -> pd.DataFrame: diff --git a/metagpt/tools/functions/libs/udf/__init__.py b/metagpt/tools/functions/libs/udf/__init__.py index 5d9c35b27..6644565d7 100644 --- a/metagpt/tools/functions/libs/udf/__init__.py +++ b/metagpt/tools/functions/libs/udf/__init__.py @@ -5,12 +5,12 @@ import yaml import inspect import importlib from pathlib import Path -from typing import Dict, List +from typing import List from metagpt.logs import logger def extract_function_signatures(file_path): - with open(file_path, 'r', encoding='utf-8') as file: + with open(file_path, "r", encoding="utf-8") as file: source_code = file.read() tree = ast.parse(source_code) @@ -19,7 +19,7 @@ def extract_function_signatures(file_path): for node in ast.walk(tree): if isinstance(node, ast.FunctionDef): # 只提取用户自定义函数,排除内置函数 - if not (node.name.startswith('__') and node.name.endswith('__')): + if not (node.name.startswith("__") and node.name.endswith("__")): # 获取函数名 function_name = node.name # 获取参数列表 @@ -27,36 +27,37 @@ def extract_function_signatures(file_path): # 获取函数签名 function_signature = f"{function_name}({', '.join(args)})" # 导入函数 - module_name = Path(file_path).parts[-1][:-len(Path(file_path).suffix)] + module_name = Path(file_path).parts[-1][: -len(Path(file_path).suffix)] module = importlib.import_module(f"metagpt.tools.functions.libs.udf.{module_name}") # 将函数导入到当前命名空间 globals().update({function_name: getattr(module, function_name)}) # 获取函数注释和函数路径 - function_schema = {'udf_name': function_signature, - 'udf_path': f'from metagpt.tools.functions.libs.udf.{module_name} import {function_name}', - 'udf_doc': inspect.getdoc(getattr(module, function_name))} + function_schema = { + "udf_name": function_signature, + "udf_path": f"from metagpt.tools.functions.libs.udf.{module_name} import {function_name}", + "udf_doc": inspect.getdoc(getattr(module, function_name)), + } function_signatures.append(function_schema) # 获取函数返回变量名 source_lines, _ = inspect.getsourcelines(getattr(module, function_name)) for line in source_lines: if line.strip().startswith("return "): - function_returns.append({ - 'udf_name': function_name, - 'udf_returns': [var.strip() for var in line.strip()[len("return "):].split(',')] - }) + function_returns.append( + { + "udf_name": function_name, + "udf_returns": [var.strip() for var in line.strip()[len("return ") :].split(",")], + } + ) break # 没有返回值的函数 - if not function_returns or function_returns[-1]['udf_name'] != function_name: - function_returns.append({ - 'udf_name': function_name, - 'udf_returns': [None] - }) + if not function_returns or function_returns[-1]["udf_name"] != function_name: + function_returns.append({"udf_name": function_name, "udf_returns": [None]}) return function_signatures, function_returns def get_function_signatures_in_folder(folder_path): - python_files = [f for f in os.listdir(folder_path) if f.endswith('.py') and f != '__init__.py'] + python_files = [f for f in os.listdir(folder_path) if f.endswith(".py") and f != "__init__.py"] all_function_signatures = [] all_function_returns = [] @@ -74,31 +75,33 @@ def docstring_to_yaml(docstring: str, return_vars: List[str] = None): if docstring is None: return {} # 匹配简介部分 - description_match = re.search(r'^(.*?)(?:Args:|Returns:|Raises:|$)', docstring, re.DOTALL) + description_match = re.search(r"^(.*?)(?:Args:|Returns:|Raises:|$)", docstring, re.DOTALL) description = description_match.group(1).strip() if description_match else "" # 匹配Args部分 - args_match = re.search(r'Args:\s*(.*?)(?:Returns:|Raises:|$)', docstring, re.DOTALL) + args_match = re.search(r"Args:\s*(.*?)(?:Returns:|Raises:|$)", docstring, re.DOTALL) _args = args_match.group(1).strip() if args_match else "" - variable_pattern = re.compile(r'(\w+)\s*\((.*?)\):\s*(.*)') + variable_pattern = re.compile(r"(\w+)\s*\((.*?)\):\s*(.*)") params = variable_pattern.findall(_args) if not params: params = ((None, None, None),) # 匹配Returns部分 - returns_match = re.search(r'Returns:\s*(.*?)(?:Raises:|$)', docstring, re.DOTALL) + returns_match = re.search(r"Returns:\s*(.*?)(?:Raises:|$)", docstring, re.DOTALL) returns = returns_match.group(1).strip() if returns_match else "" - return_pattern = re.compile(r'^(.*)\s*:\s*(.*)$') + return_pattern = re.compile(r"^(.*)\s*:\s*(.*)$") # 添加返回值变量名 return_vars = return_vars if isinstance(return_vars, list) else [return_vars] returns = [(r, *r_desc) for r_desc, r in zip(return_pattern.findall(returns), return_vars)] # 构建YAML字典 yaml_data = { - 'description': description.strip('.').strip(), - 'parameters': { - 'properties': {param[0]: {'type': param[1], 'description': param[2]} for param in params if param[0] is not None}, - 'required': [param[0] for param in params if param[0] is not None] + "description": description.strip(".").strip(), + "parameters": { + "properties": { + param[0]: {"type": param[1], "description": param[2]} for param in params if param[0] is not None + }, + "required": [param[0] for param in params if param[0] is not None], }, - 'returns': {ret[0]: {'type': ret[1], 'description': ret[2]} for ret in returns} + "returns": {ret[0]: {"type": ret[1], "description": ret[2]} for ret in returns}, } return yaml_data @@ -107,10 +110,10 @@ def extract_function_schema_yaml_in_folder(folder_path: str): function_signatures, function_returns = get_function_signatures_in_folder(folder_path) function_schema_yaml_data = {} for func_docstring, func_returns in zip(function_signatures, function_returns): - if func_docstring['udf_doc']: - fun_yaml_data = docstring_to_yaml(func_docstring['udf_doc'], func_returns['udf_returns']) - fun_yaml_data.update({'type': 'function'}) - function_schema_yaml_data.update({func_returns['udf_name']: fun_yaml_data}) + if func_docstring["udf_doc"]: + fun_yaml_data = docstring_to_yaml(func_docstring["udf_doc"], func_returns["udf_returns"]) + fun_yaml_data.update({"type": "function"}) + function_schema_yaml_data.update({func_returns["udf_name"]: fun_yaml_data}) return yaml.dump(function_schema_yaml_data, default_flow_style=False) diff --git a/metagpt/utils/common.py b/metagpt/utils/common.py index bf112f820..b20b4acd2 100644 --- a/metagpt/utils/common.py +++ b/metagpt/utils/common.py @@ -361,6 +361,7 @@ def create_func_config(func_schema: dict) -> dict: def remove_comments(code_str): """Remove comments from code.""" pattern = r"(\".*?\"|\'.*?\')|(\#.*?$)" + def replace_func(match): if match.group(2) is not None: return "" diff --git a/metagpt/utils/recovery_util.py b/metagpt/utils/recovery_util.py index cef302d6b..3405b9587 100644 --- a/metagpt/utils/recovery_util.py +++ b/metagpt/utils/recovery_util.py @@ -2,15 +2,17 @@ # @Date : 12/20/2023 11:07 AM # @Author : stellahong (stellahong@fuzhi.ai) # @Desc : -import nbformat -from pathlib import Path import json from datetime import datetime +from pathlib import Path + +import nbformat -from metagpt.roles.role import Role from metagpt.const import DATA_PATH +from metagpt.roles.role import Role from metagpt.utils.save_code import save_code_file + def load_history(save_dir: str = ""): """ Load history from the specified save directory. @@ -21,7 +23,7 @@ def load_history(save_dir: str = ""): Returns: Tuple: A tuple containing the loaded plan and notebook. """ - + plan_path = Path(save_dir) / "plan.json" nb_path = Path(save_dir) / "history_nb" / "code.ipynb" plan = json.load(open(plan_path, "r", encoding="utf-8")) @@ -40,16 +42,16 @@ def save_history(role: Role, save_dir: str = ""): Returns: Path: The path to the saved history directory. """ - record_time = datetime.now().strftime('%Y-%m-%d_%H-%M-%S') + record_time = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") save_path = DATA_PATH / "output" / f"{record_time}" - + # overwrite exist trajectory save_path.mkdir(parents=True, exist_ok=True) - + plan = role.planner.plan.dict() - + with open(save_path / "plan.json", "w", encoding="utf-8") as plan_file: json.dump(plan, plan_file, indent=4, ensure_ascii=False) - + save_code_file(name=Path(record_time) / "history_nb", code_context=role.execute_code.nb, file_format="ipynb") - return save_path \ No newline at end of file + return save_path diff --git a/metagpt/utils/save_code.py b/metagpt/utils/save_code.py index 96c310336..adf136316 100644 --- a/metagpt/utils/save_code.py +++ b/metagpt/utils/save_code.py @@ -2,13 +2,14 @@ # @Date : 12/12/2023 4:14 PM # @Author : stellahong (stellahong@fuzhi.ai) # @Desc : -import os import json +import os import nbformat from metagpt.const import DATA_PATH + def save_code_file(name: str, code_context: str, file_format: str = "py") -> None: """ Save code files to a specified path. @@ -36,10 +37,6 @@ def save_code_file(name: str, code_context: str, file_format: str = "py") -> Non with open(file_path, "w", encoding="utf-8") as fp: json.dump(data, fp, indent=2) elif file_format == "ipynb": - nbformat.write(code_context, file_path) + nbformat.write(code_context, file_path) else: raise ValueError("Unsupported file format. Please choose 'py', 'json', or 'ipynb'.") - - - - diff --git a/tests/metagpt/test_schema.py b/tests/metagpt/test_schema.py index f4dc56bdd..ab2e206a4 100644 --- a/tests/metagpt/test_schema.py +++ b/tests/metagpt/test_schema.py @@ -26,10 +26,11 @@ from metagpt.schema import ( Document, Message, MessageQueue, + Plan, SystemMessage, + Task, UserMessage, ) -from metagpt.schema import Task, Plan from metagpt.utils.common import any_to_str @@ -53,7 +54,7 @@ class TestPlan: tasks = [ Task(task_id="1", dependent_task_ids=["2", "3"], instruction="Third"), Task(task_id="2", instruction="First"), - Task(task_id="3", dependent_task_ids=["2"], instruction="Second") + Task(task_id="3", dependent_task_ids=["2"], instruction="Second"), ] # 2 -> 3 -> 1 plan.add_tasks(tasks) @@ -65,7 +66,7 @@ class TestPlan: tasks = [ Task(task_id="1", dependent_task_ids=["2", "3"], instruction="Third"), Task(task_id="2", instruction="First"), - Task(task_id="3", dependent_task_ids=["2"], instruction="Second", is_finished=True) + Task(task_id="3", dependent_task_ids=["2"], instruction="Second", is_finished=True), ] # 2 -> 3 -> 1 plan.add_tasks(tasks) @@ -81,7 +82,7 @@ class TestPlan: tasks = [ Task(task_id="1", dependent_task_ids=["2", "3"], instruction="Third"), Task(task_id="2", instruction="First"), - Task(task_id="3", dependent_task_ids=["2"], instruction="Second") + Task(task_id="3", dependent_task_ids=["2"], instruction="Second"), ] # 2 -> 3 -> 1 plan.add_tasks(tasks) plan.finish_current_task() # finish 2 @@ -90,19 +91,21 @@ class TestPlan: new_tasks = [ Task(task_id="4", dependent_task_ids=["3"], instruction="Third"), Task(task_id="2", instruction="First"), - Task(task_id="3", dependent_task_ids=["2"], instruction="Second") + Task(task_id="3", dependent_task_ids=["2"], instruction="Second"), ] # 2 -> 3 -> 4, so the common prefix is 2 -> 3, and these two should be obtained from the existing tasks plan.add_tasks(new_tasks) assert [task.task_id for task in plan.tasks] == ["2", "3", "4"] - assert plan.tasks[0].is_finished and plan.tasks[1].is_finished # "2" and "3" should be the original finished one + assert ( + plan.tasks[0].is_finished and plan.tasks[1].is_finished + ) # "2" and "3" should be the original finished one assert plan.current_task_id == "4" def test_current_task(self): plan = Plan(goal="") tasks = [ Task(task_id="1", dependent_task_ids=["2"], instruction="Second"), - Task(task_id="2", instruction="First") + Task(task_id="2", instruction="First"), ] plan.add_tasks(tasks) assert plan.current_task.task_id == "2" @@ -111,7 +114,7 @@ class TestPlan: plan = Plan(goal="") tasks = [ Task(task_id="1", instruction="First"), - Task(task_id="2", dependent_task_ids=["1"], instruction="Second") + Task(task_id="2", dependent_task_ids=["1"], instruction="Second"), ] plan.add_tasks(tasks) plan.finish_current_task() @@ -121,7 +124,7 @@ class TestPlan: plan = Plan(goal="") tasks = [ Task(task_id="1", instruction="First"), - Task(task_id="2", dependent_task_ids=["1"], instruction="Second") + Task(task_id="2", dependent_task_ids=["1"], instruction="Second"), ] plan.add_tasks(tasks) plan.finish_current_task() @@ -149,8 +152,10 @@ class TestPlan: def test_replace_task_with_dependents(self): plan = Plan(goal="") - tasks = [Task(task_id="1", instruction="First Task", finished=True), - Task(task_id="2", instruction="Second Task", dependent_task_ids=["1"], finished=True)] + tasks = [ + Task(task_id="1", instruction="First Task", finished=True), + Task(task_id="2", instruction="Second Task", dependent_task_ids=["1"], finished=True), + ] plan.add_tasks(tasks) new_task = Task(task_id="1", instruction="Updated First Task") plan.replace_task(new_task) @@ -168,7 +173,7 @@ class TestPlan: plan.replace_task(new_task) # Task with ID 2 does not exist in plan assert "1" in plan.task_map assert "2" not in plan.task_map - + def test_append_task_with_valid_dependencies(self): plan = Plan(goal="Test") existing_task = [Task(task_id="1")] @@ -183,7 +188,7 @@ class TestPlan: plan = Plan(goal="Test") with pytest.raises(AssertionError): plan.append_task(new_task) - + def test_append_task_without_dependencies(self): plan = Plan(goal="Test") existing_task = [Task(task_id="1")]