update: editor

This commit is contained in:
seeker-jie 2024-08-26 18:19:13 +08:00
parent 8e7696b8e6
commit e191ea31c4
10 changed files with 283 additions and 255 deletions

View file

@ -29,7 +29,7 @@ from metagpt.logs import logger
from metagpt.schema import CodingContext, Document, RunCodeResult
from metagpt.utils.common import CodeParser, get_markdown_code_block_type
from metagpt.utils.project_repo import ProjectRepo
from metagpt.utils.report import EditorReporter
from metagpt.utils.report import FileIOOperatorReporter
PROMPT_TEMPLATE = """
NOTICE
@ -152,7 +152,7 @@ class WriteCode(Action):
summary_log=summary_doc.content if summary_doc else "",
)
logger.info(f"Writing {coding_context.filename}..")
async with EditorReporter(enable_llm_stream=True) as reporter:
async with FileIOOperatorReporter(enable_llm_stream=True) as reporter:
await reporter.async_report({"type": "code", "filename": coding_context.filename}, "meta")
code = await self.write_code(prompt)
if not coding_context.code_doc:

View file

@ -22,7 +22,7 @@ from metagpt.schema import CodingContext, Document
from metagpt.tools.tool_registry import register_tool
from metagpt.utils.common import CodeParser, aread, awrite
from metagpt.utils.project_repo import ProjectRepo
from metagpt.utils.report import EditorReporter
from metagpt.utils.report import FileIOOperatorReporter
PROMPT_TEMPLATE = """
# System
@ -144,7 +144,7 @@ class WriteCodeReview(Action):
return result, None
# if LBTM, rewrite code
async with EditorReporter(enable_llm_stream=True) as reporter:
async with FileIOOperatorReporter(enable_llm_stream=True) as reporter:
await reporter.async_report(
{"type": "code", "filename": filename, "src_path": doc.root_relative_path}, "meta"
)

View file

@ -13,7 +13,7 @@ from metagpt.ext.cr.utils.cleaner import (
rm_patch_useless_part,
)
from metagpt.utils.common import CodeParser
from metagpt.utils.report import EditorReporter
from metagpt.utils.report import FileIOOperatorReporter
SYSTEM_MSGS_PROMPT = """
You're an adaptive software developer who excels at refining code based on user inputs. You're proficient in creating Git patches to represent code modifications.
@ -100,7 +100,7 @@ class ModifyCode(Action):
)
patch_file = output_dir / f"{patch_target_file_name}.patch"
patch_file.parent.mkdir(exist_ok=True, parents=True)
async with EditorReporter(enable_llm_stream=True) as reporter:
async with FileIOOperatorReporter(enable_llm_stream=True) as reporter:
await reporter.async_report(
{"type": "Patch", "src_path": str(patch_file), "filename": patch_file.name}, "meta"
)

View file

@ -33,7 +33,7 @@ from metagpt.utils.common import (
parse_recipient,
)
from metagpt.utils.project_repo import ProjectRepo
from metagpt.utils.report import EditorReporter
from metagpt.utils.report import FileIOOperatorReporter
class QaEngineer(Role):
@ -80,7 +80,7 @@ class QaEngineer(Role):
context = TestingContext(filename=test_doc.filename, test_doc=test_doc, code_doc=code_doc)
context = await WriteTest(i_context=context, context=self.context, llm=self.llm).run()
async with EditorReporter(enable_llm_stream=True) as reporter:
async with FileIOOperatorReporter(enable_llm_stream=True) as reporter:
await reporter.async_report({"type": "test", "filename": test_doc.filename}, "meta")
doc = await self.repo.tests.save_doc(

View file

@ -12,7 +12,7 @@ from metagpt.ext.cr.actions.modify_code import ModifyCode
from metagpt.ext.cr.utils.schema import Point
from metagpt.tools.libs.browser import Browser
from metagpt.tools.tool_registry import register_tool
from metagpt.utils.report import EditorReporter
from metagpt.utils.report import FileIOOperatorReporter
@register_tool(tags=["codereview"], include_functions=["review", "fix"])
@ -38,7 +38,7 @@ class CodeReview:
cr_point_content = await f.read()
cr_points = [Point(**i) for i in json.loads(cr_point_content)]
async with EditorReporter(enable_llm_stream=True) as reporter:
async with FileIOOperatorReporter(enable_llm_stream=True) as reporter:
src_path = cr_output_file
cr_output_path = Path(cr_output_file)
await reporter.async_report(
@ -87,7 +87,7 @@ class CodeReview:
else:
async with aiofiles.open(patch_path, encoding="utf-8") as f:
patch_file_content = await f.read()
await EditorReporter().async_report(patch_path)
await FileIOOperatorReporter().async_report(patch_path)
patch: PatchSet = PatchSet(patch_file_content)
return patch

View file

@ -1,33 +1,22 @@
import base64
import os
import re
import shutil
import tempfile
from pathlib import Path
from typing import List, Optional, Union
from typing import Optional
from pydantic import BaseModel, ConfigDict
from pydantic import BaseModel
from metagpt.config2 import Config
from metagpt.const import DEFAULT_WORKSPACE_ROOT
from metagpt.logs import logger
from metagpt.tools.libs.linter import Linter
from metagpt.tools.tool_registry import register_tool
from metagpt.utils import read_docx
from metagpt.utils.common import aread, aread_bin, awrite_bin, check_http_endpoint
from metagpt.utils.report import EditorReporter
# This is also used in unit tests!
MSG_FILE_UPDATED = "[File updated (edited at line {line_number}). Please review the changes and make sure they are correct (correct indentation, no duplicate lines, etc). Edit the file again if necessary.]"
LINTER_ERROR_MSG = "[Your proposed edit has introduced new syntax error(s). Please understand the errors and retry your edit command.]\n"
class FileBlock(BaseModel):
"""A block of content in a file"""
file_path: str
block_content: str
class LineNumberError(Exception):
pass
@ -35,134 +24,17 @@ class LineNumberError(Exception):
@register_tool()
class Editor(BaseModel):
"""
A state-of-state tool for reading, understanding, writing, and editing files.
All path parameters should use an absolute path.
A state-of-state tool for open, reading, and editing files.
"""
model_config = ConfigDict(arbitrary_types_allowed=True)
resource: EditorReporter = EditorReporter()
# CURRENT_FILE: Optional[str] = None
current_file: Optional[str] = None
current_file: Optional[Path] = None
current_line: int = 1
# WINDOW: int = 100
window: int = 100
# def write(self, path: str, content: str):
# """Write the whole content to a file. When used, make sure content arg contains the full content of the file."""
# if "\n" not in content and "\\n" in content:
# # A very raw rule to correct the content: If 'content' lacks actual newlines ('\n') but includes '\\n', consider
# # replacing them with '\n' to potentially correct mistaken representations of newline characters.
# content = content.replace("\\n", "\n")
# directory = os.path.dirname(path)
# if directory and not os.path.exists(directory):
# os.makedirs(directory)
# with open(path, "w", encoding="utf-8") as f:
# f.write(content)
# # self.resource.report(path, "path")
# return f"The writing/coding the of the file {os.path.basename(path)}' is now completed. The file '{os.path.basename(path)}' has been successfully created."
# async def read(self, path: str) -> FileBlock:
# """Read the whole content of a file. Using absolute paths as the argument for specifying the file location."""
# is_text, mime_type = await is_text_file(path)
# if is_text:
# lines = await self._read_text(path)
# elif mime_type == "application/pdf":
# lines = await self._read_pdf(path)
# elif mime_type in {
# "application/msword",
# "application/vnd.openxmlformats-officedocument.wordprocessingml.document",
# "application/vnd.ms-word.document.macroEnabled.12",
# "application/vnd.openxmlformats-officedocument.wordprocessingml.template",
# "application/vnd.ms-word.template.macroEnabled.12",
# }:
# lines = await self._read_docx(path)
# else:
# return FileBlock(file_path=str(path), block_content="")
# self.resource.report(str(path), "path")
#
# lines_with_num = [f"{i + 1:03}|{line}" for i, line in enumerate(lines)]
# result = FileBlock(
# file_path=str(path),
# block_content="".join(lines_with_num),
# )
# return result
working_dir: Path = DEFAULT_WORKSPACE_ROOT
@staticmethod
async def _read_text(path: Union[str, Path]) -> List[str]:
content = await aread(path)
lines = content.split("\n")
return lines
@staticmethod
async def _read_pdf(path: Union[str, Path]) -> List[str]:
result = await Editor._omniparse_read_file(path)
if result:
return result
from llama_index.readers.file import PDFReader
reader = PDFReader()
lines = reader.load_data(file=Path(path))
return [i.text for i in lines]
@staticmethod
async def _read_docx(path: Union[str, Path]) -> List[str]:
result = await Editor._omniparse_read_file(path)
if result:
return result
return read_docx(str(path))
@staticmethod
async def _omniparse_read_file(path: Union[str, Path]) -> Optional[List[str]]:
from metagpt.tools.libs import get_env_default
from metagpt.utils.omniparse_client import OmniParseClient
base_url = await get_env_default(key="base_url", app_name="OmniParse", default_value="")
if not base_url:
base_url = await Editor._read_omniparse_config()
if not base_url:
return None
api_key = await get_env_default(key="api_key", app_name="OmniParse", default_value="")
v = await get_env_default(key="timeout", app_name="OmniParse", default_value="120")
try:
timeout = int(v) or 120
except ValueError:
timeout = 120
try:
if not await check_http_endpoint(url=base_url):
logger.warning(f"{base_url}: NOT AVAILABLE")
return None
client = OmniParseClient(api_key=api_key, base_url=base_url, max_timeout=timeout)
file_data = await aread_bin(filename=path)
ret = await client.parse_document(file_input=file_data, bytes_filename=str(path))
except (ValueError, Exception) as e:
logger.exception(f"{path}: {e}")
return None
if not ret.images:
return [ret.text] if ret.text else None
result = [ret.text]
img_dir = Path(path).parent / (Path(path).name.replace(".", "_") + "_images")
img_dir.mkdir(parents=True, exist_ok=True)
for i in ret.images:
byte_data = base64.b64decode(i.image)
filename = img_dir / i.image_name
await awrite_bin(filename=filename, data=byte_data)
result.append(f"![{i.image_name}]({str(filename)})")
return result
@staticmethod
async def _read_omniparse_config() -> str:
config = Config.default()
if config.omniparse and config.omniparse.url:
return config.omniparse.url
return ""
@staticmethod
def _is_valid_filename(file_name) -> bool:
if not file_name or not isinstance(file_name, str) or not file_name.strip():
def _is_valid_filename(file_name: str) -> bool:
if not file_name or not file_name.strip():
return False
invalid_chars = '<>:"/\\|?*'
if os.name == "nt": # Windows
@ -176,28 +48,25 @@ class Editor(BaseModel):
return True
@staticmethod
def _is_valid_path(path) -> bool:
if not path or not isinstance(path, str):
return False
def _is_valid_path(path: Path) -> bool:
try:
return os.path.exists(os.path.normpath(path))
return path.exists()
except PermissionError:
return False
@staticmethod
def _create_paths(file_name) -> bool:
def _create_paths(file_path: Path) -> bool:
try:
dirname = os.path.dirname(file_name)
if dirname:
os.makedirs(dirname, exist_ok=True)
if file_path.parent:
file_path.parent.mkdir(parents=True, exist_ok=True)
return True
except PermissionError:
return False
def _check_current_file(self, file_path: Optional[str] = None) -> bool:
if not file_path:
def _check_current_file(self, file_path: Optional[Path] = None) -> bool:
if file_path is None:
file_path = self.current_file
if not file_path or not os.path.isfile(file_path):
if not file_path or not file_path.is_file():
raise ValueError("No file open. Use the open_file function first.")
return True
@ -206,7 +75,7 @@ class Editor(BaseModel):
return max(min_value, min(value, max_value))
@staticmethod
def _lint_file(file_path: str) -> tuple[Optional[str], Optional[int]]:
def _lint_file(file_path: Path) -> tuple[Optional[str], Optional[int]]:
"""Lint the file at the given path and return a tuple with a boolean indicating if there are errors,
and the line number of the first error, if any.
@ -214,15 +83,15 @@ class Editor(BaseModel):
tuple[str | None, int | None]: (lint_error, first_error_line_number)
"""
linter = Linter(root=os.getcwd())
lint_error = linter.lint(file_path)
lint_error = linter.lint(str(file_path))
if not lint_error:
# Linting successful. No issues found.
return None, None
return "ERRORS:\n" + lint_error.text, lint_error.lines[0]
def _print_window(self, file_path, targeted_line, window, return_str=False):
def _print_window(self, file_path: Path, targeted_line: int, window: int, return_str: bool = False):
self._check_current_file(file_path)
with open(file_path) as file:
with file_path.open() as file:
content = file.read()
# Ensure the content ends with a newline character
@ -267,13 +136,23 @@ class Editor(BaseModel):
if return_str:
return output
else:
print(output)
logger.info(output)
@staticmethod
def _cur_file_header(current_file, total_lines) -> str:
def _cur_file_header(current_file: Path, total_lines: int) -> str:
if not current_file:
return ""
return f"[File: {os.path.abspath(current_file)} ({total_lines} lines total)]\n"
return f"[File: {current_file.resolve()} ({total_lines} lines total)]\n"
def set_workdir(self, path: str) -> None:
"""
Sets the working directory to the given path. eg: repo directory.
You need to set it up before operating the file.
Args:
path: str: The path to set as the working directory.
"""
self.working_dir = Path(path)
def open_file(self, path: str, line_number: Optional[int] = 1, context_lines: Optional[int] = None) -> None:
"""Opens the file at the given path in the editor. If line_number is provided, the window will be moved to include that line.
@ -288,11 +167,13 @@ class Editor(BaseModel):
if context_lines is None:
context_lines = self.window
if not os.path.isfile(path):
path = self.working_dir / Path(path)
if not path.is_file():
raise FileNotFoundError(f"File {path} not found")
CURRENT_FILE = os.path.abspath(path)
with open(CURRENT_FILE) as file:
self.current_file = path
with path.open() as file:
total_lines = max(1, sum(1 for _ in file))
if not isinstance(line_number, int) or line_number < 1 or line_number > total_lines:
@ -303,11 +184,9 @@ class Editor(BaseModel):
if context_lines is None or context_lines < 1:
context_lines = self.window
output = self._cur_file_header(CURRENT_FILE, total_lines)
output += self._print_window(
CURRENT_FILE, self.current_line, self._clamp(context_lines, 1, 2000), return_str=True
)
print(output)
output = self._cur_file_header(path, total_lines)
output += self._print_window(path, self.current_line, self._clamp(context_lines, 1, 2000), return_str=True)
logger.info(output)
def goto_line(self, line_number: int) -> None:
"""Moves the window to show the specified line number.
@ -317,7 +196,7 @@ class Editor(BaseModel):
"""
self._check_current_file()
with open(str(self.current_file)) as file:
with self.current_file.open() as file:
total_lines = max(1, sum(1 for _ in file))
if not isinstance(line_number, int) or line_number < 1 or line_number > total_lines:
raise ValueError(f"Line number must be between 1 and {total_lines}")
@ -326,45 +205,46 @@ class Editor(BaseModel):
output = self._cur_file_header(self.current_file, total_lines)
output += self._print_window(self.current_file, self.current_line, self.window, return_str=True)
print(output)
logger.info(output)
def scroll_down(self) -> None:
"""Moves the window down by 100 lines."""
self._check_current_file()
with open(str(self.current_file)) as file:
with self.current_file.open() as file:
total_lines = max(1, sum(1 for _ in file))
self.current_line = self._clamp(self.current_line + self.window, 1, total_lines)
output = self._cur_file_header(self.current_file, total_lines)
output += self._print_window(self.current_file, self.current_line, self.window, return_str=True)
print(output)
logger.info(output)
def scroll_up(self) -> None:
"""Moves the window up by 100 lines."""
self._check_current_file()
with open(str(self.current_file)) as file:
with self.current_file.open() as file:
total_lines = max(1, sum(1 for _ in file))
self.current_line = self._clamp(self.current_line - self.window, 1, total_lines)
output = self._cur_file_header(self.current_file, total_lines)
output += self._print_window(self.current_file, self.current_line, self.window, return_str=True)
print(output)
logger.info(output)
@classmethod
def create_file(cls, filename: str) -> None:
def create_file(self, filename: str) -> None:
"""Creates and opens a new file with the given name.
Args:
filename: str: The name of the file to create.
"""
if os.path.exists(filename):
filename = self.working_dir / Path(filename)
if filename.exists():
raise FileExistsError(f"File '{filename}' already exists.")
with open(filename, "w") as file:
with filename.open("w") as file:
file.write("\n")
cls.open_file(filename)
print(f"[File {filename} created.]")
self.open_file(filename)
logger.info(f"[File {filename} created.]")
@staticmethod
def _append_impl(lines, content):
@ -475,7 +355,7 @@ class Editor(BaseModel):
def _edit_file_impl(
self,
file_name: str,
file_name: Path,
start: Optional[int] = None,
end: Optional[int] = None,
content: str = "",
@ -485,7 +365,7 @@ class Editor(BaseModel):
"""Internal method to handle common logic for edit_/append_file methods.
Args:
file_name: str: The name of the file to edit or append to.
file_name: Path: The name of the file to edit or append to.
start: int | None = None: The start line number for editing. Ignored if is_append is True.
end: int | None = None: The end line number for editing. Ignored if is_append is True.
content: str: The content to replace the lines with or to append.
@ -501,7 +381,7 @@ class Editor(BaseModel):
"DO NOT re-run the same failed edit command. Running it again will lead to the same error."
)
if not self._is_valid_filename(file_name):
if not self._is_valid_filename(file_name.name):
raise FileNotFoundError("Invalid file name.")
if not self._is_valid_path(file_name):
@ -510,7 +390,7 @@ class Editor(BaseModel):
if not self._create_paths(file_name):
raise PermissionError("Could not access or create directories.")
if not os.path.isfile(file_name):
if not file_name.is_file():
raise FileNotFoundError(f"File {file_name} not found.")
if is_insert and is_append:
@ -519,7 +399,7 @@ class Editor(BaseModel):
# Use a temporary file to write changes
content = str(content or "")
temp_file_path = ""
src_abs_path = os.path.abspath(file_name)
src_abs_path = file_name.resolve()
first_error_line = None
try:
@ -535,7 +415,7 @@ class Editor(BaseModel):
temp_file_path = temp_file.name
# Read the original file and check if empty and for a trailing newline
with open(file_name) as original_file:
with file_name.open() as original_file:
lines = original_file.readlines()
if is_append:
@ -567,11 +447,8 @@ class Editor(BaseModel):
# because the env var will be set AFTER the agentskills is imported
if enable_auto_lint:
# BACKUP the original file
original_file_backup_path = os.path.join(
os.path.dirname(file_name),
f".backup.{os.path.basename(file_name)}",
)
with open(original_file_backup_path, "w") as f:
original_file_backup_path = file_name.parent / f".backup.{file_name.name}"
with original_file_backup_path.open("w") as f:
f.writelines(lines)
lint_error, first_error_line = self._lint_file(file_name)
@ -642,9 +519,9 @@ class Editor(BaseModel):
)
# recover the original file
with open(original_file_backup_path) as fin, open(file_name, "w") as fout:
with original_file_backup_path.open() as fin, file_name.open("w") as fout:
fout.write(fin.read())
os.remove(original_file_backup_path)
original_file_backup_path.unlink()
return ret_str
except FileNotFoundError as e:
@ -655,13 +532,13 @@ class Editor(BaseModel):
ret_str += f"Invalid input: {e}\n"
except Exception as e:
# Clean up the temporary file if an error occurs
if temp_file_path and os.path.exists(temp_file_path):
os.remove(temp_file_path)
print(f"An unexpected error occurred: {e}")
if temp_file_path and Path(temp_file_path).exists():
Path(temp_file_path).unlink()
logger.warning(f"An unexpected error occurred: {e}")
raise e
# Update the file information and print the updated content
with open(file_name, "r", encoding="utf-8") as file:
with file_name.open("r", encoding="utf-8") as file:
n_total_lines = max(1, len(file.readlines()))
if first_error_line is not None and int(first_error_line) > 0:
self.current_line = first_error_line
@ -670,14 +547,13 @@ class Editor(BaseModel):
self.current_line = max(1, len(lines)) # end of original file
else:
self.current_line = start or n_total_lines or 1
ret_str += f"[File: {os.path.abspath(file_name)} ({n_total_lines} lines total after edit)]\n"
ret_str += f"[File: {file_name.resolve()} ({n_total_lines} lines total after edit)]\n"
CURRENT_FILE = file_name
ret_str += self._print_window(CURRENT_FILE, self.current_line, self.window, return_str=True) + "\n"
ret_str += MSG_FILE_UPDATED.format(line_number=self.current_line)
return ret_str
@classmethod
def edit_file_by_replace(cls, file_name: str, to_replace: str, new_content: str) -> None:
def edit_file_by_replace(self, file_name: str, to_replace: str, new_content: str) -> None:
"""Edit a file. This will search for `to_replace` in the given file and replace it with `new_content`.
Every *to_replace* must *EXACTLY MATCH* the existing source code, character for character, including all comments, docstrings, etc.
@ -733,7 +609,8 @@ class Editor(BaseModel):
# search for `to_replace` in the file
# if found, replace it with `new_content`
# if not found, perform a fuzzy search to find the closest match and replace it with `new_content`
with open(file_name, "r") as file:
file_name = self.working_dir / Path(file_name)
with file_name.open("r") as file:
file_content = file.read()
if file_content.count(to_replace) > 1:
@ -758,13 +635,13 @@ class Editor(BaseModel):
# find the closest match
start = file_content_fuzzy.find(to_replace_fuzzy)
if start == -1:
print(f"[No exact match found in {file_name} for\n```\n{to_replace}\n```\n]")
logger.info(f"[No exact match found in {file_name} for\n```\n{to_replace}\n```\n]")
return
# Convert start from index to line number for fuzzy match
start_line_number = file_content_fuzzy[:start].count("\n") + 1
end_line_number = start_line_number + len(to_replace.splitlines()) - 1
ret_str = cls._edit_file_impl(
ret_str = self._edit_file_impl(
file_name,
start=start_line_number,
end=end_line_number,
@ -773,10 +650,9 @@ class Editor(BaseModel):
)
# lint_error = bool(LINTER_ERROR_MSG in ret_str)
# TODO: automatically tries to fix linter error (maybe involve some static analysis tools on the location near the edit to figure out indentation)
print(ret_str)
logger.info(ret_str)
@classmethod
def insert_content_at_line(cls, file_name: str, line_number: int, content: str) -> None:
def insert_content_at_line(self, file_name: str, line_number: int, content: str) -> None:
"""Insert content at the given line number in a file.
This will NOT modify the content of the lines before OR after the given line number.
@ -799,7 +675,8 @@ class Editor(BaseModel):
line_number: int: The line number (starting from 1) to insert the content after.
content: str: The content to insert.
"""
ret_str = cls._edit_file_impl(
file_name = self.working_dir / Path(file_name)
ret_str = self._edit_file_impl(
file_name,
start=line_number,
end=line_number,
@ -807,19 +684,18 @@ class Editor(BaseModel):
is_insert=True,
is_append=False,
)
print(ret_str)
logger.info(ret_str)
@classmethod
def append_file(cls, file_name: str, content: str) -> None:
def append_file(self, file_name: str, content: str) -> None:
"""Append content to the given file.
It appends text `content` to the end of the specified file.
Args:
file_name: str: The name of the file to edit.
line_number: int: The line number (starting from 1) to insert the content after.
content: str: The content to insert.
"""
ret_str = cls._edit_file_impl(
file_name = self.working_dir / Path(file_name)
ret_str = self._edit_file_impl(
file_name,
start=None,
end=None,
@ -827,44 +703,46 @@ class Editor(BaseModel):
is_insert=False,
is_append=True,
)
print(ret_str)
logger.info(ret_str)
@classmethod
def search_dir(cls, search_term: str, dir_path: str = "./") -> None:
def search_dir(self, search_term: str, dir_path: str = "./") -> None:
"""Searches for search_term in all files in dir. If dir is not provided, searches in the current directory.
Args:
search_term: str: The term to search for.
dir_path: str: The path to the directory to search.
"""
if not os.path.isdir(dir_path):
dir_path = self.working_dir / Path(dir_path)
if not dir_path.is_dir():
raise FileNotFoundError(f"Directory {dir_path} not found")
matches = []
for root, _, files in os.walk(dir_path):
for file in files:
if file.startswith("."):
continue
file_path = os.path.join(root, file)
with open(file_path, "r", errors="ignore") as f:
file_path = Path(root) / file
with file_path.open("r", errors="ignore") as f:
for line_num, line in enumerate(f, 1):
if search_term in line:
matches.append((file_path, line_num, line.strip()))
if not matches:
print(f'No matches found for "{search_term}" in {dir_path}')
logger.info(f'No matches found for "{search_term}" in {dir_path}')
return
num_matches = len(matches)
num_files = len(set(match[0] for match in matches))
if num_files > 100:
print(f'More than {num_files} files matched for "{search_term}" in {dir_path}. Please narrow your search.')
logger.info(
f'More than {num_files} files matched for "{search_term}" in {dir_path}. Please narrow your search.'
)
return
print(f'[Found {num_matches} matches for "{search_term}" in {dir_path}]')
logger.info(f'[Found {num_matches} matches for "{search_term}" in {dir_path}]')
for file_path, line_num, line in matches:
print(f"{file_path} (Line {line_num}): {line}")
print(f'[End of matches for "{search_term}" in {dir_path}]')
logger.info(f"{file_path} (Line {line_num}): {line}")
logger.info(f'[End of matches for "{search_term}" in {dir_path}]')
def search_file(self, search_term: str, file_path: Optional[str] = None) -> None:
"""Searches for search_term in file. If file is not provided, searches in the current open file.
@ -875,46 +753,49 @@ class Editor(BaseModel):
"""
if file_path is None:
file_path = self.current_file
else:
file_path = self.working_dir / Path(file_path)
if file_path is None:
raise FileNotFoundError("No file specified or open. Use the open_file function first.")
if not os.path.isfile(file_path):
if not file_path.is_file():
raise FileNotFoundError(f"File {file_path} not found")
matches = []
with open(file_path) as file:
with file_path.open() as file:
for i, line in enumerate(file, 1):
if search_term in line:
matches.append((i, line.strip()))
if matches:
print(f'[Found {len(matches)} matches for "{search_term}" in {file_path}]')
logger.info(f'[Found {len(matches)} matches for "{search_term}" in {file_path}]')
for match in matches:
print(f"Line {match[0]}: {match[1]}")
print(f'[End of matches for "{search_term}" in {file_path}]')
logger.info(f"Line {match[0]}: {match[1]}")
logger.info(f'[End of matches for "{search_term}" in {file_path}]')
else:
print(f'[No matches found for "{search_term}" in {file_path}]')
logger.info(f'[No matches found for "{search_term}" in {file_path}]')
@staticmethod
def find_file(file_name: str, dir_path: str = "./") -> None:
def find_file(self, file_name: str, dir_path: str = "./") -> None:
"""Finds all files with the given name in the specified directory.
Args:
file_name: str: The name of the file to find.
dir_path: str: The path to the directory to search.
"""
if not os.path.isdir(dir_path):
file_name = self.working_dir / Path(file_name)
dir_path = self.working_dir / Path(dir_path)
if not dir_path.is_dir():
raise FileNotFoundError(f"Directory {dir_path} not found")
matches = []
for root, _, files in os.walk(dir_path):
for file in files:
if file_name in file:
matches.append(os.path.join(root, file))
matches.append(Path(root) / file)
if matches:
print(f'[Found {len(matches)} matches for "{file_name}" in {dir_path}]')
logger.info(f'[Found {len(matches)} matches for "{file_name}" in {dir_path}]')
for match in matches:
print(f"{match}")
print(f'[End of matches for "{file_name}" in {dir_path}]')
logger.info(f"{match}")
logger.info(f'[End of matches for "{file_name}" in {dir_path}]')
else:
print(f'[No matches found for "{file_name}" in {dir_path}]')
logger.info(f'[No matches found for "{file_name}" in {dir_path}]')

View file

@ -0,0 +1,147 @@
import base64
import os
from pathlib import Path
from typing import List, Optional, Union
from pydantic import BaseModel, ConfigDict
from metagpt.config2 import Config
from metagpt.logs import logger
from metagpt.tools.tool_registry import register_tool
from metagpt.utils import read_docx
from metagpt.utils.common import aread, aread_bin, awrite_bin, check_http_endpoint
from metagpt.utils.repo_to_markdown import is_text_file
from metagpt.utils.report import FileIOOperatorReporter
class FileBlock(BaseModel):
"""A block of content in a file"""
file_path: str
block_content: str
class LineNumberError(Exception):
pass
@register_tool()
class FileIOOperator(BaseModel):
"""
A state-of-state tool for reading, understanding, and writing files.
"""
model_config = ConfigDict(arbitrary_types_allowed=True)
resource: FileIOOperatorReporter = FileIOOperatorReporter()
def write(self, path: str, content: str):
"""Write the whole content to a file. When used, make sure content arg contains the full content of the file."""
if "\n" not in content and "\\n" in content:
# A very raw rule to correct the content: If 'content' lacks actual newlines ('\n') but includes '\\n', consider
# replacing them with '\n' to potentially correct mistaken representations of newline characters.
content = content.replace("\\n", "\n")
directory = os.path.dirname(path)
if directory and not os.path.exists(directory):
os.makedirs(directory)
with open(path, "w", encoding="utf-8") as f:
f.write(content)
# self.resource.report(path, "path")
return f"The writing/coding the of the file {os.path.basename(path)}' is now completed. The file '{os.path.basename(path)}' has been successfully created."
async def read(self, path: str) -> FileBlock:
"""Read the whole content of a file. Using absolute paths as the argument for specifying the file location."""
is_text, mime_type = await is_text_file(path)
if is_text:
lines = await self._read_text(path)
elif mime_type == "application/pdf":
lines = await self._read_pdf(path)
elif mime_type in {
"application/msword",
"application/vnd.openxmlformats-officedocument.wordprocessingml.document",
"application/vnd.ms-word.document.macroEnabled.12",
"application/vnd.openxmlformats-officedocument.wordprocessingml.template",
"application/vnd.ms-word.template.macroEnabled.12",
}:
lines = await self._read_docx(path)
else:
return FileBlock(file_path=str(path), block_content="")
self.resource.report(str(path), "path")
lines_with_num = [f"{i + 1:03}|{line}" for i, line in enumerate(lines)]
result = FileBlock(
file_path=str(path),
block_content="".join(lines_with_num),
)
return result
@staticmethod
async def _read_text(path: Union[str, Path]) -> List[str]:
content = await aread(path)
lines = content.split("\n")
return lines
@staticmethod
async def _read_pdf(path: Union[str, Path]) -> List[str]:
result = await FileIOOperator._omniparse_read_file(path)
if result:
return result
from llama_index.readers.file import PDFReader
reader = PDFReader()
lines = reader.load_data(file=Path(path))
return [i.text for i in lines]
@staticmethod
async def _read_docx(path: Union[str, Path]) -> List[str]:
result = await FileIOOperator._omniparse_read_file(path)
if result:
return result
return read_docx(str(path))
@staticmethod
async def _omniparse_read_file(path: Union[str, Path]) -> Optional[List[str]]:
from metagpt.tools.libs import get_env_default
from metagpt.utils.omniparse_client import OmniParseClient
base_url = await get_env_default(key="base_url", app_name="OmniParse", default_value="")
if not base_url:
base_url = await FileIOOperator._read_omniparse_config()
if not base_url:
return None
api_key = await get_env_default(key="api_key", app_name="OmniParse", default_value="")
v = await get_env_default(key="timeout", app_name="OmniParse", default_value="120")
try:
timeout = int(v) or 120
except ValueError:
timeout = 120
try:
if not await check_http_endpoint(url=base_url):
logger.warning(f"{base_url}: NOT AVAILABLE")
return None
client = OmniParseClient(api_key=api_key, base_url=base_url, max_timeout=timeout)
file_data = await aread_bin(filename=path)
ret = await client.parse_document(file_input=file_data, bytes_filename=str(path))
except (ValueError, Exception) as e:
logger.exception(f"{path}: {e}")
return None
if not ret.images:
return [ret.text] if ret.text else None
result = [ret.text]
img_dir = Path(path).parent / (Path(path).name.replace(".", "_") + "_images")
img_dir.mkdir(parents=True, exist_ok=True)
for i in ret.images:
byte_data = base64.b64decode(i.image)
filename = img_dir / i.image_name
await awrite_bin(filename=filename, data=byte_data)
result.append(f"![{i.image_name}]({str(filename)})")
return result
@staticmethod
async def _read_omniparse_config() -> str:
config = Config.default()
if config.omniparse and config.omniparse.url:
return config.omniparse.url
return ""

View file

@ -30,12 +30,12 @@ class Linter:
)
self.all_lint_cmd = None
# def set_linter(self, lang, cmd):
# if lang:
# self.languages[lang] = cmd
# return
#
# self.all_lint_cmd = cmd
def set_linter(self, lang, cmd):
if lang:
self.languages[lang] = cmd
return
self.all_lint_cmd = cmd
def get_rel_fname(self, fname):
if self.root:

View file

@ -35,7 +35,7 @@ class BlockType(str, Enum):
TASK = "Task"
BROWSER = "Browser"
BROWSER_RT = "Browser-RT"
EDITOR = "Editor"
FILE_IO_OPERATOR = "FileIOOperator"
GALLERY = "Gallery"
NOTEBOOK = "Notebook"
DOCS = "Docs"
@ -305,10 +305,10 @@ class DocsReporter(FileReporter):
block: Literal[BlockType.DOCS] = BlockType.DOCS
class EditorReporter(FileReporter):
"""Equivalent to FileReporter(block=BlockType.Editor)."""
class FileIOOperatorReporter(FileReporter):
"""Equivalent to FileReporter(block=BlockType.FileIOOperator)."""
block: Literal[BlockType.EDITOR] = BlockType.EDITOR
block: Literal[BlockType.FILE_IO_OPERATOR] = BlockType.FILE_IO_OPERATOR
class GalleryReporter(FileReporter):

View file

@ -10,7 +10,7 @@ from metagpt.utils.report import (
BlockType,
BrowserReporter,
DocsReporter,
EditorReporter,
FileIOOperatorReporter,
NotebookReporter,
ServerReporter,
TaskReporter,
@ -148,8 +148,8 @@ async def test_notebook_reporter(http_server):
"#!/usr/bin/env python\n# -*- coding: utf-8 -*-\n\nprint('Hello World')\n",
"/data/main.py",
{"type": "write_code"},
BlockType.EDITOR,
EditorReporter,
BlockType.FILE_IO_OPERATOR,
FileIOOperatorReporter,
),
),
ids=["test_docs_reporter", "test_editor_reporter"],