Merge branch 'dev' into kaggle_team

This commit is contained in:
yzlin 2023-12-04 14:43:00 +08:00
commit f7989b0ce0
8 changed files with 286 additions and 41 deletions

View file

@ -17,6 +17,7 @@ from rich.syntax import Syntax
from metagpt.actions import Action
from metagpt.schema import Message
from metagpt.logs import logger
class ExecuteCode(ABC):
@ -90,11 +91,14 @@ class ExecutePyCode(ExecuteCode, Action):
if not outputs:
return parsed_output
for output in outputs:
for i, output in enumerate(outputs):
if output["output_type"] == "stream":
parsed_output += output["text"]
elif output["output_type"] == "display_data":
self.show_bytes_figure(output["data"]["image/png"], self.interaction)
if "image/png" in output["data"]:
self.show_bytes_figure(output["data"]["image/png"], self.interaction)
else:
logger.info(f"{i}th output['data'] from nbclient outputs dont have image/png, continue next output ...")
elif output["output_type"] == "execute_result":
parsed_output += output["data"]["text/plain"]
return parsed_output
@ -136,7 +140,6 @@ class ExecutePyCode(ExecuteCode, Action):
if isinstance(code, str):
return code, language
if isinstance(code, dict):
assert "code" in code
if "language" not in code:

View file

@ -4,10 +4,10 @@
@Author : orange-crow
@File : write_code_v2.py
"""
import json
from typing import Dict, List, Union
from typing import Dict, List, Union, Tuple
from metagpt.actions import Action
from metagpt.logs import logger
from metagpt.prompts.ml_engineer import (
TOOL_RECOMMENDATION_PROMPT,
SELECT_FUNCTION_TOOLS,
@ -40,8 +40,8 @@ class BaseWriteAnalysisCode(Action):
class WriteCodeByGenerate(BaseWriteAnalysisCode):
"""Write code fully by generation"""
DEFAULT_SYSTEM_MSG = """You are Code Interpreter, a world-class programmer that can complete any goal by executing code. Strictly follow the plan and generate code step by step. Each step of the code will be executed on the user's machine, and the user will provide the code execution results to you.**Notice: Use !pip install in a standalone block to install missing packages.**""" # prompt reference: https://github.com/KillianLucas/open-interpreter/blob/v0.1.4/interpreter/system_message.txt
REUSE_CODE_INSTRUCTION = """ATTENTION: DONT include codes from previous tasks in your current code block, include new codes only, DONT repeat codes!"""
DEFAULT_SYSTEM_MSG = """You are Code Interpreter, a world-class programmer that can complete any goal by executing code. Strictly follow the plan and generate code step by step. Each step of the code will be executed on the user's machine, and the user will provide the code execution results to you.**Notice: The code for the next step depends on the code for the previous step. Must reuse variables in the lastest other code directly, dont creat it again, it is very import for you. Use !pip install in a standalone block to install missing packages.**""" # prompt reference: https://github.com/KillianLucas/open-interpreter/blob/v0.1.4/interpreter/system_message.txt
# REUSE_CODE_INSTRUCTION = """ATTENTION: DONT include codes from previous tasks in your current code block, include new codes only, DONT repeat codes!"""
def __init__(self, name: str = "", context=None, llm=None) -> str:
super().__init__(name, context, llm)
@ -89,7 +89,7 @@ class WriteCodeByGenerate(BaseWriteAnalysisCode):
system_msg: str = None,
**kwargs,
) -> str:
context.append(Message(content=self.REUSE_CODE_INSTRUCTION, role="user"))
# context.append(Message(content=self.REUSE_CODE_INSTRUCTION, role="user"))
prompt = self.process_msg(context, system_msg)
code_content = await self.llm.aask_code(prompt, **kwargs)
return code_content["code"]
@ -99,24 +99,31 @@ class WriteCodeWithTools(BaseWriteAnalysisCode):
"""Write code with help of local available tools. Choose tools first, then generate code to use the tools"""
@staticmethod
def _parse_recommend_tools(module: str, recommend_tools: list) -> str:
def _parse_recommend_tools(module: str, recommend_tools: list) -> Tuple[Dict, List[Dict]]:
"""
Converts recommended tools to a JSON string and checks tool availability in the registry.
Parses and validates a list of recommended tools, and retrieves their schema from registry.
Args:
module (str): The module name for querying tools in the registry.
recommend_tools (list): A list of lists of recommended tools for each step.
Returns:
str: A JSON string with available tools and their schemas for each step.
Tuple[Dict, List[Dict]]:
- valid_tools: A dict of lists of valid tools for each step.
- tool_catalog: A list of dicts of unique tool schemas.
"""
valid_tools = {}
available_tools = registry.get_all_by_module(module).keys()
for index, tools in enumerate(recommend_tools):
key = f"Step {index + 1}"
tools = [tool for tool in tools if tool in available_tools]
valid_tools[key] = registry.get_schemas(module, tools)
return json.dumps(valid_tools)
valid_tools[key] = tools
unique_tools = set()
for tools in valid_tools.values():
unique_tools.update(tools)
tool_catalog = registry.get_schemas(module, unique_tools)
return valid_tools, tool_catalog
async def _tool_recommendation(
self, task: str, data_desc: str, code_steps: str, available_tools: list
@ -165,7 +172,8 @@ class WriteCodeWithTools(BaseWriteAnalysisCode):
recommend_tools = await self._tool_recommendation(
task, task_guide, available_tools
)
recommend_tools = self._parse_recommend_tools(task_type, recommend_tools)
recommend_tools, tool_catalog = self._parse_recommend_tools(task_type, recommend_tools)
logger.info(f"Recommended tools for every steps: {recommend_tools}")
special_prompt = ML_SPECIFIC_PROMPT.get(task_type, "")
module_name = ML_MODULE_MAP[task_type]
@ -190,6 +198,7 @@ class WriteCodeWithTools(BaseWriteAnalysisCode):
module_name=module_name,
output_desc=output_desc,
available_tools=recommend_tools,
tool_catalog=tool_catalog,
)
tool_config = create_func_config(CODE_GENERATOR_WITH_TOOLS)
rsp = await self.llm.aask_code(prompt, **tool_config)

View file

@ -4,12 +4,14 @@
@Author : orange-crow
@File : plan.py
"""
from typing import List
from typing import List, Dict
import json
from metagpt.actions import Action
from metagpt.prompts.ml_engineer import ASSIGN_TASK_TYPE_PROMPT, ASSIGN_TASK_TYPE
from metagpt.schema import Message, Task
from metagpt.utils.common import CodeParser
from metagpt.utils.common import CodeParser, create_func_config
class WritePlan(Action):
PROMPT_TEMPLATE = """
@ -30,7 +32,30 @@ class WritePlan(Action):
]
```
"""
async def run(self, context: List[Message], max_tasks: int = 5) -> str:
async def assign_task_type(self, tasks: List[Dict]) -> str:
"""Assign task type to each task in tasks
Args:
tasks (List[Dict]): tasks to be assigned task type
Returns:
List[Dict]: tasks with task type assigned
"""
task_list = "\n".join(
[f"Task {task['task_id']}: {task['instruction']}" for task in tasks]
)
prompt = ASSIGN_TASK_TYPE_PROMPT.format(task_list=task_list)
tool_config = create_func_config(ASSIGN_TASK_TYPE)
rsp = await self.llm.aask_code(prompt, **tool_config)
task_type_list = rsp["task_type"]
for task, task_type in zip(tasks, task_type_list):
task["task_type"] = task_type
return json.dumps(tasks)
async def run(
self, context: List[Message], max_tasks: int = 5, use_tools: bool = False
) -> str:
prompt = (
self.PROMPT_TEMPLATE.replace("__context__", "\n".join([str(ct) for ct in context]))
# .replace("__current_plan__", current_plan)
@ -38,6 +63,8 @@ class WritePlan(Action):
)
rsp = await self._aask(prompt)
rsp = CodeParser.parse_code(block=None, text=rsp)
if use_tools:
rsp = await self.assign_task_type(json.loads(rsp))
return rsp
@staticmethod

View file

@ -4,6 +4,35 @@
# @Author : lidanyang
# @File : ml_engineer
# @Desc :
ASSIGN_TASK_TYPE_PROMPT = """
## All Task Type:
- **data_preprocess**: Only involve cleaning and preparing data through techniques like imputation, scaling, and encoding, not containing reading data, feature engineering, model training, etc.
- **feature_engineering**: Involves enhancing data features through techniques like encoding, aggregation, time component analysis, and creating polynomial and interaction features, etc.
- **other**: Any tasks that do not fit into the previous categories, such as visualization, summarizing findings, build model, etc.
Please assign a task type to each task in the list below from the given categories:
{task_list}
"""
ASSIGN_TASK_TYPE = {
"name": "assign_task_type",
"description": "assign task type to each task by order",
"parameters": {
"type": "object",
"properties": {
"task_type": {
"type": "array",
"description": "List of task type.",
"items": {
"type": "string",
},
},
},
"required": ["task_type"],
},
}
TOOL_RECOMMENDATION_PROMPT = """
## Comprehensive Task Description:
{task}
@ -95,9 +124,13 @@ from metagpt.tools.functions.libs.feature_engineering import fill_missing_value
```
## Available Functions for Each Step:
Each function is described in JSON format, including the function name and parameters. {output_desc}
Here's a list of all available functions for each step. You can find more details about each function in [## Function Catalog]
{available_tools}
## Function Catalog:
Each function is described in JSON format, including the function name and parameters. {output_desc}
{function_catalog}
## Your Output Format:
Generate the complete code for every step, listing any used function tools at the beginning of the step:
```python
@ -133,11 +166,12 @@ When performing feature engineering, please adhere to the following principles:
- Importantly, provide detailed comments explaining the purpose of each feature and how it might enhance model performance, especially when the features are generated based on semantic understanding without clear user directives.
"""
CLASSIFICATION_MODEL_PROMPT = """
MODEL_TRAIN_PROMPT = """
When selecting and training a model, please follow these guidelines to ensure optimal performance:
- Keep in mind that your user prioritizes results and is highly focused on model performance. So, when needed, feel free to use models of any complexity to improve effectiveness, such as lightGBM, XGBoost, CatBoost, etc.
If user specifies a model, use that model. Otherwise, use the model you believe will best solve the problem.
"""
REGRESSION_MODEL_PROMPT = """
"""
DATA_PREPROCESS_OUTPUT_DESC = "Please note that all functions uniformly output a processed pandas.DataFrame, facilitating seamless integration into the broader workflow."
@ -151,8 +185,8 @@ REGRESSION_MODEL_OUTPUT_DESC = ""
ML_SPECIFIC_PROMPT = {
"data_preprocess": DATA_PREPROCESS_PROMPT,
"feature_engineering": FEATURE_ENGINEERING_PROMPT,
"classification_model": CLASSIFICATION_MODEL_PROMPT,
"regression_model": REGRESSION_MODEL_PROMPT,
"classification_model": MODEL_TRAIN_PROMPT,
"regression_model": MODEL_TRAIN_PROMPT,
}
TOOL_OUTPUT_DESC = {

View file

@ -3,6 +3,7 @@ import json
import subprocess
import fire
import re
from metagpt.roles import Role
from metagpt.actions import Action
@ -11,7 +12,7 @@ from metagpt.memory import Memory
from metagpt.logs import logger
from metagpt.actions.write_plan import WritePlan
from metagpt.actions.write_analysis_code import WriteCodeByGenerate, WriteCodeWithTools
from metagpt.actions.ml_da_action import AskReview, SummarizeAnalysis, Reflect, ReviewConst, truncate
from metagpt.actions.ml_da_action import AskReview, SummarizeAnalysis, Reflect, ReviewConst
from metagpt.actions.execute_code import ExecutePyCode
from metagpt.roles.kaggle_manager import DownloadData, SubmitResult
from metagpt.prompts.ml_engineer import STRUCTURAL_CONTEXT
@ -105,10 +106,10 @@ class MLEngineer(Role):
# print("*" * 10)
# breakpoint()
if not self.use_tools or self.plan.current_task.task_type == "":
if not self.use_tools or self.plan.current_task.task_type == "other":
# code = "print('abc')"
code = await WriteCodeByGenerate().run(
context=context, plan=self.plan, task_guide=task_guide
context=context, plan=self.plan, task_guide=task_guide, temperature=0.0
)
cause_by = WriteCodeByGenerate
else:
@ -122,9 +123,7 @@ class MLEngineer(Role):
)
result, success = await self.execute_code.run(code)
# truncated the result
print(truncate(result))
# print(result)
print(result)
self.working_memory.add(
Message(content=result, role="user", cause_by=ExecutePyCode)
)
@ -156,7 +155,9 @@ class MLEngineer(Role):
plan_confirmed = False
while not plan_confirmed:
context = self.get_useful_memories()
rsp = await WritePlan().run(context, max_tasks=max_tasks)
rsp = await WritePlan().run(
context, max_tasks=max_tasks, use_tools=self.use_tools
)
self.working_memory.add(
Message(content=rsp, role="assistant", cause_by=WritePlan)
)

View file

@ -50,4 +50,5 @@ nbclient==0.9.0
nbformat==5.9.2
ipython==8.17.2
ipykernel==6.27.0
scikit_learn==1.3.2
scikit_learn==1.3.2
typing-extensions==4.8.0

View file

@ -1,6 +1,6 @@
import pytest
from metagpt.actions import ExecutePyCode
from metagpt.actions.execute_code import ExecutePyCode
from metagpt.schema import Message
@ -8,12 +8,12 @@ from metagpt.schema import Message
async def test_code_running():
pi = ExecutePyCode()
output = await pi.run("print('hello world!')")
assert output.state == "done"
assert output[1] is True
output = await pi.run({"code": "print('hello world!')", "language": "python"})
assert output.state == "done"
assert output[1] is True
code_msg = Message("print('hello world!')")
output = await pi.run(code_msg)
assert output.state == "done"
assert output[1] is True
@pytest.mark.asyncio
@ -22,14 +22,14 @@ async def test_split_code_running():
output = await pi.run("x=1\ny=2")
output = await pi.run("z=x+y")
output = await pi.run("assert z==3")
assert output.state == "done"
assert output[1] is True
@pytest.mark.asyncio
async def test_execute_error():
pi = ExecutePyCode()
output = await pi.run("z=1/0")
assert output.state == "error"
assert output[1] is False
@pytest.mark.asyncio
@ -54,4 +54,30 @@ async def test_plotting_code():
plt.show()
"""
output = await pi.run(code)
assert output.state == "done"
assert output[1] is True
@pytest.mark.asyncio
async def test_plotting_bug():
code = """
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
from sklearn.datasets import load_iris
# Load the Iris dataset
iris_data = load_iris()
# Convert the loaded Iris dataset into a DataFrame for easier manipulation
iris_df = pd.DataFrame(iris_data['data'], columns=iris_data['feature_names'])
# Add a column for the target
iris_df['species'] = pd.Categorical.from_codes(iris_data['target'], iris_data['target_names'])
# Set the style of seaborn
sns.set(style='whitegrid')
# Create a pairplot of the iris dataset
plt.figure(figsize=(10, 8))
pairplot = sns.pairplot(iris_df, hue='species')
# Show the plot
plt.show()
"""
pi = ExecutePyCode()
output = await pi.run(code)
assert output[1] is True

View file

@ -1,11 +1,12 @@
import asyncio
import pytest
from metagpt.actions.write_analysis_code import WriteCodeByGenerate
from metagpt.actions.write_analysis_code import WriteCodeByGenerate, WriteCodeWithTools
from metagpt.actions.execute_code import ExecutePyCode
from metagpt.schema import Message
from metagpt.schema import Message, Plan, Task
from metagpt.logs import logger
@pytest.mark.asyncio
async def test_write_code_by_list_plan():
write_code = WriteCodeByGenerate()
@ -22,6 +23,77 @@ async def test_write_code_by_list_plan():
print(f"\n[Output]: 任务{task}的执行结果是: \n{output}\n")
messages.append(output[0])
@pytest.mark.asyncio
async def test_tool_recommendation():
task = "对已经读取的数据集进行数据清洗"
code_steps = """
step 1: 对数据集进行去重
step 2: 对数据集进行缺失值处理
"""
available_tools = [
{
"name": "fill_missing_value",
"description": "Completing missing values with simple strategies",
},
{
"name": "split_bins",
"description": "Bin continuous data into intervals and return the bin identifier encoded as an integer value",
},
]
write_code = WriteCodeWithTools()
tools = await write_code._tool_recommendation(task, code_steps, available_tools)
assert len(tools) == 2
assert tools[0] == []
assert tools[1] == ["fill_missing_value"]
@pytest.mark.asyncio
async def test_write_code_with_tools():
write_code = WriteCodeWithTools()
messages = []
task_map = {
"1": Task(
task_id="1",
instruction="随机生成一个pandas DataFrame数据集",
task_type="unknown",
dependent_task_ids=[],
code="""
import pandas as pd
df = pd.DataFrame({
'a': [1, 2, 3, 4, 5],
'b': [1.1, 2.2, 3.3, 4.4, np.nan],
'c': ['aa', 'bb', 'cc', 'dd', 'ee'],
'd': [1, 2, 3, 4, 5]
})
""",
is_finished=True,
),
"2": Task(
task_id="2",
instruction="对数据集进行数据清洗",
task_type="data_preprocess",
dependent_task_ids=["1"],
),
}
plan = Plan(
goal="构造数据集并进行数据清洗",
tasks=list(task_map.values()),
task_map=task_map,
current_task_id="2",
)
task_guide = """
step 1: 对数据集进行去重
step 2: 对数据集进行缺失值处理
"""
data_desc = "None"
code = await write_code.run(messages, plan, task_guide, data_desc)
assert len(code) > 0
print(code)
@pytest.mark.asyncio
async def test_write_code_to_correct_error():
@ -159,7 +231,7 @@ async def test_write_code_reuse_code_long():
Message(content=structural_context, role="user"),
]
trials_num = 5
trials = [WriteCodeByGenerate().run(context=context) for _ in range(trials_num)]
trials = [WriteCodeByGenerate().run(context=context, temperature=0.0) for _ in range(trials_num)]
trial_results = await asyncio.gather(*trials)
print(*trial_results, sep="\n\n***\n\n")
success = ["load_iris" not in result and "iris_data" in result \
@ -167,3 +239,75 @@ async def test_write_code_reuse_code_long():
success_rate = sum(success) / trials_num
logger.info(f"success rate: {success_rate :.2f}")
assert success_rate >= 0.8
@pytest.mark.asyncio
async def test_write_code_reuse_code_long_for_wine():
"""test code reuse for long context"""
structural_context = """
## User Requirement
Run data analysis on sklearn Wisconsin Breast Cancer dataset, include a plot, train a model to predict targets (20% as validation), and show validation accuracy
## Current Plan
[
{
"task_id": "1",
"dependent_task_ids": [],
"instruction": "Load the sklearn Wine recognition dataset and perform exploratory data analysis."
"task_type": "",
"code": "from sklearn.datasets import load_wine\n# Load the Wine recognition dataset\nwine_data = load_wine()\n# Perform exploratory data analysis\nwine_data.keys()",
"result": "Truncated to show only the last 1000 characters\ndict_keys(['data', 'target', 'frame', 'target_names', 'DESCR', 'feature_names'])",
"is_finished": true
},
{
"task_id": "2",
"dependent_task_ids": ["1"],
"instruction": "Create a plot to visualize some aspect of the wine dataset."
"task_type": "",
"code": "",
"result": "",
"is_finished": false
},
{
"task_id": "3",
"dependent_task_ids": ["1"],
"instruction": "Split the dataset into training and validation sets with a 20% validation size.",
"task_type": "",
"code": "",
"result": "",
"is_finished": false
},
{
"task_id": "4",
"dependent_task_ids": ["3"],
"instruction": "Train a model on the training set to predict wine class.",
"task_type": "",
"code": "",
"result": "",
"is_finished": false
},
{
"task_id": "5",
"dependent_task_ids": ["4"],
"instruction": "Evaluate the model on the validation set and report the accuracy.",
"task_type": "",
"code": "",
"result": "",
"is_finished": false
}
]
## Current Task
{"task_id": "2", "dependent_task_ids": ["1"], "instruction": "Create a plot to visualize some aspect of the Wine dataset.", "task_type": "", "code": "", "result": "", "is_finished": false}
"""
context = [
Message(content=structural_context, role="user"),
]
trials_num = 5
trials = [WriteCodeByGenerate().run(context=context, temperature=0.0) for _ in range(trials_num)]
trial_results = await asyncio.gather(*trials)
print(*trial_results, sep="\n\n***\n\n")
success = ["load_wine" not in result and "wine_data" in result\
for result in trial_results] # should reuse iris_data from previous tasks
success_rate = sum(success) / trials_num
logger.info(f"success rate: {success_rate :.2f}")
assert success_rate >= 0.8