Merge branch 'code_intepreter' of https://gitlab.deepwisdomai.com/agents/data_agents_opt into code_intepreter

This commit is contained in:
yzlin 2024-02-02 18:52:45 +08:00
commit 3894334b52
4 changed files with 38 additions and 83 deletions

View file

@ -8,8 +8,7 @@ import asyncio
import base64
import re
import traceback
from pathlib import Path
from typing import Any, Dict, List, Tuple, Union
from typing import List, Literal, Tuple
import nbformat
from nbclient import NotebookClient
@ -25,14 +24,13 @@ from rich.syntax import Syntax
from metagpt.actions import Action
from metagpt.logs import logger
from metagpt.schema import Message
class ExecuteNbCode(Action):
"""execute notebook code block, return result to llm, and display it."""
nb: Any
nb_client: Any
nb: NotebookNode
nb_client: NotebookClient
console: Console
interaction: str
timeout: int = 600
@ -70,13 +68,13 @@ class ExecuteNbCode(Action):
await self.build()
self.nb_client = NotebookClient(self.nb, timeout=self.timeout)
def add_code_cell(self, code):
def add_code_cell(self, code: str):
self.nb.cells.append(new_code_cell(source=code))
def add_markdown_cell(self, markdown):
def add_markdown_cell(self, markdown: str):
self.nb.cells.append(new_markdown_cell(source=markdown))
def _display(self, code, language: str = "python"):
def _display(self, code: str, language: Literal["python", "markdown"] = "python"):
if language == "python":
code = Syntax(code, "python", theme="paraiso-dark", line_numbers=True)
self.console.print(code)
@ -85,21 +83,18 @@ class ExecuteNbCode(Action):
else:
raise ValueError(f"Only support for python, markdown, but got {language}")
def add_output_to_cell(self, cell, output):
def add_output_to_cell(self, cell: NotebookNode, output: str):
"""add outputs of code execution to notebook cell."""
if "outputs" not in cell:
cell["outputs"] = []
# TODO: show figures
else:
cell["outputs"].append(new_output(output_type="stream", name="stdout", text=str(output)))
def parse_outputs(self, outputs: List) -> str:
def parse_outputs(self, outputs: List[str]) -> str:
"""Parses the outputs received from notebook execution."""
assert isinstance(outputs, list)
parsed_output = ""
# empty outputs: such as 'x=1\ny=2'
if not outputs:
return parsed_output
for i, output in enumerate(outputs):
if output["output_type"] == "stream" and not any(
tag in output["text"]
@ -117,7 +112,7 @@ class ExecuteNbCode(Action):
parsed_output += output["data"]["text/plain"]
return parsed_output
def show_bytes_figure(self, image_base64: str, interaction_type: str = "ipython"):
def show_bytes_figure(self, image_base64: str, interaction_type: Literal["ipython", None]):
image_bytes = base64.b64decode(image_base64)
if interaction_type == "ipython":
from IPython.display import Image, display
@ -141,25 +136,12 @@ class ExecuteNbCode(Action):
else:
return False
except NameError:
# 如果在Python脚本中运行__file__ 变量存在
return False
def _process_code(self, code: Union[str, Dict], language: str = "python") -> Tuple:
"""handle different code response formats, support str or dict"""
if isinstance(code, str) and Path(code).suffix in (".py", ".txt"):
code = Path(code).read_text(encoding="utf-8")
return code, language
if isinstance(code, str):
return code, language
if isinstance(code, dict):
assert "code" in code
code = code["code"]
return code, language
async def run_cell(self, cell: NotebookNode, cell_index: int) -> Tuple[bool, str]:
"""set timeout for run code"""
"""set timeout for run code.
returns the success or failure of the cell execution, and an optional error message.
"""
try:
await self.nb_client.async_execute_cell(cell, cell_index)
return True, ""
@ -175,9 +157,10 @@ class ExecuteNbCode(Action):
except Exception:
return False, f"{traceback.format_exc()}"
async def run(self, code: Union[str, Dict, Message], language: str = "python") -> Tuple[str, bool]:
code, language = self._process_code(code, language)
async def run(self, code: str, language: Literal["python", "markdown"] = "python") -> Tuple[str, bool]:
"""
return the output of code execution, and a success indicator (bool) of code execution.
"""
self._display(code, language)
if language == "python":
@ -198,8 +181,9 @@ class ExecuteNbCode(Action):
outputs = self.parse_outputs(self.nb.cells[-1].outputs)
return truncate(remove_escape_and_color_codes(outputs), is_success=success)
elif language == "markdown":
# markdown
# add markdown content to markdown cell in a notebook.
self.add_markdown_cell(code)
# return True, beacuse there is no execution failure for markdown cell.
return code, True
else:
raise ValueError(f"Only support for language: python, markdown, but got {language}, ")
@ -230,7 +214,7 @@ def truncate(result: str, keep_len: int = 2000, is_success: bool = True):
return result if not is_same_desc else desc + result, is_success
def remove_escape_and_color_codes(input_str):
def remove_escape_and_color_codes(input_str: str):
# 使用正则表达式去除转义字符和颜色代码
pattern = re.compile(r"\x1b\[[0-9;]*[mK]")
result = pattern.sub("", input_str)

View file

@ -4,7 +4,7 @@
@Author : orange-crow
@File : write_analysis_code.py
"""
from typing import Dict, Tuple, Union
from typing import Tuple
from metagpt.actions import Action
from metagpt.logs import logger
@ -14,7 +14,7 @@ from metagpt.prompts.write_analysis_code import (
TOOL_RECOMMENDATION_PROMPT,
TOOL_USAGE_PROMPT,
)
from metagpt.schema import Message, Plan
from metagpt.schema import Message, Plan, SystemMessage
from metagpt.tools import TOOL_REGISTRY
from metagpt.tools.tool_registry import validate_tool_names
from metagpt.utils.common import create_func_call_config
@ -24,34 +24,10 @@ class BaseWriteAnalysisCode(Action):
DEFAULT_SYSTEM_MSG: str = """You are Code Interpreter, a world-class programmer that can complete any goal by executing code. Strictly follow the plan and generate code step by step. Each step of the code will be executed on the user's machine, and the user will provide the code execution results to you.**Notice: The code for the next step depends on the code for the previous step. Must reuse variables in the lastest other code directly, dont creat it again, it is very import for you. Use !pip install in a standalone block to install missing packages.Usually the libraries you need are already installed.Dont check if packages already imported.**""" # prompt reference: https://github.com/KillianLucas/open-interpreter/blob/v0.1.4/interpreter/system_message.txt
# REUSE_CODE_INSTRUCTION = """ATTENTION: DONT include codes from previous tasks in your current code block, include new codes only, DONT repeat codes!"""
def process_msg(self, prompt: Union[str, list[Dict], Message, list[Message]], system_msg: str = None):
default_system_msg = system_msg or self.DEFAULT_SYSTEM_MSG
# 全部转成list
if not isinstance(prompt, list):
prompt = [prompt]
assert isinstance(prompt, list)
# 转成list[dict]
messages = []
for p in prompt:
if isinstance(p, str):
messages.append({"role": "user", "content": p})
elif isinstance(p, dict):
messages.append(p)
elif isinstance(p, Message):
if isinstance(p.content, str):
messages.append(p.to_dict())
elif isinstance(p.content, dict) and "code" in p.content:
messages.append(p.content["code"])
# 添加默认的提示词
if default_system_msg not in messages[0]["content"] and messages[0]["role"] != "system":
messages.insert(0, {"role": "system", "content": default_system_msg})
elif default_system_msg not in messages[0]["content"] and messages[0]["role"] == "system":
messages[0] = {
"role": "system",
"content": messages[0]["content"] + default_system_msg,
}
return messages
def insert_system_message(self, context: list[Message], system_msg: str = None):
system_msg = system_msg or self.DEFAULT_SYSTEM_MSG
context.insert(0, SystemMessage(content=system_msg)) if context[0].role != "system" else None
return context
async def run(self, context: list[Message], plan: Plan = None) -> dict:
"""Run of a code writing action, used in data analysis or modeling
@ -69,16 +45,9 @@ class BaseWriteAnalysisCode(Action):
class WriteCodeByGenerate(BaseWriteAnalysisCode):
"""Ask LLM to generate codes purely by itself without local user-defined tools"""
async def run(
self,
context: [list[Message]],
plan: Plan = None,
system_msg: str = None,
**kwargs,
) -> dict:
# context.append(Message(content=self.REUSE_CODE_INSTRUCTION, role="user"))
prompt = self.process_msg(context, system_msg)
rsp = await self.llm.aask_code(prompt, **kwargs)
async def run(self, context: list[Message], plan: Plan = None, system_msg: str = None, **kwargs) -> dict:
messages = self.insert_system_message(context, system_msg)
rsp = await self.llm.aask_code(messages, **kwargs)
return rsp
@ -184,7 +153,7 @@ class WriteCodeWithTools(BaseWriteAnalysisCode):
context.append(Message(content=tools_instruction, role="user"))
# prepare prompt & LLM call
prompt = self.process_msg(context)
prompt = self.insert_system_message(context)
tool_config = create_func_call_config(CODE_GENERATOR_WITH_TOOLS)
rsp = await self.llm.aask_code(prompt, **tool_config)

View file

@ -8,8 +8,6 @@ async def test_code_running():
executor = ExecuteNbCode()
output, is_success = await executor.run("print('hello world!')")
assert is_success
output, is_success = await executor.run({"code": "print('hello world!')", "language": "python"})
assert is_success
@pytest.mark.asyncio

View file

@ -15,16 +15,20 @@ async def test_write_code_by_list_plan():
write_code = WriteCodeByGenerate()
execute_code = ExecuteNbCode()
messages = []
plan = ["随机生成一个pandas DataFrame时间序列", "绘制这个时间序列的直方图", "求均值"]
plan = ["随机生成一个pandas DataFrame时间序列", "绘制这个时间序列的直方图", "回顾已完成的任务", "求均值", "总结"]
for task in plan:
print(f"\n任务: {task}\n\n")
messages.append(Message(task, role="assistant"))
code = await write_code.run(messages)
if task.startswith(("回顾", "总结")):
assert code["language"] == "markdown"
else:
assert code["language"] == "python"
messages.append(Message(code["code"], role="assistant"))
assert len(code) > 0
output = await execute_code.run(code["code"])
output, _ = await execute_code.run(**code)
print(f"\n[Output]: 任务{task}的执行结果是: \n{output}\n")
messages.append(output[0])
messages.append(output)
@pytest.mark.asyncio