write code with class tool

This commit is contained in:
lidanyang 2023-12-13 14:32:25 +08:00
parent ab7af7768c
commit 537d51c26e
2 changed files with 131 additions and 169 deletions

View file

@ -4,7 +4,9 @@
@Author : orange-crow
@File : write_code_v2.py
"""
from typing import Dict, List, Union, Tuple, Optional, Any
from typing import Dict, List, Union, Tuple
import yaml
from metagpt.actions import Action
from metagpt.logs import logger
@ -15,11 +17,9 @@ from metagpt.prompts.ml_engineer import (
TOOL_USAGE_PROMPT,
ML_SPECIFIC_PROMPT,
ML_MODULE_MAP,
TOOL_OUTPUT_DESC, DATA_PROCESS_PROMPT,
GENERATE_CODE_PROMPT
GENERATE_CODE_PROMPT,
)
from metagpt.schema import Message, Plan
from metagpt.tools.functions import registry
from metagpt.utils.common import create_func_config, remove_comments
@ -100,40 +100,55 @@ class WriteCodeByGenerate(BaseWriteAnalysisCode):
class WriteCodeWithTools(BaseWriteAnalysisCode):
"""Write code with help of local available tools. Choose tools first, then generate code to use the tools"""
@staticmethod
def _parse_recommend_tools(module: str, recommend_tools: list) -> List[Dict]:
def __init__(self, name: str = "", context=None, llm=None, schema_path=None):
super().__init__(name, context, llm)
self.schema_path = schema_path
self.available_tools = {}
if self.schema_path is not None:
self._load_tools(schema_path)
def _load_tools(self, schema_path):
"""Load tools from yaml file"""
yml_files = schema_path.glob("*.yml")
for yml_file in yml_files:
module = yml_file.stem
with open(yml_file, "r", encoding="utf-8") as f:
self.available_tools[module] = yaml.safe_load(f)
def _parse_recommend_tools(self, module: str, recommend_tools: list) -> dict:
"""
Parses and validates a list of recommended tools, and retrieves their schema from registry.
Args:
module (str): The module name for querying tools in the registry.
recommend_tools (list): A list of lists of recommended tools for each step.
recommend_tools (list): A list of recommended tools.
Returns:
List[Dict]: A list of dicts of valid tool schemas.
dict: A dict of valid tool schemas.
"""
valid_tools = []
available_tools = registry.get_all_by_module(module).keys()
available_tools = self.available_tools[module].keys()
for tool in recommend_tools:
if tool in available_tools:
valid_tools.append(tool)
tool_catalog = registry.get_schemas(module, valid_tools)
tool_catalog = {tool: self.available_tools[module][tool] for tool in valid_tools}
return tool_catalog
async def _tool_recommendation(
self,
task: str,
code_steps: str,
available_tools: list
self,
task: str,
code_steps: str,
available_tools: dict,
) -> list:
"""
Recommend tools for the specified task.
Args:
context (List[Message]): Action output history, source action denoted by Message.cause_by
task (str): the task to recommend tools for
code_steps (str): the code steps to generate the full code for the task
available_tools (list): the available tools for the task
available_tools (dict): the available tools description
Returns:
list: recommended tools for the specified task
@ -149,27 +164,23 @@ class WriteCodeWithTools(BaseWriteAnalysisCode):
return recommend_tools
async def run(
self,
context: List[Message],
plan: Plan = None,
code_steps: str = "",
column_info: str = "",
**kwargs,
) -> str:
self,
context: List[Message],
plan: Plan = None,
code_steps: str = "",
column_info: str = "",
**kwargs,
) -> Tuple[List[Message], str]:
task_type = plan.current_task.task_type
available_tools = registry.get_all_schema_by_module(task_type)
available_tools = self.available_tools.get(task_type, {})
special_prompt = ML_SPECIFIC_PROMPT.get(task_type, "")
column_names = kwargs.get("column_names", {})
finished_tasks = plan.get_finished_tasks()
code_context = [remove_comments(task.code) for task in finished_tasks]
code_context = "\n\n".join(code_context)
if len(available_tools) > 0:
available_tools = [
{k: tool[k] for k in ["name", "description"] if k in tool}
for tool in available_tools
]
available_tools = {k: v["description"] for k, v in available_tools.items()}
recommend_tools = await self._tool_recommendation(
plan.current_task.instruction,
@ -180,46 +191,27 @@ class WriteCodeWithTools(BaseWriteAnalysisCode):
logger.info(f"Recommended tools: \n{recommend_tools}")
module_name = ML_MODULE_MAP[task_type]
output_desc = TOOL_OUTPUT_DESC.get(task_type, "")
new_code = ""
for idx, tool in enumerate(recommend_tools):
hist_info = f"Previous finished code is \n\n ```Python {code_context} ``` \n\n "
prompt = TOOL_USAGE_PROMPT.format(
goal=plan.current_task.instruction,
context=hist_info,
code_steps=code_steps,
column_names=column_names,
special_prompt=special_prompt,
module_name=module_name,
output_desc=output_desc,
function_catalog=tool_catalog[idx],
)
tool_config = create_func_config(CODE_GENERATOR_WITH_TOOLS)
rsp = await self.llm.aask_code(prompt, **tool_config)
logger.info(f"rsp is: {rsp}")
# final_code = final_code + "\n\n" + rsp["code"]
# final_code[key] = rsp["code"]
new_code = new_code + "\n\n" + rsp["code"]
code_context = code_context + "\n\n" + rsp["code"]
return new_code
else:
hist_info = f"Previous finished code is \n\n ```Python {code_context} ``` \n\n "
prompt = GENERATE_CODE_PROMPT.format(
goal=plan.current_task.instruction,
context=hist_info,
code_steps=code_steps,
prompt = TOOL_USAGE_PROMPT.format(
user_requirement=plan.goal,
history_code=code_context,
current_task=plan.current_task.instruction,
column_info=column_info,
special_prompt=special_prompt,
# column_names=column_names
code_steps=code_steps,
module_name=module_name,
tool_catalog=tool_catalog,
)
else:
prompt = GENERATE_CODE_PROMPT.format(
user_requirement=plan.goal,
history_code=code_context,
current_task=plan.current_task.instruction,
column_info=column_info,
special_prompt=special_prompt,
code_steps=code_steps,
)
tool_config = create_func_config(CODE_GENERATOR_WITH_TOOLS)
logger.info(f"prompt is: {prompt}")
rsp = await self.llm.aask_code(prompt, **tool_config)
logger.info(f"rsp is: {rsp}")
return rsp["code"]
tool_config = create_func_config(CODE_GENERATOR_WITH_TOOLS)
rsp = await self.llm.aask_code(prompt, **tool_config)
context = [Message(content=prompt, role="user")]
return context, rsp["code"]

View file

@ -1,5 +1,6 @@
import json
import re
from datetime import datetime
from typing import List
import fire
@ -10,12 +11,16 @@ from metagpt.actions.execute_code import ExecutePyCode
from metagpt.actions.write_analysis_code import WriteCodeByGenerate, WriteCodeWithTools
from metagpt.actions.write_code_steps import WriteCodeSteps
from metagpt.actions.write_plan import WritePlan
from metagpt.const import DATA_PATH
from metagpt.const import DATA_PATH, PROJECT_ROOT
from metagpt.logs import logger
from metagpt.prompts.ml_engineer import GEN_DATA_DESC_PROMPT
from metagpt.prompts.ml_engineer import (
GEN_DATA_DESC_PROMPT,
UPDATE_DATA_COLUMNS,
PRINT_DATA_COLUMNS
)
from metagpt.roles import Role
from metagpt.schema import Message, Plan
from metagpt.utils.common import CodeParser
from metagpt.utils.common import CodeParser, remove_comments, create_func_config
from metagpt.actions.debug_code import DebugCode
STRUCTURAL_CONTEXT = """
@ -57,34 +62,6 @@ def remove_escape_and_color_codes(input_str):
return result
def read_data(file: str) -> pd.DataFrame:
if file.endswith(".csv"):
df = pd.read_csv(file, sep=",")
sep_list = [";", "\t", ":", " ", "|"]
for sep in sep_list:
if df.shape[1] == 1:
df = pd.read_csv(file, sep=sep)
else:
break
else:
raise ValueError(f"Unsupported file type: {file}")
return df
def get_column_info(df: pd.DataFrame) -> str:
data = []
for i in df.columns:
nan_freq = float("%.2g" % (df[i].isna().mean() * 100))
n_unique = df[i].nunique()
data.append([i, df[i].dtype, nan_freq, n_unique])
samples = pd.DataFrame(
data,
columns=["Column_name", "Data_type", "NaN_Frequency(%)", "N_unique"],
)
return samples.to_string(index=False)
class AskReview(Action):
async def run(self, context: List[Message], plan: Plan = None):
logger.info("Current overall plan:")
@ -108,26 +85,20 @@ class AskReview(Action):
return rsp, confirmed
class GenerateDataDesc(Action):
async def run(self, file: str) -> dict:
data_desc = {}
df = read_data(file)
data_head = df.head().to_dict(orient="list")
data_head = json.dumps(data_head, indent=4, ensure_ascii=False)
prompt = GEN_DATA_DESC_PROMPT.replace("{data_head}", data_head)
rsp = await self._aask(prompt)
rsp = CodeParser.parse_code(block=None, text=rsp)
rsp = json.loads(rsp)
data_desc["path"] = file
data_desc["data_desc"] = rsp["data_desc"]
data_desc["column_desc"] = rsp["column_desc"]
data_desc["column_info"] = get_column_info(df)
return data_desc
class UpdateDataColumns(Action):
async def run(self, plan: Plan = None) -> dict:
finished_tasks = plan.get_finished_tasks()
code_context = [remove_comments(task.code) for task in finished_tasks]
code_context = "\n\n".join(code_context)
prompt = UPDATE_DATA_COLUMNS.format(history_code=code_context)
tool_config = create_func_config(PRINT_DATA_COLUMNS)
rsp = await self.llm.aask_code(prompt, **tool_config)
return rsp
class MLEngineer(Role):
def __init__(
self, name="ABC", profile="MLEngineer", goal="", auto_run: bool = False, data_path: str = None
self, name="ABC", profile="MLEngineer", goal="", auto_run: bool = False,
):
super().__init__(name=name, profile=profile, goal=goal)
self._set_react_mode(react_mode="plan_and_act")
@ -136,13 +107,9 @@ class MLEngineer(Role):
self.use_code_steps = True
self.execute_code = ExecutePyCode()
self.auto_run = auto_run
self.data_path = data_path
self.data_desc = {}
async def _plan_and_act(self):
if self.data_path:
self.data_desc = await self._generate_data_desc()
# create initial plan and update until confirmation
await self._update_plan()
@ -163,25 +130,27 @@ class MLEngineer(Role):
task.code_steps = code_steps
self.plan.finish_current_task()
self.working_memory.clear()
if "print(df_processed.info())" in code:
self.data_desc["column_info"] = result
success, new_code = await self._update_data_columns()
if success:
task.code = task.code + "\n\n" + new_code
else:
# update plan according to user's feedback and to take on changed tasks
await self._update_plan()
finished_tasks = self.plan.get_finished_tasks()
if len(finished_tasks) == len(self.plan.tasks):
code_context = [task.code for task in finished_tasks]
code_context = "\n\n".join(code_context)
result, success = await self.execute_code.run(code_context)
# truncated the result
print(truncate(result))
async def _generate_data_desc(self):
data_desc = await GenerateDataDesc().run(self.data_path)
return data_desc
time = datetime.now().strftime('%Y-%m-%d_%H-%M-%S')
self.execute_code.save_notebook(f"{DATA_PATH}/notebooks/ml_{time}.ipynb")
async def _update_data_columns(self):
rsp = await UpdateDataColumns().run(self.plan)
is_update, code = rsp["is_update"], rsp["code"]
success = False
if is_update:
result, success = await self.execute_code.run(code)
if success:
self.data_desc["column_info"] = result
return success, code
async def _write_and_exec_code(self, max_retry: int = 3):
code_steps = (
await WriteCodeSteps().run(self.plan)
@ -192,6 +161,7 @@ class MLEngineer(Role):
counter = 0
improve_code = ""
success = False
debug_context = []
finished_tasks = self.plan.get_finished_tasks()
code_context = [task.code for task in finished_tasks]
@ -200,37 +170,38 @@ class MLEngineer(Role):
code_result = "\n\n".join(code_result)
while not success and counter < max_retry:
if counter == 0:
context = self.get_useful_memories()
else:
# context = self.get_useful_memories()
# logger.info(f"context {context}")
context = self.get_useful_memories()
if counter > 0:
improve_code = await DebugCode().run(plan=self.plan.current_task.instruction,
finished_code=code_context,
finished_code_result=code_result,
# finished_code=code_context,
# finished_code_result=code_result,
code=code,
runtime_result=self.working_memory.get())
if not self.use_tools or self.plan.current_task.task_type == "other":
runtime_result=self.working_memory.get(),
context=debug_context)
if improve_code != "":
code = improve_code
logger.info(f"new code \n{improve_code}")
cause_by = DebugCode
elif not self.use_tools or self.plan.current_task.task_type == "other":
logger.info("Write code with pure generation")
code = await WriteCodeByGenerate().run(
context=context, plan=self.plan, code_steps=code_steps, temperature=0.0
)
debug_context = [self.get_useful_memories(task_exclude_field={'result', 'code_steps'})[0]]
cause_by = WriteCodeByGenerate
else:
logger.info("Write code with tools")
if improve_code != "":
code = improve_code
logger.info(f"new code {code}")
cause_by = DebugCode
else:
code = await WriteCodeWithTools().run(
context=context, plan=self.plan, code_steps=code_steps, **{"column_names": {}}
)
cause_by = WriteCodeWithTools
schema_path = PROJECT_ROOT / "metagpt/tools/functions/schemas"
tool_context, code = await WriteCodeWithTools(schema_path=schema_path).run(
context=context,
plan=self.plan,
code_steps=code_steps,
column_info=self.data_desc.get("column_info", ""),
)
debug_context = tool_context
cause_by = WriteCodeWithTools
self.working_memory.add(
Message(content=code, role="assistant", cause_by=cause_by)
@ -238,9 +209,7 @@ class MLEngineer(Role):
# debug on code, run on runcode with finished code and new_df
# runcode = code_context + "\n\n" + code
runcode = code
result, success = await self.execute_code.run(runcode)
result, success = await self.execute_code.run(code)
# truncated the result
print(truncate(result))
@ -289,12 +258,12 @@ class MLEngineer(Role):
self.plan.add_tasks(tasks)
self.working_memory.clear()
def get_useful_memories(self) -> List[Message]:
def get_useful_memories(self, task_exclude_field: set = None) -> List[Message]:
"""find useful memories only to reduce context length and improve performance"""
# TODO dataset description , code steps
user_requirement = self.plan.goal
tasks = json.dumps(
[task.dict() for task in self.plan.tasks], indent=4, ensure_ascii=False
[task.dict(exclude=task_exclude_field) for task in self.plan.tasks], indent=4, ensure_ascii=False
)
current_task = self.plan.current_task.json() if self.plan.current_task else {}
context = STRUCTURAL_CONTEXT.format(
@ -321,12 +290,13 @@ if __name__ == "__main__":
# requirement = "Perform data analysis on the provided data. Train a model to predict the target variable Survived. Include data preprocessing, feature engineering, and modeling in your pipeline. The metric is accuracy."
data_path = f"{DATA_PATH}/titanic"
requirement = f"This is a titanic passenger survival dataset, your goal is to predict passenger survival outcome. The target column is Survived. Perform data analysis, data preprocessing, feature engineering, and modeling to predict the target. Report accuracy on the eval data. Train data path: '{data_path}/split_train.csv', eval data path: '{data_path}/split_eval.csv'."
async def main(requirement: str = requirement, auto_run: bool = True, data_path: str = ""):
role = MLEngineer(goal=requirement, auto_run=auto_run, data_path=data_path)
# data_path = f"{DATA_PATH}/titanic"
# requirement = f"This is a titanic passenger survival dataset, your goal is to predict passenger survival outcome. The target column is Survived. Perform data analysis, data preprocessing, feature engineering, and modeling to predict the target. Report accuracy on the eval data. Train data path: '{data_path}/split_train.csv', eval data path: '{data_path}/split_eval.csv'."
# requirement = f"Run data analysis on sklearn Wine recognition dataset, include a plot, and train a model to predict wine class (20% as validation), and show validation accuracy"
data_path = f"{DATA_PATH}/icr-identify-age-related-conditions"
requirement = f"This is a medical dataset with over fifty anonymized health characteristics linked to three age-related conditions. Your goal is to predict whether a subject has or has not been diagnosed with one of these conditions.The target column is Class. Perform data analysis, data preprocessing, feature engineering, and modeling to predict the target. Report f1 score on the eval data. Train data path: {data_path}/split_train.csv, eval data path: {data_path}/split_eval.csv."
async def main(requirement: str = requirement, auto_run: bool = True):
role = MLEngineer(goal=requirement, auto_run=auto_run)
await role.run(requirement)