Merge branch 'dev_pipeline' into 'dev'

reuse code

See merge request agents/data_agents_opt!11
This commit is contained in:
林义章 2023-11-30 07:10:14 +00:00
commit a9c8c6b73f
3 changed files with 175 additions and 53 deletions

View file

@ -40,16 +40,14 @@ class BaseWriteAnalysisCode(Action):
class WriteCodeByGenerate(BaseWriteAnalysisCode):
"""Write code fully by generation"""
DEFAULT_SYSTEM_MSG = """You are Code Interpreter, a world-class programmer that can complete any goal by executing code. Strictly follow the plan and generate code step by step. Each step of the code will be executed on the user's machine, and the user will provide the code execution results to you.**Notice: Use !pip install in a standalone block to install missing packages.**""" # prompt reference: https://github.com/KillianLucas/open-interpreter/blob/v0.1.4/interpreter/system_message.txt
REUSE_CODE_INSTRUCTION = """ATTENTION: DONT include codes from previous tasks in your current code block, include new codes only, DONT repeat codes!"""
def __init__(self, name: str = "", context=None, llm=None) -> str:
super().__init__(name, context, llm)
def process_msg(
self,
prompt: Union[str, List[Dict], Message, List[Message]],
system_msg: str = None,
):
default_system_msg = """You are Open Interpreter, a world-class programmer that can complete any goal by executing code. Strictly follow the plan and generate code step by step. Each step of the code will be executed on the user's machine, and the user will provide the code execution results to you.**Notice: The code for the next step depends on the code for the previous step.**"""
def process_msg(self, prompt: Union[str, List[Dict], Message, List[Message]], system_msg: str = None):
default_system_msg = system_msg or self.DEFAULT_SYSTEM_MSG
# 全部转成list
if not isinstance(prompt, list):
prompt = [prompt]
@ -91,6 +89,7 @@ class WriteCodeByGenerate(BaseWriteAnalysisCode):
system_msg: str = None,
**kwargs,
) -> str:
context.append(Message(content=self.REUSE_CODE_INSTRUCTION, role="user"))
prompt = self.process_msg(context, system_msg)
code_content = await self.llm.aask_code(prompt, **kwargs)
return code_content["code"]

View file

@ -23,7 +23,7 @@ STRUCTURAL_CONTEXT = """
def truncate(result: str, keep_len: int = 1000) -> str:
desc = """I truncated the result to only keep the last 1000 characters\n"""
desc = "Truncated to show only the last 1000 characters\n"
if result.startswith(desc):
result = result[-len(desc) :]
@ -39,27 +39,21 @@ class AskReview(Action):
async def run(self, context: List[Message], plan: Plan = None):
logger.info("Current overall plan:")
logger.info(
"\n".join(
[
f"{task.task_id}: {task.instruction}, is_finished: {task.is_finished}"
for task in plan.tasks
]
)
"\n".join([f"{task.task_id}: {task.instruction}, is_finished: {task.is_finished}" for task in plan.tasks])
)
logger.info("most recent context:")
# prompt = "\n".join(
# [f"{msg.cause_by.__name__ if msg.cause_by else 'Main Requirement'}: {msg.content}" for msg in context]
# )
prompt = ""
latest_action = context[-1].cause_by.__name__ if context[-1].cause_by else ""
prompt += (
f"\nPlease review output from {latest_action}:\n"
"If you want to change a task in the plan, say 'change task task_id, ... (things to change)'\n"
"If you confirm the output and wish to continue with the current process, type CONFIRM:\n"
)
prompt = f"\nPlease review output from {latest_action}:\n" \
"If you want to change a task in the plan, say 'change task task_id, ... (things to change)'\n" \
"If you confirm the output and wish to continue with the current process, type CONFIRM\n" \
"If you want to terminate the process, type exit:\n"
rsp = input(prompt)
confirmed = "confirm" in rsp.lower()
if rsp.lower() in ("exit"):
exit()
confirmed = rsp.lower() in ("confirm", "yes", "y")
return rsp, confirmed
@ -147,7 +141,7 @@ class MLEngineer(Role):
Message(content=result, role="user", cause_by=ExecutePyCode)
)
if code.startswith("!pip"):
if "!pip" in code:
success = False
# if not success:
# await self._ask_review()
@ -159,13 +153,9 @@ class MLEngineer(Role):
async def _ask_review(self):
if not self.auto_run:
context = self.get_useful_memories()
review, confirmed = await AskReview().run(
context=context[-5:], plan=self.plan
)
if review.lower() not in ("confirm", "y", "yes"):
self._rc.memory.add(
Message(content=review, role="user", cause_by=AskReview)
)
review, confirmed = await AskReview().run(context=context[-5:], plan=self.plan)
if not confirmed:
self.working_memory.add(Message(content=review, role="user", cause_by=AskReview))
return confirmed
return True
@ -204,11 +194,14 @@ class MLEngineer(Role):
if __name__ == "__main__":
# requirement = "create a normal distribution and visualize it"
requirement = "run some analysis on iris dataset"
requirement = "Run data analysis on sklearn Iris dataset, include a plot"
# requirement = "Run data analysis on sklearn Diabetes dataset, include a plot"
# requirement = "Run data analysis on sklearn Wine recognition dataset, include a plot, and train a model to predict wine class (20% as validation), and show validation accuracy"
# requirement = "Run data analysis on sklearn Wisconsin Breast Cancer dataset, include a plot, train a model to predict targets (20% as validation), and show validation accuracy"
# requirement = "Run EDA and visualization on this dataset, train a model to predict survival, report metrics on validation set (20%), dataset: workspace/titanic/train.csv"
async def main(requirement: str = requirement):
role = MLEngineer(goal=requirement)
async def main(requirement: str = requirement, auto_run: bool = False):
role = MLEngineer(goal=requirement, auto_run=auto_run)
await role.run(requirement)
fire.Fire(main)

View file

@ -1,26 +1,10 @@
import asyncio
import pytest
from metagpt.actions.write_analysis_code import WriteCodeByGenerate
from metagpt.actions.execute_code import ExecutePyCode
from metagpt.schema import Message
# @pytest.mark.asyncio
# async def test_write_code():
# write_code = WriteCodeFunction()
# code = await write_code.run("Write a hello world code.")
# assert len(code) > 0
# print(code)
# @pytest.mark.asyncio
# async def test_write_code_by_list_prompt():
# write_code = WriteCodeFunction()
# msg = ["a=[1,2,5,10,-10]", "写出求a中最大值的代码python"]
# code = await write_code.run(msg)
# assert len(code) > 0
# print(code)
from metagpt.logs import logger
@pytest.mark.asyncio
async def test_write_code_by_list_plan():
@ -37,3 +21,149 @@ async def test_write_code_by_list_plan():
output = await execute_code.run(code)
print(f"\n[Output]: 任务{task}的执行结果是: \n{output}\n")
messages.append(output[0])
@pytest.mark.asyncio
async def test_write_code_to_correct_error():
structural_context = """
## User Requirement
read a dataset test.csv and print its head
## Current Plan
[
{
"task_id": "1",
"dependent_task_ids": [],
"instruction": "import pandas and load the dataset from 'test.csv'.",
"task_type": "",
"code": "",
"result": "",
"is_finished": false
},
{
"task_id": "2",
"dependent_task_ids": [
"1"
],
"instruction": "Print the head of the dataset to display the first few rows.",
"task_type": "",
"code": "",
"result": "",
"is_finished": false
}
]
## Current Task
{"task_id": "1", "dependent_task_ids": [], "instruction": "import pandas and load the dataset from 'test.csv'.", "task_type": "", "code": "", "result": "", "is_finished": false}
"""
wrong_code = """import pandas as pd\ndata = pd.read_excel('test.csv')\ndata""" # use read_excel to read a csv
error = """
Traceback (most recent call last):
File "<stdin>", line 2, in <module>
File "/Users/gary/miniconda3/envs/py39_scratch/lib/python3.9/site-packages/pandas/io/excel/_base.py", line 478, in read_excel
io = ExcelFile(io, storage_options=storage_options, engine=engine)
File "/Users/gary/miniconda3/envs/py39_scratch/lib/python3.9/site-packages/pandas/io/excel/_base.py", line 1500, in __init__
raise ValueError(
ValueError: Excel file format cannot be determined, you must specify an engine manually.
"""
context = [
Message(content=structural_context, role="user"),
Message(content=wrong_code, role="assistant"),
Message(content=error, role="user"),
]
new_code = await WriteCodeByGenerate().run(context=context)
print(new_code)
assert "read_csv" in new_code # should correct read_excel to read_csv
@pytest.mark.asyncio
async def test_write_code_reuse_code_simple():
structural_context = """
## User Requirement
read a dataset test.csv and print its head
## Current Plan
[
{
"task_id": "1",
"dependent_task_ids": [],
"instruction": "import pandas and load the dataset from 'test.csv'.",
"task_type": "",
"code": "import pandas as pd\ndata = pd.read_csv('test.csv')",
"result": "",
"is_finished": true
},
{
"task_id": "2",
"dependent_task_ids": [
"1"
],
"instruction": "Print the head of the dataset to display the first few rows.",
"task_type": "",
"code": "",
"result": "",
"is_finished": false
}
]
## Current Task
{"task_id": "2", "dependent_task_ids": ["1"], "instruction": "Print the head of the dataset to display the first few rows.", "task_type": "", "code": "", "result": "", "is_finished": false}
"""
context = [
Message(content=structural_context, role="user"),
]
code = await WriteCodeByGenerate().run(context=context)
print(code)
assert "pandas" not in code and "read_csv" not in code # should reuse import and read statement from previous one
@pytest.mark.asyncio
async def test_write_code_reuse_code_long():
"""test code reuse for long context"""
structural_context = """
## User Requirement
Run data analysis on sklearn Iris dataset, include a plot
## Current Plan
[
{
"task_id": "1",
"dependent_task_ids": [],
"instruction": "Load the Iris dataset from sklearn.",
"task_type": "",
"code": "from sklearn.datasets import load_iris\niris_data = load_iris()\niris_data['data'][0:5], iris_data['target'][0:5]",
"result": "(array([[5.1, 3.5, 1.4, 0.2],\n [4.9, 3. , 1.4, 0.2],\n [4.7, 3.2, 1.3, 0.2],\n [4.6, 3.1, 1.5, 0.2],\n [5. , 3.6, 1.4, 0.2]]),\n array([0, 0, 0, 0, 0]))",
"is_finished": true
},
{
"task_id": "2",
"dependent_task_ids": [
"1"
],
"instruction": "Perform exploratory data analysis on the Iris dataset.",
"task_type": "",
"code": "",
"result": "",
"is_finished": false
},
{
"task_id": "3",
"dependent_task_ids": [
"2"
],
"instruction": "Create a plot visualizing the Iris dataset features.",
"task_type": "",
"code": "",
"result": "",
"is_finished": false
}
]
## Current Task
{"task_id": "2", "dependent_task_ids": ["1"], "instruction": "Perform exploratory data analysis on the Iris dataset.", "task_type": "", "code": "", "result": "", "is_finished": false}
"""
context = [
Message(content=structural_context, role="user"),
]
trials_num = 5
trials = [WriteCodeByGenerate().run(context=context) for _ in range(trials_num)]
trial_results = await asyncio.gather(*trials)
print(*trial_results, sep="\n\n***\n\n")
success = ["load_iris" not in result and "iris_data" in result \
for result in trial_results] # should reuse iris_data from previous tasks
success_rate = sum(success) / trials_num
logger.info(f"success rate: {success_rate :.2f}")
assert success_rate >= 0.8