mirror of
https://github.com/FoundationAgents/MetaGPT.git
synced 2026-05-15 11:02:36 +02:00
Merge remote-tracking branch 'origin/mgx_ops' into feat_zhg
# Conflicts: # metagpt/roles/di/engineer2.py # metagpt/tools/libs/cr.py
This commit is contained in:
commit
9f4dcf0ad0
30 changed files with 2288 additions and 261 deletions
|
|
@ -1,20 +1,32 @@
|
|||
"""
|
||||
This file is borrowed from OpenDevin
|
||||
You can find the original repository here:
|
||||
https://github.com/All-Hands-AI/OpenHands/blob/main/openhands/runtime/plugins/agent_skills/file_ops/file_ops.py
|
||||
"""
|
||||
import base64
|
||||
import os
|
||||
import re
|
||||
import shutil
|
||||
import subprocess
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
from typing import List, Optional, Tuple, Union
|
||||
|
||||
from pydantic import BaseModel, ConfigDict
|
||||
|
||||
from metagpt.config2 import Config
|
||||
from metagpt.const import DEFAULT_WORKSPACE_ROOT
|
||||
from metagpt.logs import logger
|
||||
from metagpt.tools.libs.linter import Linter
|
||||
from metagpt.tools.tool_registry import register_tool
|
||||
from metagpt.utils import read_docx
|
||||
from metagpt.utils.common import aread, aread_bin, awrite_bin, check_http_endpoint
|
||||
from metagpt.utils.repo_to_markdown import is_text_file
|
||||
from metagpt.utils.report import EditorReporter
|
||||
|
||||
# This is also used in unit tests!
|
||||
MSG_FILE_UPDATED = "[File updated (edited at line {line_number}). Please review the changes and make sure they are correct (correct indentation, no duplicate lines, etc). Edit the file again if necessary.]"
|
||||
LINTER_ERROR_MSG = "[Your proposed edit has introduced new syntax error(s). Please understand the errors and retry your edit command.]\n"
|
||||
|
||||
|
||||
class FileBlock(BaseModel):
|
||||
"""A block of content in a file"""
|
||||
|
|
@ -23,6 +35,10 @@ class FileBlock(BaseModel):
|
|||
block_content: str
|
||||
|
||||
|
||||
class LineNumberError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
@register_tool()
|
||||
class Editor(BaseModel):
|
||||
"""
|
||||
|
|
@ -31,8 +47,12 @@ class Editor(BaseModel):
|
|||
"""
|
||||
|
||||
model_config = ConfigDict(arbitrary_types_allowed=True)
|
||||
|
||||
resource: EditorReporter = EditorReporter()
|
||||
current_file: Optional[Path] = None
|
||||
current_line: int = 1
|
||||
window: int = 100
|
||||
enable_auto_lint: bool = False
|
||||
working_dir: Path = DEFAULT_WORKSPACE_ROOT
|
||||
|
||||
def write(self, path: str, content: str):
|
||||
"""Write the whole content to a file. When used, make sure content arg contains the full content of the file."""
|
||||
|
|
@ -74,153 +94,6 @@ class Editor(BaseModel):
|
|||
)
|
||||
return result
|
||||
|
||||
def search_content(self, symbol: str, root_path: str = ".", window: int = 50) -> FileBlock:
|
||||
"""
|
||||
Search symbol in all files under root_path, return the context of symbol with window size
|
||||
Useful for locating class or function in a large codebase. Example symbol can be "def some_function", "class SomeClass", etc.
|
||||
In searching, attempt different symbols of different granualities, e.g. "def some_function", "class SomeClass", a certain line of code, etc.
|
||||
|
||||
Args:
|
||||
symbol (str): The symbol to search.
|
||||
root_path (str, optional): The root path to search in, the path can be a folder or a file. If not provided, search in the current directory. Defaults to ".".
|
||||
window (int, optional): The window size to return. Defaults to 20.
|
||||
|
||||
Returns:
|
||||
FileBlock: The block containing the symbol, a pydantic BaseModel with the schema below.
|
||||
class FileBlock(BaseModel):
|
||||
file_path: str
|
||||
block_content: str
|
||||
"""
|
||||
if not os.path.exists(root_path):
|
||||
print(f"Currently at {os.getcwd()} containing: {os.listdir()}. Path {root_path} does not exist.")
|
||||
return None
|
||||
not_found_msg = (
|
||||
"symbol not found, you may try searching another one, or break down your search term to search a part of it"
|
||||
)
|
||||
if os.path.isfile(root_path):
|
||||
result = self._search_content_in_file(symbol, root_path, window)
|
||||
if not result:
|
||||
print(not_found_msg)
|
||||
return result
|
||||
for root, _, files in os.walk(root_path or "."):
|
||||
for file in files:
|
||||
file_path = os.path.join(root, file)
|
||||
result = self._search_content_in_file(symbol, file_path, window)
|
||||
if result:
|
||||
# FIXME: This returns the first found result, not all results.
|
||||
return result
|
||||
print(not_found_msg)
|
||||
return None
|
||||
|
||||
def _search_content_in_file(self, symbol: str, file_path: str, window: int = 50) -> FileBlock:
|
||||
print("search in", file_path)
|
||||
if not file_path.endswith(".py"):
|
||||
return None
|
||||
with open(file_path, "r", encoding="utf-8") as f:
|
||||
try:
|
||||
lines = f.readlines()
|
||||
except Exception:
|
||||
return None
|
||||
for i, line in enumerate(lines):
|
||||
if symbol in line:
|
||||
start = max(i - window, 0)
|
||||
end = min(i + window, len(lines) - 1)
|
||||
for row_num in range(start, end + 1):
|
||||
lines[row_num] = f"{(row_num + 1):03}|{lines[row_num]}"
|
||||
block_content = "".join(lines[start : end + 1])
|
||||
result = FileBlock(
|
||||
file_path=file_path,
|
||||
block_content=block_content,
|
||||
)
|
||||
self.resource.report(result.file_path, "path", extra={"type": "search", "line": i, "symbol": symbol})
|
||||
return result
|
||||
return None
|
||||
|
||||
def write_content(self, file_path: str, start_line: int, end_line: int, new_block_content: str = "") -> str:
|
||||
"""
|
||||
Write a new block of content into a file. Use this method to update a block of code in a file. There are three cases:
|
||||
1. If the new block content is empty, the original block will be deleted.
|
||||
2. If the new block content is not empty and end_line < start_line (e.g. set end_line = -1) the new block content will be inserted at start_line.
|
||||
3. If the new block content is not empty and end_line >= start_line, the original block from start_line to end_line (both inclusively) will be replaced by the new block content.
|
||||
This function can sometimes be used given a FileBlock upstream. You should carefully review its row number. Determine the start_line and end_line based on the row number of the FileBlock.
|
||||
The file content from start_line to end_line will be replaced by your new_block_content. DON'T replace more than you intend to.
|
||||
|
||||
Args:
|
||||
file_path (str): The file path to write the new block content.
|
||||
start_line (int): start line of the original block to be updated (inclusive).
|
||||
end_line (int): end line of the original block to be updated (inclusive).
|
||||
new_block_content (str): The new block content to write. Don't include row number in the content.
|
||||
|
||||
Returns:
|
||||
str: A message indicating the status of the write operation.
|
||||
"""
|
||||
# Create a temporary copy of the file
|
||||
temp_file_path = file_path + ".temp"
|
||||
shutil.copy(file_path, temp_file_path)
|
||||
|
||||
try:
|
||||
# Modify the temporary file with the new content
|
||||
self._write_content(temp_file_path, start_line, end_line, new_block_content)
|
||||
|
||||
# Lint the modified temporary file
|
||||
lint_passed, lint_message = self._lint_file(temp_file_path)
|
||||
# if not lint_passed:
|
||||
# return f"Linting the content at a temp file, failed with:\n{lint_message}"
|
||||
|
||||
# If linting passes, overwrite the original file with the temporary file
|
||||
shutil.move(temp_file_path, file_path)
|
||||
|
||||
new_file_block = FileBlock(
|
||||
file_path=file_path,
|
||||
block_content=new_block_content,
|
||||
)
|
||||
self.resource.report(new_file_block.file_path, "path")
|
||||
|
||||
return f"Content written successfully to {file_path}"
|
||||
|
||||
finally:
|
||||
# Clean up: Ensure the temporary file is removed if it still exists
|
||||
if os.path.exists(temp_file_path):
|
||||
os.remove(temp_file_path)
|
||||
|
||||
def _write_content(self, file_path: str, start_line: int, end_line: int, new_block_content: str = ""):
|
||||
"""start_line and end_line are both 1-based indices and inclusive."""
|
||||
with open(file_path, "r") as file:
|
||||
lines = file.readlines()
|
||||
|
||||
start_line_index = start_line - 1 # Adjusting because list indices start at 0
|
||||
end_line_index = end_line
|
||||
|
||||
if new_block_content:
|
||||
# Split the new_block_content by newline and ensure each line ends with a newline character
|
||||
new_content_lines = new_block_content.splitlines(
|
||||
keepends=True
|
||||
) # FIXME: This will split \n within a line, such as ab\ncd
|
||||
if end_line >= start_line:
|
||||
# This replaces the block between start_line and end_line with new_block_content
|
||||
# irrespective of the length difference between the original and new content.
|
||||
lines[start_line_index:end_line_index] = new_content_lines
|
||||
else:
|
||||
lines.insert(start_line_index, "".join(new_content_lines))
|
||||
else:
|
||||
del lines[start_line_index:end_line_index]
|
||||
|
||||
with open(file_path, "w") as file:
|
||||
file.writelines(lines)
|
||||
|
||||
@classmethod
|
||||
def _lint_file(cls, file_path: str) -> (bool, str):
|
||||
"""Lints an entire Python file using pylint, returns True if linting passes, along with pylint's output."""
|
||||
result = subprocess.run(
|
||||
["pylint", file_path, "--disable=all", "--enable=E"],
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.STDOUT,
|
||||
text=True,
|
||||
)
|
||||
lint_passed = result.returncode == 0
|
||||
lint_message = result.stdout
|
||||
return lint_passed, lint_message
|
||||
|
||||
@staticmethod
|
||||
async def _read_text(path: Union[str, Path]) -> List[str]:
|
||||
content = await aread(path)
|
||||
|
|
@ -294,3 +167,787 @@ class Editor(BaseModel):
|
|||
if config.omniparse and config.omniparse.url:
|
||||
return config.omniparse.url, config.omniparse.timeout
|
||||
return "", 0
|
||||
|
||||
@staticmethod
|
||||
def _is_valid_filename(file_name: str) -> bool:
|
||||
if not file_name or not file_name.strip():
|
||||
return False
|
||||
invalid_chars = '<>:"/\\|?*'
|
||||
if os.name == "nt": # Windows
|
||||
invalid_chars = '<>:"/\\|?*'
|
||||
elif os.name == "posix": # Unix-like systems
|
||||
invalid_chars = "\0"
|
||||
|
||||
for char in invalid_chars:
|
||||
if char in file_name:
|
||||
return False
|
||||
return True
|
||||
|
||||
@staticmethod
|
||||
def _is_valid_path(path: Path) -> bool:
|
||||
try:
|
||||
return path.exists()
|
||||
except PermissionError:
|
||||
return False
|
||||
|
||||
@staticmethod
|
||||
def _create_paths(file_path: Path) -> bool:
|
||||
try:
|
||||
if file_path.parent:
|
||||
file_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
return True
|
||||
except PermissionError:
|
||||
return False
|
||||
|
||||
def _check_current_file(self, file_path: Optional[Path] = None) -> bool:
|
||||
if file_path is None:
|
||||
file_path = self.current_file
|
||||
if not file_path or not file_path.is_file():
|
||||
raise ValueError("No file open. Use the open_file function first.")
|
||||
return True
|
||||
|
||||
@staticmethod
|
||||
def _clamp(value, min_value, max_value):
|
||||
return max(min_value, min(value, max_value))
|
||||
|
||||
def _lint_file(self, file_path: Path) -> tuple[Optional[str], Optional[int]]:
|
||||
"""Lint the file at the given path and return a tuple with a boolean indicating if there are errors,
|
||||
and the line number of the first error, if any.
|
||||
|
||||
Returns:
|
||||
tuple[str | None, int | None]: (lint_error, first_error_line_number)
|
||||
"""
|
||||
|
||||
linter = Linter(root=self.working_dir)
|
||||
lint_error = linter.lint(str(file_path))
|
||||
if not lint_error:
|
||||
# Linting successful. No issues found.
|
||||
return None, None
|
||||
return "ERRORS:\n" + lint_error.text, lint_error.lines[0]
|
||||
|
||||
def _print_window(self, file_path: Path, targeted_line: int, window: int):
|
||||
self._check_current_file(file_path)
|
||||
with file_path.open() as file:
|
||||
content = file.read()
|
||||
|
||||
# Ensure the content ends with a newline character
|
||||
if not content.endswith("\n"):
|
||||
content += "\n"
|
||||
|
||||
lines = content.splitlines(True) # Keep all line ending characters
|
||||
total_lines = len(lines)
|
||||
|
||||
# cover edge cases
|
||||
self.current_line = self._clamp(targeted_line, 1, total_lines)
|
||||
half_window = max(1, window // 2)
|
||||
|
||||
# Ensure at least one line above and below the targeted line
|
||||
start = max(1, self.current_line - half_window)
|
||||
end = min(total_lines, self.current_line + half_window)
|
||||
|
||||
# Adjust start and end to ensure at least one line above and below
|
||||
if start == 1:
|
||||
end = min(total_lines, start + window - 1)
|
||||
if end == total_lines:
|
||||
start = max(1, end - window + 1)
|
||||
|
||||
output = ""
|
||||
|
||||
# only display this when there's at least one line above
|
||||
if start > 1:
|
||||
output += f"({start - 1} more lines above)\n"
|
||||
else:
|
||||
output += "(this is the beginning of the file)\n"
|
||||
for i in range(start, end + 1):
|
||||
_new_line = f"{i}|{lines[i - 1]}"
|
||||
if not _new_line.endswith("\n"):
|
||||
_new_line += "\n"
|
||||
output += _new_line
|
||||
if end < total_lines:
|
||||
output += f"({total_lines - end} more lines below)\n"
|
||||
else:
|
||||
output += "(this is the end of the file)\n"
|
||||
output = output.rstrip()
|
||||
|
||||
return output
|
||||
|
||||
@staticmethod
|
||||
def _cur_file_header(current_file: Path, total_lines: int) -> str:
|
||||
if not current_file:
|
||||
return ""
|
||||
return f"[File: {current_file.resolve()} ({total_lines} lines total)]\n"
|
||||
|
||||
def set_workdir(self, path: str) -> None:
|
||||
"""
|
||||
Sets the working directory to the given path. eg: repo directory.
|
||||
You MUST to set it up before open the file.
|
||||
|
||||
Args:
|
||||
path: str: The path to set as the working directory.
|
||||
"""
|
||||
self.working_dir = Path(path)
|
||||
|
||||
def open_file(
|
||||
self, path: Union[Path, str], line_number: Optional[int] = 1, context_lines: Optional[int] = None
|
||||
) -> str:
|
||||
"""Opens the file at the given path in the editor. If line_number is provided, the window will be moved to include that line.
|
||||
It only shows the first 100 lines by default! Max `context_lines` supported is 2000, use `scroll up/down`
|
||||
to view the file if you want to see more.
|
||||
|
||||
Args:
|
||||
path: str: The path to the file to open, preferred absolute path.
|
||||
line_number: int | None = 1: The line number to move to. Defaults to 1.
|
||||
context_lines: int | None = 100: Only shows this number of lines in the context window (usually from line 1), with line_number as the center (if possible). Defaults to 100.
|
||||
"""
|
||||
if context_lines is None:
|
||||
context_lines = self.window
|
||||
|
||||
path = self._try_fix_path(path)
|
||||
|
||||
if not path.is_file():
|
||||
raise FileNotFoundError(f"File {path} not found")
|
||||
|
||||
self.current_file = path
|
||||
with path.open() as file:
|
||||
total_lines = max(1, sum(1 for _ in file))
|
||||
|
||||
if not isinstance(line_number, int) or line_number < 1 or line_number > total_lines:
|
||||
raise ValueError(f"Line number must be between 1 and {total_lines}")
|
||||
self.current_line = line_number
|
||||
|
||||
# Override WINDOW with context_lines
|
||||
if context_lines is None or context_lines < 1:
|
||||
context_lines = self.window
|
||||
|
||||
output = self._cur_file_header(path, total_lines)
|
||||
output += self._print_window(path, self.current_line, self._clamp(context_lines, 1, 2000))
|
||||
return output
|
||||
|
||||
def goto_line(self, line_number: int) -> str:
|
||||
"""Moves the window to show the specified line number.
|
||||
|
||||
Args:
|
||||
line_number: int: The line number to move to.
|
||||
"""
|
||||
self._check_current_file()
|
||||
|
||||
with self.current_file.open() as file:
|
||||
total_lines = max(1, sum(1 for _ in file))
|
||||
if not isinstance(line_number, int) or line_number < 1 or line_number > total_lines:
|
||||
raise ValueError(f"Line number must be between 1 and {total_lines}")
|
||||
|
||||
self.current_line = self._clamp(line_number, 1, total_lines)
|
||||
|
||||
output = self._cur_file_header(self.current_file, total_lines)
|
||||
output += self._print_window(self.current_file, self.current_line, self.window)
|
||||
return output
|
||||
|
||||
def scroll_down(self) -> str:
|
||||
"""Moves the window down by 100 lines."""
|
||||
self._check_current_file()
|
||||
|
||||
with self.current_file.open() as file:
|
||||
total_lines = max(1, sum(1 for _ in file))
|
||||
self.current_line = self._clamp(self.current_line + self.window, 1, total_lines)
|
||||
output = self._cur_file_header(self.current_file, total_lines)
|
||||
output += self._print_window(self.current_file, self.current_line, self.window)
|
||||
return output
|
||||
|
||||
def scroll_up(self) -> str:
|
||||
"""Moves the window up by 100 lines."""
|
||||
self._check_current_file()
|
||||
|
||||
with self.current_file.open() as file:
|
||||
total_lines = max(1, sum(1 for _ in file))
|
||||
self.current_line = self._clamp(self.current_line - self.window, 1, total_lines)
|
||||
output = self._cur_file_header(self.current_file, total_lines)
|
||||
output += self._print_window(self.current_file, self.current_line, self.window)
|
||||
return output
|
||||
|
||||
def create_file(self, filename: str) -> str:
|
||||
"""Creates and opens a new file with the given name.
|
||||
|
||||
Args:
|
||||
filename: str: The name of the file to create.
|
||||
"""
|
||||
filename = self._try_fix_path(filename)
|
||||
|
||||
if filename.exists():
|
||||
raise FileExistsError(f"File '{filename}' already exists.")
|
||||
|
||||
with filename.open("w") as file:
|
||||
file.write("\n")
|
||||
|
||||
self.open_file(filename)
|
||||
return f"[File {filename} created.]"
|
||||
|
||||
@staticmethod
|
||||
def _append_impl(lines, content):
|
||||
"""Internal method to handle appending to a file.
|
||||
|
||||
Args:
|
||||
lines: list[str]: The lines in the original file.
|
||||
content: str: The content to append to the file.
|
||||
|
||||
Returns:
|
||||
content: str: The new content of the file.
|
||||
n_added_lines: int: The number of lines added to the file.
|
||||
"""
|
||||
content_lines = content.splitlines(keepends=True)
|
||||
n_added_lines = len(content_lines)
|
||||
if lines and not (len(lines) == 1 and lines[0].strip() == ""):
|
||||
# file is not empty
|
||||
if not lines[-1].endswith("\n"):
|
||||
lines[-1] += "\n"
|
||||
new_lines = lines + content_lines
|
||||
content = "".join(new_lines)
|
||||
else:
|
||||
# file is empty
|
||||
content = "".join(content_lines)
|
||||
|
||||
return content, n_added_lines
|
||||
|
||||
@staticmethod
|
||||
def _insert_impl(lines, start, content):
|
||||
"""Internal method to handle inserting to a file.
|
||||
|
||||
Args:
|
||||
lines: list[str]: The lines in the original file.
|
||||
start: int: The start line number for inserting.
|
||||
content: str: The content to insert to the file.
|
||||
|
||||
Returns:
|
||||
content: str: The new content of the file.
|
||||
n_added_lines: int: The number of lines added to the file.
|
||||
|
||||
Raises:
|
||||
LineNumberError: If the start line number is invalid.
|
||||
"""
|
||||
inserted_lines = [content + "\n" if not content.endswith("\n") else content]
|
||||
if len(lines) == 0:
|
||||
new_lines = inserted_lines
|
||||
elif start is not None:
|
||||
if len(lines) == 1 and lines[0].strip() == "":
|
||||
# if the file with only 1 line and that line is empty
|
||||
lines = []
|
||||
|
||||
if len(lines) == 0:
|
||||
new_lines = inserted_lines
|
||||
else:
|
||||
new_lines = lines[: start - 1] + inserted_lines + lines[start - 1 :]
|
||||
else:
|
||||
raise LineNumberError(
|
||||
f"Invalid line number: {start}. Line numbers must be between 1 and {len(lines)} (inclusive)."
|
||||
)
|
||||
|
||||
content = "".join(new_lines)
|
||||
n_added_lines = len(inserted_lines)
|
||||
return content, n_added_lines
|
||||
|
||||
@staticmethod
|
||||
def _edit_impl(lines, start, end, content):
|
||||
"""Internal method to handle editing a file.
|
||||
|
||||
REQUIRES (should be checked by caller):
|
||||
start <= end
|
||||
start and end are between 1 and len(lines) (inclusive)
|
||||
content ends with a newline
|
||||
|
||||
Args:
|
||||
lines: list[str]: The lines in the original file.
|
||||
start: int: The start line number for editing.
|
||||
end: int: The end line number for editing.
|
||||
content: str: The content to replace the lines with.
|
||||
|
||||
Returns:
|
||||
content: str: The new content of the file.
|
||||
n_added_lines: int: The number of lines added to the file.
|
||||
"""
|
||||
# Handle cases where start or end are None
|
||||
if start is None:
|
||||
start = 1 # Default to the beginning
|
||||
if end is None:
|
||||
end = len(lines) # Default to the end
|
||||
# Check arguments
|
||||
if not (1 <= start <= len(lines)):
|
||||
raise LineNumberError(
|
||||
f"Invalid start line number: {start}. Line numbers must be between 1 and {len(lines)} (inclusive)."
|
||||
)
|
||||
if not (1 <= end <= len(lines)):
|
||||
raise LineNumberError(
|
||||
f"Invalid end line number: {end}. Line numbers must be between 1 and {len(lines)} (inclusive)."
|
||||
)
|
||||
if start > end:
|
||||
raise LineNumberError(f"Invalid line range: {start}-{end}. Start must be less than or equal to end.")
|
||||
|
||||
# Split content into lines and ensure it ends with a newline
|
||||
if not content.endswith("\n"):
|
||||
content += "\n"
|
||||
content_lines = content.splitlines(True)
|
||||
|
||||
# Calculate the number of lines to be added
|
||||
n_added_lines = len(content_lines)
|
||||
|
||||
# Remove the specified range of lines and insert the new content
|
||||
new_lines = lines[: start - 1] + content_lines + lines[end:]
|
||||
|
||||
# Handle the case where the original lines are empty
|
||||
if len(lines) == 0:
|
||||
new_lines = content_lines
|
||||
|
||||
# Join the lines to create the new content
|
||||
content = "".join(new_lines)
|
||||
return content, n_added_lines
|
||||
|
||||
def _edit_file_impl(
|
||||
self,
|
||||
file_name: Path,
|
||||
start: Optional[int] = None,
|
||||
end: Optional[int] = None,
|
||||
content: str = "",
|
||||
is_insert: bool = False,
|
||||
is_append: bool = False,
|
||||
) -> str:
|
||||
"""Internal method to handle common logic for edit_/append_file methods.
|
||||
|
||||
Args:
|
||||
file_name: Path: The name of the file to edit or append to.
|
||||
start: int | None = None: The start line number for editing. Ignored if is_append is True.
|
||||
end: int | None = None: The end line number for editing. Ignored if is_append is True.
|
||||
content: str: The content to replace the lines with or to append.
|
||||
is_insert: bool = False: Whether to insert content at the given line number instead of editing.
|
||||
is_append: bool = False: Whether to append content to the file instead of editing.
|
||||
"""
|
||||
ret_str = ""
|
||||
|
||||
ERROR_MSG = f"[Error editing file {file_name}. Please confirm the file is correct.]"
|
||||
ERROR_MSG_SUFFIX = (
|
||||
"Your changes have NOT been applied. Please fix your edit command and try again.\n"
|
||||
"You either need to 1) Open the correct file and try again or 2) Specify the correct line number arguments.\n"
|
||||
"DO NOT re-run the same failed edit command. Running it again will lead to the same error."
|
||||
)
|
||||
|
||||
if not self._is_valid_filename(file_name.name):
|
||||
raise FileNotFoundError("Invalid file name.")
|
||||
|
||||
if not self._is_valid_path(file_name):
|
||||
raise FileNotFoundError("Invalid path or file name.")
|
||||
|
||||
if not self._create_paths(file_name):
|
||||
raise PermissionError("Could not access or create directories.")
|
||||
|
||||
if not file_name.is_file():
|
||||
raise FileNotFoundError(f"File {file_name} not found.")
|
||||
|
||||
if is_insert and is_append:
|
||||
raise ValueError("Cannot insert and append at the same time.")
|
||||
|
||||
# Use a temporary file to write changes
|
||||
content = str(content or "")
|
||||
temp_file_path = ""
|
||||
src_abs_path = file_name.resolve()
|
||||
first_error_line = None
|
||||
|
||||
try:
|
||||
# lint the original file
|
||||
# enable_auto_lint = os.getenv("ENABLE_AUTO_LINT", "false").lower() == "true"
|
||||
if self.enable_auto_lint:
|
||||
original_lint_error, _ = self._lint_file(file_name)
|
||||
|
||||
# Create a temporary file
|
||||
with tempfile.NamedTemporaryFile("w", delete=False) as temp_file:
|
||||
temp_file_path = temp_file.name
|
||||
|
||||
# Read the original file and check if empty and for a trailing newline
|
||||
with file_name.open() as original_file:
|
||||
lines = original_file.readlines()
|
||||
|
||||
if is_append:
|
||||
content, n_added_lines = self._append_impl(lines, content)
|
||||
elif is_insert:
|
||||
try:
|
||||
content, n_added_lines = self._insert_impl(lines, start, content)
|
||||
except LineNumberError as e:
|
||||
ret_str += (f"{ERROR_MSG}\n" f"{e}\n" f"{ERROR_MSG_SUFFIX}") + "\n"
|
||||
return ret_str
|
||||
else:
|
||||
try:
|
||||
content, n_added_lines = self._edit_impl(lines, start, end, content)
|
||||
except LineNumberError as e:
|
||||
ret_str += (f"{ERROR_MSG}\n" f"{e}\n" f"{ERROR_MSG_SUFFIX}") + "\n"
|
||||
return ret_str
|
||||
|
||||
if not content.endswith("\n"):
|
||||
content += "\n"
|
||||
|
||||
# Write the new content to the temporary file
|
||||
temp_file.write(content)
|
||||
|
||||
# Replace the original file with the temporary file atomically
|
||||
shutil.move(temp_file_path, src_abs_path)
|
||||
|
||||
# Handle linting
|
||||
# NOTE: we need to get env var inside this function
|
||||
# because the env var will be set AFTER the agentskills is imported
|
||||
if self.enable_auto_lint:
|
||||
# BACKUP the original file
|
||||
original_file_backup_path = file_name.parent / f".backup.{file_name.name}"
|
||||
with original_file_backup_path.open("w") as f:
|
||||
f.writelines(lines)
|
||||
|
||||
lint_error, first_error_line = self._lint_file(file_name)
|
||||
|
||||
# Select the errors caused by the modification
|
||||
def extract_last_part(line):
|
||||
parts = line.split(":")
|
||||
if len(parts) > 1:
|
||||
return parts[-1].strip()
|
||||
return line.strip()
|
||||
|
||||
def subtract_strings(str1, str2) -> str:
|
||||
lines1 = str1.splitlines()
|
||||
lines2 = str2.splitlines()
|
||||
|
||||
last_parts1 = [extract_last_part(line) for line in lines1]
|
||||
|
||||
remaining_lines = [line for line in lines2 if extract_last_part(line) not in last_parts1]
|
||||
|
||||
result = "\n".join(remaining_lines)
|
||||
return result
|
||||
|
||||
if original_lint_error and lint_error:
|
||||
lint_error = subtract_strings(original_lint_error, lint_error)
|
||||
if lint_error == "":
|
||||
lint_error = None
|
||||
first_error_line = None
|
||||
|
||||
if lint_error is not None:
|
||||
if first_error_line is not None:
|
||||
show_line = int(first_error_line)
|
||||
elif is_append:
|
||||
# original end-of-file
|
||||
show_line = len(lines)
|
||||
# insert OR edit WILL provide meaningful line numbers
|
||||
elif start is not None and end is not None:
|
||||
show_line = int((start + end) / 2)
|
||||
else:
|
||||
raise ValueError("Invalid state. This should never happen.")
|
||||
|
||||
ret_str += LINTER_ERROR_MSG
|
||||
ret_str += lint_error + "\n"
|
||||
|
||||
editor_lines = n_added_lines + 20
|
||||
|
||||
ret_str += "[This is how your edit would have looked if applied]\n"
|
||||
ret_str += "-------------------------------------------------\n"
|
||||
ret_str += self._print_window(file_name, show_line, editor_lines, return_str=True) + "\n"
|
||||
ret_str += "-------------------------------------------------\n\n"
|
||||
|
||||
ret_str += "[This is the original code before your edit]\n"
|
||||
ret_str += "-------------------------------------------------\n"
|
||||
ret_str += (
|
||||
self._print_window(
|
||||
original_file_backup_path,
|
||||
show_line,
|
||||
editor_lines,
|
||||
)
|
||||
+ "\n"
|
||||
)
|
||||
ret_str += "-------------------------------------------------\n"
|
||||
|
||||
ret_str += (
|
||||
"Your changes have NOT been applied. Please fix your edit command and try again.\n"
|
||||
"You either need to 1) Specify the correct start/end line arguments or 2) Correct your edit code.\n"
|
||||
"DO NOT re-run the same failed edit command. Running it again will lead to the same error."
|
||||
)
|
||||
|
||||
# recover the original file
|
||||
with original_file_backup_path.open() as fin, file_name.open("w") as fout:
|
||||
fout.write(fin.read())
|
||||
original_file_backup_path.unlink()
|
||||
return ret_str
|
||||
|
||||
except FileNotFoundError as e:
|
||||
ret_str += f"File not found: {e}\n"
|
||||
except IOError as e:
|
||||
ret_str += f"An error occurred while handling the file: {e}\n"
|
||||
except ValueError as e:
|
||||
ret_str += f"Invalid input: {e}\n"
|
||||
except Exception as e:
|
||||
# Clean up the temporary file if an error occurs
|
||||
if temp_file_path and Path(temp_file_path).exists():
|
||||
Path(temp_file_path).unlink()
|
||||
logger.warning(f"An unexpected error occurred: {e}")
|
||||
raise e
|
||||
|
||||
# Update the file information and print the updated content
|
||||
with file_name.open("r", encoding="utf-8") as file:
|
||||
n_total_lines = max(1, len(file.readlines()))
|
||||
if first_error_line is not None and int(first_error_line) > 0:
|
||||
self.current_line = first_error_line
|
||||
else:
|
||||
if is_append:
|
||||
self.current_line = max(1, len(lines)) # end of original file
|
||||
else:
|
||||
self.current_line = start or n_total_lines or 1
|
||||
ret_str += f"[File: {file_name.resolve()} ({n_total_lines} lines total after edit)]\n"
|
||||
CURRENT_FILE = file_name
|
||||
ret_str += self._print_window(CURRENT_FILE, self.current_line, self.window) + "\n"
|
||||
ret_str += MSG_FILE_UPDATED.format(line_number=self.current_line)
|
||||
return ret_str
|
||||
|
||||
def edit_file_by_replace(self, file_name: str, to_replace: str, new_content: str) -> str:
|
||||
"""Edit a file. This will search for `to_replace` in the given file and replace it with `new_content`.
|
||||
|
||||
Every *to_replace* must *EXACTLY MATCH* the existing source code, character for character, including all comments, docstrings, etc.
|
||||
|
||||
Include enough lines to make code in `to_replace` unique. `to_replace` should NOT be empty.
|
||||
|
||||
For example, given a file "/workspace/example.txt" with the following content:
|
||||
```
|
||||
line 1
|
||||
line 2
|
||||
line 2
|
||||
line 3
|
||||
```
|
||||
|
||||
EDITING: If you want to replace the second occurrence of "line 2", you can make `to_replace` unique:
|
||||
|
||||
edit_file_by_replace(
|
||||
'/workspace/example.txt',
|
||||
to_replace='line 2\nline 3',
|
||||
new_content='new line\nline 3',
|
||||
)
|
||||
|
||||
This will replace only the second "line 2" with "new line". The first "line 2" will remain unchanged.
|
||||
|
||||
The resulting file will be:
|
||||
```
|
||||
line 1
|
||||
line 2
|
||||
new line
|
||||
line 3
|
||||
```
|
||||
|
||||
REMOVAL: If you want to remove "line 2" and "line 3", you can set `new_content` to an empty string:
|
||||
|
||||
edit_file_by_replace(
|
||||
'/workspace/example.txt',
|
||||
to_replace='line 2\nline 3',
|
||||
new_content='',
|
||||
)
|
||||
|
||||
Args:
|
||||
file_name: str: The name of the file to edit.
|
||||
to_replace: str: The content to search for and replace.
|
||||
new_content: str: The new content to replace the old content with.
|
||||
"""
|
||||
# FIXME: support replacing *all* occurrences
|
||||
if to_replace.strip() == "":
|
||||
raise ValueError("`to_replace` must not be empty.")
|
||||
|
||||
if to_replace == new_content:
|
||||
raise ValueError("`to_replace` and `new_content` must be different.")
|
||||
|
||||
# search for `to_replace` in the file
|
||||
# if found, replace it with `new_content`
|
||||
# if not found, perform a fuzzy search to find the closest match and replace it with `new_content`
|
||||
file_name = self._try_fix_path(file_name)
|
||||
with file_name.open("r") as file:
|
||||
file_content = file.read()
|
||||
|
||||
if file_content.count(to_replace) > 1:
|
||||
raise ValueError(
|
||||
"`to_replace` appears more than once, please include enough lines to make code in `to_replace` unique."
|
||||
)
|
||||
|
||||
start = file_content.find(to_replace)
|
||||
if start != -1:
|
||||
# Convert start from index to line number
|
||||
start_line_number = file_content[:start].count("\n") + 1
|
||||
end_line_number = start_line_number + len(to_replace.splitlines()) - 1
|
||||
else:
|
||||
|
||||
def _fuzzy_transform(s: str) -> str:
|
||||
# remove all space except newline
|
||||
return re.sub(r"[^\S\n]+", "", s)
|
||||
|
||||
# perform a fuzzy search (remove all spaces except newlines)
|
||||
to_replace_fuzzy = _fuzzy_transform(to_replace)
|
||||
file_content_fuzzy = _fuzzy_transform(file_content)
|
||||
# find the closest match
|
||||
start = file_content_fuzzy.find(to_replace_fuzzy)
|
||||
if start == -1:
|
||||
return f"[No exact match found in {file_name} for\n```\n{to_replace}\n```\n]"
|
||||
# Convert start from index to line number for fuzzy match
|
||||
start_line_number = file_content_fuzzy[:start].count("\n") + 1
|
||||
end_line_number = start_line_number + len(to_replace.splitlines()) - 1
|
||||
|
||||
ret_str = self._edit_file_impl(
|
||||
file_name,
|
||||
start=start_line_number,
|
||||
end=end_line_number,
|
||||
content=new_content,
|
||||
is_insert=False,
|
||||
)
|
||||
# lint_error = bool(LINTER_ERROR_MSG in ret_str)
|
||||
# TODO: automatically tries to fix linter error (maybe involve some static analysis tools on the location near the edit to figure out indentation)
|
||||
return ret_str
|
||||
|
||||
def insert_content_at_line(self, file_name: str, line_number: int, content: str) -> str:
|
||||
"""Insert content at the given line number in a file.
|
||||
This will NOT modify the content of the lines before OR after the given line number.
|
||||
|
||||
For example, if the file has the following content:
|
||||
```
|
||||
line 1
|
||||
line 2
|
||||
line 3
|
||||
```
|
||||
and you call `insert_content_at_line('file.txt', 2, 'new line')`, the file will be updated to:
|
||||
```
|
||||
line 1
|
||||
new line
|
||||
line 2
|
||||
line 3
|
||||
```
|
||||
|
||||
Args:
|
||||
file_name: str: The name of the file to edit.
|
||||
line_number: int: The line number (starting from 1) to insert the content after.
|
||||
content: str: The content to insert.
|
||||
"""
|
||||
file_name = self._try_fix_path(file_name)
|
||||
|
||||
ret_str = self._edit_file_impl(
|
||||
file_name,
|
||||
start=line_number,
|
||||
end=line_number,
|
||||
content=content,
|
||||
is_insert=True,
|
||||
is_append=False,
|
||||
)
|
||||
return ret_str
|
||||
|
||||
def append_file(self, file_name: str, content: str) -> str:
|
||||
"""Append content to the given file.
|
||||
It appends text `content` to the end of the specified file.
|
||||
|
||||
Args:
|
||||
file_name: str: The name of the file to edit.
|
||||
content: str: The content to insert.
|
||||
"""
|
||||
file_name = self._try_fix_path(file_name)
|
||||
|
||||
ret_str = self._edit_file_impl(
|
||||
file_name,
|
||||
start=None,
|
||||
end=None,
|
||||
content=content,
|
||||
is_insert=False,
|
||||
is_append=True,
|
||||
)
|
||||
return ret_str
|
||||
|
||||
def search_dir(self, search_term: str, dir_path: str = "./") -> str:
|
||||
"""Searches for search_term in all files in dir. If dir is not provided, searches in the current directory.
|
||||
|
||||
Args:
|
||||
search_term: str: The term to search for.
|
||||
dir_path: str: The path to the directory to search.
|
||||
"""
|
||||
dir_path = self._try_fix_path(dir_path)
|
||||
if not dir_path.is_dir():
|
||||
raise FileNotFoundError(f"Directory {dir_path} not found")
|
||||
matches = []
|
||||
for root, _, files in os.walk(dir_path):
|
||||
for file in files:
|
||||
if file.startswith("."):
|
||||
continue
|
||||
file_path = Path(root) / file
|
||||
with file_path.open("r", errors="ignore") as f:
|
||||
for line_num, line in enumerate(f, 1):
|
||||
if search_term in line:
|
||||
matches.append((file_path, line_num, line.strip()))
|
||||
|
||||
if not matches:
|
||||
return f'No matches found for "{search_term}" in {dir_path}'
|
||||
|
||||
num_matches = len(matches)
|
||||
num_files = len(set(match[0] for match in matches))
|
||||
|
||||
if num_files > 100:
|
||||
return f'More than {num_files} files matched for "{search_term}" in {dir_path}. Please narrow your search.'
|
||||
|
||||
res_list = [f'[Found {num_matches} matches for "{search_term}" in {dir_path}]']
|
||||
for file_path, line_num, line in matches:
|
||||
res_list.append(f"{file_path} (Line {line_num}): {line}")
|
||||
res_list.append(f'[End of matches for "{search_term}" in {dir_path}]')
|
||||
return "\n".join(res_list)
|
||||
|
||||
def search_file(self, search_term: str, file_path: Optional[str] = None) -> str:
|
||||
"""Searches for search_term in file. If file is not provided, searches in the current open file.
|
||||
|
||||
Args:
|
||||
search_term: str: The term to search for.
|
||||
file_path: str | None: The path to the file to search.
|
||||
"""
|
||||
if file_path is None:
|
||||
file_path = self.current_file
|
||||
else:
|
||||
file_path = self._try_fix_path(file_path)
|
||||
if file_path is None:
|
||||
raise FileNotFoundError("No file specified or open. Use the open_file function first.")
|
||||
if not file_path.is_file():
|
||||
raise FileNotFoundError(f"File {file_path} not found")
|
||||
|
||||
matches = []
|
||||
with file_path.open() as file:
|
||||
for i, line in enumerate(file, 1):
|
||||
if search_term in line:
|
||||
matches.append((i, line.strip()))
|
||||
res_list = []
|
||||
if matches:
|
||||
res_list.append(f'[Found {len(matches)} matches for "{search_term}" in {file_path}]')
|
||||
for match in matches:
|
||||
res_list.append(f"Line {match[0]}: {match[1]}")
|
||||
res_list.append(f'[End of matches for "{search_term}" in {file_path}]')
|
||||
else:
|
||||
res_list.append(f'[No matches found for "{search_term}" in {file_path}]')
|
||||
return "\n".join(res_list)
|
||||
|
||||
def find_file(self, file_name: str, dir_path: str = "./") -> str:
|
||||
"""Finds all files with the given name in the specified directory.
|
||||
|
||||
Args:
|
||||
file_name: str: The name of the file to find.
|
||||
dir_path: str: The path to the directory to search.
|
||||
"""
|
||||
file_name = self._try_fix_path(file_name)
|
||||
dir_path = self._try_fix_path(dir_path)
|
||||
if not dir_path.is_dir():
|
||||
raise FileNotFoundError(f"Directory {dir_path} not found")
|
||||
|
||||
matches = []
|
||||
for root, _, files in os.walk(dir_path):
|
||||
for file in files:
|
||||
if str(file_name) in file:
|
||||
matches.append(Path(root) / file)
|
||||
|
||||
res_list = []
|
||||
if matches:
|
||||
res_list.append(f'[Found {len(matches)} matches for "{file_name}" in {dir_path}]')
|
||||
for match in matches:
|
||||
res_list.append(f"{match}")
|
||||
res_list.append(f'[End of matches for "{file_name}" in {dir_path}]')
|
||||
else:
|
||||
res_list.append(f'[No matches found for "{file_name}" in {dir_path}]')
|
||||
return "\n".join(res_list)
|
||||
|
||||
def _try_fix_path(self, path: Union[Path, str]) -> Path:
|
||||
"""Tries to fix the path if it is not absolute."""
|
||||
if not isinstance(path, Path):
|
||||
path = Path(path)
|
||||
if not path.is_absolute():
|
||||
path = self.working_dir / path
|
||||
return path
|
||||
|
|
|
|||
264
metagpt/tools/libs/index_repo.py
Normal file
264
metagpt/tools/libs/index_repo.py
Normal file
|
|
@ -0,0 +1,264 @@
|
|||
#!/usr/bin/env python
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
import json
|
||||
from pathlib import Path
|
||||
from typing import Dict, List, Optional, Set, Union
|
||||
|
||||
import tiktoken
|
||||
from llama_index.core.base.embeddings.base import BaseEmbedding
|
||||
from llama_index.core.schema import NodeWithScore
|
||||
from pydantic import BaseModel, Field, model_validator
|
||||
|
||||
from metagpt.config2 import Config
|
||||
from metagpt.logs import logger
|
||||
from metagpt.rag.engines import SimpleEngine
|
||||
from metagpt.rag.factories.embedding import RAGEmbeddingFactory
|
||||
from metagpt.rag.schema import FAISSIndexConfig, FAISSRetrieverConfig, LLMRankerConfig
|
||||
from metagpt.utils.common import aread, awrite, generate_fingerprint, list_files
|
||||
from metagpt.utils.repo_to_markdown import is_text_file
|
||||
|
||||
|
||||
class TextScore(BaseModel):
|
||||
filename: str
|
||||
text: str
|
||||
score: Optional[float] = None
|
||||
|
||||
|
||||
class IndexRepo(BaseModel):
|
||||
persist_path: str # The persist path of the index repo, {DEFAULT_WORKSPACE_ROOT}/.index/{chat_id or 'uploads'}/
|
||||
root_path: str # `/data/uploads` or r`/data/chats/\d+`, the root path of files indexed by the index repo.
|
||||
fingerprint_filename: str = "fingerprint.json"
|
||||
model: Optional[str] = None
|
||||
min_token_count: int = 10000
|
||||
max_token_count: int = 100000000
|
||||
recall_count: int = 5
|
||||
embedding: Optional[BaseEmbedding] = Field(default=None, exclude=True)
|
||||
fingerprints: Dict[str, str] = Field(default_factory=dict)
|
||||
|
||||
@model_validator(mode="after")
|
||||
def _update_fingerprints(self) -> "IndexRepo":
|
||||
"""Load fingerprints from the fingerprint file if not already loaded.
|
||||
|
||||
Returns:
|
||||
IndexRepo: The updated IndexRepo instance.
|
||||
"""
|
||||
if not self.fingerprints:
|
||||
filename = Path(self.persist_path) / self.fingerprint_filename
|
||||
if not filename.exists():
|
||||
return self
|
||||
with open(str(filename), "r") as reader:
|
||||
self.fingerprints = json.load(reader)
|
||||
return self
|
||||
|
||||
async def search(
|
||||
self, query: str, filenames: Optional[List[Path]] = None
|
||||
) -> Optional[List[Union[NodeWithScore, TextScore]]]:
|
||||
"""Search for documents related to the given query.
|
||||
|
||||
Args:
|
||||
query (str): The search query.
|
||||
filenames (Optional[List[Path]]): A list of filenames to filter the search.
|
||||
|
||||
Returns:
|
||||
Optional[List[Union[NodeWithScore, TextScore]]]: A list of search results containing NodeWithScore or TextScore.
|
||||
"""
|
||||
encoding = tiktoken.get_encoding("cl100k_base")
|
||||
result: List[Union[NodeWithScore, TextScore]] = []
|
||||
filenames, _ = await self._filter(filenames)
|
||||
filter_filenames = set()
|
||||
for i in filenames:
|
||||
content = await aread(filename=i)
|
||||
token_count = len(encoding.encode(content))
|
||||
if not self._is_buildable(token_count):
|
||||
result.append(TextScore(filename=str(i), text=content))
|
||||
continue
|
||||
file_fingerprint = generate_fingerprint(content)
|
||||
if self.fingerprints.get(str(i)) != file_fingerprint:
|
||||
logger.error(f'file: "{i}" changed but not indexed')
|
||||
continue
|
||||
filter_filenames.add(str(i))
|
||||
nodes = await self._search(query=query, filters=filter_filenames)
|
||||
return result + nodes
|
||||
|
||||
async def merge(
|
||||
self, query: str, indices_list: List[List[Union[NodeWithScore, TextScore]]]
|
||||
) -> List[Union[NodeWithScore, TextScore]]:
|
||||
"""Merge results from multiple indices based on the query.
|
||||
|
||||
Args:
|
||||
query (str): The search query.
|
||||
indices_list (List[List[Union[NodeWithScore, TextScore]]]): A list of result lists from different indices.
|
||||
|
||||
Returns:
|
||||
List[Union[NodeWithScore, TextScore]]: A list of merged results sorted by similarity.
|
||||
"""
|
||||
if not self.embedding:
|
||||
config = Config.default()
|
||||
if self.model:
|
||||
config.embedding.model = self.model
|
||||
factory = RAGEmbeddingFactory(config)
|
||||
self.embedding = factory.get_rag_embedding()
|
||||
|
||||
scores = []
|
||||
query_embedding = await self.embedding.aget_text_embedding(query)
|
||||
flat_nodes = [node for indices in indices_list for node in indices]
|
||||
for i in flat_nodes:
|
||||
text_embedding = await self.embedding.aget_text_embedding(i.text)
|
||||
similarity = self.embedding.similarity(query_embedding, text_embedding)
|
||||
scores.append((similarity, i))
|
||||
scores.sort(key=lambda x: x[0], reverse=True)
|
||||
return [i[1] for i in scores][: self.recall_count]
|
||||
|
||||
async def add(self, paths: List[Path]):
|
||||
"""Add new documents to the index.
|
||||
|
||||
Args:
|
||||
paths (List[Path]): A list of paths to the documents to be added.
|
||||
"""
|
||||
encoding = tiktoken.get_encoding("cl100k_base")
|
||||
filenames, _ = await self._filter(paths)
|
||||
filter_filenames = []
|
||||
delete_filenames = []
|
||||
for i in filenames:
|
||||
content = await aread(filename=i)
|
||||
if not self._is_fingerprint_changed(filename=i, content=content):
|
||||
continue
|
||||
token_count = len(encoding.encode(content))
|
||||
if self._is_buildable(token_count):
|
||||
filter_filenames.append(i)
|
||||
logger.debug(f"{i} is_buildable: {token_count}, {self.min_token_count}~{self.max_token_count}")
|
||||
else:
|
||||
delete_filenames.append(i)
|
||||
logger.debug(f"{i} not is_buildable: {token_count}, {self.min_token_count}~{self.max_token_count}")
|
||||
await self._add_batch(filenames=filter_filenames, delete_filenames=delete_filenames)
|
||||
|
||||
async def _add_batch(self, filenames: List[Union[str, Path]], delete_filenames: List[Union[str, Path]]):
|
||||
"""Add and remove documents in a batch operation.
|
||||
|
||||
Args:
|
||||
filenames (List[Union[str, Path]]): List of filenames to add.
|
||||
delete_filenames (List[Union[str, Path]]): List of filenames to delete.
|
||||
"""
|
||||
if not filenames:
|
||||
return
|
||||
logger.info(f"update index repo, add {filenames}, remove {delete_filenames}")
|
||||
engine = None
|
||||
if Path(self.persist_path).exists():
|
||||
logger.debug(f"load index from {self.persist_path}")
|
||||
engine = SimpleEngine.from_index(
|
||||
index_config=FAISSIndexConfig(persist_path=self.persist_path),
|
||||
retriever_configs=[FAISSRetrieverConfig()],
|
||||
)
|
||||
try:
|
||||
engine.delete_docs(filenames + delete_filenames)
|
||||
logger.debug(f"delete docs {filenames + delete_filenames}")
|
||||
engine.add_docs(input_files=filenames)
|
||||
logger.debug(f"add docs {filenames}")
|
||||
except NotImplementedError as e:
|
||||
logger.debug(f"{e}")
|
||||
filenames = list(set([str(i) for i in filenames] + list(self.fingerprints.keys())))
|
||||
engine = None
|
||||
logger.info(f"{e}. Rebuild all.")
|
||||
if not engine:
|
||||
engine = SimpleEngine.from_docs(
|
||||
input_files=[str(i) for i in filenames],
|
||||
retriever_configs=[FAISSRetrieverConfig()],
|
||||
ranker_configs=[LLMRankerConfig()],
|
||||
)
|
||||
logger.debug(f"add docs {filenames}")
|
||||
engine.persist(persist_dir=self.persist_path)
|
||||
for i in filenames:
|
||||
content = await aread(i)
|
||||
fp = generate_fingerprint(content)
|
||||
self.fingerprints[str(i)] = fp
|
||||
await awrite(filename=Path(self.persist_path) / self.fingerprint_filename, data=json.dumps(self.fingerprints))
|
||||
|
||||
def __str__(self):
|
||||
"""Return a string representation of the IndexRepo.
|
||||
|
||||
Returns:
|
||||
str: The filename of the index repository.
|
||||
"""
|
||||
return f"{self.persist_path}"
|
||||
|
||||
def _is_buildable(self, token_count: int) -> bool:
|
||||
"""Check if the token count is within the buildable range.
|
||||
|
||||
Args:
|
||||
token_count (int): The number of tokens in the content.
|
||||
|
||||
Returns:
|
||||
bool: True if buildable, False otherwise.
|
||||
"""
|
||||
if token_count < self.min_token_count or token_count > self.max_token_count:
|
||||
return False
|
||||
return True
|
||||
|
||||
async def _filter(self, filenames: Optional[List[Union[str, Path]]] = None) -> (List[Path], List[Path]):
|
||||
"""Filter the provided filenames to only include valid text files.
|
||||
|
||||
Args:
|
||||
filenames (Optional[List[Union[str, Path]]]): List of filenames to filter.
|
||||
|
||||
Returns:
|
||||
Tuple[List[Path], List[Path]]: A tuple containing a list of valid pathnames and a list of excluded paths.
|
||||
"""
|
||||
root_path = Path(self.root_path).absolute()
|
||||
if not filenames:
|
||||
filenames = [root_path]
|
||||
pathnames = []
|
||||
excludes = []
|
||||
for i in filenames:
|
||||
path = Path(i).absolute()
|
||||
if not path.is_relative_to(root_path):
|
||||
excludes.append(path)
|
||||
logger.debug(f"{path} not is_relative_to {root_path})")
|
||||
continue
|
||||
if not path.is_dir():
|
||||
is_text, _ = await is_text_file(path)
|
||||
if is_text:
|
||||
pathnames.append(path)
|
||||
continue
|
||||
subfiles = list_files(path)
|
||||
for j in subfiles:
|
||||
is_text, _ = await is_text_file(j)
|
||||
if is_text:
|
||||
pathnames.append(j)
|
||||
|
||||
logger.debug(f"{pathnames}, excludes:{excludes})")
|
||||
return pathnames, excludes
|
||||
|
||||
async def _search(self, query: str, filters: Set[str]) -> List[NodeWithScore]:
|
||||
"""Perform a search for the given query using the index.
|
||||
|
||||
Args:
|
||||
query (str): The search query.
|
||||
filters (Set[str]): A set of filenames to filter the search results.
|
||||
|
||||
Returns:
|
||||
List[NodeWithScore]: A list of nodes with scores matching the query.
|
||||
"""
|
||||
if not Path(self.persist_path).exists():
|
||||
return []
|
||||
engine = SimpleEngine.from_index(
|
||||
index_config=FAISSIndexConfig(persist_path=self.persist_path), retriever_configs=[FAISSRetrieverConfig()]
|
||||
)
|
||||
rsp = await engine.aretrieve(query)
|
||||
return [i for i in rsp if i.metadata.get("file_path") in filters]
|
||||
|
||||
def _is_fingerprint_changed(self, filename: Union[str, Path], content: str) -> bool:
|
||||
"""Check if the fingerprint of the given document content has changed.
|
||||
|
||||
Args:
|
||||
filename (Union[str, Path]): The filename of the document.
|
||||
content (str): The content of the document.
|
||||
|
||||
Returns:
|
||||
bool: True if the fingerprint has changed, False otherwise.
|
||||
"""
|
||||
old_fp = self.fingerprints.get(str(filename))
|
||||
if not old_fp:
|
||||
return True
|
||||
fp = generate_fingerprint(content)
|
||||
return old_fp != fp
|
||||
226
metagpt/tools/libs/linter.py
Normal file
226
metagpt/tools/libs/linter.py
Normal file
|
|
@ -0,0 +1,226 @@
|
|||
"""
|
||||
This file is borrowed from OpenDevin
|
||||
You can find the original repository here:
|
||||
https://github.com/All-Hands-AI/OpenHands/blob/main/openhands/runtime/plugins/agent_skills/utils/aider/linter.py
|
||||
"""
|
||||
import os
|
||||
import subprocess
|
||||
import sys
|
||||
import traceback
|
||||
import warnings
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
from grep_ast import TreeContext, filename_to_lang
|
||||
from tree_sitter_languages import get_parser # noqa: E402
|
||||
|
||||
# tree_sitter is throwing a FutureWarning
|
||||
warnings.simplefilter("ignore", category=FutureWarning)
|
||||
|
||||
|
||||
@dataclass
|
||||
class LintResult:
|
||||
text: str
|
||||
lines: list
|
||||
|
||||
|
||||
class Linter:
|
||||
def __init__(self, encoding="utf-8", root=None):
|
||||
self.encoding = encoding
|
||||
self.root = root
|
||||
|
||||
self.languages = dict(
|
||||
python=self.py_lint,
|
||||
)
|
||||
self.all_lint_cmd = None
|
||||
|
||||
def set_linter(self, lang, cmd):
|
||||
if lang:
|
||||
self.languages[lang] = cmd
|
||||
return
|
||||
|
||||
self.all_lint_cmd = cmd
|
||||
|
||||
def get_rel_fname(self, fname):
|
||||
if self.root:
|
||||
return os.path.relpath(fname, self.root)
|
||||
else:
|
||||
return fname
|
||||
|
||||
def run_cmd(self, cmd, rel_fname, code):
|
||||
cmd += " " + rel_fname
|
||||
cmd = cmd.split()
|
||||
process = subprocess.Popen(cmd, cwd=self.root, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
|
||||
stdout, _ = process.communicate()
|
||||
errors = stdout.decode().strip()
|
||||
self.returncode = process.returncode
|
||||
if self.returncode == 0:
|
||||
return # zero exit status
|
||||
|
||||
cmd = " ".join(cmd)
|
||||
res = ""
|
||||
res += errors
|
||||
line_num = extract_error_line_from(res)
|
||||
return LintResult(text=res, lines=[line_num])
|
||||
|
||||
def get_abs_fname(self, fname):
|
||||
if os.path.isabs(fname):
|
||||
return fname
|
||||
elif os.path.isfile(fname):
|
||||
rel_fname = self.get_rel_fname(fname)
|
||||
return os.path.abspath(rel_fname)
|
||||
else: # if a temp file
|
||||
return self.get_rel_fname(fname)
|
||||
|
||||
def lint(self, fname, cmd=None) -> Optional[LintResult]:
|
||||
code = Path(fname).read_text(self.encoding)
|
||||
absolute_fname = self.get_abs_fname(fname)
|
||||
if cmd:
|
||||
cmd = cmd.strip()
|
||||
if not cmd:
|
||||
lang = filename_to_lang(fname)
|
||||
if not lang:
|
||||
return None
|
||||
if self.all_lint_cmd:
|
||||
cmd = self.all_lint_cmd
|
||||
else:
|
||||
cmd = self.languages.get(lang)
|
||||
if callable(cmd):
|
||||
linkres = cmd(fname, absolute_fname, code)
|
||||
elif cmd:
|
||||
linkres = self.run_cmd(cmd, absolute_fname, code)
|
||||
else:
|
||||
linkres = basic_lint(absolute_fname, code)
|
||||
return linkres
|
||||
|
||||
def flake_lint(self, rel_fname, code):
|
||||
fatal = "F821,F822,F831,E112,E113,E999,E902"
|
||||
flake8 = f"flake8 --select={fatal} --isolated"
|
||||
|
||||
try:
|
||||
flake_res = self.run_cmd(flake8, rel_fname, code)
|
||||
except FileNotFoundError:
|
||||
flake_res = None
|
||||
return flake_res
|
||||
|
||||
def py_lint(self, fname, rel_fname, code):
|
||||
error = self.flake_lint(rel_fname, code)
|
||||
if not error:
|
||||
error = lint_python_compile(fname, code)
|
||||
if not error:
|
||||
error = basic_lint(rel_fname, code)
|
||||
return error
|
||||
|
||||
|
||||
def lint_python_compile(fname, code):
|
||||
try:
|
||||
compile(code, fname, "exec") # USE TRACEBACK BELOW HERE
|
||||
return
|
||||
except IndentationError as err:
|
||||
end_lineno = getattr(err, "end_lineno", err.lineno)
|
||||
if isinstance(end_lineno, int):
|
||||
line_numbers = list(range(end_lineno - 1, end_lineno))
|
||||
else:
|
||||
line_numbers = []
|
||||
|
||||
tb_lines = traceback.format_exception(type(err), err, err.__traceback__)
|
||||
last_file_i = 0
|
||||
|
||||
target = "# USE TRACEBACK"
|
||||
target += " BELOW HERE"
|
||||
for i in range(len(tb_lines)):
|
||||
if target in tb_lines[i]:
|
||||
last_file_i = i
|
||||
break
|
||||
tb_lines = tb_lines[:1] + tb_lines[last_file_i + 1 :]
|
||||
|
||||
res = "".join(tb_lines)
|
||||
return LintResult(text=res, lines=line_numbers)
|
||||
|
||||
|
||||
def basic_lint(fname, code):
|
||||
"""
|
||||
Use tree-sitter to look for syntax errors, display them with tree context.
|
||||
"""
|
||||
|
||||
lang = filename_to_lang(fname)
|
||||
if not lang:
|
||||
return
|
||||
|
||||
parser = get_parser(lang)
|
||||
tree = parser.parse(bytes(code, "utf-8"))
|
||||
|
||||
errors = traverse_tree(tree.root_node)
|
||||
if not errors:
|
||||
return
|
||||
return LintResult(text=f"{fname}:{errors[0]}", lines=errors)
|
||||
|
||||
|
||||
def extract_error_line_from(lint_error):
|
||||
# moved from openhands.agentskills#_lint_file
|
||||
for line in lint_error.splitlines(True):
|
||||
if line.strip():
|
||||
# The format of the error message is: <filename>:<line>:<column>: <error code> <error message>
|
||||
parts = line.split(":")
|
||||
if len(parts) >= 2:
|
||||
try:
|
||||
first_error_line = int(parts[1])
|
||||
break
|
||||
except ValueError:
|
||||
continue
|
||||
return first_error_line
|
||||
|
||||
|
||||
def tree_context(fname, code, line_nums):
|
||||
context = TreeContext(
|
||||
fname,
|
||||
code,
|
||||
color=False,
|
||||
line_number=True,
|
||||
child_context=False,
|
||||
last_line=False,
|
||||
margin=0,
|
||||
mark_lois=True,
|
||||
loi_pad=3,
|
||||
# header_max=30,
|
||||
show_top_of_file_parent_scope=False,
|
||||
)
|
||||
line_nums = set(line_nums)
|
||||
context.add_lines_of_interest(line_nums)
|
||||
context.add_context()
|
||||
output = context.format()
|
||||
|
||||
return output
|
||||
|
||||
|
||||
# Traverse the tree to find errors
|
||||
def traverse_tree(node):
|
||||
errors = []
|
||||
if node.type == "ERROR" or node.is_missing:
|
||||
line_no = node.start_point[0] + 1
|
||||
errors.append(line_no)
|
||||
|
||||
for child in node.children:
|
||||
errors += traverse_tree(child)
|
||||
|
||||
return errors
|
||||
|
||||
|
||||
def main():
|
||||
"""
|
||||
Main function to parse files provided as command line arguments.
|
||||
"""
|
||||
if len(sys.argv) < 2:
|
||||
print("Usage: python linter.py <file1> <file2> ...")
|
||||
sys.exit(1)
|
||||
|
||||
linter = Linter(root=os.getcwd())
|
||||
for file_path in sys.argv[1:]:
|
||||
errors = linter.lint(file_path)
|
||||
if errors:
|
||||
print(errors)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Loading…
Add table
Add a link
Reference in a new issue