format using precommit

This commit is contained in:
yzlin 2024-01-10 14:15:30 +08:00
parent 853086924a
commit 767c99388f
25 changed files with 376 additions and 380 deletions

View file

@ -1,6 +1,5 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import asyncio
import fire
@ -8,6 +7,7 @@ from metagpt.roles.kaggle_manager import KaggleManager
from metagpt.roles.ml_engineer import MLEngineer
from metagpt.team import Team
async def main(
# competition: str,
# data_desc: str,
@ -21,7 +21,7 @@ async def main(
"Training set is train.csv.\nTest set is test.csv. We also include gender_submission.csv, a set of predictions that assume all and only female passengers survive, as an example of what a submission file should look like.",
# "Run EDA on the train dataset, train a model to predict survival (20% as validation) and save it, predict the test set using saved model, save the test result according to format",
# "generate a random prediction, replace the Survived column of gender_submission.csv, and save the prediction to a new submission file",
"Score as high as possible for the provided dataset, save the test prediction to a csv with two columns PassengerId and Survived"
"Score as high as possible for the provided dataset, save the test prediction to a csv with two columns PassengerId and Survived",
)
team = Team()
@ -36,5 +36,6 @@ async def main(
team.start_project(requirement)
await team.run(n_round=n_round)
if __name__ == '__main__':
if __name__ == "__main__":
fire.Fire(main)

View file

@ -1,8 +1,8 @@
from typing import List
from metagpt.actions import Action
from metagpt.schema import Message, Plan
from metagpt.logs import logger
from metagpt.schema import Message, Plan
class ReviewConst:
@ -23,17 +23,10 @@ class ReviewConst:
class AskReview(Action):
async def run(
self, context: List[Message], plan: Plan = None, trigger: str = "task"
):
async def run(self, context: List[Message], plan: Plan = None, trigger: str = "task"):
logger.info("Current overall plan:")
logger.info(
"\n".join(
[
f"{task.task_id}: {task.instruction}, is_finished: {task.is_finished}"
for task in plan.tasks
]
)
"\n".join([f"{task.task_id}: {task.instruction}, is_finished: {task.is_finished}" for task in plan.tasks])
)
logger.info("most recent context:")

View file

@ -1,9 +1,9 @@
from typing import Dict, List, Union, Tuple, Optional, Any
from typing import Any, List, Optional
from metagpt.logs import logger
from metagpt.schema import Message, Plan
from metagpt.utils.common import CodeParser, create_func_config
from metagpt.actions.write_analysis_code import BaseWriteAnalysisCode
from metagpt.logs import logger
from metagpt.schema import Message
from metagpt.utils.common import create_func_config
DEBUG_REFLECTION_EXAMPLE = '''
Example 1:
@ -113,9 +113,7 @@ class DebugCode(BaseWriteAnalysisCode):
# msg = messages_to_str(info)
# resp = await self.llm.aask(msg=msg)
resp = await self.llm.aask_code(
messages=info, **create_func_config(CODE_REFLECTION)
)
resp = await self.llm.aask_code(messages=info, **create_func_config(CODE_REFLECTION))
logger.info(f"reflection is {resp}")
return resp

View file

@ -4,23 +4,23 @@
@Author : orange-crow
@File : code_executor.py
"""
import re
import traceback
from abc import ABC, abstractmethod
from pathlib import Path
from typing import Dict, List, Tuple, Union
import traceback
import re
import nbformat
from nbclient import NotebookClient
from nbclient.exceptions import DeadKernelError, CellTimeoutError
from nbclient.exceptions import CellTimeoutError, DeadKernelError
from nbformat import NotebookNode
from nbformat.v4 import new_code_cell, new_output
from rich.console import Console
from rich.syntax import Syntax
from metagpt.actions import Action
from metagpt.schema import Message
from metagpt.logs import logger
from metagpt.schema import Message
class ExecuteCode(ABC):
@ -113,7 +113,9 @@ class ExecutePyCode(ExecuteCode, Action):
if "image/png" in output["data"]:
self.show_bytes_figure(output["data"]["image/png"], self.interaction)
else:
logger.info(f"{i}th output['data'] from nbclient outputs dont have image/png, continue next output ...")
logger.info(
f"{i}th output['data'] from nbclient outputs dont have image/png, continue next output ..."
)
elif output["output_type"] == "execute_result":
parsed_output += output["data"]["text/plain"]
return parsed_output
@ -148,7 +150,7 @@ class ExecutePyCode(ExecuteCode, Action):
return False
def _process_code(self, code: Union[str, Dict, Message], language: str = None) -> Tuple:
language = language or 'python'
language = language or "python"
if isinstance(code, str) and Path(code).suffix in (".py", ".txt"):
code = Path(code).read_text(encoding="utf-8")
return code, language
@ -158,11 +160,11 @@ class ExecutePyCode(ExecuteCode, Action):
if isinstance(code, dict):
assert "code" in code
if "language" not in code:
code['language'] = 'python'
code["language"] = "python"
code, language = code["code"], code["language"]
elif isinstance(code, Message):
if isinstance(code.content, dict) and "language" not in code.content:
code.content["language"] = 'python'
code.content["language"] = "python"
code, language = code.content["code"], code.content["language"]
elif isinstance(code.content, str):
code, language = code.content, language
@ -181,7 +183,7 @@ class ExecutePyCode(ExecuteCode, Action):
except DeadKernelError:
await self.reset()
return False, "DeadKernelError"
except Exception as e:
except Exception:
return False, f"{traceback.format_exc()}"
async def run(self, code: Union[str, Dict, Message], language: str = "python") -> Tuple[str, bool]:
@ -224,6 +226,6 @@ def truncate(result: str, keep_len: int = 2000) -> str:
def remove_escape_and_color_codes(input_str):
# 使用正则表达式去除转义字符和颜色代码
pattern = re.compile(r'\x1b\[[0-9;]*[mK]')
result = pattern.sub('', input_str)
pattern = re.compile(r"\x1b\[[0-9;]*[mK]")
result = pattern.sub("", input_str)
return result

View file

@ -1,14 +1,9 @@
import json
from typing import Dict, List, Union
from metagpt.actions import Action
from metagpt.schema import Message, Plan
from metagpt.utils.common import CodeParser, remove_comments, create_func_config
from metagpt.logs import logger
from metagpt.prompts.ml_engineer import (
UPDATE_DATA_COLUMNS,
PRINT_DATA_COLUMNS
)
from metagpt.prompts.ml_engineer import PRINT_DATA_COLUMNS, UPDATE_DATA_COLUMNS
from metagpt.schema import Plan
from metagpt.utils.common import CodeParser, create_func_config, remove_comments
class SummarizeAnalysis(Action):

View file

@ -4,25 +4,24 @@
@Author : orange-crow
@File : write_code_v2.py
"""
from typing import Dict, List, Union, Tuple
from tenacity import retry, stop_after_attempt, wait_fixed
from pathlib import Path
import re
import json
from pathlib import Path
from typing import Dict, List, Tuple, Union
import yaml
from tenacity import retry, stop_after_attempt, wait_fixed
from metagpt.actions import Action
from metagpt.llm import LLM
from metagpt.logs import logger
from metagpt.prompts.ml_engineer import (
TOOL_RECOMMENDATION_PROMPT,
SELECT_FUNCTION_TOOLS,
CODE_GENERATOR_WITH_TOOLS,
TOOL_USAGE_PROMPT,
ML_SPECIFIC_PROMPT,
ML_MODULE_MAP,
GENERATE_CODE_PROMPT,
ML_MODULE_MAP,
ML_SPECIFIC_PROMPT,
SELECT_FUNCTION_TOOLS,
TOOL_RECOMMENDATION_PROMPT,
TOOL_USAGE_PROMPT,
)
from metagpt.schema import Message, Plan
from metagpt.utils.common import create_func_config, remove_comments
@ -52,24 +51,16 @@ class BaseWriteAnalysisCode(Action):
messages.append(p.content["code"])
# 添加默认的提示词
if (
default_system_msg not in messages[0]["content"]
and messages[0]["role"] != "system"
):
if default_system_msg not in messages[0]["content"] and messages[0]["role"] != "system":
messages.insert(0, {"role": "system", "content": default_system_msg})
elif (
default_system_msg not in messages[0]["content"]
and messages[0]["role"] == "system"
):
elif default_system_msg not in messages[0]["content"] and messages[0]["role"] == "system":
messages[0] = {
"role": "system",
"content": messages[0]["content"] + default_system_msg,
}
return messages
async def run(
self, context: List[Message], plan: Plan = None, code_steps: str = ""
) -> str:
async def run(self, context: List[Message], plan: Plan = None, code_steps: str = "") -> str:
"""Run of a code writing action, used in data analysis or modeling
Args:
@ -115,7 +106,7 @@ class WriteCodeWithTools(BaseWriteAnalysisCode):
def _load_tools(self, schema_path, schema_module=None):
"""Load tools from yaml file"""
if isinstance(schema_path, dict):
schema_module = schema_module or 'udf'
schema_module = schema_module or "udf"
self.available_tools.update({schema_module: schema_path})
else:
if isinstance(schema_path, list):
@ -197,9 +188,7 @@ class WriteCodeWithTools(BaseWriteAnalysisCode):
available_tools = {k: v["description"] for k, v in available_tools.items()}
recommend_tools = await self._tool_recommendation(
plan.current_task.instruction,
code_steps,
available_tools
plan.current_task.instruction, code_steps, available_tools
)
tool_catalog = self._parse_recommend_tools(task_type, recommend_tools)
logger.info(f"Recommended tools: \n{recommend_tools}")
@ -216,8 +205,7 @@ class WriteCodeWithTools(BaseWriteAnalysisCode):
module_name=module_name,
tool_catalog=tool_catalog,
)
else:
prompt = GENERATE_CODE_PROMPT.format(
user_requirement=plan.goal,
@ -245,7 +233,7 @@ class MakeTools(WriteCodeByGenerate):
5. Only use the imported packages**
"""
def __init__(self, name: str = '', context: list[Message] = None, llm: LLM = None, workspace: str = None):
def __init__(self, name: str = "", context: list[Message] = None, llm: LLM = None, workspace: str = None):
"""
:param str name: name, defaults to ''
:param list[Message] context: context, defaults to None
@ -254,12 +242,12 @@ class MakeTools(WriteCodeByGenerate):
"""
super().__init__(name, context, llm)
self.workspace = workspace or str(Path(__file__).parents[1].joinpath("./tools/functions/libs/udf"))
self.file_suffix: str = '.py'
self.file_suffix: str = ".py"
self.context = []
def parse_function_name(self, function_code: str) -> str:
# 定义正则表达式模式
pattern = r'\bdef\s+([a-zA-Z_]\w*)\s*\('
pattern = r"\bdef\s+([a-zA-Z_]\w*)\s*\("
# 在代码中搜索匹配的模式
match = re.search(pattern, function_code)
# 如果找到匹配项则返回匹配的函数名否则返回None
@ -272,9 +260,9 @@ class MakeTools(WriteCodeByGenerate):
func_name = self.parse_function_name(tool_code)
if func_name is None:
raise ValueError(f"No function name found in {tool_code}")
saved_path = Path(self.workspace).joinpath(func_name+self.file_suffix)
saved_path = Path(self.workspace).joinpath(func_name + self.file_suffix)
logger.info(f"Saved tool_code {func_name} in {str(saved_path)}.")
saved_path.write_text(tool_code, encoding='utf-8')
saved_path.write_text(tool_code, encoding="utf-8")
@retry(stop=stop_after_attempt(3), wait=wait_fixed(1))
async def run(self, code: Union[str, List[dict]], code_desc: str = None, **kwargs) -> str:
@ -287,27 +275,31 @@ class MakeTools(WriteCodeByGenerate):
logger.info(f"\n\nAsk to Make tools:\n{'-'*60}\n {self.context[-1]}")
# 更新kwargs
if 'code' in kwargs:
kwargs.pop('code')
if 'code_desc' in kwargs:
kwargs.pop('code_desc')
if "code" in kwargs:
kwargs.pop("code")
if "code_desc" in kwargs:
kwargs.pop("code_desc")
max_tries, current_try = 3, 0
while True:
tool_code = await self.llm.aask_code(self.context, **kwargs)
func_name = self.parse_function_name(tool_code['code'])
func_name = self.parse_function_name(tool_code["code"])
current_try += 1
# make tools failed, add error message to context.
if not func_name:
logger.info(f"\n\nTools Respond\n{'-'*60}\n: {tool_code}")
logger.error(f"No function name found in code, we will retry make tools.\n{tool_code['code']}\n")
self.context.append({'role': 'user', 'content': 'We need a general function in above code,but not found function.'})
self.context.append(
{"role": "user", "content": "We need a general function in above code,but not found function."}
)
# end make tools
if func_name is not None or current_try >= max_tries:
if current_try >= max_tries:
logger.error(f"We have tried the maximum number of attempts {max_tries}\
and still have not created tools successfully, we will skip it.")
logger.error(
f"We have tried the maximum number of attempts {max_tries}\
and still have not created tools successfully, we will skip it."
)
break
logger.info(f"\n\nTools Respond\n{'-'*60}\n: {tool_code}")
self.save(tool_code['code'])
self.save(tool_code["code"])
return tool_code["code"]

View file

@ -1,9 +1,7 @@
import json
from typing import Dict, List, Union
from metagpt.actions import Action
from metagpt.schema import Message, Task, Plan
from metagpt.schema import Plan
from metagpt.utils.common import CodeParser
# CODE_STEPS_PROMPT_TEMPLATE = """
@ -79,7 +77,6 @@ STRUCTURAL_CONTEXT = """
class WriteCodeSteps(Action):
async def run(self, plan: Plan) -> str:
"""Run of a task guide writing action, used in ml engineer
@ -91,9 +88,7 @@ class WriteCodeSteps(Action):
"""
context = self.get_context(plan)
code_steps_prompt = CODE_STEPS_PROMPT_TEMPLATE.replace(
"{context}", context
)
code_steps_prompt = CODE_STEPS_PROMPT_TEMPLATE.replace("{context}", context)
code_steps = await self._aask(code_steps_prompt)
code_steps = CodeParser.parse_code(block=None, text=code_steps)
return code_steps
@ -102,19 +97,16 @@ class WriteCodeSteps(Action):
user_requirement = plan.goal
# select_task_keys = ['task_id', 'instruction', 'is_finished', 'code']
# select_task_keys = ['task_id','instruction']
def process_task(task):
task_dict = task.dict()
# ptask = {k: task_dict[k] for k in task_dict if k in select_task_keys }
ptask = f"task_id_{task_dict['task_id']}:{task_dict['instruction']}"
return ptask
tasks = json.dumps(
[process_task(task) for task in plan.tasks], indent=4, ensure_ascii=False
)
code_lists = [task.code for task in plan.tasks if task.is_finished==True]
tasks = json.dumps([process_task(task) for task in plan.tasks], indent=4, ensure_ascii=False)
code_lists = [task.code for task in plan.tasks if task.is_finished == True]
codes = "\n\n".join(code_lists)
current_task = json.dumps(process_task(plan.current_task)) if plan.current_task else {}
context = STRUCTURAL_CONTEXT.format(

View file

@ -4,16 +4,15 @@
@Author : orange-crow
@File : plan.py
"""
from typing import List, Dict, Tuple
import json
from copy import deepcopy
import traceback
from typing import Dict, List, Tuple
from metagpt.actions import Action
from metagpt.prompts.ml_engineer import ASSIGN_TASK_TYPE_PROMPT, ASSIGN_TASK_TYPE_CONFIG
from metagpt.schema import Message, Task, Plan
from metagpt.utils.common import CodeParser, create_func_config
from metagpt.logs import logger
from metagpt.prompts.ml_engineer import ASSIGN_TASK_TYPE_CONFIG, ASSIGN_TASK_TYPE_PROMPT
from metagpt.schema import Message, Plan, Task
from metagpt.utils.common import CodeParser, create_func_config
class WritePlan(Action):
@ -46,9 +45,7 @@ class WritePlan(Action):
Returns:
List[Dict]: tasks with task type assigned
"""
task_list = "\n".join(
[f"Task {task['task_id']}: {task['instruction']}" for task in tasks]
)
task_list = "\n".join([f"Task {task['task_id']}: {task['instruction']}" for task in tasks])
prompt = ASSIGN_TASK_TYPE_PROMPT.format(task_list=task_list)
tool_config = create_func_config(ASSIGN_TASK_TYPE_CONFIG)
rsp = await self.llm.aask_code(prompt, **tool_config)
@ -57,9 +54,7 @@ class WritePlan(Action):
task["task_type"] = task_type
return json.dumps(tasks)
async def run(
self, context: List[Message], max_tasks: int = 5, use_tools: bool = False
) -> str:
async def run(self, context: List[Message], max_tasks: int = 5, use_tools: bool = False) -> str:
prompt = (
self.PROMPT_TEMPLATE.replace("__context__", "\n".join([str(ct) for ct in context]))
# .replace("__current_plan__", current_plan)
@ -71,11 +66,13 @@ class WritePlan(Action):
rsp = await self.assign_task_type(json.loads(rsp))
return rsp
def rsp_to_tasks(rsp: str) -> List[Task]:
rsp = json.loads(rsp)
tasks = [Task(**task_config) for task_config in rsp]
return tasks
def update_plan_from_rsp(rsp: str, current_plan: Plan):
tasks = rsp_to_tasks(rsp)
if len(tasks) == 1 or tasks[0].dependent_task_ids:
@ -97,6 +94,7 @@ def update_plan_from_rsp(rsp: str, current_plan: Plan):
# add tasks in general
current_plan.add_tasks(tasks)
def precheck_update_plan_from_rsp(rsp: str, current_plan: Plan) -> Tuple[bool, str]:
temp_plan = deepcopy(current_plan)
try:

View file

@ -1 +0,0 @@
from metagpt.plan.planner import Planner

View file

@ -1,11 +1,14 @@
import json
from metagpt.actions.ask_review import AskReview, ReviewConst
from metagpt.actions.write_plan import (
WritePlan,
precheck_update_plan_from_rsp,
update_plan_from_rsp,
)
from metagpt.logs import logger
from metagpt.memory import Memory
from metagpt.schema import Message, Plan, Task, TaskResult
from metagpt.actions.ask_review import AskReview, ReviewConst
from metagpt.actions.write_plan import WritePlan, update_plan_from_rsp, precheck_update_plan_from_rsp
STRUCTURAL_CONTEXT = """
## User Requirement
@ -27,16 +30,18 @@ class Planner:
# memory for working on each task, discarded each time a task is done
self.working_memory = working_memory
@property
def current_task(self):
return self.plan.current_task
@property
def current_task_id(self):
return self.plan.current_task_id
async def ask_review(self, task_result: TaskResult = None, auto_run: bool = None, trigger: str = ReviewConst.TASK_REVIEW_TRIGGER):
async def ask_review(
self, task_result: TaskResult = None, auto_run: bool = None, trigger: str = ReviewConst.TASK_REVIEW_TRIGGER
):
"""
Ask to review the task result, reviewer needs to provide confirmation or request change.
If human confirms the task result, then we deem the task completed, regardless of whether the code run succeeds;
@ -51,27 +56,26 @@ class Planner:
return review, confirmed
confirmed = task_result.is_success if task_result else True
return "", confirmed
async def confirm_task(self, task: Task, task_result: TaskResult, review: str):
self.plan.update_task_result(task=task, task_result=task_result)
self.plan.finish_current_task()
self.working_memory.clear()
confirmed_and_more = (ReviewConst.CONTINUE_WORD[0] in review.lower()
and review.lower() not in ReviewConst.CONTINUE_WORD[0]) # "confirm, ... (more content, such as changing downstream tasks)"
confirmed_and_more = (
ReviewConst.CONTINUE_WORD[0] in review.lower() and review.lower() not in ReviewConst.CONTINUE_WORD[0]
) # "confirm, ... (more content, such as changing downstream tasks)"
if confirmed_and_more:
self.working_memory.add(Message(content=review, role="user", cause_by=AskReview))
await self.update_plan(review)
async def update_plan(self, max_tasks: int = 3, max_retries: int = 3):
plan_confirmed = False
while not plan_confirmed:
context = self.get_useful_memories()
rsp = await WritePlan().run(context, max_tasks=max_tasks, use_tools=self.use_tools)
self.working_memory.add(
Message(content=rsp, role="assistant", cause_by=WritePlan)
)
self.working_memory.add(Message(content=rsp, role="assistant", cause_by=WritePlan))
# precheck plan before asking reviews
is_plan_valid, error = precheck_update_plan_from_rsp(rsp, self.plan)
if not is_plan_valid and max_retries > 0:
@ -80,11 +84,11 @@ class Planner:
self.working_memory.add(Message(content=error_msg, role="assistant", cause_by=WritePlan))
max_retries -= 1
continue
_, plan_confirmed = await self.ask_review(trigger=ReviewConst.TASK_REVIEW_TRIGGER)
update_plan_from_rsp(rsp=rsp, current_plan=self.plan)
self.working_memory.clear()
def get_useful_memories(self, task_exclude_field=None) -> list[Message]:
@ -93,7 +97,7 @@ class Planner:
if task_exclude_field is None:
# Shorten the context as we don't need code steps after we get the codes.
# This doesn't affect current_task below, which should hold the code steps
task_exclude_field = {'code_steps'}
task_exclude_field = {"code_steps"}
user_requirement = self.plan.goal
context = self.plan.context
tasks = [task.dict(exclude=task_exclude_field) for task in self.plan.tasks]
@ -103,5 +107,5 @@ class Planner:
user_requirement=user_requirement, context=context, tasks=tasks, current_task=current_task
)
context_msg = [Message(content=context, role="user")]
return context_msg + self.working_memory.get()

View file

@ -259,7 +259,7 @@ for col in num_cols:
- Always copy the DataFrame before processing it and use the copy to process.
- The output code should contain all steps implemented correctly in 'Code Steps'.
"""
#- If 'Code Steps' contains step done in 'Done Tasks', such as reading data, don't repeat it.
# - If 'Code Steps' contains step done in 'Done Tasks', such as reading data, don't repeat it.
DATA_PREPROCESS_PROMPT = """
The current task is about data preprocessing, please note the following:

View file

@ -22,7 +22,6 @@ from tenacity import (
retry_if_exception_type,
stop_after_attempt,
wait_random_exponential,
wait_fixed,
)
from metagpt.config import CONFIG, Config, LLMProviderEnum

View file

@ -1,8 +1,7 @@
import json
from datetime import datetime
from metagpt.actions.execute_code import ExecutePyCode
from metagpt.actions.ask_review import ReviewConst
from metagpt.actions.execute_code import ExecutePyCode
from metagpt.actions.write_analysis_code import WriteCodeByGenerate
from metagpt.logs import logger
from metagpt.roles import Role
@ -12,7 +11,12 @@ from metagpt.utils.save_code import save_code_file
class CodeInterpreter(Role):
def __init__(
self, name="Charlie", profile="CodeInterpreter", goal="", auto_run=False, use_tools=False,
self,
name="Charlie",
profile="CodeInterpreter",
goal="",
auto_run=False,
use_tools=False,
):
super().__init__(name=name, profile=profile, goal=goal)
self._set_react_mode(react_mode="plan_and_act", auto_run=auto_run, use_tools=use_tools)
@ -21,9 +25,8 @@ class CodeInterpreter(Role):
@property
def working_memory(self):
return self._rc.working_memory
async def _plan_and_act(self):
async def _plan_and_act(self):
rsp = await super()._plan_and_act()
# save code using datetime.now or keywords related to the goal of your project (plan.goal).
@ -31,47 +34,40 @@ class CodeInterpreter(Role):
save_code_file(name=project_record, code_context=self.execute_code.nb, file_format="ipynb")
return rsp
async def _act_on_task(self, current_task: Task) -> TaskResult:
code, result, is_success = await self._write_and_exec_code()
task_result = TaskResult(code=code, result=result, is_success=is_success)
return task_result
async def _write_and_exec_code(self, max_retry: int = 3):
counter = 0
success = False
while not success and counter < max_retry:
context = self.planner.get_useful_memories()
logger.info("Write code with pure generation")
code = await WriteCodeByGenerate().run(
context=context, plan=self.planner.plan, temperature=0.0
)
code = await WriteCodeByGenerate().run(context=context, plan=self.planner.plan, temperature=0.0)
cause_by = WriteCodeByGenerate
self.working_memory.add(
Message(content=code, role="assistant", cause_by=cause_by)
)
self.working_memory.add(Message(content=code, role="assistant", cause_by=cause_by))
result, success = await self.execute_code.run(code)
print(result)
self.working_memory.add(
Message(content=result, role="user", cause_by=ExecutePyCode)
)
self.working_memory.add(Message(content=result, role="user", cause_by=ExecutePyCode))
if "!pip" in code:
success = False
counter += 1
if not success and counter >= max_retry:
logger.info("coding failed!")
review, _ = await self.planner.ask_review(auto_run=False, trigger=ReviewConst.CODE_REVIEW_TRIGGER)
if ReviewConst.CHANGE_WORD[0] in review:
counter = 0 # redo the task again with help of human suggestions
return code, result, success

View file

@ -1,25 +1,23 @@
from typing import Dict, List, Union, Tuple
import json
import subprocess
import os
import subprocess
import fire
import pandas as pd
from metagpt.actions import Action, BossRequirement
from metagpt.actions.ml_da_action import SummarizeAnalysis
from metagpt.config import CONFIG
from metagpt.const import WORKSPACE_ROOT
from metagpt.roles import Role
from metagpt.actions import Action, BossRequirement
from metagpt.actions.ask_review import AskReview
from metagpt.actions.ml_da_action import SummarizeAnalysis
from metagpt.schema import Message, Task, Plan
from metagpt.logs import logger
from metagpt.roles import Role
from metagpt.schema import Message
from metagpt.utils.common import CodeParser
os.environ["KAGGLE_USERNAME"] = CONFIG.kaggle_username
os.environ["KAGGLE_KEY"] = CONFIG.kaggle_key
def run_command(cmd):
print(cmd)
output = subprocess.run(cmd, shell=True, capture_output=True, text=True)
@ -30,21 +28,21 @@ def run_command(cmd):
print(output.stdout)
return output.stdout
class DownloadData(Action):
class DownloadData(Action):
async def run(self, competition, data_desc="") -> str:
data_path = WORKSPACE_ROOT / competition
output = run_command(f"kaggle competitions list --search {competition}")
assert output != "No competitions found", "You must provide the correct competition name"
run_command(f"kaggle competitions download {competition} --path {WORKSPACE_ROOT}")
if not os.path.exists(data_path):
# if True:
# if True:
# run_command(f"rm -r {data_path / '*'}")
run_command(f"unzip -o {WORKSPACE_ROOT / '*.zip'} -d {data_path}") # FIXME: not safe
file_list = run_command(f"ls {data_path}")
rsp = f"""
@ -55,6 +53,7 @@ class DownloadData(Action):
"""
return rsp
class SubmitResult(Action):
PROMPT_TEMPLATE = """
# Summary
@ -85,9 +84,9 @@ class SubmitResult(Action):
run_command(f"kaggle competitions submit {competition} -f {submit_file_path} -m '{submit_message}'")
run_command(f"kaggle competitions leaderboard --show --csv {competition} > {data_path / 'leaderboard.csv'}")
run_command(f"kaggle competitions submissions --csv {competition} > {data_path / 'submission.csv'}")
leaderboard = pd.read_csv(data_path / 'leaderboard.csv')
submission = pd.read_csv(data_path / 'submission.csv')
leaderboard = pd.read_csv(data_path / "leaderboard.csv")
submission = pd.read_csv(data_path / "submission.csv")
print(submission) # submission.to_json(orient="records")
submission_score = submission.loc[0, "publicScore"]
@ -106,9 +105,7 @@ class SubmitResult(Action):
class KaggleManager(Role):
def __init__(
self, name="ABC", profile="KaggleManager", goal="", competition="titanic", data_desc=""
):
def __init__(self, name="ABC", profile="KaggleManager", goal="", competition="titanic", data_desc=""):
super().__init__(name=name, profile=profile, goal=goal)
self._init_actions([DownloadData, SubmitResult])
self._watch([BossRequirement, SummarizeAnalysis])
@ -130,13 +127,16 @@ class KaggleManager(Role):
rsp = await todo.run(self.competition, self.data_desc)
elif isinstance(todo, SubmitResult):
submit_message = self.get_memories()[-1].content # use analysis summary from MLEngineer as submission message
submit_message = self.get_memories()[
-1
].content # use analysis summary from MLEngineer as submission message
rsp = await todo.run(competition=self.competition, submit_message=submit_message)
msg = Message(content=rsp, role="user", cause_by=type(todo))
return msg
if __name__ == "__main__":
competition, data_desc, requirement = (
"titanic",
@ -151,4 +151,4 @@ if __name__ == "__main__":
# await role.run(Message(content="", cause_by=BossRequirement))
await role.run(Message(content=summary, cause_by=SummarizeAnalysis))
fire.Fire(main)
fire.Fire(main)

View file

@ -1,36 +1,46 @@
import json
from metagpt.actions.ask_review import ReviewConst
from metagpt.actions.debug_code import DebugCode
from metagpt.actions.execute_code import ExecutePyCode
from metagpt.actions.ask_review import ReviewConst
from metagpt.actions.write_analysis_code import WriteCodeByGenerate, WriteCodeWithTools, MakeTools
from metagpt.actions.ml_da_action import Reflect, SummarizeAnalysis, UpdateDataColumns
from metagpt.actions.write_analysis_code import (
MakeTools,
WriteCodeByGenerate,
WriteCodeWithTools,
)
from metagpt.actions.write_code_steps import WriteCodeSteps
from metagpt.const import PROJECT_ROOT
from metagpt.logs import logger
from metagpt.schema import Message
from metagpt.utils.common import remove_comments
from metagpt.actions.ml_da_action import SummarizeAnalysis, Reflect, UpdateDataColumns
from metagpt.roles.code_interpreter import CodeInterpreter
from metagpt.roles.kaggle_manager import DownloadData, SubmitResult
from metagpt.schema import Message
from metagpt.tools.functions.libs.udf import UDFS_YAML
from metagpt.utils.common import remove_comments
class MLEngineer(CodeInterpreter):
def __init__(
self, name="Mark", profile="MLEngineer", goal="", auto_run=False, use_tools=False, use_code_steps=False,
make_udfs=False, use_udfs=False
self,
name="Mark",
profile="MLEngineer",
goal="",
auto_run=False,
use_tools=False,
use_code_steps=False,
make_udfs=False,
use_udfs=False,
):
super().__init__(name=name, profile=profile, goal=goal, auto_run=auto_run, use_tools=use_tools)
self._watch([DownloadData, SubmitResult])
self.use_tools = use_tools
self.use_code_steps = use_code_steps
self.make_udfs = make_udfs # user-defined functions
self.make_udfs = make_udfs # user-defined functions
self.use_udfs = use_udfs
self.data_desc = {}
async def _plan_and_act(self):
### Actions in a multi-agent multi-turn setting, a new attempt on the data ###
memories = self.get_memories()
if memories:
@ -40,64 +50,62 @@ class MLEngineer(CodeInterpreter):
elif latest_event == SubmitResult:
# self reflect on previous plan outcomes and think about how to improve the plan, add to working memory
await self._reflect()
# get feedback for improvement from human, add to working memory
await self.planner.ask_review(trigger=ReviewConst.TASK_REVIEW_TRIGGER)
### general plan process ###
await super()._plan_and_act()
### summarize analysis ###
summary = await SummarizeAnalysis().run(self.planner.plan)
rsp = Message(content=summary, cause_by=SummarizeAnalysis)
self._rc.memory.add(rsp)
return rsp
async def _write_and_exec_code(self, max_retry: int = 3):
self.planner.current_task.code_steps = (
await WriteCodeSteps().run(self.planner.plan)
if self.use_code_steps
else ""
await WriteCodeSteps().run(self.planner.plan) if self.use_code_steps else ""
)
counter = 0
success = False
debug_context = []
while not success and counter < max_retry:
while not success and counter < max_retry:
context = self.planner.get_useful_memories()
if counter > 0 and (self.use_tools or self.use_udfs):
logger.warning('We got a bug code, now start to debug...')
logger.warning("We got a bug code, now start to debug...")
code = await DebugCode().run(
plan=self.planner.current_task.instruction,
code=code,
runtime_result=self.working_memory.get(),
context=debug_context
context=debug_context,
)
logger.info(f"new code \n{code}")
cause_by = DebugCode
elif (not self.use_tools and not self.use_udfs) or (
self.planner.current_task.task_type == 'other' and not self.use_udfs):
self.planner.current_task.task_type == "other" and not self.use_udfs
):
logger.info("Write code with pure generation")
code = await WriteCodeByGenerate().run(
context=context, plan=self.planner.plan, temperature=0.0
)
debug_context = [self.planner.get_useful_memories(task_exclude_field={'result', 'code_steps'})[0]]
code = await WriteCodeByGenerate().run(context=context, plan=self.planner.plan, temperature=0.0)
debug_context = [self.planner.get_useful_memories(task_exclude_field={"result", "code_steps"})[0]]
cause_by = WriteCodeByGenerate
else:
logger.info("Write code with tools")
if self.use_udfs:
# use user-defined function tools.
logger.warning("Writing code with user-defined function tools by WriteCodeWithTools.")
logger.info(f"Local user defined function as following:\
\n{json.dumps(list(UDFS_YAML.keys()), indent=2, ensure_ascii=False)}")
logger.info(
f"Local user defined function as following:\
\n{json.dumps(list(UDFS_YAML.keys()), indent=2, ensure_ascii=False)}"
)
# set task_type to `udf`
self.planner.current_task.task_type = 'udf'
self.planner.current_task.task_type = "udf"
schema_path = UDFS_YAML
else:
schema_path = PROJECT_ROOT / "metagpt/tools/functions/schemas"
@ -108,26 +116,22 @@ class MLEngineer(CodeInterpreter):
)
debug_context = tool_context
cause_by = WriteCodeWithTools
self.working_memory.add(
Message(content=code, role="assistant", cause_by=cause_by)
)
self.working_memory.add(Message(content=code, role="assistant", cause_by=cause_by))
result, success = await self.execute_code.run(code)
print(result)
# make tools for successful code and long code.
if success and self.make_udfs and len(remove_comments(code).split('\n')) > 4:
logger.info('Execute code successfully. Now start to make tools ...')
if success and self.make_udfs and len(remove_comments(code).split("\n")) > 4:
logger.info("Execute code successfully. Now start to make tools ...")
await self.make_tools(code=code)
self.working_memory.add(
Message(content=result, role="user", cause_by=ExecutePyCode)
)
self.working_memory.add(Message(content=result, role="user", cause_by=ExecutePyCode))
if "!pip" in code:
success = False
counter += 1
if not success and counter >= max_retry:
logger.info("coding failed!")
review, _ = await self.planner.ask_review(auto_run=False, trigger=ReviewConst.CODE_REVIEW_TRIGGER)
@ -135,13 +139,15 @@ class MLEngineer(CodeInterpreter):
counter = 0 # redo the task again with help of human suggestions
if success:
if (self.use_tools and self.planner.current_task.task_type not in ['model_train', 'model_evaluate']) or self.use_udfs:
if (
self.use_tools and self.planner.current_task.task_type not in ["model_train", "model_evaluate"]
) or self.use_udfs:
update_success, new_code = await self._update_data_columns()
if update_success:
code = code + "\n\n" + new_code
return code, result, success
async def _update_data_columns(self):
logger.info("Check columns in updated data")
rsp = await UpdateDataColumns().run(self.planner.plan)
@ -153,11 +159,11 @@ class MLEngineer(CodeInterpreter):
print(result)
self.data_desc["column_info"] = result
return success, code
async def _reflect(self):
context = self.get_memories()
context = "\n".join([str(msg) for msg in context])
reflection = await Reflect().run(context=context)
self.working_memory.add(Message(content=reflection, role="assistant"))
self.working_memory.add(Message(content=Reflect.REWRITE_PLAN_INSTRUCTION, role="user"))
@ -168,8 +174,10 @@ class MLEngineer(CodeInterpreter):
Args:
code (str): pure generation code by class WriteCodeByGenerate.
"""
logger.warning(f"Making tools for task_id {self.planner.current_task_id}: \
`{self.planner.current_task.instruction}` \n code: \n {code}")
logger.warning(
f"Making tools for task_id {self.planner.current_task_id}: \
`{self.planner.current_task.instruction}` \n code: \n {code}"
)
make_tools = MakeTools()
make_tool_retries, make_tool_current_retry = 3, 0
while True:
@ -185,9 +193,11 @@ class MLEngineer(CodeInterpreter):
# end make tools
if execute_success or make_tool_current_retry >= make_tool_retries:
if make_tool_current_retry >= make_tool_retries:
logger.error(f"We have tried the maximum number of attempts {make_tool_retries}\
logger.error(
f"We have tried the maximum number of attempts {make_tool_retries}\
and still have not created tools for task_id {self.planner.current_task_id} successfully,\
we will skip it.")
we will skip it."
)
break
# save successful tool code in udf
if execute_success:

View file

@ -1,18 +1,17 @@
import re
from typing import List
import json
from datetime import datetime
from typing import List
import fire
from metagpt.roles import Role
from metagpt.schema import Message
from metagpt.memory import Memory
from metagpt.logs import logger
from metagpt.actions.write_analysis_code import WriteCodeByGenerate
from metagpt.actions.ask_review import AskReview, ReviewConst
from metagpt.actions.execute_code import ExecutePyCode
from metagpt.actions.write_analysis_code import WriteCodeByGenerate
from metagpt.logs import logger
from metagpt.memory import Memory
from metagpt.roles import Role
from metagpt.roles.kaggle_manager import DownloadData
from metagpt.schema import Message
from metagpt.utils.save_code import save_code_file
STRUCTURAL_CONTEXT_SIMPLE = """
@ -40,9 +39,7 @@ Next Steps:
class MLEngineerSimple(Role):
def __init__(
self, name="ABC", profile="MLEngineerSimple", goal="", auto_run: bool = False
):
def __init__(self, name="ABC", profile="MLEngineerSimple", goal="", auto_run: bool = False):
super().__init__(name=name, profile=profile, goal=goal)
self._set_react_mode(react_mode="react")
self._watch([DownloadData])
@ -78,19 +75,13 @@ class MLEngineerSimple(Role):
context = self.get_useful_memories()
print(f"memories数量{len(context)}")
# print("===\n" +str(context) + "\n===")
code = await WriteCodeByGenerate().run(
context=context, temperature=0.0
)
code = await WriteCodeByGenerate().run(context=context, temperature=0.0)
cause_by = WriteCodeByGenerate
self.working_memory.add(
Message(content=code, role="assistant", cause_by=cause_by)
)
self.working_memory.add(Message(content=code, role="assistant", cause_by=cause_by))
result, success = await self.execute_code.run(code)
print(result)
self.working_memory.add(
Message(content=result, role="user", cause_by=ExecutePyCode)
)
self.working_memory.add(Message(content=result, role="user", cause_by=ExecutePyCode))
if "!pip" in code:
success = False
@ -107,12 +98,10 @@ class MLEngineerSimple(Role):
self._rc.memory.add(completed_plan_memory[0]) # add to persistent memory
prompt = JUDGE_PROMPT_TEMPLATE.format(user_requirement=self.goal, context=completed_plan_memory)
rsp = await self._llm.aask(prompt)
self.working_memory.add(
Message(content=rsp, role="system")
)
self.working_memory.add(Message(content=rsp, role="system"))
matches = re.findall(r'\b(True|False)\b', rsp)
state = False if 'False' in matches else True
matches = re.findall(r"\b(True|False)\b", rsp)
state = False if "False" in matches else True
async def _ask_review(self, auto_run: bool = None, trigger: str = ReviewConst.TASK_REVIEW_TRIGGER):
auto_run = auto_run or self.auto_run
@ -127,9 +116,7 @@ class MLEngineerSimple(Role):
def get_useful_memories(self) -> List[Message]:
"""find useful memories only to reduce context length and improve performance"""
user_requirement = self.goal
context = STRUCTURAL_CONTEXT_SIMPLE.format(
user_requirement=user_requirement, data_desc=self.data_desc
)
context = STRUCTURAL_CONTEXT_SIMPLE.format(user_requirement=user_requirement, data_desc=self.data_desc)
context_msg = [Message(content=context, role="user")]
return context_msg + self.get_working_memories(6)

View file

@ -35,10 +35,9 @@ from metagpt.const import SERDESER_PATH
from metagpt.llm import LLM, HumanProvider
from metagpt.logs import logger
from metagpt.memory import Memory
from metagpt.provider.base_llm import BaseLLM
from metagpt.schema import Message, MessageQueue, SerializationMixin
from metagpt.schema import Task, TaskResult
from metagpt.plan.planner import Planner
from metagpt.provider.base_llm import BaseLLM
from metagpt.schema import Message, MessageQueue, SerializationMixin, Task, TaskResult
from metagpt.utils.common import (
any_to_name,
any_to_str,
@ -270,7 +269,9 @@ class Role(SerializationMixin, is_polymorphic_base=True):
if react_mode == RoleReactMode.REACT:
self.rc.max_react_loop = max_react_loop
elif react_mode == RoleReactMode.PLAN_AND_ACT:
self.planner = Planner(goal=self._setting.goal, working_memory=self.rc.working_memory, auto_run=auto_run, use_tools=use_tools)
self.planner = Planner(
goal=self._setting.goal, working_memory=self.rc.working_memory, auto_run=auto_run, use_tools=use_tools
)
def _watch(self, actions: Iterable[Type[Action]] | Iterable[Action]):
"""Watch Actions of interest. Role will select Messages caused by these Actions from its personal message
@ -450,35 +451,34 @@ class Role(SerializationMixin, is_polymorphic_base=True):
async def _plan_and_act(self) -> Message:
"""first plan, then execute an action sequence, i.e. _think (of a plan) -> _act -> _act -> ... Use llm to come up with the plan dynamically."""
### Common Procedure in both single- and multi-agent setting ###
# create initial plan and update until confirmation
await self.planner.update_plan()
while self.planner.current_task:
while self.planner.current_task:
task = self.planner.current_task
logger.info(f"ready to take on task {task}")
# take on current task
task_result = await self._act_on_task(task)
# ask for acceptance, users can other refuse and change tasks in the plan
review, task_result_confirmed = await self.planner.ask_review(task_result)
if task_result_confirmed:
# tick off this task and record progress
await self.planner.confirm_task(task, task_result, review)
elif "redo" in review:
# Ask the Role to redo this task with help of review feedback,
# useful when the code run is successful but the procedure or result is not what we want
continue
else:
# update plan according to user's feedback and to take on changed tasks
await self.planner.update_plan(review)
completed_plan_memory = self.planner.get_useful_memories() # completed plan as a outcome
rsp = completed_plan_memory[0]
@ -486,7 +486,7 @@ class Role(SerializationMixin, is_polymorphic_base=True):
self.rc.memory.add(rsp) # add to persistent memory
return rsp
async def _act_on_task(self, current_task: Task) -> TaskResult:
"""Taking specific action to handle one task in plan

View file

@ -308,12 +308,12 @@ class AIMessage(Message):
"""
def __init__(self, content: str):
super().__init__(content, 'assistant')
super().__init__(content, "assistant")
class Task(BaseModel):
task_id: str = ""
dependent_task_ids: list[str] = [] # Tasks prerequisite to this Task
dependent_task_ids: list[str] = [] # Tasks prerequisite to this Task
instruction: str = ""
task_type: str = ""
code_steps: str = ""
@ -325,6 +325,7 @@ class Task(BaseModel):
class TaskResult(BaseModel):
"""Result of taking a task, with result and is_success required to be filled"""
code_steps: str = ""
code: str = ""
result: str
@ -360,12 +361,12 @@ class Plan(BaseModel):
def add_tasks(self, tasks: list[Task]):
"""
Integrates new tasks into the existing plan, ensuring dependency order is maintained.
This method performs two primary functions based on the current state of the task list:
1. If there are no existing tasks, it topologically sorts the provided tasks to ensure
1. If there are no existing tasks, it topologically sorts the provided tasks to ensure
correct execution order based on dependencies, and sets these as the current tasks.
2. If there are existing tasks, it merges the new tasks with the existing ones. It maintains
any common prefix of tasks (based on task_id and instruction) and appends the remainder
2. If there are existing tasks, it merges the new tasks with the existing ones. It maintains
any common prefix of tasks (based on task_id and instruction) and appends the remainder
of the new tasks. The current task is updated to the first unfinished task in this merged list.
Args:
@ -395,13 +396,13 @@ class Plan(BaseModel):
# Combine the common prefix with the remainder of the new tasks
final_tasks = self.tasks[:prefix_length] + new_tasks[prefix_length:]
self.tasks = final_tasks
# Update current_task_id to the first unfinished task in the merged list
self._update_current_task()
# Update the task map for quick access to tasks by ID
self.task_map = {task.task_id: task for task in self.tasks}
def reset_task(self, task_id: str):
"""
Clear code and result of the task based on task_id, and set the task as unfinished.
@ -448,20 +449,21 @@ class Plan(BaseModel):
Args:
new_task (Task): The new task to be appended to the existing task sequence
Returns:
None
"""
assert not self.has_task_id(new_task.task_id), "Task already in current plan, use replace_task instead"
assert all([self.has_task_id(dep_id) for dep_id in new_task.dependent_task_ids]), \
"New task has unknown dependencies"
assert all(
[self.has_task_id(dep_id) for dep_id in new_task.dependent_task_ids]
), "New task has unknown dependencies"
# Existing tasks do not depend on the new task, it's fine to put it to the end of the sorted task sequence
self.tasks.append(new_task)
self.task_map[new_task.task_id] = new_task
self._update_current_task()
def update_task_result(self, task: Task, task_result: TaskResult):
task.code_steps = task_result.code_steps
task.code = task_result.code
@ -478,7 +480,7 @@ class Plan(BaseModel):
current_task_id = task.task_id
break
self.current_task_id = current_task_id # all tasks finished
@property
def current_task(self) -> Task:
"""Find current task to execute
@ -489,8 +491,7 @@ class Plan(BaseModel):
return self.task_map.get(self.current_task_id, None)
def finish_current_task(self):
"""Finish current task, set Task.is_finished=True, set current task to next task
"""
"""Finish current task, set Task.is_finished=True, set current task to next task"""
if self.current_task_id:
self.current_task.is_finished = True
self._update_current_task() # set to next task

View file

@ -3,19 +3,26 @@ import json
import numpy as np
import pandas as pd
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import LabelEncoder
from sklearn.preprocessing import MaxAbsScaler
from sklearn.preprocessing import MinMaxScaler
from sklearn.preprocessing import OneHotEncoder
from sklearn.preprocessing import OrdinalEncoder
from sklearn.preprocessing import RobustScaler
from sklearn.preprocessing import StandardScaler
from sklearn.preprocessing import (
LabelEncoder,
MaxAbsScaler,
MinMaxScaler,
OneHotEncoder,
OrdinalEncoder,
RobustScaler,
StandardScaler,
)
from metagpt.tools.functions.libs.base import MLProcess
class FillMissingValue(MLProcess):
def __init__(self, features: list, strategy: str = 'mean', fill_value=None,):
def __init__(
self,
features: list,
strategy: str = "mean",
fill_value=None,
):
self.features = features
self.strategy = strategy
self.fill_value = fill_value
@ -35,7 +42,10 @@ class FillMissingValue(MLProcess):
class MinMaxScale(MLProcess):
def __init__(self, features: list,):
def __init__(
self,
features: list,
):
self.features = features
self.mms = None
@ -49,7 +59,10 @@ class MinMaxScale(MLProcess):
class StandardScale(MLProcess):
def __init__(self, features: list,):
def __init__(
self,
features: list,
):
self.features = features
self.ss = None
@ -63,7 +76,10 @@ class StandardScale(MLProcess):
class MaxAbsScale(MLProcess):
def __init__(self, features: list,):
def __init__(
self,
features: list,
):
self.features = features
self.mas = None
@ -77,7 +93,10 @@ class MaxAbsScale(MLProcess):
class RobustScale(MLProcess):
def __init__(self, features: list,):
def __init__(
self,
features: list,
):
self.features = features
self.rs = None
@ -91,7 +110,10 @@ class RobustScale(MLProcess):
class OrdinalEncode(MLProcess):
def __init__(self, features: list,):
def __init__(
self,
features: list,
):
self.features = features
self.oe = None
@ -105,7 +127,10 @@ class OrdinalEncode(MLProcess):
class OneHotEncode(MLProcess):
def __init__(self, features: list,):
def __init__(
self,
features: list,
):
self.features = features
self.ohe = None
@ -123,7 +148,10 @@ class OneHotEncode(MLProcess):
class LabelEncode(MLProcess):
def __init__(self, features: list,):
def __init__(
self,
features: list,
):
self.features = features
self.le_encoders = []
@ -131,7 +159,7 @@ class LabelEncode(MLProcess):
if len(self.features) == 0:
return
for col in self.features:
le = LabelEncoder().fit(df[col].astype(str).unique().tolist() + ['unknown'])
le = LabelEncoder().fit(df[col].astype(str).unique().tolist() + ["unknown"])
self.le_encoders.append(le)
def transform(self, df: pd.DataFrame):
@ -141,7 +169,7 @@ class LabelEncode(MLProcess):
data_list = df[self.features[i]].astype(str).tolist()
for unique_item in np.unique(df[self.features[i]].astype(str)):
if unique_item not in self.le_encoders[i].classes_:
data_list = ['unknown' if x == unique_item else x for x in data_list]
data_list = ["unknown" if x == unique_item else x for x in data_list]
df[self.features[i]] = self.le_encoders[i].transform(data_list)
return df
@ -165,5 +193,5 @@ def get_column_info(df: pd.DataFrame) -> dict:
column_info["Others"].append(col)
if len(json.dumps(column_info)) > 2000:
column_info['Numeric'] = column_info['Numeric'][0:5] + ['Too many cols, omission here...']
column_info["Numeric"] = column_info["Numeric"][0:5] + ["Too many cols, omission here..."]
return column_info

View file

@ -13,7 +13,7 @@ from joblib import Parallel, delayed
from pandas.core.dtypes.common import is_object_dtype
from sklearn.feature_selection import VarianceThreshold
from sklearn.model_selection import KFold
from sklearn.preprocessing import PolynomialFeatures, KBinsDiscretizer
from sklearn.preprocessing import KBinsDiscretizer, PolynomialFeatures
from metagpt.tools.functions.libs.base import MLProcess
@ -91,9 +91,7 @@ class KFoldTargetMeanEncoder(MLProcess):
col_name = f"{self.col}_kf_target_mean"
for trn_idx, val_idx in kf.split(tmp, tmp[self.label]):
_trn, _val = tmp.iloc[trn_idx], tmp.iloc[val_idx]
tmp.loc[tmp.index[val_idx], col_name] = _val[self.col].map(
_trn.groupby(self.col)[self.label].mean()
)
tmp.loc[tmp.index[val_idx], col_name] = _val[self.col].map(_trn.groupby(self.col)[self.label].mean())
tmp[col_name].fillna(global_mean, inplace=True)
self.encoder_dict = tmp.groupby(self.col)[col_name].mean().to_dict()
@ -111,7 +109,7 @@ class CatCross(MLProcess):
@staticmethod
def cross_two(comb, df):
new_col = f'{comb[0]}_{comb[1]}'
new_col = f"{comb[0]}_{comb[1]}"
new_col_combs = list(itertools.product(df[comb[0]].unique(), df[comb[1]].unique()))
ll = list(range(len(new_col_combs)))
comb_map = dict(zip(new_col_combs, ll))
@ -122,13 +120,12 @@ class CatCross(MLProcess):
if df[col].nunique() > self.max_cat_num:
self.cols.remove(col)
self.combs = list(itertools.combinations(self.cols, 2))
res = Parallel(n_jobs=4, require='sharedmem')(
delayed(self.cross_two)(comb, df) for comb in self.combs)
res = Parallel(n_jobs=4, require="sharedmem")(delayed(self.cross_two)(comb, df) for comb in self.combs)
self.combs_map = dict(res)
def transform(self, df: pd.DataFrame) -> pd.DataFrame:
for comb in self.combs:
new_col = f'{comb[0]}_{comb[1]}'
new_col = f"{comb[0]}_{comb[1]}"
_map = self.combs_map[new_col]
df[new_col] = pd.Series(zip(df[comb[0]], df[comb[1]])).map(_map)
# set the unknown value to a new number
@ -157,13 +154,13 @@ class GroupStat(MLProcess):
class SplitBins(MLProcess):
def __init__(self, cols: str, strategy: str = 'quantile'):
def __init__(self, cols: str, strategy: str = "quantile"):
self.cols = cols
self.strategy = strategy
self.encoder = None
def fit(self, df: pd.DataFrame):
self.encoder = KBinsDiscretizer(strategy=self.strategy, encode='ordinal')
self.encoder = KBinsDiscretizer(strategy=self.strategy, encode="ordinal")
self.encoder.fit(df[self.cols].fillna(0))
def transform(self, df: pd.DataFrame) -> pd.DataFrame:
@ -296,10 +293,7 @@ class GeneralSelection(MLProcess):
if df[col].nunique() == 1:
feats.remove(col)
if (
df.loc[df[col] == np.inf].shape[0] != 0
or df.loc[df[col] == np.inf].shape[0] != 0
):
if df.loc[df[col] == np.inf].shape[0] != 0 or df.loc[df[col] == np.inf].shape[0] != 0:
feats.remove(col)
if is_object_dtype(df[col]) and df[col].nunique() == df.shape[0]:
@ -320,10 +314,10 @@ class TreeBasedSelection(MLProcess):
def fit(self, df: pd.DataFrame):
params = {
'boosting_type': 'gbdt',
'objective': 'binary',
'learning_rate': 0.1,
'num_leaves': 31,
"boosting_type": "gbdt",
"objective": "binary",
"learning_rate": 0.1,
"num_leaves": 31,
}
if self.task_type == "cls":
@ -342,12 +336,11 @@ class TreeBasedSelection(MLProcess):
dtrain = lgb.Dataset(df[cols], df[self.label_col])
model = lgb.train(params, dtrain, num_boost_round=100)
df_imp = pd.DataFrame({'feature_name': dtrain.feature_name,
'importance': model.feature_importance("gain")})
df_imp = pd.DataFrame({"feature_name": dtrain.feature_name, "importance": model.feature_importance("gain")})
df_imp.sort_values("importance", ascending=False, inplace=True)
df_imp = df_imp[df_imp["importance"] > 0]
self.feats = df_imp['feature_name'].tolist()
self.feats = df_imp["feature_name"].tolist()
self.feats.append(self.label_col)
def transform(self, df: pd.DataFrame) -> pd.DataFrame:

View file

@ -5,12 +5,12 @@ import yaml
import inspect
import importlib
from pathlib import Path
from typing import Dict, List
from typing import List
from metagpt.logs import logger
def extract_function_signatures(file_path):
with open(file_path, 'r', encoding='utf-8') as file:
with open(file_path, "r", encoding="utf-8") as file:
source_code = file.read()
tree = ast.parse(source_code)
@ -19,7 +19,7 @@ def extract_function_signatures(file_path):
for node in ast.walk(tree):
if isinstance(node, ast.FunctionDef):
# 只提取用户自定义函数,排除内置函数
if not (node.name.startswith('__') and node.name.endswith('__')):
if not (node.name.startswith("__") and node.name.endswith("__")):
# 获取函数名
function_name = node.name
# 获取参数列表
@ -27,36 +27,37 @@ def extract_function_signatures(file_path):
# 获取函数签名
function_signature = f"{function_name}({', '.join(args)})"
# 导入函数
module_name = Path(file_path).parts[-1][:-len(Path(file_path).suffix)]
module_name = Path(file_path).parts[-1][: -len(Path(file_path).suffix)]
module = importlib.import_module(f"metagpt.tools.functions.libs.udf.{module_name}")
# 将函数导入到当前命名空间
globals().update({function_name: getattr(module, function_name)})
# 获取函数注释和函数路径
function_schema = {'udf_name': function_signature,
'udf_path': f'from metagpt.tools.functions.libs.udf.{module_name} import {function_name}',
'udf_doc': inspect.getdoc(getattr(module, function_name))}
function_schema = {
"udf_name": function_signature,
"udf_path": f"from metagpt.tools.functions.libs.udf.{module_name} import {function_name}",
"udf_doc": inspect.getdoc(getattr(module, function_name)),
}
function_signatures.append(function_schema)
# 获取函数返回变量名
source_lines, _ = inspect.getsourcelines(getattr(module, function_name))
for line in source_lines:
if line.strip().startswith("return "):
function_returns.append({
'udf_name': function_name,
'udf_returns': [var.strip() for var in line.strip()[len("return "):].split(',')]
})
function_returns.append(
{
"udf_name": function_name,
"udf_returns": [var.strip() for var in line.strip()[len("return ") :].split(",")],
}
)
break
# 没有返回值的函数
if not function_returns or function_returns[-1]['udf_name'] != function_name:
function_returns.append({
'udf_name': function_name,
'udf_returns': [None]
})
if not function_returns or function_returns[-1]["udf_name"] != function_name:
function_returns.append({"udf_name": function_name, "udf_returns": [None]})
return function_signatures, function_returns
def get_function_signatures_in_folder(folder_path):
python_files = [f for f in os.listdir(folder_path) if f.endswith('.py') and f != '__init__.py']
python_files = [f for f in os.listdir(folder_path) if f.endswith(".py") and f != "__init__.py"]
all_function_signatures = []
all_function_returns = []
@ -74,31 +75,33 @@ def docstring_to_yaml(docstring: str, return_vars: List[str] = None):
if docstring is None:
return {}
# 匹配简介部分
description_match = re.search(r'^(.*?)(?:Args:|Returns:|Raises:|$)', docstring, re.DOTALL)
description_match = re.search(r"^(.*?)(?:Args:|Returns:|Raises:|$)", docstring, re.DOTALL)
description = description_match.group(1).strip() if description_match else ""
# 匹配Args部分
args_match = re.search(r'Args:\s*(.*?)(?:Returns:|Raises:|$)', docstring, re.DOTALL)
args_match = re.search(r"Args:\s*(.*?)(?:Returns:|Raises:|$)", docstring, re.DOTALL)
_args = args_match.group(1).strip() if args_match else ""
variable_pattern = re.compile(r'(\w+)\s*\((.*?)\):\s*(.*)')
variable_pattern = re.compile(r"(\w+)\s*\((.*?)\):\s*(.*)")
params = variable_pattern.findall(_args)
if not params:
params = ((None, None, None),)
# 匹配Returns部分
returns_match = re.search(r'Returns:\s*(.*?)(?:Raises:|$)', docstring, re.DOTALL)
returns_match = re.search(r"Returns:\s*(.*?)(?:Raises:|$)", docstring, re.DOTALL)
returns = returns_match.group(1).strip() if returns_match else ""
return_pattern = re.compile(r'^(.*)\s*:\s*(.*)$')
return_pattern = re.compile(r"^(.*)\s*:\s*(.*)$")
# 添加返回值变量名
return_vars = return_vars if isinstance(return_vars, list) else [return_vars]
returns = [(r, *r_desc) for r_desc, r in zip(return_pattern.findall(returns), return_vars)]
# 构建YAML字典
yaml_data = {
'description': description.strip('.').strip(),
'parameters': {
'properties': {param[0]: {'type': param[1], 'description': param[2]} for param in params if param[0] is not None},
'required': [param[0] for param in params if param[0] is not None]
"description": description.strip(".").strip(),
"parameters": {
"properties": {
param[0]: {"type": param[1], "description": param[2]} for param in params if param[0] is not None
},
"required": [param[0] for param in params if param[0] is not None],
},
'returns': {ret[0]: {'type': ret[1], 'description': ret[2]} for ret in returns}
"returns": {ret[0]: {"type": ret[1], "description": ret[2]} for ret in returns},
}
return yaml_data
@ -107,10 +110,10 @@ def extract_function_schema_yaml_in_folder(folder_path: str):
function_signatures, function_returns = get_function_signatures_in_folder(folder_path)
function_schema_yaml_data = {}
for func_docstring, func_returns in zip(function_signatures, function_returns):
if func_docstring['udf_doc']:
fun_yaml_data = docstring_to_yaml(func_docstring['udf_doc'], func_returns['udf_returns'])
fun_yaml_data.update({'type': 'function'})
function_schema_yaml_data.update({func_returns['udf_name']: fun_yaml_data})
if func_docstring["udf_doc"]:
fun_yaml_data = docstring_to_yaml(func_docstring["udf_doc"], func_returns["udf_returns"])
fun_yaml_data.update({"type": "function"})
function_schema_yaml_data.update({func_returns["udf_name"]: fun_yaml_data})
return yaml.dump(function_schema_yaml_data, default_flow_style=False)

View file

@ -361,6 +361,7 @@ def create_func_config(func_schema: dict) -> dict:
def remove_comments(code_str):
"""Remove comments from code."""
pattern = r"(\".*?\"|\'.*?\')|(\#.*?$)"
def replace_func(match):
if match.group(2) is not None:
return ""

View file

@ -2,15 +2,17 @@
# @Date : 12/20/2023 11:07 AM
# @Author : stellahong (stellahong@fuzhi.ai)
# @Desc :
import nbformat
from pathlib import Path
import json
from datetime import datetime
from pathlib import Path
import nbformat
from metagpt.roles.role import Role
from metagpt.const import DATA_PATH
from metagpt.roles.role import Role
from metagpt.utils.save_code import save_code_file
def load_history(save_dir: str = ""):
"""
Load history from the specified save directory.
@ -21,7 +23,7 @@ def load_history(save_dir: str = ""):
Returns:
Tuple: A tuple containing the loaded plan and notebook.
"""
plan_path = Path(save_dir) / "plan.json"
nb_path = Path(save_dir) / "history_nb" / "code.ipynb"
plan = json.load(open(plan_path, "r", encoding="utf-8"))
@ -40,16 +42,16 @@ def save_history(role: Role, save_dir: str = ""):
Returns:
Path: The path to the saved history directory.
"""
record_time = datetime.now().strftime('%Y-%m-%d_%H-%M-%S')
record_time = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
save_path = DATA_PATH / "output" / f"{record_time}"
# overwrite exist trajectory
save_path.mkdir(parents=True, exist_ok=True)
plan = role.planner.plan.dict()
with open(save_path / "plan.json", "w", encoding="utf-8") as plan_file:
json.dump(plan, plan_file, indent=4, ensure_ascii=False)
save_code_file(name=Path(record_time) / "history_nb", code_context=role.execute_code.nb, file_format="ipynb")
return save_path
return save_path

View file

@ -2,13 +2,14 @@
# @Date : 12/12/2023 4:14 PM
# @Author : stellahong (stellahong@fuzhi.ai)
# @Desc :
import os
import json
import os
import nbformat
from metagpt.const import DATA_PATH
def save_code_file(name: str, code_context: str, file_format: str = "py") -> None:
"""
Save code files to a specified path.
@ -36,10 +37,6 @@ def save_code_file(name: str, code_context: str, file_format: str = "py") -> Non
with open(file_path, "w", encoding="utf-8") as fp:
json.dump(data, fp, indent=2)
elif file_format == "ipynb":
nbformat.write(code_context, file_path)
nbformat.write(code_context, file_path)
else:
raise ValueError("Unsupported file format. Please choose 'py', 'json', or 'ipynb'.")

View file

@ -26,10 +26,11 @@ from metagpt.schema import (
Document,
Message,
MessageQueue,
Plan,
SystemMessage,
Task,
UserMessage,
)
from metagpt.schema import Task, Plan
from metagpt.utils.common import any_to_str
@ -53,7 +54,7 @@ class TestPlan:
tasks = [
Task(task_id="1", dependent_task_ids=["2", "3"], instruction="Third"),
Task(task_id="2", instruction="First"),
Task(task_id="3", dependent_task_ids=["2"], instruction="Second")
Task(task_id="3", dependent_task_ids=["2"], instruction="Second"),
] # 2 -> 3 -> 1
plan.add_tasks(tasks)
@ -65,7 +66,7 @@ class TestPlan:
tasks = [
Task(task_id="1", dependent_task_ids=["2", "3"], instruction="Third"),
Task(task_id="2", instruction="First"),
Task(task_id="3", dependent_task_ids=["2"], instruction="Second", is_finished=True)
Task(task_id="3", dependent_task_ids=["2"], instruction="Second", is_finished=True),
] # 2 -> 3 -> 1
plan.add_tasks(tasks)
@ -81,7 +82,7 @@ class TestPlan:
tasks = [
Task(task_id="1", dependent_task_ids=["2", "3"], instruction="Third"),
Task(task_id="2", instruction="First"),
Task(task_id="3", dependent_task_ids=["2"], instruction="Second")
Task(task_id="3", dependent_task_ids=["2"], instruction="Second"),
] # 2 -> 3 -> 1
plan.add_tasks(tasks)
plan.finish_current_task() # finish 2
@ -90,19 +91,21 @@ class TestPlan:
new_tasks = [
Task(task_id="4", dependent_task_ids=["3"], instruction="Third"),
Task(task_id="2", instruction="First"),
Task(task_id="3", dependent_task_ids=["2"], instruction="Second")
Task(task_id="3", dependent_task_ids=["2"], instruction="Second"),
] # 2 -> 3 -> 4, so the common prefix is 2 -> 3, and these two should be obtained from the existing tasks
plan.add_tasks(new_tasks)
assert [task.task_id for task in plan.tasks] == ["2", "3", "4"]
assert plan.tasks[0].is_finished and plan.tasks[1].is_finished # "2" and "3" should be the original finished one
assert (
plan.tasks[0].is_finished and plan.tasks[1].is_finished
) # "2" and "3" should be the original finished one
assert plan.current_task_id == "4"
def test_current_task(self):
plan = Plan(goal="")
tasks = [
Task(task_id="1", dependent_task_ids=["2"], instruction="Second"),
Task(task_id="2", instruction="First")
Task(task_id="2", instruction="First"),
]
plan.add_tasks(tasks)
assert plan.current_task.task_id == "2"
@ -111,7 +114,7 @@ class TestPlan:
plan = Plan(goal="")
tasks = [
Task(task_id="1", instruction="First"),
Task(task_id="2", dependent_task_ids=["1"], instruction="Second")
Task(task_id="2", dependent_task_ids=["1"], instruction="Second"),
]
plan.add_tasks(tasks)
plan.finish_current_task()
@ -121,7 +124,7 @@ class TestPlan:
plan = Plan(goal="")
tasks = [
Task(task_id="1", instruction="First"),
Task(task_id="2", dependent_task_ids=["1"], instruction="Second")
Task(task_id="2", dependent_task_ids=["1"], instruction="Second"),
]
plan.add_tasks(tasks)
plan.finish_current_task()
@ -149,8 +152,10 @@ class TestPlan:
def test_replace_task_with_dependents(self):
plan = Plan(goal="")
tasks = [Task(task_id="1", instruction="First Task", finished=True),
Task(task_id="2", instruction="Second Task", dependent_task_ids=["1"], finished=True)]
tasks = [
Task(task_id="1", instruction="First Task", finished=True),
Task(task_id="2", instruction="Second Task", dependent_task_ids=["1"], finished=True),
]
plan.add_tasks(tasks)
new_task = Task(task_id="1", instruction="Updated First Task")
plan.replace_task(new_task)
@ -168,7 +173,7 @@ class TestPlan:
plan.replace_task(new_task) # Task with ID 2 does not exist in plan
assert "1" in plan.task_map
assert "2" not in plan.task_map
def test_append_task_with_valid_dependencies(self):
plan = Plan(goal="Test")
existing_task = [Task(task_id="1")]
@ -183,7 +188,7 @@ class TestPlan:
plan = Plan(goal="Test")
with pytest.raises(AssertionError):
plan.append_task(new_task)
def test_append_task_without_dependencies(self):
plan = Plan(goal="Test")
existing_task = [Task(task_id="1")]