diff --git a/examples/di/fix_github_issue.py b/examples/di/fix_github_issue.py new file mode 100644 index 000000000..8e9685e5e --- /dev/null +++ b/examples/di/fix_github_issue.py @@ -0,0 +1,33 @@ +# This is a real issue from MetaGPT: https://github.com/geekan/MetaGPT/issues/1067 +# with corresponding bugfix as https://github.com/geekan/MetaGPT/pull/1069 +# We demonstrate that DataInterpreter has the capability to fix such issues. +# Prerequisite: You need to manually add back the bug in your local file metagpt/utils/repair_llm_raw_output.py +# to test the DataInterpreter's issue solving ability. + +import asyncio + +from metagpt.roles.di.data_interpreter import DataInterpreter + +REQ = """ +# Requirement +Below is a github issue, solve it. Use FileManager to search for the function, understand it, and modify the relevant code. +Write a new test file test.py with FileManager and use Terminal to python the test file to ensure you have fixed the issue. +When writing test.py, you should import the function from the file you modified and test it with the given input. +Notice: Don't write all codes in one response, each time, just write code for one step. + +# Issue +>> s = "-1" +>> print(extract_state_value_from_output(s)) +>> 1 +The extract_state_value_from_output function will process -1 into 1, +resulted in an infinite loop for the react mode. +""" + + +async def main(): + di = DataInterpreter(tools=["Terminal", "FileManager"], react_mode="react") + await di.run(REQ) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/metagpt/actions/di/detect_intent.py b/metagpt/actions/di/detect_intent.py new file mode 100644 index 000000000..8f56f4ae8 --- /dev/null +++ b/metagpt/actions/di/detect_intent.py @@ -0,0 +1,120 @@ +from __future__ import annotations + +import asyncio +from enum import Enum +from typing import Tuple + +from pydantic import BaseModel + +from metagpt.actions import Action + + +class SOPItemDef(BaseModel): + """ + Represents an item in a Standard Operating Procedure (SOP). + + Attributes: + name (str): name of the SOP item. + description (str): The description or title of the SOP. + sop (List[str]): The steps or details of the SOP. + """ + + name: str + description: str + sop: list[str] = [] + + +class SOPItem(Enum): + SOFTWARE_DEVELOPMENT = SOPItemDef( + name="software development", + description="Intentions related to or including software development, such as developing or building software, games, app, websites, etc. Excluding bug fixes, report any issues.", + sop=[ + "Writes a PRD based on software requirements.", + "Writes a design to the project repository, based on the PRD of the project.", + "Writes a project plan to the project repository, based on the design of the project.", + "Writes code to implement designed features according to the project plan and adds them to the project repository.", + # "Run QA test on the project repository.", + "Stage and commit changes for the project repository using Git.", + ], + ) + FIX_BUGS = SOPItemDef( + name="fix bugs", + description="Fix bugs in a given project.", + sop=[ + "Fix bugs in the project repository.", + "Stage and commit changes for the project repository using Git.", + ], + ) + FORMAT_REPO = SOPItemDef( + name="format repo", + description="download repository from git and format the project to MetaGPT project", + sop=[ + "Imports a project from a Git website and formats it to MetaGPT project format to enable incremental appending requirements.", + "Stage and commit changes for the project repository using Git.", + ], + ) + OTHER = SOPItemDef( + name="other", + description="Other intentions that do not fall into the above categories, including data science, machine learning, deep learning, etc.", + sop=[], + ) + + @property + def type_name(self): + return self.value.name + + @classmethod + def get_type(cls, type_name): + for member in cls: + if member.type_name == type_name: + return member.value + return None + + +DETECT_PROMPT = """ +# User Requirement +{user_requirement} +# Intentions +{intentions} +# Task +Classify user requirement into one type of the above intentions, output the name of the intention directly. +Intention name: +""" + +REQ_WITH_SOP = """ +{user_requirement} +You should follow the following Standard Operating Procedure: +{sop} +""" + + +class DetectIntent(Action): + async def run(self, user_requirement: str) -> Tuple[str, str]: + intentions = "\n".join([f"{si.type_name}: {si.value.description}" for si in SOPItem]) + prompt = DETECT_PROMPT.format(user_requirement=user_requirement, intentions=intentions) + + sop_type = await self._aask(prompt) + sop_type = sop_type.strip() + + sop = SOPItem.get_type(sop_type).sop + + req_with_sop = ( + REQ_WITH_SOP.format(user_requirement=user_requirement, sop="\n".join(sop)) if sop else user_requirement + ) + + return req_with_sop, sop_type + + +async def main(): + # Example usage of the DetectIntent action + user_requirements = ["Develop a 2048 game.", "Run data analysis on sklearn wine dataset"] + detect_intent = DetectIntent() + + for user_requirement in user_requirements: + req_with_sop, sop_type = await detect_intent.run(user_requirement) + print(req_with_sop) + print(f"Detected SOP Type: {sop_type}") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/metagpt/tools/libs/file_manager.py b/metagpt/tools/libs/file_manager.py new file mode 100644 index 000000000..c70f5ec72 --- /dev/null +++ b/metagpt/tools/libs/file_manager.py @@ -0,0 +1,108 @@ +import os + +from pydantic import BaseModel + +from metagpt.tools.tool_registry import register_tool + + +class FileBlock(BaseModel): + file_path: str + block_content: str + block_start_line: int + block_end_line: int + symbol: str = "" + symbol_line: int = -1 + + +@register_tool() +class FileManager: + """A tool for handling file io, read or write into files""" + + def write(self, path: str, content: str): + """Write the whole content to a file.""" + with open(path, "w") as f: + f.write(content) + + def read(self, path: str) -> str: + """Read the whole content of a file.""" + with open(path, "r") as f: + return f.read() + + def search_content(self, symbol: str, root_path: str = "", window: int = 20) -> FileBlock: + """ + Search symbol in all files under root_path, return the context of symbol with window size + Useful for locating class or function in a large codebase. Example symbol can be "def some_function", "class SomeClass", etc. + + Args: + symbol (str): The symbol to search. + root_path (str, optional): The root path to search in. If not provided, search in the current directory. Defaults to "". + window (int, optional): The window size to return. + + Returns: + FileBlock: The block containing the symbol, a pydantic BaseModel with the schema below. + class FileBlock(BaseModel): + file_path: str + block_content: str + block_start_line: int + block_end_line: int + symbol: str = "" + symbol_line: int = -1 + """ + for root, _, files in os.walk(root_path or "."): + for file in files: + file_path = os.path.join(root, file) + if not file.endswith(".py"): + continue + with open(file_path, "r", encoding="utf-8") as f: + try: + lines = f.readlines() + except UnicodeDecodeError: + continue + for i, line in enumerate(lines): + if symbol in line: + start = max(i - window, 0) + end = min(i + window, len(lines) - 1) + block_content = "".join(lines[start : end + 1]) + return FileBlock( + file_path=file_path, + block_content=block_content, + block_start_line=start + 1, + block_end_line=end + 1, + symbol=symbol, + symbol_line=i + 1, + ) + return None + + def write_content(self, file_path: str, start_line: int, end_line: int, new_block_content: str = ""): + """ + Write a new block of content into a file. Use this method to update a block of code in a file. There are several cases: + 1. If the new block content is empty, the original block will be deleted. + 2. If the new block content is not empty and end_line >= start_line, the original block from start_line to end_line (both inclusively) will be replaced by the new block content. + 3. If the new block content is not empty and end_line < start_line (e.g. set end_line = -1) the new block content will be inserted at start_line. + + Args: + file_path (str): The file path to write the new block content. + start_line (int): start line of the original block to be updated. + end_line (int): end line of the original block to be updated. + new_block_content (str): The new block content to write. + """ + with open(file_path, "r") as file: + lines = file.readlines() + + start_line_index = start_line - 1 # Adjusting because list indices start at 0 + end_line_index = end_line + + if new_block_content: + # Split the new_block_content by newline and ensure each line ends with a newline character + new_content_lines = [line + "\n" for line in new_block_content.split("\n")] + if end_line >= start_line: + # This replaces the block between start_line and end_line with new_block_content + # irrespective of the length difference between the original and new content. + lines[start_line_index:end_line_index] = new_content_lines + else: + lines.insert(start_line_index, "\n".join(new_content_lines)) + else: + del lines[start_line_index:end_line_index] + + with open(file_path, "w") as file: + file.writelines(lines) diff --git a/tests/data/tools/test_script_for_file_manager.py b/tests/data/tools/test_script_for_file_manager.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/metagpt/actions/di/test_detect_intent.py b/tests/metagpt/actions/di/test_detect_intent.py new file mode 100644 index 000000000..7c9cf9eba --- /dev/null +++ b/tests/metagpt/actions/di/test_detect_intent.py @@ -0,0 +1,55 @@ +import pytest + +from metagpt.actions.di.detect_intent import DetectIntent + +SOFTWARE_DEV_REQ1 = """ +I'd like to create a personalized website that features the 'Game of Life' simulation. +""" + +SOFTWARE_DEV_REQ2 = """ +Create a website widget for TODO list management. +""" + +SOFTWARE_DEV_REQ3 = """ +Create an official website with a top bar, banner, About Us section, and footer. +""" + +DI_REQ1 = """ +can you finetune a 78 Llama model using https://github.com/huggingface/peft should be instructions in the Readme. +""" + +DI_REQ2 = """ +I came across a blog post on the website Mafengwo (https://www.mafengwo.cn/i/17171539.html) that discusses the possibility of generating images with hidden text. The post refers to a script that can be used for this purpose. Could you help me set up this script and use it to generate some images? I would like the images to have the hidden text 'MAX' and also some with 'MetaGPT' as the hidden text. +""" + +DI_REQ3 = """ +Extract all of the blog posts from `https://stripe.com/blog/page/1` and return a CSV with the columns `date`, `article_text`, `author` and `summary`. Generate a summary for each article yourself. +""" + +FIX_BUG_REQ = """ +Fix this error from the 2048 game repo: TypeError: __init__() takes 1 positional argument but 2 were given" +""" + +FORMAT_REPO_REQ = """ +git clone 'https://github.com/spec-first/connexion' and format to MetaGPT project +""" + + +@pytest.mark.asyncio +@pytest.mark.parametrize( + "requirement, expected_intent_type", + [ + (SOFTWARE_DEV_REQ1, "software development"), + (SOFTWARE_DEV_REQ2, "software development"), + (SOFTWARE_DEV_REQ3, "software development"), + (DI_REQ1, "other"), + (DI_REQ2, "other"), + (DI_REQ3, "other"), + (FIX_BUG_REQ, "fix bugs"), + (FORMAT_REPO_REQ, "format repo"), + ], +) +async def test_detect_intent(requirement, expected_intent_type): + di = DetectIntent() + _, intent_type = await di.run(requirement) + assert intent_type == expected_intent_type diff --git a/tests/metagpt/tools/libs/test_file_manager.py b/tests/metagpt/tools/libs/test_file_manager.py new file mode 100644 index 000000000..4e9cdbecd --- /dev/null +++ b/tests/metagpt/tools/libs/test_file_manager.py @@ -0,0 +1,102 @@ +import pytest + +from metagpt.const import TEST_DATA_PATH +from metagpt.tools.libs.file_manager import FileBlock, FileManager + +TEST_FILE_CONTENT = """ +# this is line one +def test_function_for_fm(): + "some docstring" + a = 1 + b = 2 + c = 3 + # this is the 7th line +""".strip() + +TEST_FILE_PATH = TEST_DATA_PATH / "tools/test_script_for_file_manager.py" + + +@pytest.fixture +def test_file(): + with open(TEST_FILE_PATH, "w") as f: + f.write(TEST_FILE_CONTENT) + yield + with open(TEST_FILE_PATH, "w") as f: + f.write("") + + +EXPECTED_SEARCHED_BLOCK = FileBlock( + file_path=str(TEST_FILE_PATH), + block_content='# this is line one\ndef test_function_for_fm():\n "some docstring"\n a = 1\n b = 2\n', + block_start_line=1, + block_end_line=5, + symbol="def test_function_for_fm", + symbol_line=2, +) + + +def test_search_content(test_file): + block = FileManager().search_content("def test_function_for_fm", root_path=TEST_DATA_PATH, window=3) + assert block == EXPECTED_SEARCHED_BLOCK + + +EXPECTED_CONTENT_AFTER_REPLACE = """ +# this is line one +def test_function_for_fm(): + This is the new line A replacing lines 3 to 5. + This is the new line B. + c = 3 + # this is the 7th line +""".strip() + + +def test_replace_content(test_file): + FileManager().write_content( + file_path=str(TEST_FILE_PATH), + start_line=3, + end_line=5, + new_block_content=" This is the new line A replacing lines 3 to 5.\n This is the new line B.", + ) + with open(TEST_FILE_PATH, "r") as f: + new_content = f.read() + print(new_content) + assert new_content == EXPECTED_CONTENT_AFTER_REPLACE + + +EXPECTED_CONTENT_AFTER_DELETE = """ +# this is line one +def test_function_for_fm(): + c = 3 + # this is the 7th line +""".strip() + + +def test_delete_content(test_file): + FileManager().write_content(file_path=str(TEST_FILE_PATH), start_line=3, end_line=5) + with open(TEST_FILE_PATH, "r") as f: + new_content = f.read() + assert new_content == EXPECTED_CONTENT_AFTER_DELETE + + +EXPECTED_CONTENT_AFTER_INSERT = """ +# this is line one +def test_function_for_fm(): + This is the new line to be inserted, at line 3 + "some docstring" + a = 1 + b = 2 + c = 3 + # this is the 7th line +""".strip() + + +def test_insert_content(test_file): + FileManager().write_content( + file_path=str(TEST_FILE_PATH), + start_line=3, + end_line=-1, + new_block_content=" This is the new line to be inserted, at line 3", + ) + with open(TEST_FILE_PATH, "r") as f: + new_content = f.read() + assert new_content == EXPECTED_CONTENT_AFTER_INSERT