diff --git a/examples/di/fix_github_issue.py b/examples/di/fix_github_issue.py new file mode 100644 index 000000000..8e9685e5e --- /dev/null +++ b/examples/di/fix_github_issue.py @@ -0,0 +1,33 @@ +# This is a real issue from MetaGPT: https://github.com/geekan/MetaGPT/issues/1067 +# with corresponding bugfix as https://github.com/geekan/MetaGPT/pull/1069 +# We demonstrate that DataInterpreter has the capability to fix such issues. +# Prerequisite: You need to manually add back the bug in your local file metagpt/utils/repair_llm_raw_output.py +# to test the DataInterpreter's issue solving ability. + +import asyncio + +from metagpt.roles.di.data_interpreter import DataInterpreter + +REQ = """ +# Requirement +Below is a github issue, solve it. Use FileManager to search for the function, understand it, and modify the relevant code. +Write a new test file test.py with FileManager and use Terminal to python the test file to ensure you have fixed the issue. +When writing test.py, you should import the function from the file you modified and test it with the given input. +Notice: Don't write all codes in one response, each time, just write code for one step. + +# Issue +>> s = "-1" +>> print(extract_state_value_from_output(s)) +>> 1 +The extract_state_value_from_output function will process -1 into 1, +resulted in an infinite loop for the react mode. +""" + + +async def main(): + di = DataInterpreter(tools=["Terminal", "FileManager"], react_mode="react") + await di.run(REQ) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/metagpt/tools/libs/file_manager.py b/metagpt/tools/libs/file_manager.py new file mode 100644 index 000000000..c70f5ec72 --- /dev/null +++ b/metagpt/tools/libs/file_manager.py @@ -0,0 +1,108 @@ +import os + +from pydantic import BaseModel + +from metagpt.tools.tool_registry import register_tool + + +class FileBlock(BaseModel): + file_path: str + block_content: str + block_start_line: int + block_end_line: int + symbol: str = "" + symbol_line: int = -1 + + +@register_tool() +class FileManager: + """A tool for handling file io, read or write into files""" + + def write(self, path: str, content: str): + """Write the whole content to a file.""" + with open(path, "w") as f: + f.write(content) + + def read(self, path: str) -> str: + """Read the whole content of a file.""" + with open(path, "r") as f: + return f.read() + + def search_content(self, symbol: str, root_path: str = "", window: int = 20) -> FileBlock: + """ + Search symbol in all files under root_path, return the context of symbol with window size + Useful for locating class or function in a large codebase. Example symbol can be "def some_function", "class SomeClass", etc. + + Args: + symbol (str): The symbol to search. + root_path (str, optional): The root path to search in. If not provided, search in the current directory. Defaults to "". + window (int, optional): The window size to return. + + Returns: + FileBlock: The block containing the symbol, a pydantic BaseModel with the schema below. + class FileBlock(BaseModel): + file_path: str + block_content: str + block_start_line: int + block_end_line: int + symbol: str = "" + symbol_line: int = -1 + """ + for root, _, files in os.walk(root_path or "."): + for file in files: + file_path = os.path.join(root, file) + if not file.endswith(".py"): + continue + with open(file_path, "r", encoding="utf-8") as f: + try: + lines = f.readlines() + except UnicodeDecodeError: + continue + for i, line in enumerate(lines): + if symbol in line: + start = max(i - window, 0) + end = min(i + window, len(lines) - 1) + block_content = "".join(lines[start : end + 1]) + return FileBlock( + file_path=file_path, + block_content=block_content, + block_start_line=start + 1, + block_end_line=end + 1, + symbol=symbol, + symbol_line=i + 1, + ) + return None + + def write_content(self, file_path: str, start_line: int, end_line: int, new_block_content: str = ""): + """ + Write a new block of content into a file. Use this method to update a block of code in a file. There are several cases: + 1. If the new block content is empty, the original block will be deleted. + 2. If the new block content is not empty and end_line >= start_line, the original block from start_line to end_line (both inclusively) will be replaced by the new block content. + 3. If the new block content is not empty and end_line < start_line (e.g. set end_line = -1) the new block content will be inserted at start_line. + + Args: + file_path (str): The file path to write the new block content. + start_line (int): start line of the original block to be updated. + end_line (int): end line of the original block to be updated. + new_block_content (str): The new block content to write. + """ + with open(file_path, "r") as file: + lines = file.readlines() + + start_line_index = start_line - 1 # Adjusting because list indices start at 0 + end_line_index = end_line + + if new_block_content: + # Split the new_block_content by newline and ensure each line ends with a newline character + new_content_lines = [line + "\n" for line in new_block_content.split("\n")] + if end_line >= start_line: + # This replaces the block between start_line and end_line with new_block_content + # irrespective of the length difference between the original and new content. + lines[start_line_index:end_line_index] = new_content_lines + else: + lines.insert(start_line_index, "\n".join(new_content_lines)) + else: + del lines[start_line_index:end_line_index] + + with open(file_path, "w") as file: + file.writelines(lines) diff --git a/tests/data/tools/test_script_for_file_manager.py b/tests/data/tools/test_script_for_file_manager.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/metagpt/tools/libs/test_file_manager.py b/tests/metagpt/tools/libs/test_file_manager.py new file mode 100644 index 000000000..4e9cdbecd --- /dev/null +++ b/tests/metagpt/tools/libs/test_file_manager.py @@ -0,0 +1,102 @@ +import pytest + +from metagpt.const import TEST_DATA_PATH +from metagpt.tools.libs.file_manager import FileBlock, FileManager + +TEST_FILE_CONTENT = """ +# this is line one +def test_function_for_fm(): + "some docstring" + a = 1 + b = 2 + c = 3 + # this is the 7th line +""".strip() + +TEST_FILE_PATH = TEST_DATA_PATH / "tools/test_script_for_file_manager.py" + + +@pytest.fixture +def test_file(): + with open(TEST_FILE_PATH, "w") as f: + f.write(TEST_FILE_CONTENT) + yield + with open(TEST_FILE_PATH, "w") as f: + f.write("") + + +EXPECTED_SEARCHED_BLOCK = FileBlock( + file_path=str(TEST_FILE_PATH), + block_content='# this is line one\ndef test_function_for_fm():\n "some docstring"\n a = 1\n b = 2\n', + block_start_line=1, + block_end_line=5, + symbol="def test_function_for_fm", + symbol_line=2, +) + + +def test_search_content(test_file): + block = FileManager().search_content("def test_function_for_fm", root_path=TEST_DATA_PATH, window=3) + assert block == EXPECTED_SEARCHED_BLOCK + + +EXPECTED_CONTENT_AFTER_REPLACE = """ +# this is line one +def test_function_for_fm(): + This is the new line A replacing lines 3 to 5. + This is the new line B. + c = 3 + # this is the 7th line +""".strip() + + +def test_replace_content(test_file): + FileManager().write_content( + file_path=str(TEST_FILE_PATH), + start_line=3, + end_line=5, + new_block_content=" This is the new line A replacing lines 3 to 5.\n This is the new line B.", + ) + with open(TEST_FILE_PATH, "r") as f: + new_content = f.read() + print(new_content) + assert new_content == EXPECTED_CONTENT_AFTER_REPLACE + + +EXPECTED_CONTENT_AFTER_DELETE = """ +# this is line one +def test_function_for_fm(): + c = 3 + # this is the 7th line +""".strip() + + +def test_delete_content(test_file): + FileManager().write_content(file_path=str(TEST_FILE_PATH), start_line=3, end_line=5) + with open(TEST_FILE_PATH, "r") as f: + new_content = f.read() + assert new_content == EXPECTED_CONTENT_AFTER_DELETE + + +EXPECTED_CONTENT_AFTER_INSERT = """ +# this is line one +def test_function_for_fm(): + This is the new line to be inserted, at line 3 + "some docstring" + a = 1 + b = 2 + c = 3 + # this is the 7th line +""".strip() + + +def test_insert_content(test_file): + FileManager().write_content( + file_path=str(TEST_FILE_PATH), + start_line=3, + end_line=-1, + new_block_content=" This is the new line to be inserted, at line 3", + ) + with open(TEST_FILE_PATH, "r") as f: + new_content = f.read() + assert new_content == EXPECTED_CONTENT_AFTER_INSERT