mirror of
https://github.com/FoundationAgents/MetaGPT.git
synced 2026-05-02 04:12:45 +02:00
add FileManager and provide issue fixing example
This commit is contained in:
parent
5e34038831
commit
8648a7646e
4 changed files with 243 additions and 0 deletions
33
examples/di/fix_github_issue.py
Normal file
33
examples/di/fix_github_issue.py
Normal file
|
|
@ -0,0 +1,33 @@
|
|||
# This is a real issue from MetaGPT: https://github.com/geekan/MetaGPT/issues/1067
|
||||
# with corresponding bugfix as https://github.com/geekan/MetaGPT/pull/1069
|
||||
# We demonstrate that DataInterpreter has the capability to fix such issues.
|
||||
# Prerequisite: You need to manually add back the bug in your local file metagpt/utils/repair_llm_raw_output.py
|
||||
# to test the DataInterpreter's issue solving ability.
|
||||
|
||||
import asyncio
|
||||
|
||||
from metagpt.roles.di.data_interpreter import DataInterpreter
|
||||
|
||||
REQ = """
|
||||
# Requirement
|
||||
Below is a github issue, solve it. Use FileManager to search for the function, understand it, and modify the relevant code.
|
||||
Write a new test file test.py with FileManager and use Terminal to python the test file to ensure you have fixed the issue.
|
||||
When writing test.py, you should import the function from the file you modified and test it with the given input.
|
||||
Notice: Don't write all codes in one response, each time, just write code for one step.
|
||||
|
||||
# Issue
|
||||
>> s = "-1"
|
||||
>> print(extract_state_value_from_output(s))
|
||||
>> 1
|
||||
The extract_state_value_from_output function will process -1 into 1,
|
||||
resulted in an infinite loop for the react mode.
|
||||
"""
|
||||
|
||||
|
||||
async def main():
|
||||
di = DataInterpreter(tools=["Terminal", "FileManager"], react_mode="react")
|
||||
await di.run(REQ)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
108
metagpt/tools/libs/file_manager.py
Normal file
108
metagpt/tools/libs/file_manager.py
Normal file
|
|
@ -0,0 +1,108 @@
|
|||
import os
|
||||
|
||||
from pydantic import BaseModel
|
||||
|
||||
from metagpt.tools.tool_registry import register_tool
|
||||
|
||||
|
||||
class FileBlock(BaseModel):
|
||||
file_path: str
|
||||
block_content: str
|
||||
block_start_line: int
|
||||
block_end_line: int
|
||||
symbol: str = ""
|
||||
symbol_line: int = -1
|
||||
|
||||
|
||||
@register_tool()
|
||||
class FileManager:
|
||||
"""A tool for handling file io, read or write into files"""
|
||||
|
||||
def write(self, path: str, content: str):
|
||||
"""Write the whole content to a file."""
|
||||
with open(path, "w") as f:
|
||||
f.write(content)
|
||||
|
||||
def read(self, path: str) -> str:
|
||||
"""Read the whole content of a file."""
|
||||
with open(path, "r") as f:
|
||||
return f.read()
|
||||
|
||||
def search_content(self, symbol: str, root_path: str = "", window: int = 20) -> FileBlock:
|
||||
"""
|
||||
Search symbol in all files under root_path, return the context of symbol with window size
|
||||
Useful for locating class or function in a large codebase. Example symbol can be "def some_function", "class SomeClass", etc.
|
||||
|
||||
Args:
|
||||
symbol (str): The symbol to search.
|
||||
root_path (str, optional): The root path to search in. If not provided, search in the current directory. Defaults to "".
|
||||
window (int, optional): The window size to return.
|
||||
|
||||
Returns:
|
||||
FileBlock: The block containing the symbol, a pydantic BaseModel with the schema below.
|
||||
class FileBlock(BaseModel):
|
||||
file_path: str
|
||||
block_content: str
|
||||
block_start_line: int
|
||||
block_end_line: int
|
||||
symbol: str = ""
|
||||
symbol_line: int = -1
|
||||
"""
|
||||
for root, _, files in os.walk(root_path or "."):
|
||||
for file in files:
|
||||
file_path = os.path.join(root, file)
|
||||
if not file.endswith(".py"):
|
||||
continue
|
||||
with open(file_path, "r", encoding="utf-8") as f:
|
||||
try:
|
||||
lines = f.readlines()
|
||||
except UnicodeDecodeError:
|
||||
continue
|
||||
for i, line in enumerate(lines):
|
||||
if symbol in line:
|
||||
start = max(i - window, 0)
|
||||
end = min(i + window, len(lines) - 1)
|
||||
block_content = "".join(lines[start : end + 1])
|
||||
return FileBlock(
|
||||
file_path=file_path,
|
||||
block_content=block_content,
|
||||
block_start_line=start + 1,
|
||||
block_end_line=end + 1,
|
||||
symbol=symbol,
|
||||
symbol_line=i + 1,
|
||||
)
|
||||
return None
|
||||
|
||||
def write_content(self, file_path: str, start_line: int, end_line: int, new_block_content: str = ""):
|
||||
"""
|
||||
Write a new block of content into a file. Use this method to update a block of code in a file. There are several cases:
|
||||
1. If the new block content is empty, the original block will be deleted.
|
||||
2. If the new block content is not empty and end_line >= start_line, the original block from start_line to end_line (both inclusively) will be replaced by the new block content.
|
||||
3. If the new block content is not empty and end_line < start_line (e.g. set end_line = -1) the new block content will be inserted at start_line.
|
||||
|
||||
Args:
|
||||
file_path (str): The file path to write the new block content.
|
||||
start_line (int): start line of the original block to be updated.
|
||||
end_line (int): end line of the original block to be updated.
|
||||
new_block_content (str): The new block content to write.
|
||||
"""
|
||||
with open(file_path, "r") as file:
|
||||
lines = file.readlines()
|
||||
|
||||
start_line_index = start_line - 1 # Adjusting because list indices start at 0
|
||||
end_line_index = end_line
|
||||
|
||||
if new_block_content:
|
||||
# Split the new_block_content by newline and ensure each line ends with a newline character
|
||||
new_content_lines = [line + "\n" for line in new_block_content.split("\n")]
|
||||
if end_line >= start_line:
|
||||
# This replaces the block between start_line and end_line with new_block_content
|
||||
# irrespective of the length difference between the original and new content.
|
||||
lines[start_line_index:end_line_index] = new_content_lines
|
||||
else:
|
||||
lines.insert(start_line_index, "\n".join(new_content_lines))
|
||||
else:
|
||||
del lines[start_line_index:end_line_index]
|
||||
|
||||
with open(file_path, "w") as file:
|
||||
file.writelines(lines)
|
||||
0
tests/data/tools/test_script_for_file_manager.py
Normal file
0
tests/data/tools/test_script_for_file_manager.py
Normal file
102
tests/metagpt/tools/libs/test_file_manager.py
Normal file
102
tests/metagpt/tools/libs/test_file_manager.py
Normal file
|
|
@ -0,0 +1,102 @@
|
|||
import pytest
|
||||
|
||||
from metagpt.const import TEST_DATA_PATH
|
||||
from metagpt.tools.libs.file_manager import FileBlock, FileManager
|
||||
|
||||
TEST_FILE_CONTENT = """
|
||||
# this is line one
|
||||
def test_function_for_fm():
|
||||
"some docstring"
|
||||
a = 1
|
||||
b = 2
|
||||
c = 3
|
||||
# this is the 7th line
|
||||
""".strip()
|
||||
|
||||
TEST_FILE_PATH = TEST_DATA_PATH / "tools/test_script_for_file_manager.py"
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def test_file():
|
||||
with open(TEST_FILE_PATH, "w") as f:
|
||||
f.write(TEST_FILE_CONTENT)
|
||||
yield
|
||||
with open(TEST_FILE_PATH, "w") as f:
|
||||
f.write("")
|
||||
|
||||
|
||||
EXPECTED_SEARCHED_BLOCK = FileBlock(
|
||||
file_path=str(TEST_FILE_PATH),
|
||||
block_content='# this is line one\ndef test_function_for_fm():\n "some docstring"\n a = 1\n b = 2\n',
|
||||
block_start_line=1,
|
||||
block_end_line=5,
|
||||
symbol="def test_function_for_fm",
|
||||
symbol_line=2,
|
||||
)
|
||||
|
||||
|
||||
def test_search_content(test_file):
|
||||
block = FileManager().search_content("def test_function_for_fm", root_path=TEST_DATA_PATH, window=3)
|
||||
assert block == EXPECTED_SEARCHED_BLOCK
|
||||
|
||||
|
||||
EXPECTED_CONTENT_AFTER_REPLACE = """
|
||||
# this is line one
|
||||
def test_function_for_fm():
|
||||
This is the new line A replacing lines 3 to 5.
|
||||
This is the new line B.
|
||||
c = 3
|
||||
# this is the 7th line
|
||||
""".strip()
|
||||
|
||||
|
||||
def test_replace_content(test_file):
|
||||
FileManager().write_content(
|
||||
file_path=str(TEST_FILE_PATH),
|
||||
start_line=3,
|
||||
end_line=5,
|
||||
new_block_content=" This is the new line A replacing lines 3 to 5.\n This is the new line B.",
|
||||
)
|
||||
with open(TEST_FILE_PATH, "r") as f:
|
||||
new_content = f.read()
|
||||
print(new_content)
|
||||
assert new_content == EXPECTED_CONTENT_AFTER_REPLACE
|
||||
|
||||
|
||||
EXPECTED_CONTENT_AFTER_DELETE = """
|
||||
# this is line one
|
||||
def test_function_for_fm():
|
||||
c = 3
|
||||
# this is the 7th line
|
||||
""".strip()
|
||||
|
||||
|
||||
def test_delete_content(test_file):
|
||||
FileManager().write_content(file_path=str(TEST_FILE_PATH), start_line=3, end_line=5)
|
||||
with open(TEST_FILE_PATH, "r") as f:
|
||||
new_content = f.read()
|
||||
assert new_content == EXPECTED_CONTENT_AFTER_DELETE
|
||||
|
||||
|
||||
EXPECTED_CONTENT_AFTER_INSERT = """
|
||||
# this is line one
|
||||
def test_function_for_fm():
|
||||
This is the new line to be inserted, at line 3
|
||||
"some docstring"
|
||||
a = 1
|
||||
b = 2
|
||||
c = 3
|
||||
# this is the 7th line
|
||||
""".strip()
|
||||
|
||||
|
||||
def test_insert_content(test_file):
|
||||
FileManager().write_content(
|
||||
file_path=str(TEST_FILE_PATH),
|
||||
start_line=3,
|
||||
end_line=-1,
|
||||
new_block_content=" This is the new line to be inserted, at line 3",
|
||||
)
|
||||
with open(TEST_FILE_PATH, "r") as f:
|
||||
new_content = f.read()
|
||||
assert new_content == EXPECTED_CONTENT_AFTER_INSERT
|
||||
Loading…
Add table
Add a link
Reference in a new issue