mirror of
https://github.com/FoundationAgents/MetaGPT.git
synced 2026-05-03 21:02:38 +02:00
Merge branch 'code_interpreter' into mi_refactor
This commit is contained in:
commit
3244e6cee5
6 changed files with 110 additions and 101 deletions
|
|
@ -2,10 +2,20 @@ import fire
|
|||
|
||||
from metagpt.roles.mi.interpreter import Interpreter
|
||||
|
||||
WINE_REQ = "Run data analysis on sklearn Wine recognition dataset, include a plot, and train a model to predict wine class (20% as validation), and show validation accuracy."
|
||||
|
||||
async def main(auto_run: bool = True):
|
||||
requirement = "Run data analysis on sklearn Wine recognition dataset, include a plot, and train a model to predict wine class (20% as validation), and show validation accuracy."
|
||||
DATA_DIR = "path/to/your/data"
|
||||
# sales_forecast data from https://www.kaggle.com/datasets/aslanahmedov/walmart-sales-forecast/data
|
||||
SALES_FORECAST_REQ = f"""Train a model to predict sales for each department in every store (split the last 40 weeks records as validation dataset, the others is train dataset), include plot total sales trends, print metric and plot scatter plots of
|
||||
groud truth and predictions on validation data. Dataset is {DATA_DIR}/train.csv, the metric is weighted mean absolute error (WMAE) for test data. Notice: *print* key variables to get more information for next task step.
|
||||
"""
|
||||
|
||||
REQUIREMENTS = {"wine": WINE_REQ, "sales_forecast": SALES_FORECAST_REQ}
|
||||
|
||||
|
||||
async def main(auto_run: bool = True, use_case: str = "wine"):
|
||||
mi = Interpreter(auto_run=auto_run)
|
||||
requirement = REQUIREMENTS[use_case]
|
||||
await mi.run(requirement)
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -9,7 +9,6 @@ from __future__ import annotations
|
|||
import asyncio
|
||||
import base64
|
||||
import re
|
||||
import traceback
|
||||
from typing import Literal, Tuple
|
||||
|
||||
import nbformat
|
||||
|
|
@ -58,7 +57,8 @@ class ExecuteNbCode(Action):
|
|||
|
||||
async def terminate(self):
|
||||
"""kill NotebookClient"""
|
||||
await self.nb_client._async_cleanup_kernel()
|
||||
if self.nb_client.km is not None:
|
||||
await self.nb_client._async_cleanup_kernel()
|
||||
|
||||
async def reset(self):
|
||||
"""reset NotebookClient"""
|
||||
|
|
@ -91,17 +91,17 @@ class ExecuteNbCode(Action):
|
|||
else:
|
||||
cell["outputs"].append(new_output(output_type="stream", name="stdout", text=str(output)))
|
||||
|
||||
def parse_outputs(self, outputs: list[str]) -> str:
|
||||
def parse_outputs(self, outputs: list[str], keep_len: int = 2000) -> Tuple[bool, str]:
|
||||
"""Parses the outputs received from notebook execution."""
|
||||
assert isinstance(outputs, list)
|
||||
parsed_output = ""
|
||||
|
||||
parsed_output, is_success = [], True
|
||||
for i, output in enumerate(outputs):
|
||||
output_text = ""
|
||||
if output["output_type"] == "stream" and not any(
|
||||
tag in output["text"]
|
||||
for tag in ["| INFO | metagpt", "| ERROR | metagpt", "| WARNING | metagpt", "DEBUG"]
|
||||
):
|
||||
parsed_output += output["text"]
|
||||
output_text = output["text"]
|
||||
elif output["output_type"] == "display_data":
|
||||
if "image/png" in output["data"]:
|
||||
self.show_bytes_figure(output["data"]["image/png"], self.interaction)
|
||||
|
|
@ -110,8 +110,22 @@ class ExecuteNbCode(Action):
|
|||
f"{i}th output['data'] from nbclient outputs dont have image/png, continue next output ..."
|
||||
)
|
||||
elif output["output_type"] == "execute_result":
|
||||
parsed_output += output["data"]["text/plain"]
|
||||
return parsed_output
|
||||
output_text = output["data"]["text/plain"]
|
||||
elif output["output_type"] == "error":
|
||||
output_text, is_success = "\n".join(output["traceback"]), False
|
||||
|
||||
# handle coroutines that are not executed asynchronously
|
||||
if output_text.strip().startswith("<coroutine object"):
|
||||
output_text = "Executed code failed, you need use key word 'await' to run a async code."
|
||||
is_success = False
|
||||
|
||||
output_text = remove_escape_and_color_codes(output_text)
|
||||
# The useful information of the exception is at the end,
|
||||
# the useful information of normal output is at the begining.
|
||||
output_text = output_text[:keep_len] if is_success else output_text[-keep_len:]
|
||||
|
||||
parsed_output.append(output_text)
|
||||
return is_success, ",".join(parsed_output)
|
||||
|
||||
def show_bytes_figure(self, image_base64: str, interaction_type: Literal["ipython", None]):
|
||||
image_bytes = base64.b64decode(image_base64)
|
||||
|
|
@ -145,7 +159,7 @@ class ExecuteNbCode(Action):
|
|||
"""
|
||||
try:
|
||||
await self.nb_client.async_execute_cell(cell, cell_index)
|
||||
return True, ""
|
||||
return self.parse_outputs(self.nb.cells[-1].outputs)
|
||||
except CellTimeoutError:
|
||||
assert self.nb_client.km is not None
|
||||
await self.nb_client.km.interrupt_kernel()
|
||||
|
|
@ -156,7 +170,7 @@ class ExecuteNbCode(Action):
|
|||
await self.reset()
|
||||
return False, "DeadKernelError"
|
||||
except Exception:
|
||||
return False, f"{traceback.format_exc()}"
|
||||
return self.parse_outputs(self.nb.cells[-1].outputs)
|
||||
|
||||
async def run(self, code: str, language: Literal["python", "markdown"] = "python") -> Tuple[str, bool]:
|
||||
"""
|
||||
|
|
@ -173,16 +187,9 @@ class ExecuteNbCode(Action):
|
|||
|
||||
# run code
|
||||
cell_index = len(self.nb.cells) - 1
|
||||
success, error_message = await self.run_cell(self.nb.cells[-1], cell_index)
|
||||
success, outputs = await self.run_cell(self.nb.cells[-1], cell_index)
|
||||
|
||||
if not success:
|
||||
return truncate(remove_escape_and_color_codes(error_message), is_success=success)
|
||||
|
||||
# code success
|
||||
outputs = self.parse_outputs(self.nb.cells[-1].outputs)
|
||||
outputs, success = truncate(remove_escape_and_color_codes(outputs), is_success=success)
|
||||
|
||||
if "!pip" in outputs:
|
||||
if "!pip" in code:
|
||||
success = False
|
||||
|
||||
return outputs, success
|
||||
|
|
@ -196,54 +203,39 @@ class ExecuteNbCode(Action):
|
|||
raise ValueError(f"Only support for language: python, markdown, but got {language}, ")
|
||||
|
||||
|
||||
def truncate(result: str, keep_len: int = 2000, is_success: bool = True):
|
||||
"""对于超出keep_len个字符的result: 执行失败的代码, 展示result后keep_len个字符; 执行成功的代码, 展示result前keep_len个字符。"""
|
||||
if is_success:
|
||||
desc = f"Executed code successfully. Truncated to show only first {keep_len} characters\n"
|
||||
else:
|
||||
desc = f"Executed code failed, please reflect on the cause of bug and then debug. Truncated to show only last {keep_len} characters\n"
|
||||
|
||||
if result.strip().startswith("<coroutine object"):
|
||||
result = "Executed code failed, you need use key word 'await' to run a async code."
|
||||
return result, False
|
||||
|
||||
if len(result) > keep_len:
|
||||
result = result[-keep_len:] if not is_success else result[:keep_len]
|
||||
return desc + result, is_success
|
||||
|
||||
return result, is_success
|
||||
|
||||
|
||||
def remove_escape_and_color_codes(input_str: str):
|
||||
# 使用正则表达式去除转义字符和颜色代码
|
||||
# 使用正则表达式去除jupyter notebook输出结果中的转义字符和颜色代码
|
||||
# Use regular expressions to get rid of escape characters and color codes in jupyter notebook output.
|
||||
pattern = re.compile(r"\x1b\[[0-9;]*[mK]")
|
||||
result = pattern.sub("", input_str)
|
||||
return result
|
||||
|
||||
|
||||
def display_markdown(content: str):
|
||||
# 使用正则表达式逐个匹配代码块
|
||||
# Use regular expressions to match blocks of code one by one.
|
||||
matches = re.finditer(r"```(.+?)```", content, re.DOTALL)
|
||||
start_index = 0
|
||||
content_panels = []
|
||||
# 逐个打印匹配到的文本和代码
|
||||
# Set the text background color and text color.
|
||||
style = "black on white"
|
||||
# Print the matching text and code one by one.
|
||||
for match in matches:
|
||||
text_content = content[start_index : match.start()].strip()
|
||||
code_content = match.group(0).strip()[3:-3] # Remove triple backticks
|
||||
|
||||
if text_content:
|
||||
content_panels.append(Panel(Markdown(text_content), box=MINIMAL))
|
||||
content_panels.append(Panel(Markdown(text_content), style=style, box=MINIMAL))
|
||||
|
||||
if code_content:
|
||||
content_panels.append(Panel(Markdown(f"```{code_content}"), box=MINIMAL))
|
||||
content_panels.append(Panel(Markdown(f"```{code_content}"), style=style, box=MINIMAL))
|
||||
start_index = match.end()
|
||||
|
||||
# 打印剩余文本(如果有)
|
||||
# Print remaining text (if any).
|
||||
remaining_text = content[start_index:].strip()
|
||||
if remaining_text:
|
||||
content_panels.append(Panel(Markdown(remaining_text), box=MINIMAL))
|
||||
content_panels.append(Panel(Markdown(remaining_text), style=style, box=MINIMAL))
|
||||
|
||||
# 在Live模式中显示所有Panel
|
||||
# Display all panels in Live mode.
|
||||
with Live(auto_refresh=False, console=Console(), vertical_overflow="visible") as live:
|
||||
live.update(Group(*content_panels))
|
||||
live.refresh()
|
||||
|
|
|
|||
|
|
@ -84,6 +84,10 @@ class Interpreter(Role):
|
|||
code, _, _ = await self._write_and_exec_code()
|
||||
return Message(content=code, role="assistant", cause_by=WriteCodeWithTools)
|
||||
|
||||
async def _plan_and_act(self) -> Message:
|
||||
await super()._plan_and_act()
|
||||
await self.execute_code.terminate()
|
||||
|
||||
async def _act_on_task(self, current_task: Task) -> TaskResult:
|
||||
"""Useful in 'plan_and_act' mode. Wrap the output in a TaskResult for review and confirmation."""
|
||||
code, result, is_success = await self._write_and_exec_code()
|
||||
|
|
|
|||
|
|
@ -5,12 +5,13 @@
|
|||
@Author : mannaandpoem
|
||||
@File : gpt_v_generator.py
|
||||
"""
|
||||
import os
|
||||
import re
|
||||
from pathlib import Path
|
||||
|
||||
from metagpt.const import DEFAULT_WORKSPACE_ROOT
|
||||
from metagpt.logs import logger
|
||||
from metagpt.tools.tool_registry import register_tool
|
||||
from metagpt.utils.common import encode_image
|
||||
from metagpt.utils.common import CodeParser, encode_image
|
||||
|
||||
ANALYZE_LAYOUT_PROMPT = """You are now a UI/UX designer, please generate layout information for this image:
|
||||
|
||||
|
|
@ -29,7 +30,7 @@ Now, please generate the corresponding webpage code including HTML, CSS and Java
|
|||
|
||||
@register_tool(include_functions=["__init__", "generate_webpages", "save_webpages"])
|
||||
class GPTvGenerator:
|
||||
"""Class for generating webpages at once.
|
||||
"""Class for generating webpage code from a given webpage screenshot.
|
||||
|
||||
This class provides methods to generate webpages including all code (HTML, CSS, and JavaScript) based on an image.
|
||||
It utilizes a vision model to analyze the layout from an image and generate webpage codes accordingly.
|
||||
|
|
@ -72,50 +73,34 @@ class GPTvGenerator:
|
|||
return await self.llm.aask(msg=prompt, images=[encode_image(image_path)])
|
||||
|
||||
@staticmethod
|
||||
def save_webpages(image_path: str, webpages: str) -> Path:
|
||||
def save_webpages(webpages: str, save_folder_name: str = "example") -> Path:
|
||||
"""Save webpages including all code (HTML, CSS, and JavaScript) at once.
|
||||
|
||||
Args:
|
||||
image_path (str): The path of the image file.
|
||||
webpages (str): The generated webpages content.
|
||||
save_folder_name (str, optional): The name of the folder to save the webpages. Defaults to 'example'.
|
||||
|
||||
Returns:
|
||||
Path: The path of the saved webpages.
|
||||
"""
|
||||
# Create a folder called webpages in the workspace directory to store HTML, CSS, and JavaScript files
|
||||
webpages_path = DEFAULT_WORKSPACE_ROOT / "webpages" / Path(image_path).stem
|
||||
os.makedirs(webpages_path, exist_ok=True)
|
||||
webpages_path = DEFAULT_WORKSPACE_ROOT / "webpages" / save_folder_name
|
||||
logger.info(f"code will be saved at {webpages_path}")
|
||||
webpages_path.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
index_path = webpages_path / "index.html"
|
||||
try:
|
||||
index = webpages.split("```html")[1].split("```")[0]
|
||||
style_path = None
|
||||
if "styles.css" in index:
|
||||
style_path = webpages_path / "styles.css"
|
||||
elif "style.css" in index:
|
||||
style_path = webpages_path / "style.css"
|
||||
style = webpages.split("```css")[1].split("```")[0] if style_path else ""
|
||||
index_path.write_text(CodeParser.parse_code(block=None, text=webpages, lang="html"))
|
||||
|
||||
js_path = None
|
||||
if "scripts.js" in index:
|
||||
js_path = webpages_path / "scripts.js"
|
||||
elif "script.js" in index:
|
||||
js_path = webpages_path / "script.js"
|
||||
extract_and_save_code(folder=webpages_path, text=webpages, pattern="styles?.css", language="css")
|
||||
|
||||
js = webpages.split("```javascript")[1].split("```")[0] if js_path else ""
|
||||
except IndexError:
|
||||
raise ValueError(f"No html or css or js code found in the result. \nWebpages: {webpages}")
|
||||
|
||||
try:
|
||||
with open(index_path, "w", encoding="utf-8") as f:
|
||||
f.write(index)
|
||||
if style_path:
|
||||
with open(style_path, "w", encoding="utf-8") as f:
|
||||
f.write(style)
|
||||
if js_path:
|
||||
with open(js_path, "w", encoding="utf-8") as f:
|
||||
f.write(js)
|
||||
except FileNotFoundError as e:
|
||||
raise FileNotFoundError(f"Cannot save the webpages to {str(webpages_path)}") from e
|
||||
extract_and_save_code(folder=webpages_path, text=webpages, pattern="scripts?.js", language="javascript")
|
||||
|
||||
return webpages_path
|
||||
|
||||
|
||||
def extract_and_save_code(folder, text, pattern, language):
|
||||
word = re.search(pattern, text)
|
||||
if word:
|
||||
path = folder / word.group(0)
|
||||
code = CodeParser.parse_code(block=None, text=text, lang=language)
|
||||
path.write_text(code, encoding="utf-8")
|
||||
|
|
|
|||
|
|
@ -1,6 +1,6 @@
|
|||
import pytest
|
||||
|
||||
from metagpt.actions.mi.execute_nb_code import ExecuteNbCode, truncate
|
||||
from metagpt.actions.mi.execute_nb_code import ExecuteNbCode
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
|
|
@ -54,21 +54,6 @@ async def test_plotting_code():
|
|||
assert is_success
|
||||
|
||||
|
||||
def test_truncate():
|
||||
# 代码执行成功
|
||||
output, is_success = truncate("hello world", 5, True)
|
||||
assert "Truncated to show only first 5 characters\nhello" in output
|
||||
assert is_success
|
||||
# 代码执行失败
|
||||
output, is_success = truncate("hello world", 5, False)
|
||||
assert "Truncated to show only last 5 characters\nworld" in output
|
||||
assert not is_success
|
||||
# 异步
|
||||
output, is_success = truncate("<coroutine object", 5, True)
|
||||
assert not is_success
|
||||
assert "await" in output
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_run_with_timeout():
|
||||
executor = ExecuteNbCode(timeout=1)
|
||||
|
|
@ -83,7 +68,7 @@ async def test_run_code_text():
|
|||
executor = ExecuteNbCode()
|
||||
message, success = await executor.run(code='print("This is a code!")', language="python")
|
||||
assert success
|
||||
assert message == "This is a code!\n"
|
||||
assert "This is a code!" in message
|
||||
message, success = await executor.run(code="# This is a code!", language="markdown")
|
||||
assert success
|
||||
assert message == "# This is a code!"
|
||||
|
|
@ -100,10 +85,20 @@ async def test_terminate():
|
|||
is_kernel_alive = await executor.nb_client.km.is_alive()
|
||||
assert is_kernel_alive
|
||||
await executor.terminate()
|
||||
|
||||
import time
|
||||
|
||||
time.sleep(2)
|
||||
assert executor.nb_client.km is None
|
||||
for _ in range(200):
|
||||
executor = ExecuteNbCode()
|
||||
await executor.run(code='print("This is a code!")', language="python")
|
||||
is_kernel_alive = await executor.nb_client.km.is_alive()
|
||||
assert is_kernel_alive
|
||||
await executor.terminate()
|
||||
assert executor.nb_client.km is None
|
||||
assert executor.nb_client.kc is None
|
||||
await executor.terminate()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
|
|
@ -114,3 +109,20 @@ async def test_reset():
|
|||
assert is_kernel_alive
|
||||
await executor.reset()
|
||||
assert executor.nb_client.km is None
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_parse_outputs():
|
||||
executor = ExecuteNbCode()
|
||||
code = """
|
||||
import pandas as pd
|
||||
df = pd.DataFrame({'ID': [1,2,3], 'NAME': ['a', 'b', 'c']})
|
||||
print(df.columns)
|
||||
print(f"columns num:{len(df.columns)}")
|
||||
print(df['DUMMPY_ID'])
|
||||
"""
|
||||
output, is_success = await executor.run(code)
|
||||
assert not is_success
|
||||
assert "Index(['ID', 'NAME'], dtype='object')" in output
|
||||
assert "KeyError: 'DUMMPY_ID'" in output
|
||||
assert "columns num:2" in output
|
||||
|
|
|
|||
|
|
@ -60,18 +60,24 @@ async def test_generate_webpages(mock_webpage_filename_with_styles_and_scripts,
|
|||
async def test_save_webpages_with_styles_and_scripts(mock_webpage_filename_with_styles_and_scripts, image_path):
|
||||
generator = GPTvGenerator()
|
||||
webpages = await generator.generate_webpages(image_path)
|
||||
webpages_dir = generator.save_webpages(image_path=image_path, webpages=webpages)
|
||||
webpages_dir = generator.save_webpages(webpages=webpages, save_folder_name="test_1")
|
||||
logs.logger.info(webpages_dir)
|
||||
assert webpages_dir.exists()
|
||||
assert (webpages_dir / "index.html").exists()
|
||||
assert (webpages_dir / "styles.css").exists()
|
||||
assert (webpages_dir / "scripts.js").exists()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_save_webpages_with_style_and_script(mock_webpage_filename_with_style_and_script, image_path):
|
||||
generator = GPTvGenerator()
|
||||
webpages = await generator.generate_webpages(image_path)
|
||||
webpages_dir = generator.save_webpages(image_path=image_path, webpages=webpages)
|
||||
webpages_dir = generator.save_webpages(webpages=webpages, save_folder_name="test_2")
|
||||
logs.logger.info(webpages_dir)
|
||||
assert webpages_dir.exists()
|
||||
assert (webpages_dir / "index.html").exists()
|
||||
assert (webpages_dir / "style.css").exists()
|
||||
assert (webpages_dir / "script.js").exists()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue