mirror of
https://github.com/FoundationAgents/MetaGPT.git
synced 2026-06-08 15:05:17 +02:00
Merge branch 'di_mgx' into 'mgx_ops'
add linter to process edit content for FileManager See merge request pub/MetaGPT!19
This commit is contained in:
commit
6ff2767e25
6 changed files with 122 additions and 34 deletions
|
|
@ -1,8 +1,7 @@
|
|||
# This is a real issue from MetaGPT: https://github.com/geekan/MetaGPT/issues/1067
|
||||
# with corresponding bugfix as https://github.com/geekan/MetaGPT/pull/1069
|
||||
# We demonstrate that DataInterpreter has the capability to fix such issues.
|
||||
# Prerequisite: You need to manually add back the bug in your local file metagpt/utils/repair_llm_raw_output.py
|
||||
# to test the DataInterpreter's issue solving ability.
|
||||
"""This example is from a real issue from MetaGPT: https://github.com/geekan/MetaGPT/issues/1067 with corresponding bugfix as https://github.com/geekan/MetaGPT/pull/1069
|
||||
We demonstrate that DataInterpreter has the capability to fix such issues.
|
||||
Prerequisite: You need to manually add the bug back to your local file metagpt/utils/repair_llm_raw_output.py to test DataInterpreter's debugging ability. For detail, please check the issue and PR link above.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
|
||||
|
|
|
|||
|
|
@ -11,6 +11,7 @@ from __future__ import annotations
|
|||
import sys
|
||||
from datetime import datetime
|
||||
from functools import partial
|
||||
from typing import Any
|
||||
|
||||
from loguru import logger as _logger
|
||||
from pydantic import BaseModel, Field
|
||||
|
|
@ -21,7 +22,7 @@ from metagpt.const import METAGPT_ROOT
|
|||
class ToolLogItem(BaseModel):
|
||||
type_: str = Field(alias="type", default="str", description="Data type of `value` field.")
|
||||
name: str
|
||||
value: str
|
||||
value: Any
|
||||
|
||||
|
||||
TOOL_LOG_END_MARKER = ToolLogItem(
|
||||
|
|
@ -66,4 +67,6 @@ def set_tool_output_logfunc(func):
|
|||
_llm_stream_log = partial(print, end="")
|
||||
|
||||
|
||||
_tool_output_log = lambda output, tool_name: print(output)
|
||||
_tool_output_log = (
|
||||
lambda *args, **kwargs: None
|
||||
) # a dummy function to avoid errors if set_tool_output_logfunc is not called
|
||||
|
|
|
|||
|
|
@ -434,13 +434,6 @@ class Plan(BaseModel):
|
|||
final_tasks = self.tasks[:prefix_length] + new_tasks[prefix_length:]
|
||||
self.tasks = final_tasks
|
||||
|
||||
log_tool_output(
|
||||
ToolLogItem(
|
||||
name="output", value="\n\n".join([f"Task {task.task_id}: {task.instruction}" for task in self.tasks])
|
||||
),
|
||||
tool_name="Plan",
|
||||
)
|
||||
|
||||
# Update current_task_id to the first unfinished task in the merged list
|
||||
self._update_current_task()
|
||||
|
||||
|
|
@ -484,6 +477,8 @@ class Plan(BaseModel):
|
|||
if new_task.task_id in task.dependent_task_ids:
|
||||
self.reset_task(task.task_id)
|
||||
|
||||
self._update_current_task()
|
||||
|
||||
def append_task(self, new_task: Task):
|
||||
"""
|
||||
Append a new task to the end of existing task sequences
|
||||
|
|
@ -514,7 +509,15 @@ class Plan(BaseModel):
|
|||
if not task.is_finished:
|
||||
current_task_id = task.task_id
|
||||
break
|
||||
self.current_task_id = current_task_id # all tasks finished
|
||||
self.current_task_id = current_task_id
|
||||
|
||||
log_tool_output(
|
||||
[
|
||||
ToolLogItem(type="object", name="tasks", value=self.tasks),
|
||||
ToolLogItem(type="object", name="current_task_id", value=self.current_task_id),
|
||||
],
|
||||
tool_name="Plan",
|
||||
)
|
||||
|
||||
@property
|
||||
def current_task(self) -> Task:
|
||||
|
|
|
|||
|
|
@ -1,32 +1,39 @@
|
|||
import os
|
||||
import shutil
|
||||
import subprocess
|
||||
|
||||
from pydantic import BaseModel
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
from metagpt.logs import ToolLogItem, log_tool_output
|
||||
from metagpt.tools.tool_registry import register_tool
|
||||
|
||||
|
||||
class FileBlock(BaseModel):
|
||||
"""A block of content in a file"""
|
||||
|
||||
file_path: str
|
||||
block_content: str
|
||||
block_start_line: int
|
||||
block_end_line: int
|
||||
symbol: str = ""
|
||||
symbol_line: int = -1
|
||||
symbol: str = Field(default="", description="The symbol of interest in the block, empty if not applicable.")
|
||||
symbol_line: int = Field(default=-1, description="The line number of the symbol in the file, -1 if not applicable")
|
||||
|
||||
|
||||
@register_tool()
|
||||
class FileManager:
|
||||
"""A tool for handling file io, read or write into files"""
|
||||
"""A tool for reading, understanding, writing, and editing files"""
|
||||
|
||||
def write(self, path: str, content: str):
|
||||
"""Write the whole content to a file."""
|
||||
with open(path, "w") as f:
|
||||
f.write(content)
|
||||
log_tool_output(ToolLogItem(name="write_file_path", value=path), tool_name="FileManager")
|
||||
|
||||
def read(self, path: str) -> str:
|
||||
"""Read the whole content of a file."""
|
||||
with open(path, "r") as f:
|
||||
return f.read()
|
||||
log_tool_output(ToolLogItem(name="read_file_path", value=path), tool_name="FileManager")
|
||||
|
||||
def search_content(self, symbol: str, root_path: str = "", window: int = 20) -> FileBlock:
|
||||
"""
|
||||
|
|
@ -36,7 +43,7 @@ class FileManager:
|
|||
Args:
|
||||
symbol (str): The symbol to search.
|
||||
root_path (str, optional): The root path to search in. If not provided, search in the current directory. Defaults to "".
|
||||
window (int, optional): The window size to return.
|
||||
window (int, optional): The window size to return. Defaults to 20.
|
||||
|
||||
Returns:
|
||||
FileBlock: The block containing the symbol, a pydantic BaseModel with the schema below.
|
||||
|
|
@ -45,8 +52,8 @@ class FileManager:
|
|||
block_content: str
|
||||
block_start_line: int
|
||||
block_end_line: int
|
||||
symbol: str = ""
|
||||
symbol_line: int = -1
|
||||
symbol: str = Field(default="", description="The symbol of interest in the block, empty if not applicable.")
|
||||
symbol_line: int = Field(default=-1, description="The line number of the symbol in the file, -1 if not applicable")
|
||||
"""
|
||||
for root, _, files in os.walk(root_path or "."):
|
||||
for file in files:
|
||||
|
|
@ -56,14 +63,14 @@ class FileManager:
|
|||
with open(file_path, "r", encoding="utf-8") as f:
|
||||
try:
|
||||
lines = f.readlines()
|
||||
except UnicodeDecodeError:
|
||||
except Exception:
|
||||
continue
|
||||
for i, line in enumerate(lines):
|
||||
if symbol in line:
|
||||
start = max(i - window, 0)
|
||||
end = min(i + window, len(lines) - 1)
|
||||
block_content = "".join(lines[start : end + 1])
|
||||
return FileBlock(
|
||||
result = FileBlock(
|
||||
file_path=file_path,
|
||||
block_content=block_content,
|
||||
block_start_line=start + 1,
|
||||
|
|
@ -71,21 +78,64 @@ class FileManager:
|
|||
symbol=symbol,
|
||||
symbol_line=i + 1,
|
||||
)
|
||||
log_tool_output(
|
||||
ToolLogItem(type="object", name="file_block_searched", value=result),
|
||||
tool_name="FileManager",
|
||||
)
|
||||
return result
|
||||
return None
|
||||
|
||||
def write_content(self, file_path: str, start_line: int, end_line: int, new_block_content: str = ""):
|
||||
def write_content(self, file_path: str, start_line: int, end_line: int, new_block_content: str = "") -> str:
|
||||
"""
|
||||
Write a new block of content into a file. Use this method to update a block of code in a file. There are several cases:
|
||||
Write a new block of content into a file. Use this method to update a block of code in a file. There are three cases:
|
||||
1. If the new block content is empty, the original block will be deleted.
|
||||
2. If the new block content is not empty and end_line >= start_line, the original block from start_line to end_line (both inclusively) will be replaced by the new block content.
|
||||
3. If the new block content is not empty and end_line < start_line (e.g. set end_line = -1) the new block content will be inserted at start_line.
|
||||
2. If the new block content is not empty and end_line < start_line (e.g. set end_line = -1) the new block content will be inserted at start_line.
|
||||
3. If the new block content is not empty and end_line >= start_line, the original block from start_line to end_line (both inclusively) will be replaced by the new block content.
|
||||
This function can sometimes be used given a FileBlock upstream. Think carefully if you want to use block_start_line or symbol_line in the FileBlock as your start_line input.
|
||||
|
||||
Args:
|
||||
file_path (str): The file path to write the new block content.
|
||||
start_line (int): start line of the original block to be updated.
|
||||
end_line (int): end line of the original block to be updated.
|
||||
new_block_content (str): The new block content to write.
|
||||
|
||||
Returns:
|
||||
str: A message indicating the status of the write operation.
|
||||
"""
|
||||
# Create a temporary copy of the file
|
||||
temp_file_path = file_path + ".temp"
|
||||
shutil.copy(file_path, temp_file_path)
|
||||
|
||||
try:
|
||||
# Modify the temporary file with the new content
|
||||
self._write_content(temp_file_path, start_line, end_line, new_block_content)
|
||||
|
||||
# Lint the modified temporary file
|
||||
lint_passed, lint_message = self._lint_file(temp_file_path)
|
||||
if not lint_passed:
|
||||
return f"Linting the content at a temp file, failed with:\n{lint_message}"
|
||||
|
||||
# If linting passes, overwrite the original file with the temporary file
|
||||
shutil.move(temp_file_path, file_path)
|
||||
|
||||
new_file_block = FileBlock(
|
||||
file_path=file_path,
|
||||
block_content=new_block_content,
|
||||
block_start_line=start_line,
|
||||
block_end_line=-1 if end_line < start_line else start_line + new_block_content.count("\n"),
|
||||
)
|
||||
log_tool_output(
|
||||
ToolLogItem(type="object", name="file_block_written", value=new_file_block), tool_name="FileManager"
|
||||
)
|
||||
|
||||
return f"Content written successfully to {file_path}"
|
||||
|
||||
finally:
|
||||
# Clean up: Ensure the temporary file is removed if it still exists
|
||||
if os.path.exists(temp_file_path):
|
||||
os.remove(temp_file_path)
|
||||
|
||||
def _write_content(self, file_path: str, start_line: int, end_line: int, new_block_content: str = ""):
|
||||
with open(file_path, "r") as file:
|
||||
lines = file.readlines()
|
||||
|
||||
|
|
@ -106,3 +156,16 @@ class FileManager:
|
|||
|
||||
with open(file_path, "w") as file:
|
||||
file.writelines(lines)
|
||||
|
||||
@classmethod
|
||||
def _lint_file(cls, file_path: str) -> (bool, str):
|
||||
"""Lints an entire Python file using pylint, returns True if linting passes, along with pylint's output."""
|
||||
result = subprocess.run(
|
||||
["pylint", file_path, "--disable=all", "--enable=E"],
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.STDOUT,
|
||||
text=True,
|
||||
)
|
||||
lint_passed = result.returncode == 0
|
||||
lint_message = result.stdout
|
||||
return lint_passed, lint_message
|
||||
|
|
|
|||
|
|
@ -70,3 +70,4 @@ qianfan==0.3.2
|
|||
dashscope==1.14.1
|
||||
rank-bm25==0.2.2 # for tool recommendation
|
||||
gymnasium==0.29.1
|
||||
pylint~=3.0.3
|
||||
|
|
@ -43,8 +43,8 @@ def test_search_content(test_file):
|
|||
EXPECTED_CONTENT_AFTER_REPLACE = """
|
||||
# this is line one
|
||||
def test_function_for_fm():
|
||||
This is the new line A replacing lines 3 to 5.
|
||||
This is the new line B.
|
||||
# This is the new line A replacing lines 3 to 5.
|
||||
# This is the new line B.
|
||||
c = 3
|
||||
# this is the 7th line
|
||||
""".strip()
|
||||
|
|
@ -55,11 +55,10 @@ def test_replace_content(test_file):
|
|||
file_path=str(TEST_FILE_PATH),
|
||||
start_line=3,
|
||||
end_line=5,
|
||||
new_block_content=" This is the new line A replacing lines 3 to 5.\n This is the new line B.",
|
||||
new_block_content=" # This is the new line A replacing lines 3 to 5.\n # This is the new line B.",
|
||||
)
|
||||
with open(TEST_FILE_PATH, "r") as f:
|
||||
new_content = f.read()
|
||||
print(new_content)
|
||||
assert new_content == EXPECTED_CONTENT_AFTER_REPLACE
|
||||
|
||||
|
||||
|
|
@ -81,7 +80,7 @@ def test_delete_content(test_file):
|
|||
EXPECTED_CONTENT_AFTER_INSERT = """
|
||||
# this is line one
|
||||
def test_function_for_fm():
|
||||
This is the new line to be inserted, at line 3
|
||||
# This is the new line to be inserted, at line 3
|
||||
"some docstring"
|
||||
a = 1
|
||||
b = 2
|
||||
|
|
@ -95,8 +94,28 @@ def test_insert_content(test_file):
|
|||
file_path=str(TEST_FILE_PATH),
|
||||
start_line=3,
|
||||
end_line=-1,
|
||||
new_block_content=" This is the new line to be inserted, at line 3",
|
||||
new_block_content=" # This is the new line to be inserted, at line 3",
|
||||
)
|
||||
with open(TEST_FILE_PATH, "r") as f:
|
||||
new_content = f.read()
|
||||
assert new_content == EXPECTED_CONTENT_AFTER_INSERT
|
||||
|
||||
|
||||
def test_new_content_wrong_indentation(test_file):
|
||||
msg = FileManager().write_content(
|
||||
file_path=str(TEST_FILE_PATH),
|
||||
start_line=3,
|
||||
end_line=-1,
|
||||
new_block_content=" This is the new line to be inserted, at line 3", # omit # should throw a syntax error
|
||||
)
|
||||
assert "failed" in msg
|
||||
|
||||
|
||||
def test_new_content_format_issue(test_file):
|
||||
msg = FileManager().write_content(
|
||||
file_path=str(TEST_FILE_PATH),
|
||||
start_line=3,
|
||||
end_line=-1,
|
||||
new_block_content=" # This is the new line to be inserted, at line 3 ", # trailing spaces are format issue only, and should not throw an error
|
||||
)
|
||||
assert "failed" not in msg
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue