Merge branch 'dev' into dev_make_tools

This commit is contained in:
刘棒棒 2023-12-18 22:22:38 +08:00
commit ea7e11665d
24 changed files with 1801 additions and 1226 deletions

View file

@ -87,7 +87,7 @@ SD_T2I_API: "/sdapi/v1/txt2img"
MODEL_FOR_RESEARCHER_SUMMARY: gpt-3.5-turbo
MODEL_FOR_RESEARCHER_REPORT: gpt-3.5-turbo-16k
### choose the engine for mermaid conversion,
### choose the engine for mermaid conversion,
# default is nodejs, you can change it to playwright,pyppeteer or ink
# MERMAID_ENGINE: nodejs

View file

@ -0,0 +1,160 @@
from typing import Dict, List, Union, Tuple, Optional, Any
from metagpt.logs import logger
from metagpt.schema import Message, Plan
from metagpt.utils.common import CodeParser, create_func_config
from metagpt.actions.write_analysis_code import BaseWriteAnalysisCode
DEBUG_REFLECTION_EXAMPLE = '''
Example 1:
[previous impl]:
```python
def add(a: int, b: int) -> int:
"""
Given integers a and b, return the total value of a and b.
"""
return a - b
```
[runtime Error]:
Tested passed:
Tests failed:
assert add(1, 2) == 3 # output: -1
assert add(1, 2) == 4 # output: -1
[reflection on previous impl]:
The implementation failed the test cases where the input integers are 1 and 2. The issue arises because the code does not add the two integers together, but instead subtracts the second integer from the first. To fix this issue, we should change the operator from `-` to `+` in the return statement. This will ensure that the function returns the correct output for the given input.
[improved impl]:
```python
def add(a: int, b: int) -> int:
"""
Given integers a and b, return the total value of a and b.
"""
return a + b
```
'''
REFLECTION_PROMPT = """
Here is an example for you.
{debug_example}
[context]
{context}
[previous impl]
{code}
[runtime Error]
{runtime_result}
Analysis the error step by step, provide me improve method and code. Remember to follow [context] rerquirement. Don't forget write code for steps behind the error step.
[reflection on previous impl]:
xxx
"""
CODE_REFLECTION = {
"name": "execute_reflection_code",
"description": "Execute reflection code.",
"parameters": {
"type": "object",
"properties": {
"reflection": {
"type": "string",
"description": "Reflection on previous impl.",
},
"improved_impl": {
"type": "string",
"description": "Refined code after reflection.",
},
},
"required": ["reflection", "improved_impl"],
},
}
def message_to_str(message: Message) -> str:
return f"{message.role}: {message.content}"
def messages_to_str(messages: List[Message]) -> str:
return "\n".join([message_to_str(message) for message in messages])
class DebugCode(BaseWriteAnalysisCode):
name: str = "debugcode"
context: Optional[str] = None
llm: None
def __init__(self, **kwargs: Any):
super().__init__(**kwargs)
async def run_reflection(
self,
# goal,
# finished_code,
# finished_code_result,
context: List[Message],
code,
runtime_result,
) -> dict:
info = []
# finished_code_and_result = finished_code + "\n [finished results]\n\n" + finished_code_result
reflection_prompt = REFLECTION_PROMPT.format(
debug_example=DEBUG_REFLECTION_EXAMPLE,
context=context,
# goal=goal,
# finished_code=finished_code_and_result,
code=code,
runtime_result=runtime_result,
)
system_prompt = "You are an AI Python assistant. You will be given your previous implementation code of a task, runtime error results, and a hint to change the implementation appropriately. Write your full implementation "
info.append(Message(role="system", content=system_prompt))
info.append(Message(role="user", content=reflection_prompt))
# msg = messages_to_str(info)
# resp = await self.llm.aask(msg=msg)
resp = await self.llm.aask_code(
messages=info, **create_func_config(CODE_REFLECTION)
)
logger.info(f"reflection is {resp}")
return resp
# async def rewrite_code(self, reflection: str = "", context: List[Message] = None) -> str:
# """
# 根据reflection重写代码
# """
# info = context
# # info.append(Message(role="assistant", content=f"[code context]:{code_context}"
# # f"finished code are executable, and you should based on the code to continue your current code debug and improvement"
# # f"[reflection]: \n {reflection}"))
# info.append(Message(role="assistant", content=f"[reflection]: \n {reflection}"))
# info.append(Message(role="user", content=f"[improved impl]:\n Return in Python block"))
# msg = messages_to_str(info)
# resp = await self.llm.aask(msg=msg)
# improv_code = CodeParser.parse_code(block=None, text=resp)
# return improv_code
async def run(
self,
context: List[Message] = None,
plan: str = "",
# finished_code: str = "",
# finished_code_result: str = "",
code: str = "",
runtime_result: str = "",
) -> str:
"""
根据当前运行代码和报错信息进行reflection和纠错
"""
reflection = await self.run_reflection(
# plan,
# finished_code=finished_code,
# finished_code_result=finished_code_result,
code=code,
context=context,
runtime_result=runtime_result,
)
# 根据reflection结果重写代码
# improv_code = await self.rewrite_code(reflection, context=context)
improv_code = reflection["improved_impl"]
return improv_code

View file

@ -10,6 +10,8 @@ from pathlib import Path
import re
import json
import yaml
from metagpt.actions import Action
from metagpt.llm import LLM
from metagpt.logs import logger
@ -17,40 +19,19 @@ from metagpt.prompts.ml_engineer import (
TOOL_RECOMMENDATION_PROMPT,
SELECT_FUNCTION_TOOLS,
CODE_GENERATOR_WITH_TOOLS,
TOO_ORGANIZATION_PROMPT,
TOOL_USAGE_PROMPT,
ML_SPECIFIC_PROMPT,
ML_MODULE_MAP,
TOOL_OUTPUT_DESC,
GENERATE_CODE_PROMPT,
)
from metagpt.schema import Message, Plan
from metagpt.tools.functions import registry
from metagpt.utils.common import create_func_config, CodeParser
from metagpt.utils.common import create_func_config, remove_comments
class BaseWriteAnalysisCode(Action):
async def run(
self, context: List[Message], plan: Plan = None, task_guide: str = ""
) -> str:
"""Run of a code writing action, used in data analysis or modeling
Args:
context (List[Message]): Action output history, source action denoted by Message.cause_by
plan (Plan, optional): Overall plan. Defaults to None.
task_guide (str, optional): suggested step breakdown for the current task. Defaults to "".
Returns:
str: The code string.
"""
class WriteCodeByGenerate(BaseWriteAnalysisCode):
"""Write code fully by generation"""
DEFAULT_SYSTEM_MSG = """You are Code Interpreter, a world-class programmer that can complete any goal by executing code. Strictly follow the plan and generate code step by step. Each step of the code will be executed on the user's machine, and the user will provide the code execution results to you.**Notice: The code for the next step depends on the code for the previous step. Must reuse variables in the lastest other code directly, dont creat it again, it is very import for you. Use !pip install in a standalone block to install missing packages.**""" # prompt reference: https://github.com/KillianLucas/open-interpreter/blob/v0.1.4/interpreter/system_message.txt
DEFAULT_SYSTEM_MSG = """You are Code Interpreter, a world-class programmer that can complete any goal by executing code. Strictly follow the plan and generate code step by step. Each step of the code will be executed on the user's machine, and the user will provide the code execution results to you.**Notice: The code for the next step depends on the code for the previous step. Must reuse variables in the lastest other code directly, dont creat it again, it is very import for you. Use !pip install in a standalone block to install missing packages.**""" # prompt reference: https://github.com/KillianLucas/open-interpreter/blob/v0.1.4/interpreter/system_message.txt
# REUSE_CODE_INSTRUCTION = """ATTENTION: DONT include codes from previous tasks in your current code block, include new codes only, DONT repeat codes!"""
def __init__(self, name: str = "", context=None, llm=None) -> str:
super().__init__(name, context, llm)
def process_msg(self, prompt: Union[str, List[Dict], Message, List[Message]], system_msg: str = None):
default_system_msg = system_msg or self.DEFAULT_SYSTEM_MSG
# 全部转成list
@ -86,6 +67,27 @@ class WriteCodeByGenerate(BaseWriteAnalysisCode):
}
return messages
async def run(
self, context: List[Message], plan: Plan = None, code_steps: str = ""
) -> str:
"""Run of a code writing action, used in data analysis or modeling
Args:
context (List[Message]): Action output history, source action denoted by Message.cause_by
plan (Plan, optional): Overall plan. Defaults to None.
code_steps (str, optional): suggested step breakdown for the current task. Defaults to "".
Returns:
str: The code string.
"""
class WriteCodeByGenerate(BaseWriteAnalysisCode):
"""Write code fully by generation"""
def __init__(self, name: str = "", context=None, llm=None) -> str:
super().__init__(name, context, llm)
async def run(
self,
context: [List[Message]],
@ -102,51 +104,61 @@ class WriteCodeByGenerate(BaseWriteAnalysisCode):
class WriteCodeWithTools(BaseWriteAnalysisCode):
"""Write code with help of local available tools. Choose tools first, then generate code to use the tools"""
@staticmethod
def _parse_recommend_tools(module: str, recommend_tools: list) -> Tuple[Dict, List[Dict]]:
def __init__(self, name: str = "", context=None, llm=None, schema_path=None):
super().__init__(name, context, llm)
self.schema_path = schema_path
self.available_tools = {}
if self.schema_path is not None:
self._load_tools(schema_path)
def _load_tools(self, schema_path):
"""Load tools from yaml file"""
yml_files = schema_path.glob("*.yml")
for yml_file in yml_files:
module = yml_file.stem
with open(yml_file, "r", encoding="utf-8") as f:
self.available_tools[module] = yaml.safe_load(f)
def _parse_recommend_tools(self, module: str, recommend_tools: list) -> dict:
"""
Parses and validates a list of recommended tools, and retrieves their schema from registry.
Args:
module (str): The module name for querying tools in the registry.
recommend_tools (list): A list of lists of recommended tools for each step.
recommend_tools (list): A list of recommended tools.
Returns:
Tuple[Dict, List[Dict]]:
- valid_tools: A dict of lists of valid tools for each step.
- tool_catalog: A list of dicts of unique tool schemas.
dict: A dict of valid tool schemas.
"""
valid_tools = {}
available_tools = registry.get_all_by_module(module).keys()
for index, tools in enumerate(recommend_tools):
key = f"Step {index + 1}"
tools = [tool for tool in tools if tool in available_tools]
valid_tools[key] = tools
valid_tools = []
available_tools = self.available_tools[module].keys()
for tool in recommend_tools:
if tool in available_tools:
valid_tools.append(tool)
unique_tools = set()
for tools in valid_tools.values():
unique_tools.update(tools)
tool_catalog = registry.get_schemas(module, unique_tools)
return valid_tools, tool_catalog
tool_catalog = {tool: self.available_tools[module][tool] for tool in valid_tools}
return tool_catalog
async def _tool_recommendation(
self, task: str, data_desc: str, code_steps: str, available_tools: list
self,
task: str,
code_steps: str,
available_tools: dict,
) -> list:
"""
Recommend tools for each step of the specified task
Recommend tools for the specified task.
Args:
task (str): the task description
data_desc (str): the description of the dataset for the task
task (str): the task to recommend tools for
code_steps (str): the code steps to generate the full code for the task
available_tools (list): the available tools for the task
available_tools (dict): the available tools description
Returns:
list: recommended tools for each step of the specified task
list: recommended tools for the specified task
"""
prompt = TOOL_RECOMMENDATION_PROMPT.format(
task=task,
data_desc=data_desc,
current_task=task,
code_steps=code_steps,
available_tools=available_tools,
)
@ -159,160 +171,52 @@ class WriteCodeWithTools(BaseWriteAnalysisCode):
self,
context: List[Message],
plan: Plan = None,
code_steps: str = "",
data_desc: str = "",
) -> str:
column_info: str = "",
**kwargs,
) -> Tuple[List[Message], str]:
task_type = plan.current_task.task_type
task = plan.current_task.instruction
available_tools = registry.get_all_schema_by_module(task_type)
available_tools = [
{k: tool[k] for k in ["name", "description"] if k in tool}
for tool in available_tools
]
code_steps = "\n".join(
[f"Step {step.strip()}" for step in code_steps.split("\n")]
)
recommend_tools = await self._tool_recommendation(
task, code_steps, available_tools
)
recommend_tools, tool_catalog = self._parse_recommend_tools(task_type, recommend_tools)
logger.info(f"Recommended tools for every steps: {recommend_tools}")
available_tools = self.available_tools.get(task_type, {})
special_prompt = ML_SPECIFIC_PROMPT.get(task_type, "")
module_name = ML_MODULE_MAP[task_type]
output_desc = TOOL_OUTPUT_DESC.get(task_type, "")
all_tasks = ""
completed_code = ""
code_steps = plan.current_task.code_steps
for i, task in enumerate(plan.tasks):
stats = "DONE" if task.is_finished else "TODO"
all_tasks += f"Subtask {task.task_id}: {task.instruction}({stats})\n"
finished_tasks = plan.get_finished_tasks()
code_context = [remove_comments(task.code) for task in finished_tasks]
code_context = "\n\n".join(code_context)
for task in plan.tasks:
if task.code:
completed_code += task.code + "\n"
if len(available_tools) > 0:
available_tools = {k: v["description"] for k, v in available_tools.items()}
recommend_tools = await self._tool_recommendation(
plan.current_task.instruction,
code_steps,
available_tools
)
tool_catalog = self._parse_recommend_tools(task_type, recommend_tools)
logger.info(f"Recommended tools: \n{recommend_tools}")
module_name = ML_MODULE_MAP[task_type]
prompt = TOOL_USAGE_PROMPT.format(
user_requirement=plan.goal,
history_code=code_context,
current_task=plan.current_task.instruction,
column_info=column_info,
special_prompt=special_prompt,
code_steps=code_steps,
module_name=module_name,
tool_catalog=tool_catalog,
)
else:
prompt = GENERATE_CODE_PROMPT.format(
user_requirement=plan.goal,
history_code=code_context,
current_task=plan.current_task.instruction,
column_info=column_info,
special_prompt=special_prompt,
code_steps=code_steps,
)
prompt = TOO_ORGANIZATION_PROMPT.format(
all_tasks=all_tasks,
completed_code=completed_code,
data_desc=data_desc,
special_prompt=special_prompt,
code_steps=code_steps,
module_name=module_name,
output_desc=output_desc,
available_tools=recommend_tools,
tool_catalog=tool_catalog,
)
tool_config = create_func_config(CODE_GENERATOR_WITH_TOOLS)
rsp = await self.llm.aask_code(prompt, **tool_config)
return rsp["code"]
class MakeTools(WriteCodeByGenerate):
DEFAULT_SYSTEM_MSG = """Please Create a very General Function Code startswith `def` from any codes you got.\n
**Notice:
1. Your code must contain a general function start with `def`.
2. Refactor your code to get the most efficient implementation for large input data in the shortest amount of time.
3. Use Google style for function annotations.
4. Write example code after `if __name__ == '__main__':`by using old varibales in old code,
and make sure it could be execute in the user's machine.
5. Do not have missing package references.**
"""
def __init__(self, name: str = '', context: list[Message] = None, llm: LLM = None, workspace: str = None):
"""
:param str name: name, defaults to ''
:param list[Message] context: context, defaults to None
:param LLM llm: llm, defaults to None
:param str workspace: tools code saved file path dir, defaults to None
"""
super().__init__(name, context, llm)
self.workspace = workspace or str(Path(__file__).parents[1].joinpath("./tools/functions/libs/udf"))
self.file_suffix: str = '.py'
def parse_function_name(self, function_code: str) -> str:
# 定义正则表达式模式
pattern = r'\bdef\s+([a-zA-Z_]\w*)\s*\('
# 在代码中搜索匹配的模式
match = re.search(pattern, function_code)
# 如果找到匹配项则返回匹配的函数名否则返回None
if match:
return match.group(1)
else:
return None
def save(self, tool_code: str) -> None:
func_name = self.parse_function_name(tool_code)
if func_name is None:
raise ValueError(f"No function name found in {tool_code}")
saved_path = Path(self.workspace).joinpath(func_name+self.file_suffix)
logger.info(f"Saved tool_code {func_name} in {str(saved_path)}.")
saved_path.write_text(tool_code, encoding='utf-8')
@retry(stop=stop_after_attempt(3), wait=wait_fixed(1))
async def run(self, code_message: List[Message | Dict], **kwargs) -> str:
msgs = self.process_msg(code_message)
logger.info(f"\n\nAsk to Make tools:\n{'-'*60}\n {msgs[-1]}")
tool_code = await self.llm.aask_code(msgs, **kwargs)
max_tries, current_try = 3, 1
func_name = self.parse_function_name(tool_code['code'])
while current_try < max_tries and func_name is None:
logger.info(f"\n\nTools Respond\n{'-'*60}\n: {tool_code}")
logger.warning(f"No function name found in code, we will retry make tools. \n\n{tool_code['code']}\n")
msgs.append({'role': 'assistant', 'content': 'We need a general function in above code,but not found function.'})
tool_code = await self.llm.aask_code(msgs, **kwargs)
current_try += 1
func_name = self.parse_function_name(tool_code['code'])
if func_name is not None:
break
self.save(tool_code['code'])
return tool_code["code"]
class WriteCodeWithUDFs(WriteCodeByGenerate):
"""Write code with user defined function."""
from metagpt.tools.functions.libs.udf import UDFS
UDFS_DEFAULT_SYSTEM_MSG = f"""Please remember these functions, you will use these functions to write code:\n
{UDFS}, **Notice: 1. if no udf meets user requirement, please send `No udf found`. 2.Only use function code provied to you.
3. Dont generate code from scratch.**
"""
async def aask_code_and_text(self, context: List[Dict], **kwargs) -> Tuple[str]:
rsp = await self.llm.acompletion(context, **kwargs)
rsp_content = self.llm.get_choice_text(rsp)
code = CodeParser.parse_code(None, rsp_content)
if 'No udf found' in code or 'No udf found' in rsp_content:
rsp_content = 'No udf found'
code = 'No udf found'
return code, rsp_content
async def run(self, context: List[Message], plan: Plan = None, **kwargs) -> str:
from metagpt.tools.functions.libs.udf import UDFS
if len(UDFS) > 0:
# Write code from user defined function.
prompt = self.process_msg(context, self.UDFS_DEFAULT_SYSTEM_MSG)
logger.info(prompt[-1])
try:
logger.info("Local user defined function as following:")
logger.info(json.dumps(UDFS, indent=4, ensure_ascii=False))
except Exception:
from pprint import pprint
pprint(UDFS)
logger.info('Writing code from user defined function by LLM...')
code, _ = await self.aask_code_and_text(prompt, **kwargs)
logger.info(f"Writing code from user defined function: \n{'-'*50}\n {code}")
if code != 'No udf found':
return code
logger.warning("No udf found, we will write code from scratch by LLM.")
# Writing code from scratch.
logger.warning("Writing code from scratch by LLM.")
code = await super().run(context, plan, self.DEFAULT_SYSTEM_MSG, **kwargs)
logger.info(f"Code Writing code from scratch by LLM is :\n{'-'*60}\n {code}")
# Make tools for above code.
logger.info("Make tools for above code.")
make_tools = MakeTools()
tool_code = await make_tools.run(code)
make_tools.save(tool_code)
return code
context = [Message(content=prompt, role="user")]
return context, rsp["code"]

View file

@ -4,18 +4,37 @@ from typing import Dict, List, Union
from metagpt.actions import Action
from metagpt.schema import Message, Task, Plan
from metagpt.utils.common import CodeParser
# CODE_STEPS_PROMPT_TEMPLATE = """
# # Context
# {context}
#
# -----
# Tasks are all code development tasks.
# You are a professional engineer, the main goal is to plan out concise solution steps for Current Task before coding.
# A planning process can reduce the difficulty and improve the quality of coding.
# You may be given some code plans for the tasks ahead, but you don't have to follow the existing plan when planning the current task.
# The output plan should following the subsequent principles:
# 1.The plan is a rough checklist of steps outlining the entire program's structure.Try to keep the number of steps fewer than 5.
# 2.The steps should be written concisely and at a high level, avoiding overly detailed implementation specifics.
# 3.The execution of the plan happens sequentially, but the plan can incorporate conditional (if) and looping(loop) keywords for more complex structures.
#
# Output the code steps in a JSON format, as shown in this example:
# ```json
# {
# "Step 1": "",
# "Step 2": "",
# "Step 3": "",
# ...
# }
# ```
# """
CODE_STEPS_PROMPT_TEMPLATE = """
# Context
{context}
## Format example
1.
2.
3.
...
-----
Tasks are all code development tasks.
You are a professional engineer, the main goal is to plan out concise solution steps for Current Task before coding.
@ -25,14 +44,35 @@ The output plan should following the subsequent principles:
1.The plan is a rough checklist of steps outlining the entire program's structure.Try to keep the number of steps fewer than 5.
2.The steps should be written concisely and at a high level, avoiding overly detailed implementation specifics.
3.The execution of the plan happens sequentially, but the plan can incorporate conditional (if) and looping(loop) keywords for more complex structures.
4.Output carefully referenced "Format example" in format.
4.Design and provide code steps by following the code logic. Analyze the provided code step by step and reuse the imported library.
Output the code steps in a JSON format, as shown in this example:
```json
{
"Step 1": "",
"Step 2": "",
"Step 3": "",
...
}
```
"""
# STRUCTURAL_CONTEXT = """
# ## User Requirement
# {user_requirement}
# ## Current Plan
# {tasks}
# ## Current Task
# {current_task}
# """
STRUCTURAL_CONTEXT = """
## User Requirement
{user_requirement}
## Current Plan
## Plan
{tasks}
## Codes
{codes}
## Current Task
{current_task}
"""
@ -51,27 +91,34 @@ class WriteCodeSteps(Action):
"""
context = self.get_context(plan)
code_steps_prompt = CODE_STEPS_PROMPT_TEMPLATE.format(
context=context,
code_steps_prompt = CODE_STEPS_PROMPT_TEMPLATE.replace(
"{context}", context
)
code_steps = await self._aask(code_steps_prompt)
code_steps = CodeParser.parse_code(block=None, text=code_steps)
return code_steps
def get_context(self, plan: Plan):
user_requirement = plan.goal
select_task_keys = ['task_id', 'instruction', 'is_finished', 'code_steps']
# select_task_keys = ['task_id', 'instruction', 'is_finished', 'code']
# select_task_keys = ['task_id','instruction']
def process_task(task):
task_dict = task.dict()
ptask = {k: task_dict[k] for k in task_dict if k in select_task_keys}
# ptask = {k: task_dict[k] for k in task_dict if k in select_task_keys }
ptask = f"task_id_{task_dict['task_id']}:{task_dict['instruction']}\n"
return ptask
tasks = json.dumps(
[process_task(task) for task in plan.tasks], indent=4, ensure_ascii=False
)
code_lists = [task.code for task in plan.tasks if task.is_finished==True]
codes = "\n\n".join(code_lists)
current_task = json.dumps(process_task(plan.current_task)) if plan.current_task else {}
context = STRUCTURAL_CONTEXT.format(
user_requirement=user_requirement, tasks=tasks, current_task=current_task
user_requirement=user_requirement, tasks=tasks, codes=codes, current_task=current_task
)
# print(context)
return context

View file

@ -4,25 +4,74 @@
# @Author : lidanyang
# @File : ml_engineer
# @Desc :
ASSIGN_TASK_TYPE_PROMPT = """
## All Task Type:
- **data_preprocess**: Only involve cleaning and preparing data through techniques like imputation, scaling, and encoding, not containing reading data, feature engineering, model training, etc.
- **feature_engineering**: Involves enhancing data features through techniques like encoding, aggregation, time component analysis, and creating polynomial and interaction features, etc.
- **other**: Any tasks that do not fit into the previous categories, such as visualization, summarizing findings, build model, etc.
UPDATE_DATA_COLUMNS = """
# Background
Keep dataset column information updated to reflect changes in training or testing datasets, aiding in informed decision-making during data analysis.
## Done Tasks
```python
{history_code}
```end
# Task
Update and print the dataset's column information only if the train or test data has changed. Use the following code:
```python
from metagpt.tools.functions.libs.data_preprocess import get_column_info
column_info = get_column_info(df)
print("df_column_info")
print(column_info)
```end
# Constraints:
- Use the DataFrame variable from 'Done Tasks' in place of df.
- Import `get_column_info` only if it's not already imported.
- Skip update if no changes in training/testing data, except for initial data load.
- No need to update info if only model evaluation is performed.
"""
GEN_DATA_DESC_PROMPT = """
Here is the head 5 rows of the dataset:
{data_head}
Please provide a brief one-sentence background of the dataset, and concise meaning for each column. Keep descriptions short.
Output the information in a JSON format, as shown in this example:
```json
{
"data_desc": "Brief dataset background.",
"column_desc": {
"column_name1": "Abstract meaning of the first column.",
"column_name2": "Abstract meaning of the second column.",
...
}
}
```
# Constraints:
- Don't contain specific values or examples found in the data column.
"""
ASSIGN_TASK_TYPE_PROMPT = """
Please assign a task type to each task in the list below from the given categories:
{task_list}
## All Task Type:
- **feature_engineering**: Only for creating new columns for input data.
- **data_preprocess**: Only for changing value inplace.
- **model_train**: Only for training model.
- **model_evaluate**: Only for evaluating model.
- **other**: Any tasks that do not fit into the previous categories, such as visualization, summarizing findings, etc.
"""
ASSIGN_TASK_TYPE = {
"name": "assign_task_type",
"description": "assign task type to each task by order",
"description": "Assign task type to each task by order.",
"parameters": {
"type": "object",
"properties": {
"task_type": {
"type": "array",
"description": "List of task type.",
"description": "List of task type. The length should as long as task list",
"items": {
"type": "string",
},
@ -32,45 +81,36 @@ ASSIGN_TASK_TYPE = {
},
}
TOOL_RECOMMENDATION_PROMPT = """
## Comprehensive Task Description:
{task}
## User Requirement:
{current_task}
## Dataset Description:
Details about the dataset for the project:
{data_desc}
This task is divided into several steps, and you need to select the most suitable tools for each step. A tool means a function that can be used to help you solve the task.
## Detailed Code Steps for the Task:
## Task
Recommend up to five tools from 'Available Tools' that can help solve the 'User Requirement'.
This is a detailed code steps for current task. You can refer to it when recommending tools.
{code_steps}
## List of Available Tools:
## Available Tools:
{available_tools}
## Tool Selection and Instructions:
- For each code step listed above, choose up to five tools that are most likely to be useful in solving the task.
- If you believe that no tools are suitable for a step, indicate with an empty list.
- Select tools most relevant to completing the 'User Requirement'.
- If you believe that no tools are suitable, indicate with an empty list.
- Only list the names of the tools, not the full schema of each tool.
- The result should only contain tool names that are in the list of available tools.
- The result list should be in the same order as the code steps.
- Ensure selected tools are listed in 'Available Tools'.
"""
SELECT_FUNCTION_TOOLS = {
"name": "select_function_tools",
"description": "Given code steps to generate full code for a task, select suitable tools for each step by order.",
"description": "For current task, select suitable tools for it.",
"parameters": {
"type": "object",
"properties": {
"recommend_tools": {
"type": "array",
"description": "List of tool names for each code step. Empty list if no tool is suitable.",
"description": "List of tool names. Empty list if no tool is suitable.",
"items": {
"type": "array",
"items": {
"type": "string",
},
"type": "string",
},
},
},
@ -78,129 +118,189 @@ SELECT_FUNCTION_TOOLS = {
},
}
CODE_GENERATOR_WITH_TOOLS = {
"name": "add_subtask_code",
"description": "Add new code of current subtask to the end of an active Jupyter notebook.",
"description": "Add new code cell of current task to the end of an active Jupyter notebook.",
"parameters": {
"type": "object",
"properties": {
"code": {
"type": "string",
"description": "The code to be added.",
"description": "The code to be added to a new cell in jupyter.",
},
},
"required": ["code"],
},
}
TOO_ORGANIZATION_PROMPT = """
As a senior data scientist, your role involves developing code for a specific sub-task within a larger project. This project is divided into several sub-tasks, which may either be new challenges or extensions of previous work.
## Sub-tasks Overview
Here's a list of all the sub-tasks, indicating their current status (DONE or TODO). Your responsibility is the first TODO task on this list.
{all_tasks}
PRINT_DATA_COLUMNS = {
"name": "print_column_info",
"description": "Print the latest column information after 'Done Tasks' code if first read or data changed.",
"parameters": {
"type": "object",
"properties": {
"is_update": {
"type": "boolean",
"description": "Whether need to update the column info.",
},
"code": {
"type": "string",
"description": "The code to be added to a new cell in jupyter.",
},
},
"required": ["is_update", "code"],
},
}
## Historical Code (Previously Done Sub-tasks):
This code, already executed in the Jupyter notebook, is critical for understanding the background and foundation for your current task.
GENERATE_CODE_PROMPT = """
# Background
As a data scientist, you need to help user to achieve their goal [{user_requirement}] step-by-step in an continuous Jupyter notebook.
## Done Tasks
```python
{completed_code}
```
## Dataset Description:
Details about the dataset for the project:
{data_desc}
## Current Task Notion:
{special_prompt}
## Code Steps for Your Sub-task:
Follow these steps to complete your current TODO task. You may use external Python functions or write custom code as needed. Ensure your code is self-contained.
{code_steps}
When you call a function, you should import the function from `{module_name}` first, e.g.:
```python
from metagpt.tools.functions.libs.feature_engineering import fill_missing_value
```
## Available Functions for Each Step:
Here's a list of all available functions for each step. You can find more details about each function in [## Function Catalog]
{available_tools}
## Function Catalog:
Each function is described in JSON format, including the function name and parameters. {output_desc}
{function_catalog}
## Your Output Format:
Generate the complete code for every step, listing any used function tools at the beginning of the step:
```python
# Step 1
# Tools used: [function names or 'none']
<your code for this step, without any comments>
# Step 2
# Tools used: [function names or 'none']
<your code for this step, without any comments>
# Continue with additional steps, following the same format...
{history_code}
```end
*** Important Rules ***
- Use only the tools designated for each code step.
- Your output should only include code for the current sub-task. Don't repeat historical code.
- Only mention functions in comments if used in the code.
- Ensure the output new code is executable in the current Jupyter notebook environment, with all historical code executed.
## Current Task
{current_task}
# Latest Data Info
Latest data info after previous tasks:
{column_info}
# Task
Write complete code for 'Current Task'. And avoid duplicating code from 'Done Tasks', such as repeated import of packages, reading data, etc.
Specifically, {special_prompt}
# Code Steps:
Strictly follow steps below when you writing code if it's convenient.
{code_steps}
# Output Example:
when current task is "train a lightgbm model on training data", and their are two steps in 'Code Steps', the code be like:
```python
# Step 1: check data type and convert to numeric
ojb_cols = train.select_dtypes(include='object').columns.tolist()
for col in obj_cols:
encoder = LabelEncoder()
train[col] = encoder.fit_transform(train[col])
test[col] = test[col].apply(lambda x: x if x in encoder.classes_ else 'unknown')
test[col] = encoder.transform(test[col])
# Step 2: train lightgbm model
model = LGBMClassifier()
model.fit(train, y_train)
```end
# Constraints:
- Ensure the output new code is executable in the same Jupyter notebook with previous tasks code have been executed.
- The output code should contain all steps implemented in 'Code Steps'.
"""
TOOL_USAGE_PROMPT = """
# Background
As a data scientist, you need to help user to achieve their goal [{user_requirement}] step-by-step in an continuous Jupyter notebook.
## Done Tasks
```python
{history_code}
```end
## Current Task
{current_task}
# Latest Data Info
Latest data info after previous tasks:
{column_info}
# Task
Write complete code for 'Current Task'. And avoid duplicating code from 'Done Tasks', such as repeated import of packages, reading data, etc.
Specifically, {special_prompt}
# Code Steps:
Strictly follow steps below when you writing code if it's convenient.
{code_steps}
# Capabilities
- You can utilize pre-defined tools in any code lines from 'Available Tools' in the form of Python Class.
- You can freely combine the use of any other public packages, like sklearn, numpy, pandas, etc..
# Available Tools:
Each Class tool is described in JSON format. When you call a tool, import the tool from `{module_name}` first.
{tool_catalog}
# Output Example:
when current task is "do data preprocess, like fill missing value, handle outliers, etc.", and their are two steps in 'Code Steps', the code be like:
```python
# Step 1: fill missing value
# Tools used: ['FillMissingValue']
from metagpt.tools.functions.libs.data_preprocess import FillMissingValue
train_processed = train.copy()
test_processed = test.copy()
num_cols = train_processed.select_dtypes(include='number').columns.tolist()
fill_missing_value = FillMissingValue(features=num_cols, strategy='mean')
fill_missing_value.fit(train_processed)
train_processed = fill_missing_value.transform(train_processed)
test_processed = fill_missing_value.transform(test_processed)
# Step 2: handle outliers
for col in num_cols:
low, high = train_processed[col].quantile([0.01, 0.99])
train_processed[col] = train_processed[col].clip(low, high)
test_processed[col] = test_processed[col].clip(low, high)
```end
# Constraints:
- Ensure the output new code is executable in the same Jupyter notebook with previous tasks code have been executed.
- Always prioritize using pre-defined tools for the same functionality.
- Always copy the DataFrame before processing it and use the copy to process.
- The output code should contain all steps implemented correctly in 'Code Steps'.
"""
#- If 'Code Steps' contains step done in 'Done Tasks', such as reading data, don't repeat it.
DATA_PREPROCESS_PROMPT = """
In data preprocessing, closely monitor each column's data type. Apply suitable methods for various types (numerical, categorical, datetime, textual, etc.) to ensure the pandas.DataFrame is correctly formatted.
Additionally, ensure that the columns being processed must be the ones that actually exist in the dataset.
The current task is about data preprocessing, please note the following:
- Monitor data types per column, applying appropriate methods.
- Ensure operations are on existing dataset columns.
- Avoid writing processed data to files.
- Prefer alternatives to one-hot encoding for categorical data.
- Only encode necessary categorical columns to allow for potential feature-specific engineering tasks later.
"""
FEATURE_ENGINEERING_PROMPT = """
When performing feature engineering, please adhere to the following principles:
- For specific user requests (such as removing a feature, creating a new feature based on existing data), directly generate the corresponding code.
- In cases of unclear user requirements, write feature engineering code that you believe will most improve model performance. This may include feature transformation, combination, aggregation, etc., with a limit of five features at a time.
- Ensure that the feature you're working with is indeed present in the dataset and consider the data type (numerical, categorical, etc.) and application scenario (classification, regression tasks, etc.).
- Importantly, provide detailed comments explaining the purpose of each feature and how it might enhance model performance, especially when the features are generated based on semantic understanding without clear user directives.
The current task is about feature engineering. when performing it, please adhere to the following principles:
- Ensure operations are on existing dataset columns and consider the data type (numerical, categorical, etc.) and application scenario (classification, regression tasks, etc.).
- Create impactful features based on real-world knowledge and column info.
- Generate as diverse features as possible to improve the model's performance.
- If potential impactful features are not included in 'Code Steps', add new steps to generate them.
"""
MODEL_TRAIN_PROMPT = """
When selecting and training a model, please follow these guidelines to ensure optimal performance:
The current task is about training a model, please ensure high performance:
- Keep in mind that your user prioritizes results and is highly focused on model performance. So, when needed, feel free to use models of any complexity to improve effectiveness, such as lightGBM, XGBoost, CatBoost, etc.
If user specifies a model, use that model. Otherwise, use the model you believe will best solve the problem.
- Before training, first check not is_numeric_dtype columns and use label encoding to convert them to numeric columns.
- Use the data from previous task result directly, do not mock or reload data yourself.
"""
DATA_PREPROCESS_OUTPUT_DESC = "Please note that all functions uniformly output a processed pandas.DataFrame, facilitating seamless integration into the broader workflow."
FEATURE_ENGINEERING_OUTPUT_DESC = "Please note that all functions uniformly output updated pandas.DataFrame with feature engineering applied."
CLASSIFICATION_MODEL_OUTPUT_DESC = ""
REGRESSION_MODEL_OUTPUT_DESC = ""
MODEL_EVALUATE_PROMPT = """
The current task is about evaluating a model, please note the following:
- Ensure that the evaluated data is same processed as the training data. If not, remember use object in 'Done Tasks' to transform the data.
- Use trained model from previous task result directly, do not mock or reload model yourself.
"""
ML_SPECIFIC_PROMPT = {
"data_preprocess": DATA_PREPROCESS_PROMPT,
"feature_engineering": FEATURE_ENGINEERING_PROMPT,
"classification_model": MODEL_TRAIN_PROMPT,
"regression_model": MODEL_TRAIN_PROMPT,
}
TOOL_OUTPUT_DESC = {
"data_preprocess": DATA_PREPROCESS_OUTPUT_DESC,
"feature_engineering": FEATURE_ENGINEERING_OUTPUT_DESC,
"classification_model": CLASSIFICATION_MODEL_OUTPUT_DESC,
"regression_model": REGRESSION_MODEL_OUTPUT_DESC,
"model_train": MODEL_TRAIN_PROMPT,
"model_evaluate": MODEL_EVALUATE_PROMPT,
}
ML_MODULE_MAP = {
"data_preprocess": "metagpt.tools.functions.libs.machine_learning.data_preprocess",
"feature_engineering": "metagpt.tools.functions.libs.machine_learning.feature_engineering",
"classification_model": "metagpt.tools.functions.libs.machine_learning.ml_model",
"regression_model": "metagpt.tools.functions.libs.machine_learning.ml_model",
"data_preprocess": "metagpt.tools.functions.libs.data_preprocess",
"feature_engineering": "metagpt.tools.functions.libs.feature_engineering",
}
STRUCTURAL_CONTEXT = """

View file

@ -15,6 +15,7 @@ from tenacity import (
retry,
retry_if_exception_type,
stop_after_attempt,
wait_random_exponential,
wait_fixed,
)
@ -259,7 +260,8 @@ class OpenAIGPTAPI(BaseGPTAPI, RateLimiter):
rsp = self.llm.ChatCompletion.create(**self._func_configs(messages, **kwargs))
self._update_costs(rsp.get("usage"))
return rsp
@retry(wait=wait_random_exponential(min=1, max=60), stop=stop_after_attempt(6))
async def _achat_completion_function(self, messages: list[dict], **chat_configs) -> dict:
rsp = await self.llm.ChatCompletion.acreate(**self._func_configs(messages, **chat_configs))
self._update_costs(rsp.get("usage"))

View file

@ -4,19 +4,40 @@ from datetime import datetime
import fire
from metagpt.roles import Role
from metagpt.schema import Message, Plan
from metagpt.memory import Memory
from metagpt.logs import logger
from metagpt.actions.write_plan import WritePlan, update_plan_from_rsp, precheck_update_plan_from_rsp
from metagpt.actions.write_analysis_code import WriteCodeByGenerate, WriteCodeWithTools, MakeTools
from metagpt.actions.ml_da_action import AskReview, SummarizeAnalysis, Reflect, ReviewConst
from metagpt.actions import Action
from metagpt.actions.debug_code import DebugCode
from metagpt.actions.execute_code import ExecutePyCode
from metagpt.roles.kaggle_manager import DownloadData, SubmitResult
from metagpt.prompts.ml_engineer import STRUCTURAL_CONTEXT
from metagpt.actions.ml_da_action import AskReview, SummarizeAnalysis, Reflect, ReviewConst
from metagpt.actions.write_analysis_code import WriteCodeByGenerate, WriteCodeWithTools
from metagpt.actions.write_code_steps import WriteCodeSteps
from metagpt.actions.write_plan import WritePlan
from metagpt.actions.write_plan import update_plan_from_rsp, precheck_update_plan_from_rsp
from metagpt.const import DATA_PATH, PROJECT_ROOT
from metagpt.logs import logger
from metagpt.memory import Memory
from metagpt.prompts.ml_engineer import STRUCTURAL_CONTEXT
from metagpt.prompts.ml_engineer import (
UPDATE_DATA_COLUMNS,
PRINT_DATA_COLUMNS
)
from metagpt.roles import Role
from metagpt.roles.kaggle_manager import DownloadData, SubmitResult
from metagpt.schema import Message, Plan
from metagpt.utils.common import remove_comments, create_func_config
from metagpt.utils.save_code import save_code_file
class UpdateDataColumns(Action):
async def run(self, plan: Plan = None) -> dict:
finished_tasks = plan.get_finished_tasks()
code_context = [remove_comments(task.code) for task in finished_tasks]
code_context = "\n\n".join(code_context)
prompt = UPDATE_DATA_COLUMNS.format(history_code=code_context)
tool_config = create_func_config(PRINT_DATA_COLUMNS)
rsp = await self.llm.aask_code(prompt, **tool_config)
return rsp
class MLEngineer(Role):
def __init__(
self, name="ABC", profile="MLEngineer", goal="", auto_run: bool = False
@ -30,6 +51,7 @@ class MLEngineer(Role):
self.use_code_steps = False
self.execute_code = ExecutePyCode()
self.auto_run = auto_run
self.data_desc = {}
# memory for working on each task, discarded each time a task is done
self.working_memory = Memory()
@ -58,7 +80,7 @@ class MLEngineer(Role):
logger.info(f"ready to take on task {task}")
# take on current task
code, result, success, code_steps = await self._write_and_exec_code()
code, result, success = await self._write_and_exec_code()
# ask for acceptance, users can other refuse and change tasks in the plan
review, task_result_confirmed = await self._ask_review(trigger=ReviewConst.TASK_REVIEW_TRIGGER)
@ -72,10 +94,14 @@ class MLEngineer(Role):
# tick off this task and record progress
task.code = code
task.result = result
task.code_steps = code_steps
self.plan.finish_current_task()
self.working_memory.clear()
if self.use_tools:
success, new_code = await self._update_data_columns()
if success:
task.code = task.code + "\n\n" + new_code
confirmed_and_more = (ReviewConst.CONTINUE_WORD[0] in review.lower()
and review.lower() not in ReviewConst.CONTINUE_WORD[0]) # "confirm, ... (more content, such as changing downstream tasks)"
if confirmed_and_more:
@ -103,8 +129,18 @@ class MLEngineer(Role):
save_code_file(name=project_record, code_context=self.execute_code.nb, file_format="ipynb")
return rsp
async def _update_data_columns(self):
rsp = await UpdateDataColumns().run(self.plan)
is_update, code = rsp["is_update"], rsp["code"]
success = False
if is_update:
result, success = await self.execute_code.run(code)
if success:
self.data_desc["column_info"] = result
return success, code
async def _write_and_exec_code(self, max_retry: int = 3):
code_steps = (
self.plan.current_task.code_steps = (
await WriteCodeSteps().run(self.plan)
if self.use_code_steps
else ""
@ -112,6 +148,8 @@ class MLEngineer(Role):
counter = 0
success = False
debug_context = []
while not success and counter < max_retry:
context = self.get_useful_memories()
@ -119,21 +157,35 @@ class MLEngineer(Role):
# print(context)
# print("*" * 10)
# breakpoint()
if not self.use_tools or self.plan.current_task.task_type == "other":
# code = "print('abc')"
code = await WriteCodeByGenerate().run(
context=context, plan=self.plan, code_steps=code_steps, temperature=0.0
if counter > 0 and self.use_tools:
code = await DebugCode().run(
plan=self.plan.current_task.instruction,
code=code,
runtime_result=self.working_memory.get(),
context=debug_context
)
logger.info(f"new code \n{code}")
cause_by = DebugCode
elif not self.use_tools or self.plan.current_task.task_type == "other":
logger.info("Write code with pure generation")
code = await WriteCodeByGenerate().run(
context=context, plan=self.plan, temperature=0.0
)
debug_context = [self.get_useful_memories(task_exclude_field={'result', 'code_steps'})[0]]
cause_by = WriteCodeByGenerate
# make and save tools.
make_tools = MakeTools()
tool_code = await make_tools.run(code)
make_tools.save(tool_code)
else:
code = await WriteCodeWithTools().run(
context=context, plan=self.plan, code_steps=code_steps, data_desc=""
logger.info("Write code with tools")
schema_path = PROJECT_ROOT / "metagpt/tools/functions/schemas"
tool_context, code = await WriteCodeWithTools(schema_path=schema_path).run(
context=context,
plan=self.plan,
column_info=self.data_desc.get("column_info", ""),
)
debug_context = tool_context
cause_by = WriteCodeWithTools
self.working_memory.add(
@ -157,7 +209,7 @@ class MLEngineer(Role):
if ReviewConst.CHANGE_WORD[0] in review:
counter = 0 # redo the task again with help of human suggestions
return code, result, success, code_steps
return code, result, success
async def _ask_review(self, auto_run: bool = None, trigger: str = ReviewConst.TASK_REVIEW_TRIGGER):
auto_run = auto_run or self.auto_run
@ -205,16 +257,16 @@ class MLEngineer(Role):
self.working_memory.add(Message(content=reflection, role="assistant"))
self.working_memory.add(Message(content=Reflect.REWRITE_PLAN_INSTRUCTION, role="user"))
def get_useful_memories(self) -> List[Message]:
def get_useful_memories(self, task_exclude_field=None) -> List[Message]:
"""find useful memories only to reduce context length and improve performance"""
# TODO dataset description , code steps
user_requirement = self.plan.goal
data_desc = self.plan.context
tasks = [task.dict() for task in self.plan.tasks]
for task in tasks:
if task_exclude_field is None:
# Shorten the context as we don't need code steps after we get the codes.
# This doesn't affect current_task below, which should hold the code steps
task.pop("code_steps")
task_exclude_field = {'code_steps'}
user_requirement = self.plan.goal
data_desc = self.plan.context
tasks = [task.dict(exclude=task_exclude_field) for task in self.plan.tasks]
tasks = json.dumps(tasks, indent=4, ensure_ascii=False)
current_task = self.plan.current_task.json() if self.plan.current_task else {}
context = STRUCTURAL_CONTEXT.format(

View file

@ -78,10 +78,10 @@ class Task(BaseModel):
dependent_task_ids: list[str] = [] # Tasks prerequisite to this Task
instruction: str = ""
task_type: str = ""
code_steps: str = ""
code: str = ""
result: str = ""
is_finished: bool = False
code_steps: str = ""
class Plan(BaseModel):

View file

@ -4,5 +4,3 @@
# @Author : lidanyang
# @File : __init__.py
# @Desc :
from metagpt.tools.functions.register.register import registry
import metagpt.tools.functions.libs.feature_engineering

View file

@ -0,0 +1,16 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2023/12/10 20:12
# @Author : lidanyang
# @File : base
# @Desc :
class MLProcess(object):
def fit(self, df):
raise NotImplementedError
def transform(self, df):
raise NotImplementedError
def fit_transform(self, df):
self.fit(df)
return self.transform(df)

View file

@ -1,123 +1,153 @@
import pandas as pd
import numpy as np
import pandas as pd
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import LabelEncoder
from sklearn.preprocessing import KBinsDiscretizer
from sklearn.preprocessing import MinMaxScaler
from sklearn.preprocessing import StandardScaler
from sklearn.preprocessing import MaxAbsScaler
from sklearn.preprocessing import RobustScaler
from sklearn.preprocessing import MinMaxScaler
from sklearn.preprocessing import OneHotEncoder
from sklearn.preprocessing import OrdinalEncoder
from sklearn.preprocessing import RobustScaler
from sklearn.preprocessing import StandardScaler
from metagpt.tools.functions import registry
from metagpt.tools.functions.schemas.data_preprocess import *
from metagpt.tools.functions.libs.base import MLProcess
@registry.register("data_preprocess", FillMissingValue)
def fill_missing_value(df: pd.DataFrame, features: list, strategy: str = 'mean', fill_value=None,):
df[features] = SimpleImputer(strategy=strategy, fill_value=fill_value).fit_transform(df[features])
return df
class FillMissingValue(MLProcess):
def __init__(self, features: list, strategy: str = 'mean', fill_value=None,):
self.features = features
self.strategy = strategy
self.fill_value = fill_value
self.si = None
def fit(self, df: pd.DataFrame):
self.si = SimpleImputer(strategy=self.strategy, fill_value=self.fill_value)
self.si.fit(df[self.features])
def transform(self, df: pd.DataFrame):
df[self.features] = self.si.transform(df[self.features])
return df
# @registry.register("data_preprocess", FillMissingValue)
# def label_encode(df: pd.DataFrame, features: list,):
# for col in features:
# df[col] = LabelEncoder().fit_transform(df[col])
# return df
class MinMaxScale(MLProcess):
def __init__(self, features: list,):
self.features = features
self.mms = None
def fit(self, df: pd.DataFrame):
self.mms = MinMaxScaler()
self.mms.fit(df[self.features])
def transform(self, df: pd.DataFrame):
df[self.features] = self.mms.transform(df[self.features])
return df
@registry.register("data_preprocess", SplitBins)
def split_bins(df: pd.DataFrame, features: list, strategy: str = 'quantile',):
df[features] = KBinsDiscretizer(strategy=strategy, encode='ordinal').fit_transform(df[features])
return df
class StandardScale(MLProcess):
def __init__(self, features: list,):
self.features = features
self.ss = None
def fit(self, df: pd.DataFrame):
self.ss = StandardScaler()
self.ss.fit(df[self.features])
def transform(self, df: pd.DataFrame):
df[self.features] = self.ss.transform(df[self.features])
return df
@registry.register("data_preprocess", MinMaxScale)
def min_max_scale(df: pd.DataFrame, features: list, ):
df[features] = MinMaxScaler().fit_transform(df[features])
return df
class MaxAbsScale(MLProcess):
def __init__(self, features: list,):
self.features = features
self.mas = None
def fit(self, df: pd.DataFrame):
self.mas = MaxAbsScaler()
self.mas.fit(df[self.features])
def transform(self, df: pd.DataFrame):
df[self.features] = self.mas.transform(df[self.features])
return df
@registry.register("data_preprocess", StandardScale)
def standard_scale(df: pd.DataFrame, features: list, ):
df[features] = StandardScaler().fit_transform(df[features])
return df
class RobustScale(MLProcess):
def __init__(self, features: list,):
self.features = features
self.rs = None
def fit(self, df: pd.DataFrame):
self.rs = RobustScaler()
self.rs.fit(df[self.features])
def transform(self, df: pd.DataFrame):
df[self.features] = self.rs.transform(df[self.features])
return df
@registry.register("data_preprocess", LogTransform)
def log_transform(df: pd.DataFrame, features: list, ):
for col in features:
if df[col].min() <= 0:
df[col] = df[col] - df[col].min() + 2
df[col] = np.log(df[col])
return df
class OrdinalEncode(MLProcess):
def __init__(self, features: list,):
self.features = features
self.oe = None
def fit(self, df: pd.DataFrame):
self.oe = OrdinalEncoder()
self.oe.fit(df[self.features])
def transform(self, df: pd.DataFrame):
df[self.features] = self.oe.transform(df[self.features])
return df
@registry.register("data_preprocess", MaxAbsScale)
def max_abs_scale(df: pd.DataFrame, features: list, ):
df[features] = MaxAbsScaler().fit_transform(df[features])
return df
class OneHotEncode(MLProcess):
def __init__(self, features: list,):
self.features = features
self.ohe = None
def fit(self, df: pd.DataFrame):
self.ohe = OneHotEncoder(handle_unknown="ignore", sparse=False)
self.ohe.fit(df[self.features])
def transform(self, df: pd.DataFrame):
ts_data = self.ohe.transform(df[self.features])
new_columns = self.ohe.get_feature_names_out(self.features)
ts_data = pd.DataFrame(ts_data, columns=new_columns, index=df.index)
df.drop(self.features, axis=1, inplace=True)
df = pd.concat([df, ts_data], axis=1)
return df
@registry.register("data_preprocess", RobustScale)
def robust_scale(df: pd.DataFrame, features: list, ):
df[features] = RobustScaler().fit_transform(df[features])
return df
class LabelEncode(MLProcess):
def __init__(self, features: list,):
self.features = features
self.le_encoders = []
def fit(self, df: pd.DataFrame):
for col in self.features:
le = LabelEncoder().fit(df[col].astype(str).unique().tolist() + ['unknown'])
self.le_encoders.append(le)
def transform(self, df: pd.DataFrame):
for i in range(len(self.features)):
data_list = df[self.features[i]].astype(str).tolist()
for unique_item in np.unique(df[self.features[i]].astype(str)):
if unique_item not in self.le_encoders[i].classes_:
data_list = ['unknown' if x == unique_item else x for x in data_list]
df[self.features[i]] = self.le_encoders[i].transform(data_list)
return df
@registry.register("data_preprocess", OrdinalEncode)
def ordinal_encode(df: pd.DataFrame, features: list,):
df[features] = OrdinalEncoder().fit_transform(df[features])
return df
def get_column_info(df: pd.DataFrame) -> dict:
data = []
for i in df.columns:
nan_freq = float("%.2g" % (df[i].isna().mean() * 100))
n_unique = df[i].nunique()
data_type = str(df[i].dtype).replace("dtype('", "").replace("')", "")
if data_type == "O":
data_type = "object"
data.append([i, data_type, nan_freq, n_unique])
if __name__ == '__main__':
def run():
V = {
'a': [-1, 2, 3, 6, 5, 4],
'b': [1.1, 2.2, 3.3, 6.6, 5.5, 4.4],
'c': ['aa', 'bb', 'cc', 'dd', 'ee', 'ff'],
'd': [1, None, 3, None, 5, 4],
'e': [1.1, np.NAN, 3.3, None, 5.5, 4.4],
'f': ['aa', np.NAN, 'cc', None, '', 'ff'],
}
df = pd.DataFrame(V)
print(df.dtypes)
numeric_features = ['a', 'b', 'd', 'e']
numeric_features_wo_miss = ['a', 'b', ]
categorial_features = ['c', 'f']
df_ = fill_missing_value(df.copy(), numeric_features)
print(df_)
df_ = fill_missing_value(df.copy(), categorial_features, strategy='constant', fill_value='hehe')
print(df_)
df_ = fill_missing_value(df.copy(), numeric_features, strategy='constant', fill_value=999)
print(df_)
# df_ = label_encode(df.copy(), numeric_features + categorial_features, )
# print(df_)
df_ = split_bins(df.copy(), numeric_features_wo_miss, strategy='quantile')
print(df_)
df_ = min_max_scale(df.copy(), numeric_features, )
print(df_)
df_ = standard_scale(df.copy(), numeric_features, )
print(df_)
df_ = log_transform(df.copy(), numeric_features, )
print(df_)
df_ = max_abs_scale(df.copy(), numeric_features, )
print(df_)
df_ = robust_scale(df.copy(), numeric_features, )
print(df_)
run()
samples = pd.DataFrame(
data,
columns=["Column_name", "Data_type", "NaN_Frequency(%)", "N_unique"],
)
return samples.to_dict(orient='list')

View file

@ -3,172 +3,290 @@
# @Time : 2023/11/17 10:33
# @Author : lidanyang
# @File : feature_engineering.py
# @Desc : Feature Engineering Functions
# @Desc : Feature Engineering Tools
import itertools
import numpy as np
import pandas as pd
from dateutil.relativedelta import relativedelta
from joblib import Parallel, delayed
from pandas.api.types import is_numeric_dtype
from sklearn.preprocessing import PolynomialFeatures, OneHotEncoder
from pandas.core.dtypes.common import is_object_dtype
from sklearn.model_selection import KFold
from sklearn.preprocessing import PolynomialFeatures, KBinsDiscretizer
from metagpt.tools.functions import registry
from metagpt.tools.functions.schemas.feature_engineering import *
from metagpt.tools.functions.libs.base import MLProcess
@registry.register("feature_engineering", PolynomialExpansion)
def polynomial_expansion(df, cols, degree=2):
for col in cols:
if not is_numeric_dtype(df[col]):
raise ValueError(f"Column '{col}' must be numeric.")
class PolynomialExpansion(MLProcess):
def __init__(self, cols: list, degree: int = 2):
self.cols = cols
self.degree = degree
self.poly = PolynomialFeatures(degree=degree, include_bias=False)
poly = PolynomialFeatures(degree=degree, include_bias=False)
ts_data = poly.fit_transform(df[cols].fillna(0))
new_columns = poly.get_feature_names_out(cols)
ts_data = pd.DataFrame(ts_data, columns=new_columns, index=df.index)
ts_data = ts_data.drop(cols, axis=1)
df = pd.concat([df, ts_data], axis=1)
return df
def fit(self, df: pd.DataFrame):
self.poly.fit(df[self.cols].fillna(0))
def transform(self, df: pd.DataFrame) -> pd.DataFrame:
ts_data = self.poly.transform(df[self.cols].fillna(0))
column_name = self.poly.get_feature_names_out(self.cols)
ts_data = pd.DataFrame(ts_data, index=df.index, columns=column_name)
df.drop(self.cols, axis=1, inplace=True)
df = pd.concat([df, ts_data], axis=1)
return df
@registry.register("feature_engineering", OneHotEncoding)
def one_hot_encoding(df, cols):
enc = OneHotEncoder(handle_unknown="ignore", sparse=False)
ts_data = enc.fit_transform(df[cols])
new_columns = enc.get_feature_names_out(cols)
ts_data = pd.DataFrame(ts_data, columns=new_columns, index=df.index)
df.drop(cols, axis=1, inplace=True)
df = pd.concat([df, ts_data], axis=1)
return df
class CatCount(MLProcess):
def __init__(self, col: str):
self.col = col
self.encoder_dict = None
def fit(self, df: pd.DataFrame):
self.encoder_dict = df[self.col].value_counts().to_dict()
def transform(self, df: pd.DataFrame) -> pd.DataFrame:
df[f"{self.col}_cnt"] = df[self.col].map(self.encoder_dict)
return df
@registry.register("feature_engineering", FrequencyEncoding)
def frequency_encoding(df, cols):
for col in cols:
encoder_dict = df[col].value_counts().to_dict()
df[f"{col}_cnt"] = df[col].map(encoder_dict)
return df
class TargetMeanEncoder(MLProcess):
def __init__(self, col: str, label: str):
self.col = col
self.label = label
self.encoder_dict = None
def fit(self, df: pd.DataFrame):
self.encoder_dict = df.groupby(self.col)[self.label].mean().to_dict()
def transform(self, df: pd.DataFrame) -> pd.DataFrame:
df[f"{self.col}_target_mean"] = df[self.col].map(self.encoder_dict)
return df
@registry.register("feature_engineering", CatCross)
def cat_cross(df, cols, max_cat_num=100):
for col in cols:
if df[col].nunique() > max_cat_num:
cols.remove(col)
class KFoldTargetMeanEncoder(MLProcess):
def __init__(self, col: str, label: str, n_splits: int = 5, random_state: int = 2021):
self.col = col
self.label = label
self.n_splits = n_splits
self.random_state = random_state
self.encoder_dict = None
for col1, col2 in itertools.combinations(cols, 2):
cross_col = f"{col1}_cross_{col2}"
df[cross_col] = df[col1].astype(str) + "_" + df[col2].astype(str)
return df
def fit(self, df: pd.DataFrame):
tmp = df.copy()
kf = KFold(n_splits=self.n_splits, shuffle=True, random_state=self.random_state)
@registry.register("feature_engineering", GroupStat)
def group_stat(df, group_col, agg_col, agg_funcs):
group_df = df.groupby(group_col)[agg_col].agg(agg_funcs).reset_index()
group_df.columns = group_col + [
f"{agg_col}_{agg_func}_by_{group_col}" for agg_func in agg_funcs
]
df = df.merge(group_df, on=group_col, how="left")
return df
@registry.register("feature_engineering", ExtractTimeComps)
def extract_time_comps(df, time_col, time_comps):
time_s = pd.to_datetime(df[time_col], errors="coerce")
time_comps_df = pd.DataFrame()
if "year" in time_comps:
time_comps_df["year"] = time_s.dt.year
if "month" in time_comps:
time_comps_df["month"] = time_s.dt.month
if "day" in time_comps:
time_comps_df["day"] = time_s.dt.day
if "hour" in time_comps:
time_comps_df["hour"] = time_s.dt.hour
if "dayofweek" in time_comps:
time_comps_df["dayofweek"] = time_s.dt.dayofweek + 1
if "is_weekend" in time_comps:
time_comps_df["is_weekend"] = time_s.dt.dayofweek.isin([5, 6]).astype(int)
df = pd.concat([df, time_comps_df], axis=1)
return df
@registry.register("feature_engineering", FeShiftByTime)
def fe_shift_by_time(df, time_col, group_col, shift_col, periods, freq):
df[time_col] = pd.to_datetime(df[time_col])
def shift_datetime(date, offset, unit):
if unit in ["year", "y", "Y"]:
return date + relativedelta(years=offset)
elif unit in ["month", "m", "M"]:
return date + relativedelta(months=offset)
elif unit in ["day", "d", "D"]:
return date + relativedelta(days=offset)
elif unit in ["week", "w", "W"]:
return date + relativedelta(weeks=offset)
elif unit in ["hour", "h", "H"]:
return date + relativedelta(hours=offset)
else:
return date
def shift_by_time_on_key(
inner_df, time_col, group_col, shift_col, offset, unit, col_name
):
inner_df = inner_df.drop_duplicates()
inner_df[time_col] = inner_df[time_col].map(
lambda x: shift_datetime(x, offset, unit)
)
inner_df = inner_df.groupby([time_col, group_col], as_index=False)[
shift_col
].mean()
inner_df.rename(columns={shift_col: col_name}, inplace=True)
return inner_df
shift_df = df[[time_col, group_col, shift_col]].copy()
for period in periods:
new_col_name = f"{group_col}_{shift_col}_lag_{period}_{freq}"
tmp = shift_by_time_on_key(
shift_df, time_col, group_col, shift_col, period, freq, new_col_name
)
df = df.merge(tmp, on=[time_col, group_col], how="left")
return df
@registry.register("feature_engineering", FeRollingByTime)
def fe_rolling_by_time(df, time_col, group_col, rolling_col, periods, freq, agg_funcs):
df[time_col] = pd.to_datetime(df[time_col])
def rolling_by_time_on_key(inner_df, offset, unit, agg_func, col_name):
time_freq = {
"Y": [365 * offset, "D"],
"M": [30 * offset, "D"],
"D": [offset, "D"],
"W": [7 * offset, "D"],
"H": [offset, "h"],
}
if agg_func not in ["mean", "std", "max", "min", "median", "sum", "count"]:
raise ValueError(f"Invalid agg function: {agg_func}")
rolling_feat = inner_df.rolling(
f"{time_freq[unit][0]}{time_freq[unit][1]}", closed="left"
)
rolling_feat = getattr(rolling_feat, agg_func)()
depth = df.columns.nlevels
rolling_feat = rolling_feat.stack(list(range(depth)))
rolling_feat.name = col_name
return rolling_feat
rolling_df = df[[time_col, group_col, rolling_col]].copy()
for period in periods:
for func in agg_funcs:
new_col_name = f"{group_col}_{rolling_col}_rolling_{period}_{freq}_{func}"
tmp = pd.pivot_table(
rolling_df,
index=time_col,
values=rolling_col,
columns=group_col,
global_mean = tmp[self.label].mean()
col_name = f"{self.col}_kf_target_mean"
for trn_idx, val_idx in kf.split(tmp, tmp[self.label]):
_trn, _val = tmp.iloc[trn_idx], tmp.iloc[val_idx]
tmp.loc[tmp.index[val_idx], col_name] = _val[self.col].map(
_trn.groupby(self.col)[self.label].mean()
)
tmp = rolling_by_time_on_key(tmp, period, freq, func, new_col_name)
df = df.merge(tmp, on=[time_col, group_col], how="left")
tmp[col_name].fillna(global_mean, inplace=True)
self.encoder_dict = tmp.groupby(self.col)[col_name].mean().to_dict()
return df
def transform(self, df: pd.DataFrame) -> pd.DataFrame:
df[f"{self.col}_kf_target_mean"] = df[self.col].map(self.encoder_dict)
return df
class CatCross(MLProcess):
def __init__(self, cols: list, max_cat_num: int = 100):
self.cols = cols
self.max_cat_num = max_cat_num
self.combs = []
self.combs_map = {}
@staticmethod
def cross_two(comb, df):
new_col = f'{comb[0]}_{comb[1]}'
new_col_combs = list(itertools.product(df[comb[0]].unique(), df[comb[1]].unique()))
ll = list(range(len(new_col_combs)))
comb_map = dict(zip(new_col_combs, ll))
return new_col, comb_map
def fit(self, df: pd.DataFrame):
for col in self.cols:
if df[col].nunique() > self.max_cat_num:
self.cols.remove(col)
self.combs = list(itertools.combinations(self.cols, 2))
res = Parallel(n_jobs=4, require='sharedmem')(
delayed(self.cross_two)(comb, df) for comb in self.combs)
self.combs_map = dict(res)
def transform(self, df: pd.DataFrame) -> pd.DataFrame:
for comb in self.combs:
new_col = f'{comb[0]}_{comb[1]}'
_map = self.combs_map[new_col]
df[new_col] = pd.Series(zip(df[comb[0]], df[comb[1]])).map(_map)
# set the unknown value to a new number
df[new_col].fillna(max(_map.values()) + 1, inplace=True)
df[new_col] = df[new_col].astype(int)
return df
class GroupStat(MLProcess):
def __init__(self, group_col: str, agg_col: str, agg_funcs: list):
self.group_col = group_col
self.agg_col = agg_col
self.agg_funcs = agg_funcs
self.group_df = None
def fit(self, df: pd.DataFrame):
group_df = df.groupby(self.group_col)[self.agg_col].agg(self.agg_funcs).reset_index()
group_df.columns = [self.group_col] + [
f"{self.agg_col}_{agg_func}_by_{self.group_col}" for agg_func in self.agg_funcs
]
self.group_df = group_df
def transform(self, df: pd.DataFrame) -> pd.DataFrame:
df = df.merge(self.group_df, on=self.group_col, how="left")
return df
class SplitBins(MLProcess):
def __init__(self, cols: str, strategy: str = 'quantile'):
self.cols = cols
self.strategy = strategy
self.encoder = None
def fit(self, df: pd.DataFrame):
self.encoder = KBinsDiscretizer(strategy=self.strategy, encode='ordinal')
self.encoder.fit(df[self.cols].fillna(0))
def transform(self, df: pd.DataFrame) -> pd.DataFrame:
df[self.cols] = self.encoder.transform(df[self.cols].fillna(0))
return df
# @registry.register("feature_engineering", ExtractTimeComps)
# def extract_time_comps(df, time_col, time_comps):
# time_s = pd.to_datetime(df[time_col], errors="coerce")
# time_comps_df = pd.DataFrame()
#
# if "year" in time_comps:
# time_comps_df["year"] = time_s.dt.year
# if "month" in time_comps:
# time_comps_df["month"] = time_s.dt.month
# if "day" in time_comps:
# time_comps_df["day"] = time_s.dt.day
# if "hour" in time_comps:
# time_comps_df["hour"] = time_s.dt.hour
# if "dayofweek" in time_comps:
# time_comps_df["dayofweek"] = time_s.dt.dayofweek + 1
# if "is_weekend" in time_comps:
# time_comps_df["is_weekend"] = time_s.dt.dayofweek.isin([5, 6]).astype(int)
# df = pd.concat([df, time_comps_df], axis=1)
# return df
#
#
# @registry.register("feature_engineering", FeShiftByTime)
# def fe_shift_by_time(df, time_col, group_col, shift_col, periods, freq):
# df[time_col] = pd.to_datetime(df[time_col])
#
# def shift_datetime(date, offset, unit):
# if unit in ["year", "y", "Y"]:
# return date + relativedelta(years=offset)
# elif unit in ["month", "m", "M"]:
# return date + relativedelta(months=offset)
# elif unit in ["day", "d", "D"]:
# return date + relativedelta(days=offset)
# elif unit in ["week", "w", "W"]:
# return date + relativedelta(weeks=offset)
# elif unit in ["hour", "h", "H"]:
# return date + relativedelta(hours=offset)
# else:
# return date
#
# def shift_by_time_on_key(
# inner_df, time_col, group_col, shift_col, offset, unit, col_name
# ):
# inner_df = inner_df.drop_duplicates()
# inner_df[time_col] = inner_df[time_col].map(
# lambda x: shift_datetime(x, offset, unit)
# )
# inner_df = inner_df.groupby([time_col, group_col], as_index=False)[
# shift_col
# ].mean()
# inner_df.rename(columns={shift_col: col_name}, inplace=True)
# return inner_df
#
# shift_df = df[[time_col, group_col, shift_col]].copy()
# for period in periods:
# new_col_name = f"{group_col}_{shift_col}_lag_{period}_{freq}"
# tmp = shift_by_time_on_key(
# shift_df, time_col, group_col, shift_col, period, freq, new_col_name
# )
# df = df.merge(tmp, on=[time_col, group_col], how="left")
#
# return df
#
#
# @registry.register("feature_engineering", FeRollingByTime)
# def fe_rolling_by_time(df, time_col, group_col, rolling_col, periods, freq, agg_funcs):
# df[time_col] = pd.to_datetime(df[time_col])
#
# def rolling_by_time_on_key(inner_df, offset, unit, agg_func, col_name):
# time_freq = {
# "Y": [365 * offset, "D"],
# "M": [30 * offset, "D"],
# "D": [offset, "D"],
# "W": [7 * offset, "D"],
# "H": [offset, "h"],
# }
#
# if agg_func not in ["mean", "std", "max", "min", "median", "sum", "count"]:
# raise ValueError(f"Invalid agg function: {agg_func}")
#
# rolling_feat = inner_df.rolling(
# f"{time_freq[unit][0]}{time_freq[unit][1]}", closed="left"
# )
# rolling_feat = getattr(rolling_feat, agg_func)()
# depth = df.columns.nlevels
# rolling_feat = rolling_feat.stack(list(range(depth)))
# rolling_feat.name = col_name
# return rolling_feat
#
# rolling_df = df[[time_col, group_col, rolling_col]].copy()
# for period in periods:
# for func in agg_funcs:
# new_col_name = f"{group_col}_{rolling_col}_rolling_{period}_{freq}_{func}"
# tmp = pd.pivot_table(
# rolling_df,
# index=time_col,
# values=rolling_col,
# columns=group_col,
# )
# tmp = rolling_by_time_on_key(tmp, period, freq, func, new_col_name)
# df = df.merge(tmp, on=[time_col, group_col], how="left")
#
# return df
class GeneralSelection(MLProcess):
def __init__(self, label_col: str):
self.label_col = label_col
self.feats = []
def fit(self, df: pd.DataFrame):
feats = [f for f in df.columns if f != self.label_col]
for col in df.columns:
if df[col].isnull().sum() / df.shape[0] == 1:
feats.remove(col)
if df[col].nunique() == 1:
feats.remove(col)
if (
df.loc[df[col] == np.inf].shape[0] != 0
or df.loc[df[col] == np.inf].shape[0] != 0
):
feats.remove(col)
if is_object_dtype(df[col]) and df[col].nunique() == df.shape[0]:
feats.remove(col)
self.feats = feats
def transform(self, df: pd.DataFrame) -> pd.DataFrame:
df = df[self.feats + [self.label_col]]
return df

View file

@ -1,196 +0,0 @@
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.linear_model import LinearRegression
from sklearn.ensemble import RandomForestRegressor
from sklearn.ensemble import GradientBoostingRegressor
from metagpt.tools.functions import registry
from metagpt.tools.functions.schemas.ml_model import *
#########
## 分类 ##
#########
@registry.register("classification_model", LogisticRegressionClassification)
def logistic_regression_classification(df, label, test_size=0.2, penalty='l2', dual=False):
nonnumeric_columns = [col for col in df if df[col].dtype == 'object']
for col in nonnumeric_columns:
df[col] = LabelEncoder().fit_transform(df[col])
df = df.fillna(0)
features = [col for col in df if col != label]
x, y = df[features], df[label]
tr_x, te_x, tr_y, te_y = train_test_split(x, y, test_size=test_size, random_state=1)
model = LogisticRegression(penalty=penalty, dual=dual)
model.fit(tr_x, tr_y, )
te_pred_prob = model.predict_proba(te_x)
res = {
'te_pred_prob': te_pred_prob
}
return res
@registry.register("classification_model", RandomForestClassification)
def random_forest_classification(df, label, test_size=0.2, n_estimators=100, criterion='gini'):
nonnumeric_columns = [col for col in df if df[col].dtype == 'object']
for col in nonnumeric_columns:
df[col] = LabelEncoder().fit_transform(df[col])
df = df.fillna(0)
features = [col for col in df if col != label]
x, y = df[features], df[label]
tr_x, te_x, tr_y, te_y = train_test_split(x, y, test_size=test_size, random_state=1)
model = RandomForestClassifier(n_estimators=n_estimators, criterion=criterion)
model.fit(tr_x, tr_y, )
te_pred_prob = model.predict_proba(te_x)
res = {
'te_pred_prob': te_pred_prob
}
return res
@registry.register("classification_model", GradientBoostingClassification)
def gradient_boosting_classification(df, label, test_size=0.2, n_estimators=100, learning_rate=0.1):
nonnumeric_columns = [col for col in df if df[col].dtype == 'object']
for col in nonnumeric_columns:
df[col] = LabelEncoder().fit_transform(df[col])
df = df.fillna(0)
features = [col for col in df if col != label]
x, y = df[features], df[label]
tr_x, te_x, tr_y, te_y = train_test_split(x, y, test_size=test_size, random_state=1)
model = GradientBoostingClassifier(n_estimators=n_estimators, learning_rate=learning_rate)
model.fit(tr_x, tr_y, )
te_pred_prob = model.predict_proba(te_x)
res = {
'te_pred_prob': te_pred_prob
}
return res
#########
## 回归 ##
#########
@registry.register("regression_model", LinearRegressionRegression)
def linear_regression(df, label, test_size=0.2, ):
nonnumeric_columns = [col for col in df if df[col].dtype == 'object']
for col in nonnumeric_columns:
df[col] = LabelEncoder().fit_transform(df[col])
df = df.fillna(0)
features = [col for col in df if col != label]
x, y = df[features], df[label]
tr_x, te_x, tr_y, te_y = train_test_split(x, y, test_size=test_size, random_state=1)
model = LinearRegression()
model.fit(tr_x, tr_y, )
te_pred_prob = model.predict(te_x)
res = {
'te_pred_prob': te_pred_prob
}
return res
@registry.register("regression_model", RandomForestRegression)
def random_forest_regression(df, label, test_size=0.2, n_estimators=100, criterion='squared_error'):
nonnumeric_columns = [col for col in df if df[col].dtype == 'object']
for col in nonnumeric_columns:
df[col] = LabelEncoder().fit_transform(df[col])
df = df.fillna(0)
features = [col for col in df if col != label]
x, y = df[features], df[label]
tr_x, te_x, tr_y, te_y = train_test_split(x, y, test_size=test_size, random_state=1)
model = RandomForestRegressor(n_estimators=n_estimators, criterion=criterion)
model.fit(tr_x, tr_y, )
te_pred_prob = model.predict(te_x)
res = {
'te_pred_prob': te_pred_prob
}
return res
@registry.register("regression_model", GradientBoostingRegression)
def gradient_boosting_regression(df, label, test_size=0.2, n_estimators=100, learning_rate=0.1):
nonnumeric_columns = [col for col in df if df[col].dtype == 'object']
for col in nonnumeric_columns:
df[col] = LabelEncoder().fit_transform(df[col])
df = df.fillna(0)
features = [col for col in df if col != label]
x, y = df[features], df[label]
tr_x, te_x, tr_y, te_y = train_test_split(x, y, test_size=test_size, random_state=1)
model = GradientBoostingRegressor(n_estimators=n_estimators, learning_rate=learning_rate)
model.fit(tr_x, tr_y, )
te_pred_prob = model.predict(te_x)
res = {
'te_pred_prob': te_pred_prob
}
return res
if __name__ == '__main__':
def run():
from sklearn.datasets import load_iris
loader = load_iris(as_frame=True)
df = loader['data']
df['target'] = loader['target']
df[df.columns[0]] = df[df.columns[0]].astype(str)
df[df.columns[1]] = df[df.columns[1]].astype(int)
df['target'] = df['target'].astype(str)
print(df)
print('####'*5)
res = logistic_regression_classification(df, 'target', test_size=0.25, penalty='l2', dual=False)
print(res['te_pred_prob'])
print('####'*5)
res = random_forest_classification(df, 'target', test_size=0.25, n_estimators=100, criterion='gini')
print(res['te_pred_prob'])
print('####'*5)
res = gradient_boosting_classification(df, 'target', test_size=0.25, n_estimators=100, learning_rate=0.1)
print(res['te_pred_prob'])
from sklearn.datasets import make_regression
import pandas as pd
loader = make_regression()
df = pd.DataFrame(loader[0])
df['target'] = loader[1]
df[df.columns[0]] = df[df.columns[0]].astype(str)
df[df.columns[1]] = df[df.columns[1]].astype(int)
# df['target'] = df['target'].astype(str)
print(df)
print('####' * 5)
res = linear_regression(df, 'target', test_size=0.25, )
print(res['te_pred_prob'])
print('####' * 5)
res = random_forest_regression(df, 'target', test_size=0.25, n_estimators=100, criterion='squared_error')
print(res['te_pred_prob'])
print('####' * 5)
res = gradient_boosting_regression(df, 'target', test_size=0.25, n_estimators=100, learning_rate=0.1)
print(res['te_pred_prob'])
run()

View file

@ -1,6 +0,0 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2023/11/16 16:37
# @Author : lidanyang
# @File : __init__.py
# @Desc :

View file

@ -1,78 +0,0 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2023/11/16 16:38
# @Author : lidanyang
# @File : register.py
# @Desc :
import inspect
from typing import Type, Optional, Callable, Dict, Union, List
from metagpt.tools.functions.schemas.base import ToolSchema
class FunctionRegistry:
def __init__(self):
self.functions: Dict[str, Dict[str, Dict]] = {}
@staticmethod
def _check_param_consistency(func_params, schema):
param_names = set(func_params.keys())
schema_names = set(schema["parameters"]["properties"].keys())
if param_names != schema_names:
raise ValueError("Function parameters do not match schema properties")
def register(self, module: str, tool_schema: Type[ToolSchema]) -> Callable:
def wrapper(func: Callable) -> Callable:
module_registry = self.functions.setdefault(module, {})
if func.__name__ in module_registry:
raise ValueError(f"Function {func.__name__} is already registered in {module}")
func_params = inspect.signature(func).parameters
schema = tool_schema.schema()
schema["name"] = func.__name__
self._check_param_consistency(func_params, schema)
module_registry[func.__name__] = {
"func": func,
"schema": schema,
}
return func
return wrapper
def get(self, module: str, name: str) -> Optional[Union[Callable, Dict]]:
"""Get function by module and name"""
module_registry = self.functions.get(module, {})
return module_registry.get(name)
def get_by_name(self, name: str) -> Optional[Dict]:
"""Get function by name"""
for module_registry in self.functions.values():
if name in module_registry:
return module_registry.get(name, {})
def get_all_by_module(self, module: str) -> Optional[Dict]:
"""Get all functions by module"""
return self.functions.get(module, {})
def get_schema(self, module: str, name: str) -> Optional[Dict]:
"""Get schema by module and name"""
module_registry = self.functions.get(module, {})
return module_registry.get(name, {}).get("schema")
def get_schemas(self, module: str, names: List[str]) -> List[Dict]:
"""Get schemas by module and names"""
module_registry = self.functions.get(module, {})
return [module_registry.get(name, {}).get("schema") for name in names]
def get_all_schema_by_module(self, module: str) -> List[Dict]:
"""Get all schemas by module"""
module_registry = self.functions.get(module, {})
return [v.get("schema") for v in module_registry.values()]
registry = FunctionRegistry()

View file

@ -1,100 +0,0 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2023/11/16 16:34
# @Author : lidanyang
# @File : base.py
# @Desc : Build base class to generate schema for tool
from typing import Any, List, Optional, get_type_hints
class NoDefault:
"""
A class to represent a missing default value.
This is used to distinguish between a default value of None and a missing default value.
"""
pass
def tool_field(
description: str, default: Any = NoDefault(), enum: Optional[List[Any]] = None, **kwargs
):
"""
Create a field for a tool parameter.
Args:
description (str): A description of the field.
default (Any, optional): The default value for the field. Defaults to None.
enum (Optional[List[Any]], optional): A list of possible values for the field. Defaults to None.
**kwargs: Additional keyword arguments.
Returns:
dict: A dictionary representing the field with provided attributes.
"""
field_info = {
"description": description,
"default": default,
"enum": enum,
}
field_info.update(kwargs)
return field_info
class ToolSchema:
@staticmethod
def format_type(type_hint):
"""
Format a type hint into a string representation.
Args:
type_hint (type): The type hint to format.
Returns:
str: A string representation of the type hint.
"""
if isinstance(type_hint, type):
# Handle built-in types separately
if type_hint.__module__ == "builtins":
return type_hint.__name__
else:
return f"{type_hint.__module__}.{type_hint.__name__}"
elif hasattr(type_hint, "__origin__") and hasattr(type_hint, "__args__"):
# Handle generic types (like List[int])
origin_type = ToolSchema.format_type(type_hint.__origin__)
args_type = ", ".join(
[ToolSchema.format_type(t) for t in type_hint.__args__]
)
return f"{origin_type}[{args_type}]"
else:
return str(type_hint)
@classmethod
def schema(cls):
"""
Generate a schema dictionary for the class.
The schema includes the class name, description, and information about
each class parameter based on type hints and field definitions.
Returns:
dict: A dictionary representing the schema of the class.
"""
schema = {
"name": cls.__name__,
"description": cls.__doc__,
"parameters": {"type": "object", "properties": {}, "required": []},
}
type_hints = get_type_hints(cls)
for attr, type_hint in type_hints.items():
value = getattr(cls, attr, None)
if isinstance(value, dict):
# Process each attribute that is defined using the field function
prop_info = {k: v for k, v in value.items() if v is not None or k == "default"}
if isinstance(prop_info["default"], NoDefault):
del prop_info["default"]
prop_info["type"] = ToolSchema.format_type(type_hint)
schema["parameters"]["properties"][attr] = prop_info
# Check for required fields
if "default" not in prop_info:
schema["parameters"]["required"].append(attr)
return schema

View file

@ -1,62 +0,0 @@
import pandas as pd
from metagpt.tools.functions.schemas.base import tool_field, ToolSchema
class FillMissingValue(ToolSchema):
"""Completing missing values with simple strategies"""
df: pd.DataFrame = tool_field(description="input dataframe")
features: list = tool_field(description="columns to be processed")
strategy: str = tool_field(description="the imputation strategy", default='mean')
fill_value: int = tool_field(description="fill_value is used to replace all occurrences of missing_values", default=None)
# class LabelEncode(ToolSchema):
# """Completing missing values with simple strategies"""
# df: pd.DataFrame = tool_field(description="input dataframe")
# features: list = tool_field(description="columns to be processed")
class SplitBins(ToolSchema):
"""Bin continuous data into intervals and return the bin identifier encoded as an integer value"""
df: pd.DataFrame = tool_field(description="input dataframe")
features: list = tool_field(description="columns to be processed")
strategy: str = tool_field(description="Strategy used to define the widths of the bins", default='quantile')
class MinMaxScale(ToolSchema):
"""Transform features by scaling each feature to a range, witch is (0, 1)"""
df: pd.DataFrame = tool_field(description="input dataframe")
features: list = tool_field(description="columns to be processed")
class StandardScale(ToolSchema):
"""Standardize features by removing the mean and scaling to unit variance"""
df: pd.DataFrame = tool_field(description="input dataframe")
features: list = tool_field(description="columns to be processed")
class LogTransform(ToolSchema):
"""Performs a logarithmic transformation on the specified columns"""
df: pd.DataFrame = tool_field(description="input dataframe")
features: list = tool_field(description="columns to be processed")
class MaxAbsScale(ToolSchema):
"""Scale each feature by its maximum absolute value"""
df: pd.DataFrame = tool_field(description="input dataframe")
features: list = tool_field(description="columns to be processed")
class RobustScale(ToolSchema):
"""Scale features using statistics that are robust to outliers, the quantile_range is (25.0, 75.0)"""
df: pd.DataFrame = tool_field(description="input dataframe")
features: list = tool_field(description="columns to be processed")
class OrdinalEncode(ToolSchema):
"""Encode categorical features as an integer array"""
df: pd.DataFrame = tool_field(description="input dataframe")
features: list = tool_field(description="columns to be processed")

View file

@ -0,0 +1,306 @@
FillMissingValue:
type: class
description: "Completing missing values with simple strategies"
methods:
__init__:
description: "Initialize self."
parameters:
properties:
features:
type: list
description: "columns to be processed"
strategy:
type: str
description: "the imputation strategy"
default: mean
enum:
- mean
- median
- most_frequent
- constant
fill_value:
type: int
description: "fill_value is used to replace all occurrences of missing_values"
default: null
required:
- features
fit:
description: "Fit the FillMissingValue model."
parameters:
properties:
df:
type: DataFrame
description: "The input DataFrame."
required:
- df
transform:
description: "Transform the input DataFrame with the fitted model."
parameters:
properties:
df:
type: DataFrame
description: "The input DataFrame."
required:
- df
returns:
df:
type: DataFrame
description: "The transformed DataFrame."
fit_transform:
description: "Fit and transform the input DataFrame."
parameters:
properties:
df:
type: DataFrame
description: "The input DataFrame."
required:
- df
returns:
df:
type: DataFrame
description: "The transformed DataFrame."
MinMaxScale:
type: class
description: "Transform features by scaling each feature to a range, witch is (0, 1)"
methods:
__init__:
description: "Initialize self."
parameters:
properties:
features:
type: list
description: "columns to be processed"
required:
- features
fit:
description: "Fit the MinMaxScale model."
parameters:
properties:
df:
type: DataFrame
description: "The input DataFrame."
required:
- df
transform:
description: "Transform the input DataFrame with the fitted model."
parameters:
properties:
df:
type: DataFrame
description: "The input DataFrame."
required:
- df
returns:
df:
type: DataFrame
description: "The transformed DataFrame."
fit_transform:
description: "Fit and transform the input DataFrame."
parameters:
properties:
df:
type: DataFrame
description: "The input DataFrame."
required:
- df
returns:
df:
type: DataFrame
description: "The transformed DataFrame."
StandardScale:
type: class
description: "Standardize features by removing the mean and scaling to unit variance"
methods:
__init__:
description: "Initialize self."
parameters:
properties:
features:
type: list
description: "columns to be processed"
required:
- features
fit:
description: "Fit the StandardScale model."
parameters:
properties:
df:
type: DataFrame
description: "The input DataFrame."
required:
- df
transform:
description: "Transform the input DataFrame with the fitted model."
parameters:
properties:
df:
type: DataFrame
description: "The input DataFrame."
required:
- df
returns:
df:
type: DataFrame
description: "The transformed DataFrame."
fit_transform:
description: "Fit and transform the input DataFrame."
parameters:
properties:
df:
type: DataFrame
description: "The input DataFrame."
required:
- df
returns:
df:
type: DataFrame
description: "The transformed DataFrame."
MaxAbsScale:
type: class
description: "cale each feature by its maximum absolute value"
methods:
__init__:
description: "Initialize self."
parameters:
properties:
features:
type: list
description: "columns to be processed"
required:
- features
fit:
description: "Fit the MaxAbsScale model."
parameters:
properties:
df:
type: DataFrame
description: "The input DataFrame."
required:
- df
transform:
description: "Transform the input DataFrame with the fitted model."
parameters:
properties:
df:
type: DataFrame
description: "The input DataFrame."
required:
- df
returns:
df:
type: DataFrame
description: "The transformed DataFrame."
fit_transform:
description: "Fit and transform the input DataFrame."
parameters:
properties:
df:
type: DataFrame
description: "The input DataFrame."
required:
- df
returns:
df:
type: DataFrame
description: "The transformed DataFrame."
LabelEncode:
type: class
description: "Apply label encoding to specified categorical columns in-place."
methods:
__init__:
description: "Initialize self."
parameters:
properties:
features:
type: list
description: "Categorical columns to be label encoded"
required:
- features
fit:
description: "Fit the LabelEncode model."
parameters:
properties:
df:
type: DataFrame
description: "The input DataFrame."
required:
- df
transform:
description: "Transform the input DataFrame with the fitted model."
parameters:
properties:
df:
type: DataFrame
description: "The input DataFrame."
required:
- df
returns:
df:
type: DataFrame
description: "The transformed DataFrame."
fit_transform:
description: "Fit and transform the input DataFrame."
parameters:
properties:
df:
type: DataFrame
description: "The input DataFrame."
required:
- df
returns:
df:
type: DataFrame
description: "The transformed DataFrame."
OneHotEncode:
type: class
description: "Apply one-hot encoding to specified categorical columns, the original columns will be dropped."
methods:
__init__:
description: "Initialize self."
parameters:
properties:
features:
type: list
description: "Categorical columns to be one-hot encoded and dropped"
required:
- features
fit:
description: "Fit the OneHotEncoding model."
parameters:
properties:
df:
type: DataFrame
description: "The input DataFrame."
required:
- df
transform:
description: "Transform the input DataFrame with the fitted model."
parameters:
properties:
df:
type: DataFrame
description: "The input DataFrame."
required:
- df
returns:
df:
type: DataFrame
description: "The transformed DataFrame."
fit_transform:
description: "Fit and transform the input DataFrame."
parameters:
properties:
df:
type: DataFrame
description: "The input DataFrame."
required:
- df
returns:
df:
type: DataFrame
description: "The transformed DataFrame."

View file

@ -1,100 +0,0 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2023/11/17 10:34
# @Author : lidanyang
# @File : feature_engineering.py
# @Desc : Schema for feature engineering functions
from typing import List
import pandas as pd
from metagpt.tools.functions.schemas.base import ToolSchema, tool_field
class PolynomialExpansion(ToolSchema):
"""Generate polynomial and interaction features from selected columns, excluding the bias column."""
df: pd.DataFrame = tool_field(description="DataFrame to process.")
cols: list = tool_field(description="Columns for polynomial expansion.")
degree: int = tool_field(description="Degree of polynomial features.", default=2)
class OneHotEncoding(ToolSchema):
"""Apply one-hot encoding to specified categorical columns in a DataFrame."""
df: pd.DataFrame = tool_field(description="DataFrame to process.")
cols: list = tool_field(description="Categorical columns to be one-hot encoded.")
class FrequencyEncoding(ToolSchema):
"""Convert categorical columns to frequency encoding."""
df: pd.DataFrame = tool_field(description="DataFrame to process.")
cols: list = tool_field(description="Categorical columns to be frequency encoded.")
class CatCross(ToolSchema):
"""Create pairwise crossed features from categorical columns, joining values with '_'."""
df: pd.DataFrame = tool_field(description="DataFrame to process.")
cols: list = tool_field(description="Columns to be pairwise crossed.")
max_cat_num: int = tool_field(
description="Maximum unique categories per crossed feature.", default=100
)
class GroupStat(ToolSchema):
"""Perform aggregation operations on a specified column grouped by certain categories."""
df: pd.DataFrame = tool_field(description="DataFrame to process.")
group_col: str = tool_field(description="Column used for grouping.")
agg_col: str = tool_field(description="Column on which aggregation is performed.")
agg_funcs: list = tool_field(
description="""List of aggregation functions to apply, such as ['mean', 'std'].
Each function must be supported by pandas."""
)
class ExtractTimeComps(ToolSchema):
"""Extract specific time components from a designated time column in a DataFrame."""
df: pd.DataFrame = tool_field(description="DataFrame to process.")
time_col: str = tool_field(
description="The name of the column containing time data."
)
time_comps: List[str] = tool_field(
description="""List of time components to extract.
Each component must be in ['year', 'month', 'day', 'hour', 'dayofweek', 'is_weekend']."""
)
class FeShiftByTime(ToolSchema):
"""Shift column values in a DataFrame based on specified time intervals."""
df: pd.DataFrame = tool_field(description="DataFrame to process.")
time_col: str = tool_field(description="Column for time-based shifting.")
group_col: str = tool_field(description="Column for grouping before shifting.")
shift_col: str = tool_field(description="Column to shift.")
periods: list = tool_field(description="Time intervals for shifting.")
freq: str = tool_field(
description="Frequency unit for time intervals (e.g., 'D', 'M').",
enum=["D", "M", "Y", "W", "H"],
)
class FeRollingByTime(ToolSchema):
"""Calculate rolling statistics for a DataFrame column over time intervals."""
df: pd.DataFrame = tool_field(description="DataFrame to process.")
time_col: str = tool_field(description="Column for time-based rolling.")
group_col: str = tool_field(description="Column for grouping before rolling.")
rolling_col: str = tool_field(description="Column for rolling calculations.")
periods: list = tool_field(description="Window sizes for rolling.")
freq: str = tool_field(
description="Frequency unit for time windows (e.g., 'D', 'M').",
enum=["D", "M", "Y", "W", "H"],
)
agg_funcs: list = tool_field(
description="""List of aggregation functions for rolling, like ['mean', 'std'].
Each function must be in ['mean', 'std', 'min', 'max', 'median', 'sum', 'count']."""
)

View file

@ -0,0 +1,433 @@
PolynomialExpansion:
type: class
description: "Add polynomial and interaction features from selected numeric columns, excluding the bias column."
methods:
__init__:
description: "Initialize self."
parameters:
properties:
cols:
type: list
description: "Columns for polynomial expansion."
degree:
type: int
description: "The degree of the polynomial features."
default: 2
required:
- cols
fit:
description: "Fit the PolynomialExpansion model."
parameters:
properties:
df:
type: DataFrame
description: "The input DataFrame."
required:
- df
transform:
description: "Transform the input DataFrame with the fitted model."
parameters:
properties:
df:
type: DataFrame
description: "The input DataFrame."
required:
- df
returns:
df:
type: DataFrame
description: "The transformed DataFrame."
fit_transform:
description: "Fit and transform the input DataFrame."
parameters:
properties:
df:
type: DataFrame
description: "The input DataFrame."
required:
- df
returns:
df:
type: DataFrame
description: "The transformed DataFrame."
CatCount:
type: class
description: "Add value counts of a categorical column as new feature."
methods:
__init__:
description: "Initialize self."
parameters:
properties:
col:
type: str
description: "Column for value counts."
required:
- col
fit:
description: "Fit the CatCount model."
parameters:
properties:
df:
type: DataFrame
description: "The input DataFrame."
required:
- df
transform:
description: "Transform the input DataFrame with the fitted model."
parameters:
properties:
df:
type: DataFrame
description: "The input DataFrame."
required:
- df
returns:
df:
type: DataFrame
description: "The transformed DataFrame."
fit_transform:
description: "Fit and transform the input DataFrame."
parameters:
properties:
df:
type: DataFrame
description: "The input DataFrame."
required:
- df
returns:
df:
type: DataFrame
description: "The transformed DataFrame."
TargetMeanEncoder:
type: class
description: "Encodes a categorical column by the mean of the label column, and adds the result as a new feature."
methods:
__init__:
description: "Initialize self."
parameters:
properties:
col:
type: str
description: "Column to be mean encoded."
label:
type: str
description: "Predicted label column."
required:
- col
- label
fit:
description: "Fit the TargetMeanEncoder model."
parameters:
properties:
df:
type: DataFrame
description: "The input DataFrame."
required:
- df
transform:
description: "Transform the input DataFrame with the fitted model."
parameters:
properties:
df:
type: DataFrame
description: "The input DataFrame."
required:
- df
returns:
df:
type: DataFrame
description: "The transformed DataFrame."
fit_transform:
description: "Fit and transform the input DataFrame."
parameters:
properties:
df:
type: DataFrame
description: "The input DataFrame."
required:
- df
returns:
df:
type: DataFrame
description: "The transformed DataFrame."
KFoldTargetMeanEncoder:
type: class
description: "Adds a new feature to the DataFrame by k-fold mean encoding of a categorical column using the label column."
methods:
__init__:
description: "Initialize self."
parameters:
properties:
col:
type: str
description: "Column to be k-fold mean encoded."
label:
type: str
description: "Predicted label column."
n_splits:
type: int
description: "Number of splits for K-fold."
default: 5
random_state:
type: int
description: "Random seed."
default: 2021
required:
- col
- label
fit:
description: "Fit the KFoldTargetMeanEncoder model."
parameters:
properties:
df:
type: DataFrame
description: "The input DataFrame."
required:
- df
transform:
description: "Transform the input DataFrame with the fitted model."
parameters:
properties:
df:
type: DataFrame
description: "The input DataFrame."
required:
- df
returns:
df:
type: DataFrame
description: "The transformed DataFrame."
fit_transform:
description: "Fit and transform the input DataFrame."
parameters:
properties:
df:
type: DataFrame
description: "The input DataFrame."
required:
- df
returns:
df:
type: DataFrame
description: "The transformed DataFrame."
CatCross:
type: class
description: "Add pairwise crossed features and convert them to numerical features."
methods:
__init__:
description: "Initialize self."
parameters:
properties:
cols:
type: list
description: "Columns to be pairwise crossed."
max_cat_num:
type: int
description: "Maximum unique categories per crossed feature."
default: 100
required:
- cols
fit:
description: "Fit the CatCross model."
parameters:
properties:
df:
type: DataFrame
description: "The input DataFrame."
required:
- df
transform:
description: "Transform the input DataFrame with the fitted model."
parameters:
properties:
df:
type: DataFrame
description: "The input DataFrame."
required:
- df
returns:
df:
type: DataFrame
description: "The transformed DataFrame."
fit_transform:
description: "Fit and transform the input DataFrame."
parameters:
properties:
df:
type: DataFrame
description: "The input DataFrame."
required:
- df
returns:
df:
type: DataFrame
description: "The transformed DataFrame."
GroupStat:
type: class
description: "Aggregate specified column in a DataFrame grouped by another column, adding new features named '<agg_col>_<agg_func>_by_<group_col>'."
methods:
__init__:
description: "Initialize self."
parameters:
properties:
group_col:
type: str
description: "Column used for grouping."
agg_col:
type: str
description: "Column on which aggregation is performed."
agg_funcs:
type: list
description: >-
List of aggregation functions to apply, such as ['mean', 'std'].
Each function must be supported by pandas.
required:
- group_col
- agg_col
- agg_funcs
fit:
description: "Fit the GroupStat model."
parameters:
properties:
df:
type: DataFrame
description: "The input DataFrame."
required:
- df
transform:
description: "Transform the input DataFrame with the fitted model."
parameters:
properties:
df:
type: DataFrame
description: "The input DataFrame."
required:
- df
returns:
df:
type: DataFrame
description: "The transformed DataFrame."
fit_transform:
description: "Fit and transform the input DataFrame."
parameters:
properties:
df:
type: DataFrame
description: "The input DataFrame."
required:
- df
returns:
df:
type: DataFrame
description: "The transformed DataFrame."
SplitBins:
type: class
description: "Inplace binning of continuous data into intervals, returning integer-encoded bin identifiers directly."
methods:
__init__:
description: "Initialize self."
parameters:
properties:
cols:
type: list
description: "Columns to be binned inplace."
strategy:
type: str
description: "Strategy used to define the widths of the bins."
default: quantile
enum:
- quantile
- uniform
- kmeans
required:
- cols
fit:
description: "Fit the SplitBins model."
parameters:
properties:
df:
type: DataFrame
description: "The input DataFrame."
required:
- df
transform:
description: "Transform the input DataFrame with the fitted model."
parameters:
properties:
df:
type: DataFrame
description: "The input DataFrame."
required:
- df
returns:
df:
type: DataFrame
description: "The transformed DataFrame."
fit_transform:
description: "Fit and transform the input DataFrame."
parameters:
properties:
df:
type: DataFrame
description: "The input DataFrame."
required:
- df
returns:
df:
type: DataFrame
description: "The transformed DataFrame."
GeneralSelection:
type: class
description: "Drop all nan feats and feats with only one unique value."
methods:
__init__:
description: "Initialize self."
parameters:
properties:
label_col:
type: str
description: "Label column name."
required:
- label_col
fit:
description: "Fit the GeneralSelection model."
parameters:
properties:
df:
type: DataFrame
description: "The input DataFrame."
required:
- df
transform:
description: "Transform the input DataFrame with the fitted model."
parameters:
properties:
df:
type: DataFrame
description: "The input DataFrame."
required:
- df
returns:
df:
type: DataFrame
description: "The transformed DataFrame."
fit_transform:
description: "Fit and transform the input DataFrame."
parameters:
properties:
df:
type: DataFrame
description: "The input DataFrame."
required:
- df
returns:
df:
type: DataFrame
description: "The transformed DataFrame."

View file

@ -1,55 +0,0 @@
import pandas as pd
from metagpt.tools.functions.schemas.base import tool_field, ToolSchema
class LogisticRegressionClassification(ToolSchema):
"""Logistic Regression (aka logit, MaxEnt) classifier"""
df: pd.DataFrame = tool_field(description="input dataframe")
label: str = tool_field(description="target name")
test_size: float = tool_field(description="The proportion of the test set to all the data", default=0.2)
penalty: str = tool_field(description="Specify the norm of the penalty", default="l2")
dual: bool = tool_field(description="Dual (constrained) or primal (regularized) formulation", default="l2")
class RandomForestClassification(ToolSchema):
"""random forest is a meta estimator that fits a number of decision tree classifiers on various sub-samples of the dataset and uses averaging to improve the predictive accuracy and control over-fitting"""
df: pd.DataFrame = tool_field(description="input dataframe")
label: str = tool_field(description="target name")
test_size: float = tool_field(description="The proportion of the test set to all the data", default=0.2)
n_estimators: int = tool_field(description="The number of trees in the forest", default=100)
criterion: str = tool_field(description="The function to measure the quality of a split", default="gini")
class GradientBoostingClassification(ToolSchema):
"""Gradient Boosting for classification.This algorithm builds an additive model in a forward stage-wise fashion"""
df: pd.DataFrame = tool_field(description="input dataframe")
label: str = tool_field(description="target name")
test_size: float = tool_field(description="The proportion of the test set to all the data", default=0.2)
n_estimators: int = tool_field(description="The number of boosting stages to perform", default=100)
learning_rate: float = tool_field(description="Learning rate shrinks the contribution of each tree by learning_rate", default=0.1)
class LinearRegressionRegression(ToolSchema):
"""Ordinary least squares Linear Regression."""
df: pd.DataFrame = tool_field(description="input dataframe")
label: str = tool_field(description="target name")
test_size: float = tool_field(description="The proportion of the test set to all the data", default=0.2)
class RandomForestRegression(ToolSchema):
"""random forest is a meta estimator that fits a number of decision tree on various sub-samples of the dataset and uses averaging to improve the predictive accuracy and control over-fitting"""
df: pd.DataFrame = tool_field(description="input dataframe")
label: str = tool_field(description="target name")
test_size: float = tool_field(description="The proportion of the test set to all the data", default=0.2)
n_estimators: int = tool_field(description="The number of trees in the forest", default=100)
criterion: str = tool_field(description="The function to measure the quality of a split", default="squared_error")
class GradientBoostingRegression(ToolSchema):
"""Gradient Boosting for regression.This estimator builds an additive model in a forward stage-wise fashion"""
df: pd.DataFrame = tool_field(description="input dataframe")
label: str = tool_field(description="target name")
test_size: float = tool_field(description="The proportion of the test set to all the data", default=0.2)
n_estimators: int = tool_field(description="The number of boosting stages to perform", default=100)
learning_rate: float = tool_field(description="Learning rate shrinks the contribution of each tree by learning_rate", default=0.1)

View file

@ -315,3 +315,17 @@ def create_func_config(func_schema: dict) -> dict:
"tools": tools,
"tool_choice": tool_choice,
}
def remove_comments(code_str):
"""Remove comments from code."""
pattern = r"(\".*?\"|\'.*?\')|(\#.*?$)"
def replace_func(match):
if match.group(2) is not None:
return ""
else:
return match.group(1)
clean_code = re.sub(pattern, replace_func, code_str, flags=re.MULTILINE)
clean_code = os.linesep.join([s.rstrip() for s in clean_code.splitlines() if s.strip()])
return clean_code

View file

@ -35,7 +35,6 @@ tqdm==4.64.0
# webdriver_manager<3.9
anthropic==0.3.6
typing-inspect==0.8.0
typing_extensions==4.5.0
libcst==1.0.1
qdrant-client==1.4.0
pytest-mock==3.11.1

View file

@ -31,22 +31,15 @@ async def test_tool_recommendation():
step 1: 对数据集进行去重
step 2: 对数据集进行缺失值处理
"""
available_tools = [
{
"name": "fill_missing_value",
"description": "Completing missing values with simple strategies",
},
{
"name": "split_bins",
"description": "Bin continuous data into intervals and return the bin identifier encoded as an integer value",
},
]
available_tools = {
"fill_missing_value": "Completing missing values with simple strategies",
"split_bins": "Bin continuous data into intervals and return the bin identifier encoded as an integer value",
}
write_code = WriteCodeWithTools()
tools = await write_code._tool_recommendation(task, code_steps, available_tools)
assert len(tools) == 2
assert tools[0] == []
assert tools[1] == ["fill_missing_value"]
assert len(tools) == 1
assert tools[0] == ["fill_missing_value"]
@pytest.mark.asyncio
@ -57,7 +50,7 @@ async def test_write_code_with_tools():
"1": Task(
task_id="1",
instruction="随机生成一个pandas DataFrame数据集",
task_type="unknown",
task_type="other",
dependent_task_ids=[],
code="""
import pandas as pd
@ -75,6 +68,10 @@ async def test_write_code_with_tools():
instruction="对数据集进行数据清洗",
task_type="data_preprocess",
dependent_task_ids=["1"],
code_steps="""
{"Step 1": "对数据集进行去重",
"Step 2": "对数据集进行缺失值处理"}
"""
),
}
plan = Plan(
@ -83,13 +80,9 @@ async def test_write_code_with_tools():
task_map=task_map,
current_task_id="2",
)
task_guide = """
step 1: 对数据集进行去重
step 2: 对数据集进行缺失值处理
"""
data_desc = "None"
column_info = ""
code = await write_code.run(messages, plan, task_guide, data_desc)
code = await write_code.run(messages, plan, column_info)
assert len(code) > 0
print(code)