diff --git a/examples/mi/machine_learning.py b/examples/mi/machine_learning.py index a8ab5051e..56c68f69e 100644 --- a/examples/mi/machine_learning.py +++ b/examples/mi/machine_learning.py @@ -2,10 +2,20 @@ import fire from metagpt.roles.mi.interpreter import Interpreter +WINE_REQ = "Run data analysis on sklearn Wine recognition dataset, include a plot, and train a model to predict wine class (20% as validation), and show validation accuracy." -async def main(auto_run: bool = True): - requirement = "Run data analysis on sklearn Wine recognition dataset, include a plot, and train a model to predict wine class (20% as validation), and show validation accuracy." +DATA_DIR = "path/to/your/data" +# sales_forecast data from https://www.kaggle.com/datasets/aslanahmedov/walmart-sales-forecast/data +SALES_FORECAST_REQ = f"""Train a model to predict sales for each department in every store (split the last 40 weeks records as validation dataset, the others is train dataset), include plot total sales trends, print metric and plot scatter plots of +groud truth and predictions on validation data. Dataset is {DATA_DIR}/train.csv, the metric is weighted mean absolute error (WMAE) for test data. Notice: *print* key variables to get more information for next task step. +""" + +REQUIREMENTS = {"wine": WINE_REQ, "sales_forecast": SALES_FORECAST_REQ} + + +async def main(auto_run: bool = True, use_case: str = "wine"): mi = Interpreter(auto_run=auto_run) + requirement = REQUIREMENTS[use_case] await mi.run(requirement) diff --git a/metagpt/actions/mi/execute_nb_code.py b/metagpt/actions/mi/execute_nb_code.py index 02b1fb168..f6a8defbd 100644 --- a/metagpt/actions/mi/execute_nb_code.py +++ b/metagpt/actions/mi/execute_nb_code.py @@ -9,7 +9,6 @@ from __future__ import annotations import asyncio import base64 import re -import traceback from typing import Literal, Tuple import nbformat @@ -58,7 +57,8 @@ class ExecuteNbCode(Action): async def terminate(self): """kill NotebookClient""" - await self.nb_client._async_cleanup_kernel() + if self.nb_client.km is not None: + await self.nb_client._async_cleanup_kernel() async def reset(self): """reset NotebookClient""" @@ -91,17 +91,17 @@ class ExecuteNbCode(Action): else: cell["outputs"].append(new_output(output_type="stream", name="stdout", text=str(output))) - def parse_outputs(self, outputs: list[str]) -> str: + def parse_outputs(self, outputs: list[str], keep_len: int = 2000) -> Tuple[bool, str]: """Parses the outputs received from notebook execution.""" assert isinstance(outputs, list) - parsed_output = "" - + parsed_output, is_success = [], True for i, output in enumerate(outputs): + output_text = "" if output["output_type"] == "stream" and not any( tag in output["text"] for tag in ["| INFO | metagpt", "| ERROR | metagpt", "| WARNING | metagpt", "DEBUG"] ): - parsed_output += output["text"] + output_text = output["text"] elif output["output_type"] == "display_data": if "image/png" in output["data"]: self.show_bytes_figure(output["data"]["image/png"], self.interaction) @@ -110,8 +110,22 @@ class ExecuteNbCode(Action): f"{i}th output['data'] from nbclient outputs dont have image/png, continue next output ..." ) elif output["output_type"] == "execute_result": - parsed_output += output["data"]["text/plain"] - return parsed_output + output_text = output["data"]["text/plain"] + elif output["output_type"] == "error": + output_text, is_success = "\n".join(output["traceback"]), False + + # handle coroutines that are not executed asynchronously + if output_text.strip().startswith(" Tuple[str, bool]: """ @@ -173,16 +187,9 @@ class ExecuteNbCode(Action): # run code cell_index = len(self.nb.cells) - 1 - success, error_message = await self.run_cell(self.nb.cells[-1], cell_index) + success, outputs = await self.run_cell(self.nb.cells[-1], cell_index) - if not success: - return truncate(remove_escape_and_color_codes(error_message), is_success=success) - - # code success - outputs = self.parse_outputs(self.nb.cells[-1].outputs) - outputs, success = truncate(remove_escape_and_color_codes(outputs), is_success=success) - - if "!pip" in outputs: + if "!pip" in code: success = False return outputs, success @@ -196,54 +203,39 @@ class ExecuteNbCode(Action): raise ValueError(f"Only support for language: python, markdown, but got {language}, ") -def truncate(result: str, keep_len: int = 2000, is_success: bool = True): - """对于超出keep_len个字符的result: 执行失败的代码, 展示result后keep_len个字符; 执行成功的代码, 展示result前keep_len个字符。""" - if is_success: - desc = f"Executed code successfully. Truncated to show only first {keep_len} characters\n" - else: - desc = f"Executed code failed, please reflect on the cause of bug and then debug. Truncated to show only last {keep_len} characters\n" - - if result.strip().startswith(" keep_len: - result = result[-keep_len:] if not is_success else result[:keep_len] - return desc + result, is_success - - return result, is_success - - def remove_escape_and_color_codes(input_str: str): - # 使用正则表达式去除转义字符和颜色代码 + # 使用正则表达式去除jupyter notebook输出结果中的转义字符和颜色代码 + # Use regular expressions to get rid of escape characters and color codes in jupyter notebook output. pattern = re.compile(r"\x1b\[[0-9;]*[mK]") result = pattern.sub("", input_str) return result def display_markdown(content: str): - # 使用正则表达式逐个匹配代码块 + # Use regular expressions to match blocks of code one by one. matches = re.finditer(r"```(.+?)```", content, re.DOTALL) start_index = 0 content_panels = [] - # 逐个打印匹配到的文本和代码 + # Set the text background color and text color. + style = "black on white" + # Print the matching text and code one by one. for match in matches: text_content = content[start_index : match.start()].strip() code_content = match.group(0).strip()[3:-3] # Remove triple backticks if text_content: - content_panels.append(Panel(Markdown(text_content), box=MINIMAL)) + content_panels.append(Panel(Markdown(text_content), style=style, box=MINIMAL)) if code_content: - content_panels.append(Panel(Markdown(f"```{code_content}"), box=MINIMAL)) + content_panels.append(Panel(Markdown(f"```{code_content}"), style=style, box=MINIMAL)) start_index = match.end() - # 打印剩余文本(如果有) + # Print remaining text (if any). remaining_text = content[start_index:].strip() if remaining_text: - content_panels.append(Panel(Markdown(remaining_text), box=MINIMAL)) + content_panels.append(Panel(Markdown(remaining_text), style=style, box=MINIMAL)) - # 在Live模式中显示所有Panel + # Display all panels in Live mode. with Live(auto_refresh=False, console=Console(), vertical_overflow="visible") as live: live.update(Group(*content_panels)) live.refresh() diff --git a/metagpt/roles/mi/interpreter.py b/metagpt/roles/mi/interpreter.py index e71514b62..a083f7f7d 100644 --- a/metagpt/roles/mi/interpreter.py +++ b/metagpt/roles/mi/interpreter.py @@ -84,6 +84,10 @@ class Interpreter(Role): code, _, _ = await self._write_and_exec_code() return Message(content=code, role="assistant", cause_by=WriteCodeWithTools) + async def _plan_and_act(self) -> Message: + await super()._plan_and_act() + await self.execute_code.terminate() + async def _act_on_task(self, current_task: Task) -> TaskResult: """Useful in 'plan_and_act' mode. Wrap the output in a TaskResult for review and confirmation.""" code, result, is_success = await self._write_and_exec_code() diff --git a/metagpt/tools/libs/gpt_v_generator.py b/metagpt/tools/libs/gpt_v_generator.py index abf5d1986..0e9f34770 100644 --- a/metagpt/tools/libs/gpt_v_generator.py +++ b/metagpt/tools/libs/gpt_v_generator.py @@ -5,12 +5,13 @@ @Author : mannaandpoem @File : gpt_v_generator.py """ -import os +import re from pathlib import Path from metagpt.const import DEFAULT_WORKSPACE_ROOT +from metagpt.logs import logger from metagpt.tools.tool_registry import register_tool -from metagpt.utils.common import encode_image +from metagpt.utils.common import CodeParser, encode_image ANALYZE_LAYOUT_PROMPT = """You are now a UI/UX designer, please generate layout information for this image: @@ -29,7 +30,7 @@ Now, please generate the corresponding webpage code including HTML, CSS and Java @register_tool(include_functions=["__init__", "generate_webpages", "save_webpages"]) class GPTvGenerator: - """Class for generating webpages at once. + """Class for generating webpage code from a given webpage screenshot. This class provides methods to generate webpages including all code (HTML, CSS, and JavaScript) based on an image. It utilizes a vision model to analyze the layout from an image and generate webpage codes accordingly. @@ -72,50 +73,34 @@ class GPTvGenerator: return await self.llm.aask(msg=prompt, images=[encode_image(image_path)]) @staticmethod - def save_webpages(image_path: str, webpages: str) -> Path: + def save_webpages(webpages: str, save_folder_name: str = "example") -> Path: """Save webpages including all code (HTML, CSS, and JavaScript) at once. Args: - image_path (str): The path of the image file. webpages (str): The generated webpages content. + save_folder_name (str, optional): The name of the folder to save the webpages. Defaults to 'example'. Returns: Path: The path of the saved webpages. """ # Create a folder called webpages in the workspace directory to store HTML, CSS, and JavaScript files - webpages_path = DEFAULT_WORKSPACE_ROOT / "webpages" / Path(image_path).stem - os.makedirs(webpages_path, exist_ok=True) + webpages_path = DEFAULT_WORKSPACE_ROOT / "webpages" / save_folder_name + logger.info(f"code will be saved at {webpages_path}") + webpages_path.mkdir(parents=True, exist_ok=True) index_path = webpages_path / "index.html" - try: - index = webpages.split("```html")[1].split("```")[0] - style_path = None - if "styles.css" in index: - style_path = webpages_path / "styles.css" - elif "style.css" in index: - style_path = webpages_path / "style.css" - style = webpages.split("```css")[1].split("```")[0] if style_path else "" + index_path.write_text(CodeParser.parse_code(block=None, text=webpages, lang="html")) - js_path = None - if "scripts.js" in index: - js_path = webpages_path / "scripts.js" - elif "script.js" in index: - js_path = webpages_path / "script.js" + extract_and_save_code(folder=webpages_path, text=webpages, pattern="styles?.css", language="css") - js = webpages.split("```javascript")[1].split("```")[0] if js_path else "" - except IndexError: - raise ValueError(f"No html or css or js code found in the result. \nWebpages: {webpages}") - - try: - with open(index_path, "w", encoding="utf-8") as f: - f.write(index) - if style_path: - with open(style_path, "w", encoding="utf-8") as f: - f.write(style) - if js_path: - with open(js_path, "w", encoding="utf-8") as f: - f.write(js) - except FileNotFoundError as e: - raise FileNotFoundError(f"Cannot save the webpages to {str(webpages_path)}") from e + extract_and_save_code(folder=webpages_path, text=webpages, pattern="scripts?.js", language="javascript") return webpages_path + + +def extract_and_save_code(folder, text, pattern, language): + word = re.search(pattern, text) + if word: + path = folder / word.group(0) + code = CodeParser.parse_code(block=None, text=text, lang=language) + path.write_text(code, encoding="utf-8") diff --git a/tests/metagpt/actions/mi/test_execute_nb_code.py b/tests/metagpt/actions/mi/test_execute_nb_code.py index 59a814054..2ecfbd2a2 100644 --- a/tests/metagpt/actions/mi/test_execute_nb_code.py +++ b/tests/metagpt/actions/mi/test_execute_nb_code.py @@ -1,6 +1,6 @@ import pytest -from metagpt.actions.mi.execute_nb_code import ExecuteNbCode, truncate +from metagpt.actions.mi.execute_nb_code import ExecuteNbCode @pytest.mark.asyncio @@ -54,21 +54,6 @@ async def test_plotting_code(): assert is_success -def test_truncate(): - # 代码执行成功 - output, is_success = truncate("hello world", 5, True) - assert "Truncated to show only first 5 characters\nhello" in output - assert is_success - # 代码执行失败 - output, is_success = truncate("hello world", 5, False) - assert "Truncated to show only last 5 characters\nworld" in output - assert not is_success - # 异步 - output, is_success = truncate("