mirror of
https://github.com/FoundationAgents/MetaGPT.git
synced 2026-05-10 16:22:37 +02:00
update: editor
This commit is contained in:
parent
fedc771478
commit
8e7696b8e6
7 changed files with 1516 additions and 232 deletions
|
|
@ -183,7 +183,9 @@ IMPORTANT_TIPS = """
|
|||
|
||||
15. When the edit fails, try to enlarge the starting line.
|
||||
|
||||
16. Once again, and this is critical: YOU CAN ONLY ENTER ONE COMMAND AT A TIME.
|
||||
16. Use an absolute path instead of a relative path.
|
||||
|
||||
17. Once again, and this is critical: YOU CAN ONLY ENTER ONE COMMAND AT A TIME.
|
||||
"""
|
||||
|
||||
NEXT_STEP_TEMPLATE = f"""
|
||||
|
|
|
|||
|
|
@ -109,9 +109,6 @@ class RoleZero(Role):
|
|||
"Plan.append_task": self.planner.plan.append_task,
|
||||
"Plan.reset_task": self.planner.plan.reset_task,
|
||||
"Plan.replace_task": self.planner.plan.replace_task,
|
||||
"Editor.write": self.editor.write,
|
||||
"Editor.write_content": self.editor.write_content,
|
||||
"Editor.read": self.editor.read,
|
||||
"RoleZero.ask_human": self.ask_human,
|
||||
"RoleZero.reply_to_human": self.reply_to_human,
|
||||
}
|
||||
|
|
@ -132,6 +129,26 @@ class RoleZero(Role):
|
|||
]
|
||||
}
|
||||
)
|
||||
self.tool_execution_map.update(
|
||||
{
|
||||
f"Editor.{i}": getattr(self.editor, i)
|
||||
for i in [
|
||||
"append_file",
|
||||
"create_file",
|
||||
"edit_file_by_replace",
|
||||
"find_file",
|
||||
"goto_line",
|
||||
"insert_content_at_line",
|
||||
"open_file",
|
||||
# "read",
|
||||
"scroll_down",
|
||||
"scroll_up",
|
||||
"search_dir",
|
||||
"search_file",
|
||||
# "write",
|
||||
]
|
||||
}
|
||||
)
|
||||
# can be updated by subclass
|
||||
self._update_tool_execution()
|
||||
return self
|
||||
|
|
|
|||
|
|
@ -19,10 +19,11 @@ class SWEAgent(RoleZero):
|
|||
goal: str = "Resolve GitHub issue or bug in any existing codebase"
|
||||
_instruction: str = NEXT_STEP_TEMPLATE
|
||||
tools: list[str] = [
|
||||
"Bash",
|
||||
# "Bash",
|
||||
"Browser:goto,scroll",
|
||||
"RoleZero",
|
||||
"git_create_pull",
|
||||
"Editor",
|
||||
]
|
||||
terminal: Bash = Field(default_factory=Bash, exclude=True)
|
||||
output_diff: str = ""
|
||||
|
|
|
|||
File diff suppressed because it is too large
Load diff
222
metagpt/tools/libs/linter.py
Normal file
222
metagpt/tools/libs/linter.py
Normal file
|
|
@ -0,0 +1,222 @@
|
|||
import os
|
||||
import subprocess
|
||||
import sys
|
||||
import traceback
|
||||
import warnings
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
from grep_ast import TreeContext, filename_to_lang
|
||||
from tree_sitter_languages import get_parser # noqa: E402
|
||||
|
||||
# tree_sitter is throwing a FutureWarning
|
||||
warnings.simplefilter("ignore", category=FutureWarning)
|
||||
|
||||
|
||||
@dataclass
|
||||
class LintResult:
|
||||
text: str
|
||||
lines: list
|
||||
|
||||
|
||||
class Linter:
|
||||
def __init__(self, encoding="utf-8", root=None):
|
||||
self.encoding = encoding
|
||||
self.root = root
|
||||
|
||||
self.languages = dict(
|
||||
python=self.py_lint,
|
||||
)
|
||||
self.all_lint_cmd = None
|
||||
|
||||
# def set_linter(self, lang, cmd):
|
||||
# if lang:
|
||||
# self.languages[lang] = cmd
|
||||
# return
|
||||
#
|
||||
# self.all_lint_cmd = cmd
|
||||
|
||||
def get_rel_fname(self, fname):
|
||||
if self.root:
|
||||
return os.path.relpath(fname, self.root)
|
||||
else:
|
||||
return fname
|
||||
|
||||
def run_cmd(self, cmd, rel_fname, code):
|
||||
cmd += " " + rel_fname
|
||||
cmd = cmd.split()
|
||||
|
||||
process = subprocess.Popen(cmd, cwd=self.root, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
|
||||
stdout, _ = process.communicate()
|
||||
errors = stdout.decode().strip()
|
||||
self.returncode = process.returncode
|
||||
if self.returncode == 0:
|
||||
return # zero exit status
|
||||
|
||||
cmd = " ".join(cmd)
|
||||
res = ""
|
||||
res += errors
|
||||
line_num = extract_error_line_from(res)
|
||||
return LintResult(text=res, lines=[line_num])
|
||||
|
||||
def get_abs_fname(self, fname):
|
||||
if os.path.isabs(fname):
|
||||
return fname
|
||||
elif os.path.isfile(fname):
|
||||
rel_fname = self.get_rel_fname(fname)
|
||||
return os.path.abspath(rel_fname)
|
||||
else: # if a temp file
|
||||
return self.get_rel_fname(fname)
|
||||
|
||||
def lint(self, fname, cmd=None) -> Optional[LintResult]:
|
||||
code = Path(fname).read_text(self.encoding)
|
||||
absolute_fname = self.get_abs_fname(fname)
|
||||
if cmd:
|
||||
cmd = cmd.strip()
|
||||
if not cmd:
|
||||
lang = filename_to_lang(fname)
|
||||
if not lang:
|
||||
return None
|
||||
if self.all_lint_cmd:
|
||||
cmd = self.all_lint_cmd
|
||||
else:
|
||||
cmd = self.languages.get(lang)
|
||||
if callable(cmd):
|
||||
linkres = cmd(fname, absolute_fname, code)
|
||||
elif cmd:
|
||||
linkres = self.run_cmd(cmd, absolute_fname, code)
|
||||
else:
|
||||
linkres = basic_lint(absolute_fname, code)
|
||||
return linkres
|
||||
|
||||
def flake_lint(self, rel_fname, code):
|
||||
fatal = "F821,F822,F831,E112,E113,E999,E902"
|
||||
flake8 = f"flake8 --select={fatal} --isolated"
|
||||
|
||||
try:
|
||||
flake_res = self.run_cmd(flake8, rel_fname, code)
|
||||
except FileNotFoundError:
|
||||
flake_res = None
|
||||
return flake_res
|
||||
|
||||
def py_lint(self, fname, rel_fname, code):
|
||||
error = self.flake_lint(rel_fname, code)
|
||||
if not error:
|
||||
error = lint_python_compile(fname, code)
|
||||
if not error:
|
||||
error = basic_lint(rel_fname, code)
|
||||
return error
|
||||
|
||||
|
||||
def lint_python_compile(fname, code):
|
||||
try:
|
||||
compile(code, fname, "exec") # USE TRACEBACK BELOW HERE
|
||||
return
|
||||
except IndentationError as err:
|
||||
end_lineno = getattr(err, "end_lineno", err.lineno)
|
||||
if isinstance(end_lineno, int):
|
||||
line_numbers = list(range(end_lineno - 1, end_lineno))
|
||||
else:
|
||||
line_numbers = []
|
||||
|
||||
tb_lines = traceback.format_exception(type(err), err, err.__traceback__)
|
||||
last_file_i = 0
|
||||
|
||||
target = "# USE TRACEBACK"
|
||||
target += " BELOW HERE"
|
||||
for i in range(len(tb_lines)):
|
||||
if target in tb_lines[i]:
|
||||
last_file_i = i
|
||||
break
|
||||
tb_lines = tb_lines[:1] + tb_lines[last_file_i + 1 :]
|
||||
|
||||
res = "".join(tb_lines)
|
||||
return LintResult(text=res, lines=line_numbers)
|
||||
|
||||
|
||||
def basic_lint(fname, code):
|
||||
"""
|
||||
Use tree-sitter to look for syntax errors, display them with tree context.
|
||||
"""
|
||||
|
||||
lang = filename_to_lang(fname)
|
||||
if not lang:
|
||||
return
|
||||
|
||||
parser = get_parser(lang)
|
||||
tree = parser.parse(bytes(code, "utf-8"))
|
||||
|
||||
errors = traverse_tree(tree.root_node)
|
||||
if not errors:
|
||||
return
|
||||
return LintResult(text=f"{fname}:{errors[0]}", lines=errors)
|
||||
|
||||
|
||||
def extract_error_line_from(lint_error):
|
||||
# moved from openhands.agentskills#_lint_file
|
||||
for line in lint_error.splitlines(True):
|
||||
if line.strip():
|
||||
# The format of the error message is: <filename>:<line>:<column>: <error code> <error message>
|
||||
parts = line.split(":")
|
||||
if len(parts) >= 2:
|
||||
try:
|
||||
first_error_line = int(parts[1])
|
||||
break
|
||||
except ValueError:
|
||||
continue
|
||||
return first_error_line
|
||||
|
||||
|
||||
def tree_context(fname, code, line_nums):
|
||||
context = TreeContext(
|
||||
fname,
|
||||
code,
|
||||
color=False,
|
||||
line_number=True,
|
||||
child_context=False,
|
||||
last_line=False,
|
||||
margin=0,
|
||||
mark_lois=True,
|
||||
loi_pad=3,
|
||||
# header_max=30,
|
||||
show_top_of_file_parent_scope=False,
|
||||
)
|
||||
line_nums = set(line_nums)
|
||||
context.add_lines_of_interest(line_nums)
|
||||
context.add_context()
|
||||
output = context.format()
|
||||
|
||||
return output
|
||||
|
||||
|
||||
# Traverse the tree to find errors
|
||||
def traverse_tree(node):
|
||||
errors = []
|
||||
if node.type == "ERROR" or node.is_missing:
|
||||
line_no = node.start_point[0] + 1
|
||||
errors.append(line_no)
|
||||
|
||||
for child in node.children:
|
||||
errors += traverse_tree(child)
|
||||
|
||||
return errors
|
||||
|
||||
|
||||
def main():
|
||||
"""
|
||||
Main function to parse files provided as command line arguments.
|
||||
"""
|
||||
if len(sys.argv) < 2:
|
||||
print("Usage: python linter.py <file1> <file2> ...")
|
||||
sys.exit(1)
|
||||
|
||||
linter = Linter(root=os.getcwd())
|
||||
for file_path in sys.argv[1:]:
|
||||
errors = linter.lint(file_path)
|
||||
if errors:
|
||||
print(errors)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
|
@ -74,3 +74,5 @@ pylint~=3.0.3
|
|||
pygithub~=2.3
|
||||
htmlmin
|
||||
fsspec
|
||||
grep-ast~=0.3.3
|
||||
tree-sitter~=0.21.3
|
||||
|
|
@ -1,7 +1,10 @@
|
|||
import contextlib
|
||||
import io
|
||||
|
||||
import pytest
|
||||
|
||||
from metagpt.const import TEST_DATA_PATH
|
||||
from metagpt.tools.libs.editor import Editor, FileBlock
|
||||
from metagpt.tools.libs.editor import WINDOW, Editor
|
||||
|
||||
TEST_FILE_CONTENT = """
|
||||
# this is line one
|
||||
|
|
@ -25,21 +28,6 @@ def test_file():
|
|||
f.write("")
|
||||
|
||||
|
||||
EXPECTED_SEARCHED_BLOCK = FileBlock(
|
||||
file_path=str(TEST_FILE_PATH),
|
||||
block_content='001|# this is line one\n002|def test_function_for_fm():\n003| "some docstring"\n004| a = 1\n005| b = 2\n',
|
||||
block_start_line=1,
|
||||
block_end_line=5,
|
||||
symbol="def test_function_for_fm",
|
||||
symbol_line=2,
|
||||
)
|
||||
|
||||
|
||||
def test_search_content(test_file):
|
||||
block = Editor().search_content("def test_function_for_fm", root_path=TEST_DATA_PATH, window=3)
|
||||
assert block == EXPECTED_SEARCHED_BLOCK
|
||||
|
||||
|
||||
EXPECTED_CONTENT_AFTER_REPLACE = """
|
||||
# this is line one
|
||||
def test_function_for_fm():
|
||||
|
|
@ -103,28 +91,6 @@ def test_insert_content(test_file):
|
|||
assert new_content == EXPECTED_CONTENT_AFTER_INSERT
|
||||
|
||||
|
||||
@pytest.mark.skip
|
||||
def test_new_content_wrong_indentation(test_file):
|
||||
msg = Editor().write_content(
|
||||
file_path=str(TEST_FILE_PATH),
|
||||
start_line=3,
|
||||
end_line=-1,
|
||||
new_block_content=" This is the new line to be inserted, at line 3", # omit # should throw a syntax error
|
||||
)
|
||||
assert "failed" in msg
|
||||
|
||||
|
||||
@pytest.mark.skip
|
||||
def test_new_content_format_issue(test_file):
|
||||
msg = Editor().write_content(
|
||||
file_path=str(TEST_FILE_PATH),
|
||||
start_line=3,
|
||||
end_line=-1,
|
||||
new_block_content=" # This is the new line to be inserted, at line 3 ", # trailing spaces are format issue only, and should not throw an error
|
||||
)
|
||||
assert "failed" not in msg
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"filename",
|
||||
[
|
||||
|
|
@ -151,5 +117,453 @@ async def test_read_files(filename):
|
|||
assert file_block.block_content
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def reset_current_file():
|
||||
global CURRENT_FILE
|
||||
CURRENT_FILE = None
|
||||
|
||||
|
||||
def _numbered_test_lines(start, end) -> str:
|
||||
return ("\n".join(f"{i}|" for i in range(start, end + 1))) + "\n"
|
||||
|
||||
|
||||
def _generate_test_file_with_lines(temp_path, num_lines) -> str:
|
||||
file_path = temp_path / "test_file.py"
|
||||
file_path.write_text("\n" * num_lines)
|
||||
return file_path
|
||||
|
||||
|
||||
def _generate_ruby_test_file_with_lines(temp_path, num_lines) -> str:
|
||||
file_path = temp_path / "test_file.rb"
|
||||
file_path.write_text("\n" * num_lines)
|
||||
return file_path
|
||||
|
||||
|
||||
def _calculate_window_bounds(current_line, total_lines, window_size):
|
||||
half_window = window_size // 2
|
||||
if current_line - half_window < 0:
|
||||
start = 1
|
||||
end = window_size
|
||||
else:
|
||||
start = current_line - half_window
|
||||
end = current_line + half_window
|
||||
return start, end
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_open_file_unexist_path():
|
||||
editor = Editor()
|
||||
with pytest.raises(FileNotFoundError):
|
||||
editor.open_file("/unexist/path/a.txt")
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_open_file(tmp_path):
|
||||
editor = Editor()
|
||||
assert tmp_path is not None
|
||||
temp_file_path = tmp_path / "a.txt"
|
||||
temp_file_path.write_text("Line 1\nLine 2\nLine 3\nLine 4\nLine 5")
|
||||
|
||||
with io.StringIO() as buf:
|
||||
with contextlib.redirect_stdout(buf):
|
||||
editor.open_file(str(temp_file_path))
|
||||
result = buf.getvalue()
|
||||
assert result is not None
|
||||
expected = (
|
||||
f"[File: {temp_file_path} (5 lines total)]\n"
|
||||
"(this is the beginning of the file)\n"
|
||||
"1|Line 1\n"
|
||||
"2|Line 2\n"
|
||||
"3|Line 3\n"
|
||||
"4|Line 4\n"
|
||||
"5|Line 5\n"
|
||||
"(this is the end of the file)\n"
|
||||
)
|
||||
assert result.split("\n") == expected.split("\n")
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_open_file_with_indentation(tmp_path):
|
||||
editor = Editor()
|
||||
temp_file_path = tmp_path / "a.txt"
|
||||
temp_file_path.write_text("Line 1\n Line 2\nLine 3\nLine 4\nLine 5")
|
||||
|
||||
with io.StringIO() as buf:
|
||||
with contextlib.redirect_stdout(buf):
|
||||
editor.open_file(str(temp_file_path))
|
||||
result = buf.getvalue()
|
||||
assert result is not None
|
||||
expected = (
|
||||
f"[File: {temp_file_path} (5 lines total)]\n"
|
||||
"(this is the beginning of the file)\n"
|
||||
"1|Line 1\n"
|
||||
"2| Line 2\n"
|
||||
"3|Line 3\n"
|
||||
"4|Line 4\n"
|
||||
"5|Line 5\n"
|
||||
"(this is the end of the file)\n"
|
||||
)
|
||||
assert result.split("\n") == expected.split("\n")
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_open_file_long(tmp_path):
|
||||
editor = Editor()
|
||||
temp_file_path = tmp_path / "a.txt"
|
||||
content = "\n".join([f"Line {i}" for i in range(1, 1001)])
|
||||
temp_file_path.write_text(content)
|
||||
|
||||
with io.StringIO() as buf:
|
||||
with contextlib.redirect_stdout(buf):
|
||||
editor.open_file(str(temp_file_path), 1, 50)
|
||||
result = buf.getvalue()
|
||||
assert result is not None
|
||||
expected = f"[File: {temp_file_path} (1000 lines total)]\n"
|
||||
expected += "(this is the beginning of the file)\n"
|
||||
for i in range(1, 51):
|
||||
expected += f"{i}|Line {i}\n"
|
||||
expected += "(950 more lines below)\n"
|
||||
assert result.split("\n") == expected.split("\n")
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_open_file_long_with_lineno(tmp_path):
|
||||
editor = Editor()
|
||||
temp_file_path = tmp_path / "a.txt"
|
||||
content = "\n".join([f"Line {i}" for i in range(1, 1001)])
|
||||
temp_file_path.write_text(content)
|
||||
|
||||
cur_line = 100
|
||||
|
||||
with io.StringIO() as buf:
|
||||
with contextlib.redirect_stdout(buf):
|
||||
editor.open_file(str(temp_file_path), cur_line)
|
||||
result = buf.getvalue()
|
||||
assert result is not None
|
||||
expected = f"[File: {temp_file_path} (1000 lines total)]\n"
|
||||
start, end = _calculate_window_bounds(cur_line, 1000, WINDOW)
|
||||
if start == 1:
|
||||
expected += "(this is the beginning of the file)\n"
|
||||
else:
|
||||
expected += f"({start - 1} more lines above)\n"
|
||||
for i in range(start, end + 1):
|
||||
expected += f"{i}|Line {i}\n"
|
||||
if end == 1000:
|
||||
expected += "(this is the end of the file)\n"
|
||||
else:
|
||||
expected += f"({1000 - end} more lines below)\n"
|
||||
assert result.split("\n") == expected.split("\n")
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_create_file_unexist_path():
|
||||
editor = Editor()
|
||||
with pytest.raises(FileNotFoundError):
|
||||
editor.create_file("/unexist/path/a.txt")
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_create_file(tmp_path):
|
||||
editor = Editor()
|
||||
temp_file_path = tmp_path / "a.txt"
|
||||
with io.StringIO() as buf:
|
||||
with contextlib.redirect_stdout(buf):
|
||||
editor.create_file(str(temp_file_path))
|
||||
result = buf.getvalue()
|
||||
|
||||
expected = (
|
||||
f"[File: {temp_file_path} (1 lines total)]\n"
|
||||
"(this is the beginning of the file)\n"
|
||||
"1|\n"
|
||||
"(this is the end of the file)\n"
|
||||
f"[File {temp_file_path} created.]\n"
|
||||
)
|
||||
assert result.split("\n") == expected.split("\n")
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_goto_line(tmp_path):
|
||||
editor = Editor()
|
||||
temp_file_path = tmp_path / "a.txt"
|
||||
total_lines = 1000
|
||||
content = "\n".join([f"Line {i}" for i in range(1, total_lines + 1)])
|
||||
temp_file_path.write_text(content)
|
||||
|
||||
with io.StringIO() as buf:
|
||||
with contextlib.redirect_stdout(buf):
|
||||
editor.open_file(str(temp_file_path))
|
||||
result = buf.getvalue()
|
||||
assert result is not None
|
||||
|
||||
expected = f"[File: {temp_file_path} ({total_lines} lines total)]\n"
|
||||
expected += "(this is the beginning of the file)\n"
|
||||
for i in range(1, WINDOW + 1):
|
||||
expected += f"{i}|Line {i}\n"
|
||||
expected += f"({total_lines - WINDOW} more lines below)\n"
|
||||
assert result.split("\n") == expected.split("\n")
|
||||
|
||||
with io.StringIO() as buf:
|
||||
with contextlib.redirect_stdout(buf):
|
||||
editor.goto_line(500)
|
||||
result = buf.getvalue()
|
||||
assert result is not None
|
||||
|
||||
cur_line = 500
|
||||
expected = f"[File: {temp_file_path} ({total_lines} lines total)]\n"
|
||||
start, end = _calculate_window_bounds(cur_line, total_lines, WINDOW)
|
||||
if start == 1:
|
||||
expected += "(this is the beginning of the file)\n"
|
||||
else:
|
||||
expected += f"({start - 1} more lines above)\n"
|
||||
for i in range(start, end + 1):
|
||||
expected += f"{i}|Line {i}\n"
|
||||
if end == total_lines:
|
||||
expected += "(this is the end of the file)\n"
|
||||
else:
|
||||
expected += f"({total_lines - end} more lines below)\n"
|
||||
assert result.split("\n") == expected.split("\n")
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_goto_line_negative(tmp_path):
|
||||
editor = Editor()
|
||||
temp_file_path = tmp_path / "a.txt"
|
||||
content = "\n".join([f"Line {i}" for i in range(1, 5)])
|
||||
temp_file_path.write_text(content)
|
||||
|
||||
with io.StringIO() as buf:
|
||||
with contextlib.redirect_stdout(buf):
|
||||
editor.open_file(str(temp_file_path))
|
||||
with pytest.raises(ValueError):
|
||||
editor.goto_line(-1)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_goto_line_out_of_bound(tmp_path):
|
||||
editor = Editor()
|
||||
temp_file_path = tmp_path / "a.txt"
|
||||
content = "\n".join([f"Line {i}" for i in range(1, 5)])
|
||||
temp_file_path.write_text(content)
|
||||
|
||||
with io.StringIO() as buf:
|
||||
with contextlib.redirect_stdout(buf):
|
||||
editor.open_file(str(temp_file_path))
|
||||
with pytest.raises(ValueError):
|
||||
editor.goto_line(100)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_scroll_down(tmp_path):
|
||||
editor = Editor()
|
||||
temp_file_path = tmp_path / "a.txt"
|
||||
total_lines = 1000
|
||||
content = "\n".join([f"Line {i}" for i in range(1, total_lines + 1)])
|
||||
temp_file_path.write_text(content)
|
||||
|
||||
with io.StringIO() as buf:
|
||||
with contextlib.redirect_stdout(buf):
|
||||
editor.open_file(str(temp_file_path))
|
||||
result = buf.getvalue()
|
||||
assert result is not None
|
||||
|
||||
expected = f"[File: {temp_file_path} ({total_lines} lines total)]\n"
|
||||
start, end = _calculate_window_bounds(1, total_lines, WINDOW)
|
||||
if start == 1:
|
||||
expected += "(this is the beginning of the file)\n"
|
||||
else:
|
||||
expected += f"({start - 1} more lines above)\n"
|
||||
for i in range(start, end + 1):
|
||||
expected += f"{i}|Line {i}\n"
|
||||
if end == total_lines:
|
||||
expected += "(this is the end of the file)\n"
|
||||
else:
|
||||
expected += f"({total_lines - end} more lines below)\n"
|
||||
assert result.split("\n") == expected.split("\n")
|
||||
|
||||
with io.StringIO() as buf:
|
||||
with contextlib.redirect_stdout(buf):
|
||||
editor.scroll_down()
|
||||
result = buf.getvalue()
|
||||
assert result is not None
|
||||
|
||||
expected = f"[File: {temp_file_path} ({total_lines} lines total)]\n"
|
||||
start, end = _calculate_window_bounds(WINDOW + 1, total_lines, WINDOW)
|
||||
if start == 1:
|
||||
expected += "(this is the beginning of the file)\n"
|
||||
else:
|
||||
expected += f"({start - 1} more lines above)\n"
|
||||
for i in range(start, end + 1):
|
||||
expected += f"{i}|Line {i}\n"
|
||||
if end == total_lines:
|
||||
expected += "(this is the end of the file)\n"
|
||||
else:
|
||||
expected += f"({total_lines - end} more lines below)\n"
|
||||
assert result.split("\n") == expected.split("\n")
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_scroll_up(tmp_path):
|
||||
editor = Editor()
|
||||
temp_file_path = tmp_path / "a.txt"
|
||||
total_lines = 1000
|
||||
content = "\n".join([f"Line {i}" for i in range(1, total_lines + 1)])
|
||||
temp_file_path.write_text(content)
|
||||
|
||||
cur_line = 300
|
||||
with io.StringIO() as buf:
|
||||
with contextlib.redirect_stdout(buf):
|
||||
editor.open_file(str(temp_file_path), cur_line)
|
||||
result = buf.getvalue()
|
||||
assert result is not None
|
||||
|
||||
expected = f"[File: {temp_file_path} ({total_lines} lines total)]\n"
|
||||
start, end = _calculate_window_bounds(cur_line, total_lines, WINDOW)
|
||||
if start == 1:
|
||||
expected += "(this is the beginning of the file)\n"
|
||||
else:
|
||||
expected += f"({start - 1} more lines above)\n"
|
||||
for i in range(start, end + 1):
|
||||
expected += f"{i}|Line {i}\n"
|
||||
if end == total_lines:
|
||||
expected += "(this is the end of the file)\n"
|
||||
else:
|
||||
expected += f"({total_lines - end} more lines below)\n"
|
||||
assert result.split("\n") == expected.split("\n")
|
||||
|
||||
with io.StringIO() as buf:
|
||||
with contextlib.redirect_stdout(buf):
|
||||
editor.scroll_up()
|
||||
result = buf.getvalue()
|
||||
assert result is not None
|
||||
|
||||
cur_line = cur_line - WINDOW
|
||||
|
||||
expected = f"[File: {temp_file_path} ({total_lines} lines total)]\n"
|
||||
start, end = _calculate_window_bounds(cur_line, total_lines, WINDOW)
|
||||
if start == 1:
|
||||
expected += "(this is the beginning of the file)\n"
|
||||
else:
|
||||
expected += f"({start - 1} more lines above)\n"
|
||||
for i in range(start, end + 1):
|
||||
expected += f"{i}|Line {i}\n"
|
||||
if end == total_lines:
|
||||
expected += "(this is the end of the file)\n"
|
||||
else:
|
||||
expected += f"({total_lines - end} more lines below)\n"
|
||||
assert result.split("\n") == expected.split("\n")
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_scroll_down_edge(tmp_path):
|
||||
editor = Editor()
|
||||
temp_file_path = tmp_path / "a.txt"
|
||||
content = "\n".join([f"Line {i}" for i in range(1, 10)])
|
||||
temp_file_path.write_text(content)
|
||||
|
||||
with io.StringIO() as buf:
|
||||
with contextlib.redirect_stdout(buf):
|
||||
editor.open_file(str(temp_file_path))
|
||||
result = buf.getvalue()
|
||||
assert result is not None
|
||||
|
||||
expected = f"[File: {temp_file_path} (9 lines total)]\n"
|
||||
expected += "(this is the beginning of the file)\n"
|
||||
for i in range(1, 10):
|
||||
expected += f"{i}|Line {i}\n"
|
||||
expected += "(this is the end of the file)\n"
|
||||
|
||||
with io.StringIO() as buf:
|
||||
with contextlib.redirect_stdout(buf):
|
||||
editor.scroll_down()
|
||||
result = buf.getvalue()
|
||||
assert result is not None
|
||||
|
||||
assert result.split("\n") == expected.split("\n")
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_print_window_internal(tmp_path):
|
||||
editor = Editor()
|
||||
test_file_path = tmp_path / "a.txt"
|
||||
await editor.create_file(str(test_file_path))
|
||||
editor.open_file(str(test_file_path))
|
||||
with open(test_file_path, "w") as file:
|
||||
for i in range(1, 101):
|
||||
file.write(f"Line `{i}`\n")
|
||||
|
||||
current_line = 50
|
||||
window = 2
|
||||
|
||||
with io.StringIO() as buf:
|
||||
with contextlib.redirect_stdout(buf):
|
||||
editor._print_window(str(test_file_path), current_line, window, return_str=False)
|
||||
result = buf.getvalue()
|
||||
expected = "(48 more lines above)\n" "49|Line `49`\n" "50|Line `50`\n" "51|Line `51`\n" "(49 more lines below)\n"
|
||||
assert result == expected
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_open_file_large_line_number(tmp_path):
|
||||
editor = Editor()
|
||||
test_file_path = tmp_path / "a.txt"
|
||||
editor.create_file(str(test_file_path))
|
||||
editor.open_file(str(test_file_path))
|
||||
with open(test_file_path, "w") as file:
|
||||
for i in range(1, 1000):
|
||||
file.write(f"Line `{i}`\n")
|
||||
|
||||
current_line = 800
|
||||
window = 100
|
||||
|
||||
with io.StringIO() as buf:
|
||||
with contextlib.redirect_stdout(buf):
|
||||
editor.open_file(str(test_file_path), current_line, window)
|
||||
result = buf.getvalue()
|
||||
expected = f"[File: {test_file_path} (999 lines total)]\n"
|
||||
expected += "(749 more lines above)\n"
|
||||
for i in range(750, 850 + 1):
|
||||
expected += f"{i}|Line `{i}`\n"
|
||||
expected += "(149 more lines below)\n"
|
||||
assert result == expected
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_open_file_large_line_number_consecutive_diff_window(tmp_path):
|
||||
editor = Editor()
|
||||
test_file_path = tmp_path / "a.txt"
|
||||
editor.create_file(str(test_file_path))
|
||||
editor.open_file(str(test_file_path))
|
||||
total_lines = 1000
|
||||
with open(test_file_path, "w") as file:
|
||||
for i in range(1, total_lines + 1):
|
||||
file.write(f"Line `{i}`\n")
|
||||
|
||||
current_line = 800
|
||||
cur_window = 300
|
||||
|
||||
with io.StringIO() as buf:
|
||||
with contextlib.redirect_stdout(buf):
|
||||
editor.open_file(str(test_file_path), current_line, cur_window)
|
||||
result = buf.getvalue()
|
||||
expected = f"[File: {test_file_path} ({total_lines} lines total)]\n"
|
||||
start, end = _calculate_window_bounds(current_line, total_lines, cur_window)
|
||||
if start == 1:
|
||||
expected += "(this is the beginning of the file)\n"
|
||||
else:
|
||||
expected += f"({start - 1} more lines above)\n"
|
||||
for i in range(current_line - cur_window // 2, current_line + cur_window // 2 + 1):
|
||||
expected += f"{i}|Line `{i}`\n"
|
||||
if end == total_lines:
|
||||
expected += "(this is the end of the file)\n"
|
||||
else:
|
||||
expected += f"({total_lines - end} more lines below)\n"
|
||||
assert result == expected
|
||||
|
||||
current_line = current_line - WINDOW
|
||||
with io.StringIO() as buf:
|
||||
with contextlib.redirect_stdout(buf):
|
||||
editor.scroll_up()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
pytest.main([__file__, "-s"])
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue