merge mle & interpreter

This commit is contained in:
yzlin 2024-02-29 18:23:26 +08:00
parent a2b85641c3
commit 4c1ee630f6
13 changed files with 273 additions and 365 deletions

View file

@ -1,93 +0,0 @@
from __future__ import annotations
import json
from metagpt.actions import Action
from metagpt.schema import Message
from metagpt.utils.common import CodeParser
DEBUG_REFLECTION_EXAMPLE = '''
Example 1:
[previous impl]:
```python
def add(a: int, b: int) -> int:
"""
Given integers a and b, return the total value of a and b.
"""
return a - b
```
[runtime Error]:
Tested passed:
Tests failed:
assert add(1, 2) == 3 # output: -1
assert add(1, 2) == 4 # output: -1
[reflection on previous impl]:
The implementation failed the test cases where the input integers are 1 and 2. The issue arises because the code does not add the two integers together, but instead subtracts the second integer from the first. To fix this issue, we should change the operator from `-` to `+` in the return statement. This will ensure that the function returns the correct output for the given input.
[improved impl]:
```python
def add(a: int, b: int) -> int:
"""
Given integers a and b, return the total value of a and b.
"""
return a + b
```
'''
REFLECTION_PROMPT = """
Here is an example for you.
{debug_example}
[context]
{context}
[previous impl]
{code}
[runtime Error]
{runtime_result}
Analysis the error step by step, provide me improve method and code. Remember to follow [context] requirement. Don't forget write code for steps behind the error step.
Output a json following the format:
```json
{{
"reflection": str = "Reflection on previous implementation",
"improved_impl": str = "Refined code after reflection.",
}}
```
"""
class DebugCode(Action):
async def run(
self,
context: list[Message] = None,
code: str = "",
runtime_result: str = "",
) -> str:
"""
Execute the debugging process based on the provided context, code, and runtime_result.
Args:
context (list[Message]): A list of Message objects representing the context.
code (str): The code to be debugged.
runtime_result (str): The result of the code execution.
Returns:
str: The improved implementation based on the debugging process.
"""
reflection_prompt = REFLECTION_PROMPT.format(
debug_example=DEBUG_REFLECTION_EXAMPLE,
context=context,
code=code,
runtime_result=runtime_result,
)
system_prompt = "You are an AI Python assistant. You will be given your previous implementation code of a task, runtime error results, and a hint to change the implementation appropriately. Write your full implementation "
rsp = await self._aask(reflection_prompt, system_msgs=[system_prompt])
reflection = json.loads(CodeParser.parse_code(block=None, text=rsp))
return {"code": reflection["improved_impl"]}

View file

@ -201,7 +201,7 @@ def truncate(result: str, keep_len: int = 2000, is_success: bool = True):
if is_success:
desc = f"Executed code successfully. Truncated to show only first {keep_len} characters\n"
else:
desc = f"Executed code failed, please reflect the cause of bug and then debug. Truncated to show only last {keep_len} characters\n"
desc = f"Executed code failed, please reflect on the cause of bug and then debug. Truncated to show only last {keep_len} characters\n"
if result.strip().startswith("<coroutine object"):
result = "Executed code failed, you need use key word 'await' to run a async code."

View file

@ -1,59 +0,0 @@
from __future__ import annotations
from typing import Tuple
from metagpt.actions import Action
from metagpt.actions.mi.write_analysis_code import WriteCodeWithTools
from metagpt.prompts.mi.ml_action import (
ML_PROMPT,
UPDATE_DATA_COLUMNS,
USE_NO_TOOLS_EXAMPLE,
USE_TOOLS_EXAMPLE,
)
from metagpt.schema import Message, Plan
from metagpt.utils.common import remove_comments
class WriteCodeWithToolsML(WriteCodeWithTools):
async def run(
self,
context: list[Message],
plan: Plan = None,
column_info: str = "",
**kwargs,
) -> Tuple[list[Message], str]:
# prepare tool schemas and tool-type-specific instruction
tool_schemas, tool_type_usage_prompt = await self._prepare_tools(plan=plan)
# ML-specific variables to be used in prompt
finished_tasks = plan.get_finished_tasks()
code_context = [remove_comments(task.code) for task in finished_tasks]
code_context = "\n\n".join(code_context)
# prepare prompt depending on tool availability & LLM call
prompt = ML_PROMPT.format(
user_requirement=plan.goal,
history_code=code_context,
current_task=plan.current_task.instruction,
column_info=column_info,
tool_type_usage_prompt=tool_type_usage_prompt,
tool_schemas=tool_schemas,
examples=USE_TOOLS_EXAMPLE if tool_schemas else USE_NO_TOOLS_EXAMPLE,
)
rsp = await self.llm.aask_code(prompt)
# Extra output to be used for potential debugging
context = [Message(content=prompt, role="user")]
return context, rsp
class UpdateDataColumns(Action):
async def run(self, plan: Plan = None) -> dict:
finished_tasks = plan.get_finished_tasks()
code_context = [remove_comments(task.code) for task in finished_tasks]
code_context = "\n\n".join(code_context)
prompt = UPDATE_DATA_COLUMNS.format(history_code=code_context)
rsp = await self.llm.aask_code(prompt)
return rsp

View file

@ -11,14 +11,19 @@ from typing import Tuple
from metagpt.actions import Action
from metagpt.logs import logger
from metagpt.prompts.mi.ml_action import MODEL_TRAIN_EXAMPLE, USE_ML_TOOLS_EXAMPLE
from metagpt.prompts.mi.write_analysis_code import (
CHECK_DATA_PROMPT,
DEBUG_REFLECTION_EXAMPLE,
REFLECTION_PROMPT,
STRUCTUAL_PROMPT,
TOOL_RECOMMENDATION_PROMPT,
TOOL_USAGE_PROMPT,
)
from metagpt.schema import Message, Plan, SystemMessage
from metagpt.tools import TOOL_REGISTRY
from metagpt.tools.tool_registry import validate_tool_names
from metagpt.utils.common import CodeParser
from metagpt.tools.tool_type import ToolType
from metagpt.utils.common import CodeParser, process_message, remove_comments
class WriteCodeWithTools(Action):
@ -27,7 +32,6 @@ class WriteCodeWithTools(Action):
use_tools: bool = True
# selected tools to choose from, listed by their names. An empty list means selection from all tools.
selected_tools: list[str] = []
DEFAULT_SYSTEM_MSG: str = """You are Code Interpreter, a world-class programmer that can complete any goal by executing code. Strictly follow the plan and generate code step by step. Each step of the code will be executed on the user's machine, and the user will provide the code execution results to you.**Notice: The code for the next step depends on the code for the previous step. Must reuse variables in the lastest other code directly, dont creat it again, it is very import for you. Use !pip install in a standalone block to install missing packages.Usually the libraries you need are already installed.Dont check if packages already imported.**""" # prompt reference: https://github.com/KillianLucas/open-interpreter/blob/v0.1.4/interpreter/system_message.txt
def _insert_system_message(self, context: list[Message], system_msg: str = None):
system_msg = system_msg or self.DEFAULT_SYSTEM_MSG
@ -98,6 +102,16 @@ class WriteCodeWithTools(Action):
TOOL_REGISTRY.get_tool_type(tool_type).usage_prompt if TOOL_REGISTRY.has_tool_type(tool_type) else ""
)
# ML-specific tool usage examples
examples = ""
if plan.current_task.task_type in [
ToolType.DATA_PREPROCESS.type_name,
ToolType.FEATURE_ENGINEERING.type_name,
]:
examples = USE_ML_TOOLS_EXAMPLE
elif plan.current_task.task_type in [ToolType.MODEL_TRAIN.type_name]:
examples = MODEL_TRAIN_EXAMPLE
# prepare schemas of available tools
tool_schemas = {}
available_tools = self._get_tools_by_type(tool_type)
@ -105,27 +119,72 @@ class WriteCodeWithTools(Action):
available_tools = {tool_name: tool.schemas["description"] for tool_name, tool in available_tools.items()}
tool_schemas = await self._recommend_tool(plan.current_task.instruction, available_tools)
return tool_schemas, tool_type_usage_prompt
return tool_schemas, tool_type_usage_prompt, examples
async def _debug_with_reflection(self, context: list[Message], working_memory: list[Message]):
reflection_prompt = REFLECTION_PROMPT.format(
debug_example=DEBUG_REFLECTION_EXAMPLE,
context=context,
previous_impl=working_memory,
)
# print(reflection_prompt)
system_prompt = "You are an AI Python assistant. You will be given your previous implementation code of a task, runtime error results, and a hint to change the implementation appropriately. Write your full implementation "
rsp = await self._aask(reflection_prompt, system_msgs=[system_prompt])
reflection = json.loads(CodeParser.parse_code(block=None, text=rsp))
return reflection["improved_impl"]
async def run(
self,
context: list[Message],
plan: Plan,
working_memory: list[Message] = [],
use_reflection: bool = False,
**kwargs,
) -> str:
if self.use_tools:
# prepare tool schemas and tool-type-specific instruction
tool_schemas, tool_type_usage_prompt = await self._prepare_tools(plan=plan)
# prepare tool schemas and tool-type-specific instruction
tool_schemas, tool_type_usage_prompt, examples = await self._prepare_tools(plan=plan)
# form a complete tool usage instruction and include it as a message in context
tools_instruction = TOOL_USAGE_PROMPT.format(
tool_schemas=tool_schemas, tool_type_usage_prompt=tool_type_usage_prompt
)
context.append(Message(content=tools_instruction, role="user"))
# necessary components to be used in prompt
finished_tasks = plan.get_finished_tasks()
code_written = [remove_comments(task.code) for task in finished_tasks]
code_written = "\n\n".join(code_written)
task_results = [task.result for task in finished_tasks]
task_results = "\n\n".join(task_results)
# prepare prompt & LLM call
prompt = self._insert_system_message(context)
# structure prompt
structual_prompt = STRUCTUAL_PROMPT.format(
user_requirement=plan.goal,
code_written=code_written,
task_results=task_results,
current_task=plan.current_task.instruction,
tool_type_usage_prompt=tool_type_usage_prompt,
tool_schemas=tool_schemas,
examples=examples,
)
context = [Message(content=structual_prompt, role="user")] + working_memory
context = process_message(context)
rsp = await self.llm.aask_code(prompt)
# temp = context + working_memory
# print(*temp, sep="***\n\n***")
return rsp
# LLM call
if not use_reflection:
rsp = await self.llm.aask(context, **kwargs)
code = CodeParser.parse_code(block=None, text=rsp)
else:
code = await self._debug_with_reflection(context=context, working_memory=working_memory)
return code
class CheckData(Action):
async def run(self, plan: Plan = None) -> dict:
finished_tasks = plan.get_finished_tasks()
code_written = [remove_comments(task.code) for task in finished_tasks]
code_written = "\n\n".join(code_written)
prompt = CHECK_DATA_PROMPT.format(code_written=code_written)
rsp = await self._aask(prompt)
code = CodeParser.parse_code(block=None, text=rsp)
return code

View file

@ -4,62 +4,7 @@
# @Author : lidanyang
# @File : ml_action
# @Desc :
UPDATE_DATA_COLUMNS = """
# Background
Keep dataset column information updated before model train.
## Tasks Done
```python
{history_code}
```end
# Task
Print the the latest column information after 'Tasks Done' code. Use the following code:
```python
from metagpt.tools.libs.data_preprocess import get_column_info
column_info = get_column_info(df)
print("column_info")
print(column_info)
```end
# Constraints:
- Use the DataFrame variable from 'Tasks Done' in place of df.
- Your code is to be added to a new cell in jupyter.
"""
ML_PROMPT = """
# Background
As a data scientist, you need to help user to achieve their goal [{user_requirement}] step-by-step in an continuous Jupyter notebook.
## Done Tasks
```python
{history_code}
```end
## Current Task
{current_task}
# Latest Data Info
Latest data info after previous tasks:
{column_info}
# Task
Write complete code for 'Current Task'. And avoid duplicating code from 'Done Tasks', such as repeated import of packages, reading data, etc.
Specifically, {tool_type_usage_prompt}
# Capabilities
- You can utilize pre-defined tools in any code lines from 'Available Tools' in the form of Python Class.
- You can freely combine the use of any other public packages, like sklearn, numpy, pandas, etc..
# Available Tools:
Each Class tool is described in JSON format. When you call a tool, import the tool from its path first.
{tool_schemas}
{examples}
"""
USE_NO_TOOLS_EXAMPLE = """
# Output Example:
MODEL_TRAIN_EXAMPLE = """
when current task is "train a lightgbm model on training data", the code can be like:
```python
# Step 1: check data type and convert to numeric
@ -80,8 +25,7 @@ model.fit(train, y_train)
- Ensure the output new code is executable in the same Jupyter notebook with previous tasks code have been executed.
"""
USE_TOOLS_EXAMPLE = """
# Output Example:
USE_ML_TOOLS_EXAMPLE = """
when current task is "do data preprocess, like fill missing value, handle outliers, etc.", the code can be like:
```python
# Step 1: fill missing value

View file

@ -1,3 +1,130 @@
STRUCTUAL_PROMPT = """
# Background
As a data scientist, you need to help user to achieve their goal [{user_requirement}] step-by-step in an continuous Jupyter notebook. Since it is a notebook environment, don't use asyncio.run. Instead, use await if you need to call an async function.
# Finished Tasks
## code
```python
{code_written}
```
## execution result
{task_results}
# Current Task
{current_task}
# Instruction
Write complete code for 'Current Task'. And avoid duplicating code from 'Finished Tasks', such as repeated import of packages, reading data, etc.
Specifically, {tool_type_usage_prompt}
# Capabilities
- You can utilize pre-defined tools in any code lines from 'Available Tools' in the form of Python class or function.
- You can freely combine the use of any other public packages, like sklearn, numpy, pandas, etc..
# Available Tools:
Each tool is described in JSON format. When you call a tool, import the tool from its path first.
{tool_schemas}
# Examples
{examples}
# Output
Output code in the following format:
```python
your code
```
"""
DEBUG_REFLECTION_EXAMPLE = '''
[previous impl]:
assistant:
```python
def add(a: int, b: int) -> int:
"""
Given integers a and b, return the total value of a and b.
"""
return a - b
```
user:
Tests failed:
assert add(1, 2) == 3 # output: -1
assert add(1, 2) == 4 # output: -1
[reflection on previous impl]:
The implementation failed the test cases where the input integers are 1 and 2. The issue arises because the code does not add the two integers together, but instead subtracts the second integer from the first. To fix this issue, we should change the operator from `-` to `+` in the return statement. This will ensure that the function returns the correct output for the given input.
[improved impl]:
```python
def add(a: int, b: int) -> int:
"""
Given integers a and b, return the total value of a and b.
"""
return a + b
```
'''
REFLECTION_PROMPT = """
[example]
Here is an example of debugging with reflection.
{debug_example}
[/example]
[context]
{context}
[previous impl]:
{previous_impl}
[instruction]
Analyze your previous code and error in [context] step by step, provide me with improved method and code. Remember to follow [context] requirement. Don't forget to write code for steps behind the error step.
Output a json following the format:
```json
{{
"reflection": str = "Reflection on previous implementation",
"improved_impl": str = "Refined code after reflection.",
}}
```
"""
CHECK_DATA_PROMPT = """
# Background
Check latest data info to guide subsequent tasks.
## Finished Tasks
```python
{code_written}
```end
# Task
Check code in finished tasks, print key variables to guide your following actions.
Specifically, if it is a data analysis or machine learning task, print the the latest column information using the following code, with DataFrame variable from 'Finished Tasks' in place of df:
```python
from metagpt.tools.libs.data_preprocess import get_column_info
column_info = get_column_info(df)
print("column_info")
print(column_info)
```end
Otherwise, you may write any codes you see fit. Return an empty string if you think there is no important data to check.
# Constraints:
- Your code is to be added to a new cell in jupyter.
# Instruction
Output code following the format:
```python
your code
```
"""
DATA_INFO = """
# Latest Data Info
Latest data info after previous tasks:
{info}
"""
TOOL_RECOMMENDATION_PROMPT = """
## User Requirement:
{current_task}
@ -18,22 +145,3 @@ Recommend up to five tools from 'Available Tools' that can help solve the 'User
["tool_name1", "tool_name2", ...]
```
"""
TOOL_USAGE_PROMPT = """
# Instruction
Write complete code for 'Current Task'. And avoid duplicating code from finished tasks, such as repeated import of packages, reading data, etc.
Specifically, {tool_type_usage_prompt}
# Capabilities
- You can utilize pre-defined tools in any code lines from 'Available Tools' in the form of Python Class.
- You can freely combine the use of any other public packages, like sklearn, numpy, pandas, etc..
# Available Tools (can be empty):
Each Class tool is described in JSON format. When you call a tool, import the tool first.
{tool_schemas}
# Constraints:
- Ensure the output new code is executable in the same Jupyter notebook with previous tasks code have been executed.
- Always prioritize using pre-defined tools for the same functionality.
"""

View file

@ -69,7 +69,7 @@ class BaseLLM(ABC):
async def aask(
self,
msg: str,
msg: Union[str, list[dict[str, str]]],
system_msgs: Optional[list[str]] = None,
format_msgs: Optional[list[dict[str, str]]] = None,
images: Optional[Union[str, list[str]]] = None,
@ -84,7 +84,10 @@ class BaseLLM(ABC):
message = []
if format_msgs:
message.extend(format_msgs)
message.append(self._user_msg(msg, images=images))
if isinstance(msg, str):
message.append(self._user_msg(msg, images=images))
else:
message.extend(msg)
logger.debug(message)
rsp = await self.acompletion_text(message, stream=stream, timeout=timeout)
return rsp
@ -103,7 +106,7 @@ class BaseLLM(ABC):
return self._extract_assistant_rsp(context)
async def aask_code(
self, messages: Union[str, Message, list[dict]], timeout=3, language: str = "", **kwargs
self, messages: Union[str, Message, list[dict]], timeout=3, include_language: bool = False, **kwargs
) -> dict:
raise NotImplementedError

View file

@ -26,22 +26,6 @@ GENERAL_FUNCTION_SCHEMA = {
}
CODE_ONLY_FUNCTION_SCHEMA = {
"name": "add_new_code",
"description": "Add new code cell of current task to the end of an active Jupyter notebook.",
"parameters": {
"type": "object",
"properties": {
"code": {
"type": "string",
"description": "The code to be added to a new cell in jupyter.",
},
},
"required": ["code"],
},
}
# tool_choice value for general_function_schema
# https://platform.openai.com/docs/api-reference/chat/create#chat-create-tool_choice
GENERAL_TOOL_CHOICE = {"type": "function", "function": {"name": "execute"}}

View file

@ -28,8 +28,7 @@ from metagpt.logs import log_llm_stream, logger
from metagpt.provider.base_llm import BaseLLM
from metagpt.provider.constant import CODE_ONLY_FUNCTION_SCHEMA, GENERAL_FUNCTION_SCHEMA
from metagpt.provider.llm_provider_registry import register_provider
from metagpt.schema import Message
from metagpt.utils.common import CodeParser, decode_image
from metagpt.utils.common import CodeParser, decode_image, process_message
from metagpt.utils.cost_manager import CostManager, Costs
from metagpt.utils.exceptions import handle_exception
from metagpt.utils.token_counter import (
@ -145,32 +144,10 @@ class OpenAILLM(BaseLLM):
rsp = await self._achat_completion(messages, timeout=timeout)
return self.get_choice_text(rsp)
def _process_message(self, messages: Union[str, Message, list[dict], list[Message], list[str]]) -> list[dict]:
"""convert messages to list[dict]."""
# 全部转成list
if not isinstance(messages, list):
messages = [messages]
# 转成list[dict]
processed_messages = []
for msg in messages:
if isinstance(msg, str):
processed_messages.append({"role": "user", "content": msg})
elif isinstance(msg, dict):
assert set(msg.keys()) == set(["role", "content"])
processed_messages.append(msg)
elif isinstance(msg, Message):
processed_messages.append(msg.to_dict())
else:
raise ValueError(
f"Only support message type are: str, Message, dict, but got {type(messages).__name__}!"
)
return processed_messages
async def _achat_completion_function(
self, messages: list[dict], timeout: int = 3, **chat_configs
) -> ChatCompletion:
messages = self._process_message(messages)
messages = process_message(messages)
kwargs = self._cons_kwargs(messages=messages, timeout=timeout, **chat_configs)
rsp: ChatCompletion = await self.aclient.chat.completions.create(**kwargs)
self._update_costs(rsp.usage)

View file

@ -4,10 +4,12 @@ from pydantic import Field
from metagpt.actions.mi.ask_review import ReviewConst
from metagpt.actions.mi.execute_nb_code import ExecuteNbCode
from metagpt.actions.mi.write_analysis_code import WriteCodeWithTools
from metagpt.actions.mi.write_analysis_code import CheckData, WriteCodeWithTools
from metagpt.logs import logger
from metagpt.prompts.mi.write_analysis_code import DATA_INFO
from metagpt.roles import Role
from metagpt.schema import Message, Task, TaskResult
from metagpt.tools.tool_type import ToolType
class Interpreter(Role):
@ -15,6 +17,7 @@ class Interpreter(Role):
profile: str = "Interpreter"
auto_run: bool = True
use_tools: bool = False
use_reflection: bool = False
execute_code: ExecuteNbCode = Field(default_factory=ExecuteNbCode, exclude=True)
tools: list[str] = []
@ -48,14 +51,16 @@ class Interpreter(Role):
counter = 0
success = False
await self._check_data()
while not success and counter < max_retry:
### write code ###
code, cause_by = await self._write_code()
code, cause_by = await self._write_code(counter)
self.working_memory.add(Message(content=code["code"], role="assistant", cause_by=cause_by))
self.working_memory.add(Message(content=code, role="assistant", cause_by=cause_by))
### execute code ###
result, success = await self.execute_code.run(**code)
result, success = await self.execute_code.run(code)
print(result)
self.working_memory.add(Message(content=result, role="user", cause_by=ExecuteNbCode))
@ -69,14 +74,33 @@ class Interpreter(Role):
if ReviewConst.CHANGE_WORDS[0] in review:
counter = 0 # redo the task again with help of human suggestions
return code["code"], result, success
return code, result, success
async def _write_code(self):
async def _write_code(self, counter):
todo = WriteCodeWithTools(use_tools=self.use_tools, selected_tools=self.tools)
logger.info(f"ready to {todo.name}")
context = self.planner.get_useful_memories()
# print(*context, sep="\n***\n")
code = await todo.run(context=context, plan=self.planner.plan, temperature=0.0)
use_reflection = counter > 0 and self.use_reflection
code = await todo.run(
plan=self.planner.plan, working_memory=self.working_memory.get(), use_reflection=use_reflection
)
return code, todo
async def _check_data(self):
current_task = self.planner.plan.current_task
if current_task.task_type not in [
ToolType.DATA_PREPROCESS.type_name,
ToolType.FEATURE_ENGINEERING.type_name,
ToolType.MODEL_TRAIN.type_name,
]:
return
logger.info("Check updated data")
code = await CheckData().run(self.planner.plan)
if not code:
return
success = False
result, success = await self.execute_code.run(code)
if success:
print(result)
data_info = DATA_INFO.format(info=result)
self.working_memory.add(Message(content=data_info, role="user", cause_by=CheckData))

View file

@ -1,63 +0,0 @@
from metagpt.actions.mi.debug_code import DebugCode
from metagpt.actions.mi.execute_nb_code import ExecuteNbCode
from metagpt.actions.mi.ml_action import UpdateDataColumns, WriteCodeWithToolsML
from metagpt.logs import logger
from metagpt.roles.mi.interpreter import Interpreter
from metagpt.tools.tool_type import ToolType
from metagpt.utils.common import any_to_str
class MLEngineer(Interpreter):
name: str = "Mark"
profile: str = "MLEngineer"
debug_context: list = []
latest_code: str = ""
async def _write_code(self):
if not self.use_tools:
return await super()._write_code()
# In a trial and errors settings, check whether this is our first attempt to tackle the task. If there is no code execution before, then it is.
is_first_trial = any_to_str(ExecuteNbCode) not in [msg.cause_by for msg in self.working_memory.get()]
if is_first_trial:
# For the first trial, write task code from scratch
column_info = await self._update_data_columns()
logger.info("Write code with tools")
tool_context, code = await WriteCodeWithToolsML(selected_tools=self.tools).run(
context=[], # context assembled inside the Action
plan=self.planner.plan,
column_info=column_info,
)
self.debug_context = tool_context
cause_by = WriteCodeWithToolsML
else:
# Previous trials resulted in error, debug and rewrite the code
logger.warning("We got a bug, now start to debug...")
code = await DebugCode().run(
code=self.latest_code,
runtime_result=self.working_memory.get(),
context=self.debug_context,
)
cause_by = DebugCode
self.latest_code = code["code"]
return code, cause_by
async def _update_data_columns(self):
current_task = self.planner.plan.current_task
if current_task.task_type not in [
ToolType.DATA_PREPROCESS.type_name,
ToolType.FEATURE_ENGINEERING.type_name,
ToolType.MODEL_TRAIN.type_name,
]:
return ""
logger.info("Check columns in updated data")
code = await UpdateDataColumns().run(self.planner.plan)
success = False
result, success = await self.execute_code.run(**code)
print(result)
return result if success else ""

View file

@ -19,7 +19,8 @@ class ToolType(Enum):
)
DATA_PREPROCESS = ToolTypeDef(
name="data_preprocess",
desc="Only for changing value inplace.",
desc="For preprocessing dataset in a data analysis or machine learning task ONLY,"
"general data operation doesn't fall into this type",
usage_prompt=DATA_PREPROCESS_PROMPT,
)
EMAIL_LOGIN = ToolTypeDef(

View file

@ -666,3 +666,26 @@ def decode_image(img_url_or_b64: str) -> Image:
img_data = BytesIO(base64.b64decode(b64_data))
img = Image.open(img_data)
return img
def process_message(messages: Union[str, Message, list[dict], list[Message], list[str]]) -> list[dict]:
"""convert messages to list[dict]."""
from metagpt.schema import Message
# 全部转成list
if not isinstance(messages, list):
messages = [messages]
# 转成list[dict]
processed_messages = []
for msg in messages:
if isinstance(msg, str):
processed_messages.append({"role": "user", "content": msg})
elif isinstance(msg, dict):
assert set(msg.keys()) == set(["role", "content"])
processed_messages.append(msg)
elif isinstance(msg, Message):
processed_messages.append(msg.to_dict())
else:
raise ValueError(f"Only support message type are: str, Message, dict, but got {type(messages).__name__}!")
return processed_messages