diff --git a/metagpt/actions/write_analysis_code.py b/metagpt/actions/write_analysis_code.py index 51cfa6d49..66e2137fe 100644 --- a/metagpt/actions/write_analysis_code.py +++ b/metagpt/actions/write_analysis_code.py @@ -4,25 +4,40 @@ @Author : orange-crow @File : write_code_v2.py """ +import json from typing import Dict, List, Union from metagpt.actions import Action +from metagpt.prompts.ml_engineer import ( + TOOL_RECOMMENDATION_PROMPT, + SELECT_FUNCTION_TOOLS, + CODE_GENERATOR_WITH_TOOLS, + TOO_ORGANIZATION_PROMPT, + ML_SPECIFIC_PROMPT, + ML_MODULE_MAP, + TOOL_OUTPUT_DESC, +) from metagpt.schema import Message, Plan +from metagpt.tools.functions import registry +from metagpt.utils.common import create_func_config + class BaseWriteAnalysisCode(Action): - - async def run(self, context: List[Message], plan: Plan = None, task_guide: str = "") -> str: + async def run( + self, context: List[Message], plan: Plan = None, task_guide: str = "" + ) -> str: """Run of a code writing action, used in data analysis or modeling Args: context (List[Message]): Action output history, source action denoted by Message.cause_by plan (Plan, optional): Overall plan. Defaults to None. task_guide (str, optional): suggested step breakdown for the current task. Defaults to "". - + Returns: str: The code string. """ + class WriteCodeByGenerate(BaseWriteAnalysisCode): """Write code fully by generation""" DEFAULT_SYSTEM_MSG = """You are Code Interpreter, a world-class programmer that can complete any goal by executing code. Strictly follow the plan and generate code step by step. Each step of the code will be executed on the user's machine, and the user will provide the code execution results to you.**Notice: Use !pip install in a standalone block to install missing packages.**""" # prompt reference: https://github.com/KillianLucas/open-interpreter/blob/v0.1.4/interpreter/system_message.txt @@ -41,24 +56,38 @@ class WriteCodeByGenerate(BaseWriteAnalysisCode): messages = [] for p in prompt: if isinstance(p, str): - messages.append({'role': 'user', 'content': p}) + messages.append({"role": "user", "content": p}) elif isinstance(p, dict): messages.append(p) elif isinstance(p, Message): if isinstance(p.content, str): messages.append(p.to_dict()) - elif isinstance(p.content, dict) and 'code' in p.content: - messages.append(p.content['code']) + elif isinstance(p.content, dict) and "code" in p.content: + messages.append(p.content["code"]) # 添加默认的提示词 - if default_system_msg not in messages[0]['content'] and messages[0]['role'] != 'system': - messages.insert(0, {'role': 'system', 'content': default_system_msg}) - elif default_system_msg not in messages[0]['content'] and messages[0]['role'] == 'system': - messages[0] = {'role': 'system', 'content': messages[0]['content']+default_system_msg} + if ( + default_system_msg not in messages[0]["content"] + and messages[0]["role"] != "system" + ): + messages.insert(0, {"role": "system", "content": default_system_msg}) + elif ( + default_system_msg not in messages[0]["content"] + and messages[0]["role"] == "system" + ): + messages[0] = { + "role": "system", + "content": messages[0]["content"] + default_system_msg, + } return messages async def run( - self, context: [List[Message]], plan: Plan = None, task_guide: str = "", system_msg: str = None, **kwargs + self, + context: [List[Message]], + plan: Plan = None, + task_guide: str = "", + system_msg: str = None, + **kwargs, ) -> str: context.append(Message(content=self.REUSE_CODE_INSTRUCTION, role="user")) prompt = self.process_msg(context, system_msg) @@ -69,5 +98,99 @@ class WriteCodeByGenerate(BaseWriteAnalysisCode): class WriteCodeWithTools(BaseWriteAnalysisCode): """Write code with help of local available tools. Choose tools first, then generate code to use the tools""" - async def run(self, context: List[Message], plan: Plan = None, task_guide: str = "") -> str: - return "print('abc')" + @staticmethod + def _parse_recommend_tools(module: str, recommend_tools: list) -> str: + """ + Converts recommended tools to a JSON string and checks tool availability in the registry. + + Args: + module (str): The module name for querying tools in the registry. + recommend_tools (list): A list of lists of recommended tools for each step. + + Returns: + str: A JSON string with available tools and their schemas for each step. + """ + valid_tools = {} + available_tools = registry.get_all_by_module(module).keys() + for index, tools in enumerate(recommend_tools): + key = f"Step {index + 1}" + tools = [tool for tool in tools if tool in available_tools] + valid_tools[key] = registry.get_schemas(module, tools) + return json.dumps(valid_tools) + + async def _tool_recommendation( + self, task: str, data_desc: str, code_steps: str, available_tools: list + ) -> list: + """ + Recommend tools for each step of the specified task + + Args: + task (str): the task description + data_desc (str): the description of the dataset for the task + code_steps (str): the code steps to generate the full code for the task + available_tools (list): the available tools for the task + + Returns: + list: recommended tools for each step of the specified task + """ + prompt = TOOL_RECOMMENDATION_PROMPT.format( + task=task, + data_desc=data_desc, + code_steps=code_steps, + available_tools=available_tools, + ) + tool_config = create_func_config(SELECT_FUNCTION_TOOLS) + rsp = await self.llm.aask_code(prompt, **tool_config) + recommend_tools = rsp["recommend_tools"] + return recommend_tools + + async def run( + self, + context: List[Message], + plan: Plan = None, + task_guide: str = "", + data_desc: str = "", + ) -> str: + task_type = plan.current_task.task_type + task = plan.current_task.instruction + available_tools = registry.get_all_schema_by_module(task_type) + available_tools = [ + {k: tool[k] for k in ["name", "description"] if k in tool} + for tool in available_tools + ] + task_guide = "\n".join( + [f"Step {step.strip()}" for step in task_guide.split("\n")] + ) + + recommend_tools = await self._tool_recommendation( + task, task_guide, available_tools + ) + recommend_tools = self._parse_recommend_tools(task_type, recommend_tools) + + special_prompt = ML_SPECIFIC_PROMPT.get(task_type, "") + module_name = ML_MODULE_MAP[task_type] + output_desc = TOOL_OUTPUT_DESC.get(task_type, "") + all_tasks = "" + completed_code = "" + + for i, task in enumerate(plan.tasks): + stats = "DONE" if task.is_finished else "TODO" + all_tasks += f"Subtask {task.task_id}: {task.instruction}({stats})\n" + + for task in plan.tasks: + if task.code: + completed_code += task.code + "\n" + + prompt = TOO_ORGANIZATION_PROMPT.format( + all_tasks=all_tasks, + completed_code=completed_code, + data_desc=data_desc, + special_prompt=special_prompt, + code_steps=task_guide, + module_name=module_name, + output_desc=output_desc, + available_tools=recommend_tools, + ) + tool_config = create_func_config(CODE_GENERATOR_WITH_TOOLS) + rsp = await self.llm.aask_code(prompt, **tool_config) + return rsp["code"] diff --git a/metagpt/prompts/ml_engineer.py b/metagpt/prompts/ml_engineer.py new file mode 100644 index 000000000..55ac27d82 --- /dev/null +++ b/metagpt/prompts/ml_engineer.py @@ -0,0 +1,170 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# @Time : 2023/11/24 15:43 +# @Author : lidanyang +# @File : ml_engineer +# @Desc : +TOOL_RECOMMENDATION_PROMPT = """ +## Comprehensive Task Description: +{task} + +## Dataset Description: +Details about the dataset for the project: +{data_desc} + +This task is divided into several steps, and you need to select the most suitable tools for each step. A tool means a function that can be used to help you solve the task. + +## Detailed Code Steps for the Task: +{code_steps} + +## List of Available Tools: +{available_tools} + +## Tool Selection and Instructions: +- For each code step listed above, choose up to five tools that are most likely to be useful in solving the task. +- If you believe that no tools are suitable for a step, indicate with an empty list. +- Only list the names of the tools, not the full schema of each tool. +- The result should only contain tool names that are in the list of available tools. +- The result list should be in the same order as the code steps. +""" + +SELECT_FUNCTION_TOOLS = { + "name": "select_function_tools", + "description": "Given code steps to generate full code for a task, select suitable tools for each step by order.", + "parameters": { + "type": "object", + "properties": { + "recommend_tools": { + "type": "array", + "description": "List of tool names for each code step. Empty list if no tool is suitable.", + "items": { + "type": "array", + "items": { + "type": "string", + }, + }, + }, + }, + "required": ["recommend_tools"], + }, +} + + +CODE_GENERATOR_WITH_TOOLS = { + "name": "add_subtask_code", + "description": "Add new code of current subtask to the end of an active Jupyter notebook.", + "parameters": { + "type": "object", + "properties": { + "code": { + "type": "string", + "description": "The code to be added.", + }, + }, + "required": ["code"], + }, +} + +TOO_ORGANIZATION_PROMPT = """ +As a senior data scientist, your role involves developing code for a specific sub-task within a larger project. This project is divided into several sub-tasks, which may either be new challenges or extensions of previous work. + +## Sub-tasks Overview +Here's a list of all the sub-tasks, indicating their current status (DONE or TODO). Your responsibility is the first TODO task on this list. +{all_tasks} + +## Historical Code (Previously Done Sub-tasks): +This code, already executed in the Jupyter notebook, is critical for understanding the background and foundation for your current task. +```python +{completed_code} +``` + +## Dataset Description: +Details about the dataset for the project: +{data_desc} + +## Current Task Notion: +{special_prompt} + +## Code Steps for Your Sub-task: +Follow these steps to complete your current TODO task. You may use external Python functions or write custom code as needed. Ensure your code is self-contained. +{code_steps} + +When you call a function, you should import the function from `{module_name}` first, e.g.: +```python +from metagpt.tools.functions.libs.feature_engineering import fill_missing_value +``` + +## Available Functions for Each Step: +Each function is described in JSON format, including the function name and parameters. {output_desc} +{available_tools} + +## Your Output Format: +Generate the complete code for every step, listing any used function tools at the beginning of the step: +```python +# Step 1 +# Tools used: [function names or 'none'] + + +# Step 2 +# Tools used: [function names or 'none'] + + +# Continue with additional steps, following the same format... +```end + +*** Important Rules *** +- Use only the tools designated for each code step. +- Your output should only include code for the current sub-task. Don't repeat historical code. +- Only mention functions in comments if used in the code. +- Ensure the output new code is executable in the current Jupyter notebook environment, with all historical code executed. +""" + + +DATA_PREPROCESS_PROMPT = """ +In data preprocessing, closely monitor each column's data type. Apply suitable methods for various types (numerical, categorical, datetime, textual, etc.) to ensure the pandas.DataFrame is correctly formatted. +Additionally, ensure that the columns being processed must be the ones that actually exist in the dataset. +""" + +FEATURE_ENGINEERING_PROMPT = """ +When performing feature engineering, please adhere to the following principles: +- For specific user requests (such as removing a feature, creating a new feature based on existing data), directly generate the corresponding code. +- In cases of unclear user requirements, write feature engineering code that you believe will most improve model performance. This may include feature transformation, combination, aggregation, etc., with a limit of five features at a time. +- Ensure that the feature you're working with is indeed present in the dataset and consider the data type (numerical, categorical, etc.) and application scenario (classification, regression tasks, etc.). +- Importantly, provide detailed comments explaining the purpose of each feature and how it might enhance model performance, especially when the features are generated based on semantic understanding without clear user directives. +""" + +CLASSIFICATION_MODEL_PROMPT = """ +""" + +REGRESSION_MODEL_PROMPT = """ +""" + +DATA_PREPROCESS_OUTPUT_DESC = "Please note that all functions uniformly output a processed pandas.DataFrame, facilitating seamless integration into the broader workflow." + +FEATURE_ENGINEERING_OUTPUT_DESC = "Please note that all functions uniformly output updated pandas.DataFrame with feature engineering applied." + +CLASSIFICATION_MODEL_OUTPUT_DESC = "" + +REGRESSION_MODEL_OUTPUT_DESC = "" + + +ML_SPECIFIC_PROMPT = { + "data_preprocess": DATA_PREPROCESS_PROMPT, + "feature_engineering": FEATURE_ENGINEERING_PROMPT, + "classification_model": CLASSIFICATION_MODEL_PROMPT, + "regression_model": REGRESSION_MODEL_PROMPT, +} + +TOOL_OUTPUT_DESC = { + "data_preprocess": DATA_PREPROCESS_OUTPUT_DESC, + "feature_engineering": FEATURE_ENGINEERING_OUTPUT_DESC, + "classification_model": CLASSIFICATION_MODEL_OUTPUT_DESC, + "regression_model": REGRESSION_MODEL_OUTPUT_DESC, +} + +ML_MODULE_MAP = { + "data_preprocess": "metagpt.tools.functions.libs.machine_learning.data_preprocess", + "feature_engineering": "metagpt.tools.functions.libs.machine_learning.feature_engineering", + "classification_model": "metagpt.tools.functions.libs.machine_learning.ml_model", + "regression_model": "metagpt.tools.functions.libs.machine_learning.ml_model", +} diff --git a/metagpt/roles/ml_engineer.py b/metagpt/roles/ml_engineer.py index 7ad29a532..1e4367372 100644 --- a/metagpt/roles/ml_engineer.py +++ b/metagpt/roles/ml_engineer.py @@ -21,10 +21,11 @@ STRUCTURAL_CONTEXT = """ {current_task} """ + def truncate(result: str, keep_len: int = 1000) -> str: desc = "Truncated to show only the last 1000 characters\n" if result.startswith(desc): - result = result[-len(desc):] + result = result[-len(desc) :] if len(result) > keep_len: result = result[-keep_len:] @@ -35,7 +36,6 @@ def truncate(result: str, keep_len: int = 1000) -> str: class AskReview(Action): - async def run(self, context: List[Message], plan: Plan = None): logger.info("Current overall plan:") logger.info( @@ -57,13 +57,16 @@ class AskReview(Action): return rsp, confirmed -class WriteTaskGuide(Action): +class WriteTaskGuide(Action): async def run(self, task_instruction: str, data_desc: str = "") -> str: return "" + class MLEngineer(Role): - def __init__(self, name="ABC", profile="MLEngineer", goal="", auto_run: bool = False): + def __init__( + self, name="ABC", profile="MLEngineer", goal="", auto_run: bool = False + ): super().__init__(name=name, profile=profile, goal=goal) self._set_react_mode(react_mode="plan_and_act") self.plan = Plan(goal=goal) @@ -73,7 +76,6 @@ class MLEngineer(Role): self.auto_run = auto_run async def _plan_and_act(self): - # create initial plan and update until confirmation await self._update_plan() @@ -99,8 +101,11 @@ class MLEngineer(Role): await self._update_plan() async def _write_and_exec_code(self, max_retry: int = 3): - - task_guide = await WriteTaskGuide().run(self.plan.current_task.instruction) if self.use_task_guide else "" + task_guide = ( + await WriteTaskGuide().run(self.plan.current_task.instruction) + if self.use_task_guide + else "" + ) counter = 0 success = False @@ -112,22 +117,29 @@ class MLEngineer(Role): # print("*" * 10) # breakpoint() - if not self.use_tools: + if not self.use_tools or self.plan.current_task.task_type == "": # code = "print('abc')" - code = await WriteCodeByGenerate().run(context=context, plan=self.plan, task_guide=task_guide) + code = await WriteCodeByGenerate().run( + context=context, plan=self.plan, task_guide=task_guide + ) cause_by = WriteCodeByGenerate - else: - code = await WriteCodeWithTools().run(context=context, plan=self.plan, task_guide=task_guide) + code = await WriteCodeWithTools().run( + context=context, plan=self.plan, task_guide=task_guide, data_desc="" + ) cause_by = WriteCodeWithTools - self.working_memory.add(Message(content=code, role="assistant", cause_by=cause_by)) + self.working_memory.add( + Message(content=code, role="assistant", cause_by=cause_by) + ) result, success = await self.execute_code.run(code) # truncated the result print(truncate(result)) # print(result) - self.working_memory.add(Message(content=result, role="user", cause_by=ExecutePyCode)) + self.working_memory.add( + Message(content=result, role="user", cause_by=ExecutePyCode) + ) if "!pip" in code: success = False @@ -152,7 +164,9 @@ class MLEngineer(Role): while not plan_confirmed: context = self.get_useful_memories() rsp = await WritePlan().run(context, max_tasks=max_tasks) - self.working_memory.add(Message(content=rsp, role="assistant", cause_by=WritePlan)) + self.working_memory.add( + Message(content=rsp, role="assistant", cause_by=WritePlan) + ) plan_confirmed = await self._ask_review() tasks = WritePlan.rsp_to_tasks(rsp) @@ -163,9 +177,13 @@ class MLEngineer(Role): """find useful memories only to reduce context length and improve performance""" user_requirement = self.plan.goal - tasks = json.dumps([task.dict() for task in self.plan.tasks], indent=4, ensure_ascii=False) + tasks = json.dumps( + [task.dict() for task in self.plan.tasks], indent=4, ensure_ascii=False + ) current_task = self.plan.current_task.json() if self.plan.current_task else {} - context = STRUCTURAL_CONTEXT.format(user_requirement=user_requirement, tasks=tasks, current_task=current_task) + context = STRUCTURAL_CONTEXT.format( + user_requirement=user_requirement, tasks=tasks, current_task=current_task + ) context_msg = [Message(content=context, role="user")] return context_msg + self.working_memory.get() @@ -174,6 +192,7 @@ class MLEngineer(Role): def working_memory(self): return self._rc.memory + if __name__ == "__main__": requirement = "Run data analysis on sklearn Iris dataset, include a plot" # requirement = "Run data analysis on sklearn Diabetes dataset, include a plot" diff --git a/metagpt/tools/functions/__init__.py b/metagpt/tools/functions/__init__.py index 7ab850667..b81e85833 100644 --- a/metagpt/tools/functions/__init__.py +++ b/metagpt/tools/functions/__init__.py @@ -6,4 +6,3 @@ # @Desc : from metagpt.tools.functions.register.register import registry import metagpt.tools.functions.libs.feature_engineering -print(registry.functions) \ No newline at end of file diff --git a/metagpt/utils/common.py b/metagpt/utils/common.py index f09666beb..8f8edbc6d 100644 --- a/metagpt/utils/common.py +++ b/metagpt/utils/common.py @@ -305,3 +305,13 @@ def parse_recipient(text): pattern = r"## Send To:\s*([A-Za-z]+)\s*?" # hard code for now recipient = re.search(pattern, text) return recipient.group(1) if recipient else "" + + +def create_func_config(func_schema: dict) -> dict: + """Create new function call config""" + tools = [{"type": "function", "function": func_schema}] + tool_choice = {"type": "function", "function": {"name": func_schema["name"]}} + return { + "tools": tools, + "tool_choice": tool_choice, + }