fix conflict

This commit is contained in:
seehi 2024-09-06 10:30:11 +08:00
commit 93d4c8c318
34 changed files with 2310 additions and 260 deletions

View file

View file

@ -0,0 +1,55 @@
import pytest
from metagpt.config2 import Config
from metagpt.const import TEST_DATA_PATH
from metagpt.rag.engines import SimpleEngine
from metagpt.rag.factories.embedding import RAGEmbeddingFactory
from metagpt.utils.common import aread
@pytest.mark.skip
@pytest.mark.parametrize(
("knowledge_filename", "query_filename", "answer_filename"),
[
(
TEST_DATA_PATH / "embedding/2.knowledge.md",
TEST_DATA_PATH / "embedding/2.query.md",
TEST_DATA_PATH / "embedding/2.answer.md",
),
(
TEST_DATA_PATH / "embedding/3.knowledge.md",
TEST_DATA_PATH / "embedding/3.query.md",
TEST_DATA_PATH / "embedding/3.answer.md",
),
],
)
@pytest.mark.asyncio
async def test_large_pdf(knowledge_filename, query_filename, answer_filename):
Config.default(reload=True) # `config.embedding.model = "text-embedding-ada-002"` changes the cache.
engine = SimpleEngine.from_docs(
input_files=[knowledge_filename],
)
query = await aread(filename=query_filename)
rsp = await engine.aretrieve(query)
assert rsp
config = Config.default()
config.embedding.model = "text-embedding-ada-002"
factory = RAGEmbeddingFactory(config)
embedding = factory.get_rag_embedding()
answer = await aread(filename=answer_filename)
answer_embedding = await embedding.aget_text_embedding(answer)
similarity = 0
for i in rsp:
rsp_embedding = await embedding.aget_query_embedding(i.text)
v = embedding.similarity(answer_embedding, rsp_embedding)
similarity = max(similarity, v)
print(similarity)
assert similarity > 0.9
if __name__ == "__main__":
pytest.main([__file__, "-s"])

View file

@ -1,7 +1,7 @@
import pytest
from metagpt.const import TEST_DATA_PATH
from metagpt.tools.libs.editor import Editor, FileBlock
from metagpt.tools.libs.editor import Editor
TEST_FILE_CONTENT = """
# this is line one
@ -13,31 +13,24 @@ def test_function_for_fm():
# this is the 7th line
""".strip()
TEST_FILE_PATH = TEST_DATA_PATH / "tools/test_script_for_editor.py"
WINDOW = 100
@pytest.fixture
def test_file():
with open(TEST_FILE_PATH, "w") as f:
f.write(TEST_FILE_CONTENT)
yield
with open(TEST_FILE_PATH, "w") as f:
f.write("")
def temp_file_path(tmp_path):
assert tmp_path is not None
temp_file_path = tmp_path / "a.txt"
yield temp_file_path
temp_file_path.unlink()
EXPECTED_SEARCHED_BLOCK = FileBlock(
file_path=str(TEST_FILE_PATH),
block_content='001|# this is line one\n002|def test_function_for_fm():\n003| "some docstring"\n004| a = 1\n005| b = 2\n',
block_start_line=1,
block_end_line=5,
symbol="def test_function_for_fm",
symbol_line=2,
)
def test_search_content(test_file):
block = Editor().search_content("def test_function_for_fm", root_path=TEST_DATA_PATH, window=3)
assert block == EXPECTED_SEARCHED_BLOCK
@pytest.fixture
def temp_py_file(tmp_path):
assert tmp_path is not None
temp_file_path = tmp_path / "test_script_for_editor.py"
temp_file_path.write_text(TEST_FILE_CONTENT)
yield temp_file_path
temp_file_path.unlink()
EXPECTED_CONTENT_AFTER_REPLACE = """
@ -50,32 +43,43 @@ def test_function_for_fm():
""".strip()
@pytest.mark.skip
def test_replace_content(test_file):
Editor().write_content(
file_path=str(TEST_FILE_PATH),
start_line=3,
end_line=5,
new_block_content=" # This is the new line A replacing lines 3 to 5.\n # This is the new line B.",
def test_replace_content(temp_py_file):
editor = Editor()
editor._edit_file_impl(
file_name=temp_py_file,
start=3,
end=5,
content=" # This is the new line A replacing lines 3 to 5.\n # This is the new line B.",
is_insert=False,
is_append=False,
)
with open(TEST_FILE_PATH, "r") as f:
with open(temp_py_file, "r") as f:
new_content = f.read()
assert new_content == EXPECTED_CONTENT_AFTER_REPLACE
assert new_content.strip() == EXPECTED_CONTENT_AFTER_REPLACE.strip()
EXPECTED_CONTENT_AFTER_DELETE = """
# this is line one
def test_function_for_fm():
c = 3
# this is the 7th line
""".strip()
def test_delete_content(test_file):
Editor().write_content(file_path=str(TEST_FILE_PATH), start_line=3, end_line=5)
with open(TEST_FILE_PATH, "r") as f:
def test_delete_content(temp_py_file):
editor = Editor()
editor._edit_file_impl(
file_name=temp_py_file,
start=3,
end=5,
content="",
is_insert=False,
is_append=False,
)
with open(temp_py_file, "r") as f:
new_content = f.read()
assert new_content == EXPECTED_CONTENT_AFTER_DELETE
assert new_content.strip() == EXPECTED_CONTENT_AFTER_DELETE.strip()
EXPECTED_CONTENT_AFTER_INSERT = """
@ -90,39 +94,16 @@ def test_function_for_fm():
""".strip()
@pytest.mark.skip
def test_insert_content(test_file):
Editor().write_content(
file_path=str(TEST_FILE_PATH),
start_line=3,
end_line=-1,
new_block_content=" # This is the new line to be inserted, at line 3",
def test_insert_content(temp_py_file):
editor = Editor(enable_auto_lint=True)
editor.insert_content_at_line(
file_name=temp_py_file,
line_number=3,
content=" # This is the new line to be inserted, at line 3",
)
with open(TEST_FILE_PATH, "r") as f:
with open(temp_py_file, "r") as f:
new_content = f.read()
assert new_content == EXPECTED_CONTENT_AFTER_INSERT
@pytest.mark.skip
def test_new_content_wrong_indentation(test_file):
msg = Editor().write_content(
file_path=str(TEST_FILE_PATH),
start_line=3,
end_line=-1,
new_block_content=" This is the new line to be inserted, at line 3", # omit # should throw a syntax error
)
assert "failed" in msg
@pytest.mark.skip
def test_new_content_format_issue(test_file):
msg = Editor().write_content(
file_path=str(TEST_FILE_PATH),
start_line=3,
end_line=-1,
new_block_content=" # This is the new line to be inserted, at line 3 ", # trailing spaces are format issue only, and should not throw an error
)
assert "failed" not in msg
assert new_content.strip() == EXPECTED_CONTENT_AFTER_INSERT.strip()
@pytest.mark.parametrize(
@ -151,5 +132,518 @@ async def test_read_files(filename):
assert file_block.block_content
def _numbered_test_lines(start, end) -> str:
return ("\n".join(f"{i}|" for i in range(start, end + 1))) + "\n"
def _generate_test_file_with_lines(temp_path, num_lines) -> str:
file_path = temp_path / "test_file.py"
file_path.write_text("\n" * num_lines)
return file_path
def _generate_ruby_test_file_with_lines(temp_path, num_lines) -> str:
file_path = temp_path / "test_file.rb"
file_path.write_text("\n" * num_lines)
return file_path
def _calculate_window_bounds(current_line, total_lines, window_size):
half_window = window_size // 2
if current_line - half_window < 0:
start = 1
end = window_size
else:
start = current_line - half_window
end = current_line + half_window
return start, end
def test_open_file_unexist_path():
editor = Editor()
with pytest.raises(FileNotFoundError):
editor.open_file("/unexist/path/a.txt")
def test_open_file(temp_file_path):
editor = Editor()
temp_file_path.write_text("Line 1\nLine 2\nLine 3\nLine 4\nLine 5")
result = editor.open_file(str(temp_file_path))
assert result is not None
expected = (
f"[File: {temp_file_path} (5 lines total)]\n"
"(this is the beginning of the file)\n"
"1|Line 1\n"
"2|Line 2\n"
"3|Line 3\n"
"4|Line 4\n"
"5|Line 5\n"
"(this is the end of the file)"
)
assert result.split("\n") == expected.split("\n")
def test_open_file_with_indentation(temp_file_path):
editor = Editor()
temp_file_path.write_text("Line 1\n Line 2\nLine 3\nLine 4\nLine 5")
result = editor.open_file(str(temp_file_path))
assert result is not None
expected = (
f"[File: {temp_file_path} (5 lines total)]\n"
"(this is the beginning of the file)\n"
"1|Line 1\n"
"2| Line 2\n"
"3|Line 3\n"
"4|Line 4\n"
"5|Line 5\n"
"(this is the end of the file)"
)
assert result.split("\n") == expected.split("\n")
def test_open_file_long(temp_file_path):
editor = Editor()
content = "\n".join([f"Line {i}" for i in range(1, 1001)])
temp_file_path.write_text(content)
result = editor.open_file(str(temp_file_path), 1, 50)
assert result is not None
expected = f"[File: {temp_file_path} (1000 lines total)]\n"
expected += "(this is the beginning of the file)\n"
for i in range(1, 51):
expected += f"{i}|Line {i}\n"
expected += "(950 more lines below)"
assert result.split("\n") == expected.split("\n")
def test_open_file_long_with_lineno(temp_file_path):
editor = Editor()
content = "\n".join([f"Line {i}" for i in range(1, 1001)])
temp_file_path.write_text(content)
cur_line = 100
result = editor.open_file(str(temp_file_path), cur_line)
assert result is not None
expected = f"[File: {temp_file_path} (1000 lines total)]\n"
start, end = _calculate_window_bounds(cur_line, 1000, WINDOW)
if start == 1:
expected += "(this is the beginning of the file)\n"
else:
expected += f"({start - 1} more lines above)\n"
for i in range(start, end + 1):
expected += f"{i}|Line {i}\n"
if end == 1000:
expected += "(this is the end of the file)\n"
else:
expected += f"({1000 - end} more lines below)"
assert result.split("\n") == expected.split("\n")
def test_create_file_unexist_path():
editor = Editor()
with pytest.raises(FileNotFoundError):
editor.create_file("/unexist/path/a.txt")
def test_create_file(temp_file_path):
editor = Editor()
result = editor.create_file(str(temp_file_path))
expected = f"[File {temp_file_path} created.]"
assert result.split("\n") == expected.split("\n")
def test_goto_line(temp_file_path):
editor = Editor()
total_lines = 1000
content = "\n".join([f"Line {i}" for i in range(1, total_lines + 1)])
temp_file_path.write_text(content)
result = editor.open_file(str(temp_file_path))
assert result is not None
expected = f"[File: {temp_file_path} ({total_lines} lines total)]\n"
expected += "(this is the beginning of the file)\n"
for i in range(1, WINDOW + 1):
expected += f"{i}|Line {i}\n"
expected += f"({total_lines - WINDOW} more lines below)"
assert result.split("\n") == expected.split("\n")
result = editor.goto_line(500)
assert result is not None
cur_line = 500
expected = f"[File: {temp_file_path} ({total_lines} lines total)]\n"
start, end = _calculate_window_bounds(cur_line, total_lines, WINDOW)
if start == 1:
expected += "(this is the beginning of the file)\n"
else:
expected += f"({start - 1} more lines above)\n"
for i in range(start, end + 1):
expected += f"{i}|Line {i}\n"
if end == total_lines:
expected += "(this is the end of the file)\n"
else:
expected += f"({total_lines - end} more lines below)"
assert result.split("\n") == expected.split("\n")
def test_goto_line_negative(temp_file_path):
editor = Editor()
content = "\n".join([f"Line {i}" for i in range(1, 5)])
temp_file_path.write_text(content)
editor.open_file(str(temp_file_path))
with pytest.raises(ValueError):
editor.goto_line(-1)
def test_goto_line_out_of_bound(temp_file_path):
editor = Editor()
content = "\n".join([f"Line {i}" for i in range(1, 5)])
temp_file_path.write_text(content)
editor.open_file(str(temp_file_path))
with pytest.raises(ValueError):
editor.goto_line(100)
def test_scroll_down(temp_file_path):
editor = Editor()
total_lines = 1000
content = "\n".join([f"Line {i}" for i in range(1, total_lines + 1)])
temp_file_path.write_text(content)
result = editor.open_file(str(temp_file_path))
assert result is not None
expected = f"[File: {temp_file_path} ({total_lines} lines total)]\n"
start, end = _calculate_window_bounds(1, total_lines, WINDOW)
if start == 1:
expected += "(this is the beginning of the file)\n"
else:
expected += f"({start - 1} more lines above)\n"
for i in range(start, end + 1):
expected += f"{i}|Line {i}\n"
if end == total_lines:
expected += "(this is the end of the file)"
else:
expected += f"({total_lines - end} more lines below)"
assert result.split("\n") == expected.split("\n")
result = editor.scroll_down()
assert result is not None
expected = f"[File: {temp_file_path} ({total_lines} lines total)]\n"
start, end = _calculate_window_bounds(WINDOW + 1, total_lines, WINDOW)
if start == 1:
expected += "(this is the beginning of the file)\n"
else:
expected += f"({start - 1} more lines above)\n"
for i in range(start, end + 1):
expected += f"{i}|Line {i}\n"
if end == total_lines:
expected += "(this is the end of the file)\n"
else:
expected += f"({total_lines - end} more lines below)"
assert result.split("\n") == expected.split("\n")
def test_scroll_up(temp_file_path):
editor = Editor()
total_lines = 1000
content = "\n".join([f"Line {i}" for i in range(1, total_lines + 1)])
temp_file_path.write_text(content)
cur_line = 300
result = editor.open_file(str(temp_file_path), cur_line)
assert result is not None
expected = f"[File: {temp_file_path} ({total_lines} lines total)]\n"
start, end = _calculate_window_bounds(cur_line, total_lines, WINDOW)
if start == 1:
expected += "(this is the beginning of the file)\n"
else:
expected += f"({start - 1} more lines above)\n"
for i in range(start, end + 1):
expected += f"{i}|Line {i}\n"
if end == total_lines:
expected += "(this is the end of the file)\n"
else:
expected += f"({total_lines - end} more lines below)"
assert result.split("\n") == expected.split("\n")
result = editor.scroll_up()
assert result is not None
cur_line = cur_line - WINDOW
expected = f"[File: {temp_file_path} ({total_lines} lines total)]\n"
start, end = _calculate_window_bounds(cur_line, total_lines, WINDOW)
if start == 1:
expected += "(this is the beginning of the file)\n"
else:
expected += f"({start - 1} more lines above)\n"
for i in range(start, end + 1):
expected += f"{i}|Line {i}\n"
if end == total_lines:
expected += "(this is the end of the file)\n"
else:
expected += f"({total_lines - end} more lines below)"
assert result.split("\n") == expected.split("\n")
def test_scroll_down_edge(temp_file_path):
editor = Editor()
content = "\n".join([f"Line {i}" for i in range(1, 10)])
temp_file_path.write_text(content)
result = editor.open_file(str(temp_file_path))
assert result is not None
expected = f"[File: {temp_file_path} (9 lines total)]\n"
expected += "(this is the beginning of the file)\n"
for i in range(1, 10):
expected += f"{i}|Line {i}\n"
expected += "(this is the end of the file)"
result = editor.scroll_down()
assert result is not None
assert result.split("\n") == expected.split("\n")
def test_print_window_internal(temp_file_path):
editor = Editor()
editor.create_file(str(temp_file_path))
with open(temp_file_path, "w") as file:
for i in range(1, 101):
file.write(f"Line `{i}`\n")
current_line = 50
window = 2
result = editor._print_window(temp_file_path, current_line, window)
expected = "(48 more lines above)\n" "49|Line `49`\n" "50|Line `50`\n" "51|Line `51`\n" "(49 more lines below)"
assert result == expected
def test_open_file_large_line_number(temp_file_path):
editor = Editor()
editor.create_file(str(temp_file_path))
with open(temp_file_path, "w") as file:
for i in range(1, 1000):
file.write(f"Line `{i}`\n")
current_line = 800
window = 100
result = editor.open_file(str(temp_file_path), current_line, window)
expected = f"[File: {temp_file_path} (999 lines total)]\n"
expected += "(749 more lines above)\n"
for i in range(750, 850 + 1):
expected += f"{i}|Line `{i}`\n"
expected += "(149 more lines below)"
assert result == expected
def test_open_file_large_line_number_consecutive_diff_window(temp_file_path):
editor = Editor()
editor.create_file(str(temp_file_path))
total_lines = 1000
with open(temp_file_path, "w") as file:
for i in range(1, total_lines + 1):
file.write(f"Line `{i}`\n")
current_line = 800
cur_window = 300
result = editor.open_file(str(temp_file_path), current_line, cur_window)
expected = f"[File: {temp_file_path} ({total_lines} lines total)]\n"
start, end = _calculate_window_bounds(current_line, total_lines, cur_window)
if start == 1:
expected += "(this is the beginning of the file)\n"
else:
expected += f"({start - 1} more lines above)\n"
for i in range(current_line - cur_window // 2, current_line + cur_window // 2 + 1):
expected += f"{i}|Line `{i}`\n"
if end == total_lines:
expected += "(this is the end of the file)\n"
else:
expected += f"({total_lines - end} more lines below)"
assert result == expected
current_line = current_line - WINDOW
result = editor.scroll_up()
expected = f"[File: {temp_file_path} ({total_lines} lines total)]\n"
start, end = _calculate_window_bounds(current_line, total_lines, WINDOW)
if start == 1:
expected += "(this is the beginning of the file)\n"
else:
expected += f"({start - 1} more lines above)\n"
for i in range(start, end + 1):
expected += f"{i}|Line `{i}`\n"
if end == total_lines:
expected += "(this is the end of the file)\n"
else:
expected += f"({total_lines - end} more lines below)"
assert result.split("\n") == expected.split("\n")
EXPECTED_CONTENT_AFTER_REPLACE_TEXT = """
# this is line one
def test_function_for_fm():
"some docstring"
a = 1
b = 9
c = 3
# this is the 7th line
""".strip()
def test_edit_file_by_replace(temp_py_file):
editor = Editor()
editor.edit_file_by_replace(file_name=str(temp_py_file), to_replace=" b = 2", new_content=" b = 9")
with open(temp_py_file, "r") as f:
new_content = f.read()
assert new_content.strip() == EXPECTED_CONTENT_AFTER_REPLACE_TEXT.strip()
def test_append_file(temp_file_path):
editor = Editor()
# 写入初始内容
initial_content = "Line 1\nLine 2\nLine 3\n"
temp_file_path.write_text(initial_content)
# 追加内容到文件
append_content = "Line 4\nLine 5\n"
result = editor.append_file(str(temp_file_path), append_content)
# 预期内容
expected_content = initial_content + append_content
# 读取文件并断言内容与预期一致
with open(temp_file_path, "r") as f:
new_content = f.read()
assert new_content == expected_content
# 输出的预期结果
expected_output = (
f"[File: {temp_file_path.resolve()} (5 lines total after edit)]\n"
"(this is the beginning of the file)\n"
"1|Line 1\n"
"2|Line 2\n"
"3|Line 3\n"
"4|Line 4\n"
"5|Line 5\n"
"(this is the end of the file)\n"
"[File updated (edited at line 3). Please review the changes and make sure they are correct (correct indentation, no duplicate lines, etc). Edit the file again if necessary.]"
)
assert result.split("\n") == expected_output.split("\n")
def test_search_dir(tmp_path):
editor = Editor()
dir_path = tmp_path / "test_dir"
dir_path.mkdir()
# Create some files with specific content
(dir_path / "file1.txt").write_text("This is a test file with some content.")
(dir_path / "file2.txt").write_text("Another file with different content.")
sub_dir = dir_path / "sub_dir"
sub_dir.mkdir()
(sub_dir / "file3.txt").write_text("This file is inside a sub directory with some content.")
search_term = "some content"
result = editor.search_dir(search_term, str(dir_path))
assert "file1.txt" in result
assert "file3.txt" in result
assert "Another file with different content." not in result
def test_search_file(temp_file_path):
editor = Editor()
file_path = temp_file_path
file_path.write_text("This is a test file with some content.\nAnother line with more content.")
search_term = "some content"
result = editor.search_file(search_term, str(file_path))
assert "Line 1: This is a test file with some content." in result
assert "Line 2: Another line with more content." not in result
def test_find_file(tmp_path):
editor = Editor()
dir_path = tmp_path / "test_dir"
dir_path.mkdir()
# Create some files with specific names
(dir_path / "file1.txt").write_text("Content of file 1.")
(dir_path / "file2.txt").write_text("Content of file 2.")
sub_dir = dir_path / "sub_dir"
sub_dir.mkdir()
(sub_dir / "file3.txt").write_text("Content of file 3.")
file_name = "file1.txt"
result = editor.find_file(file_name, str(dir_path))
assert "file1.txt" in result
assert "file2.txt" not in result
assert "file3.txt" not in result
# Test data for _append_impl method
TEST_LINES = ["First line\n", "Second line\n", "Third line\n"]
NEW_CONTENT = "Appended line\n"
EXPECTED_APPEND_NON_EMPTY_FILE = ["First line\n", "Second line\n", "Third line\n", "Appended line\n"]
EXPECTED_APPEND_EMPTY_FILE = ["Appended line\n"]
def test_append_non_empty_file():
editor = Editor()
lines = TEST_LINES.copy()
content, n_added_lines = editor._append_impl(lines, NEW_CONTENT)
assert content.splitlines(keepends=True) == EXPECTED_APPEND_NON_EMPTY_FILE
assert n_added_lines == 1
def test_append_empty_file():
editor = Editor()
lines = []
content, n_added_lines = editor._append_impl(lines, NEW_CONTENT)
assert content.splitlines(keepends=True) == EXPECTED_APPEND_EMPTY_FILE
assert n_added_lines == 1
def test_append_to_single_empty_line_file():
editor = Editor()
lines = [""]
content, n_added_lines = editor._append_impl(lines, NEW_CONTENT)
assert content.splitlines(keepends=True) == EXPECTED_APPEND_EMPTY_FILE
assert n_added_lines == 1
if __name__ == "__main__":
pytest.main([__file__, "-s"])

View file

@ -0,0 +1,32 @@
import shutil
import pytest
from metagpt.const import DEFAULT_WORKSPACE_ROOT, TEST_DATA_PATH
from metagpt.tools.libs.index_repo import IndexRepo
@pytest.mark.asyncio
@pytest.mark.parametrize(("path", "query"), [(TEST_DATA_PATH / "requirements", "业务线")])
async def test_index_repo(path, query):
index_path = DEFAULT_WORKSPACE_ROOT / ".index"
repo = IndexRepo(persist_path=str(index_path), root_path=str(path), min_token_count=0)
await repo.add([path])
await repo.add([path])
assert index_path.exists()
rsp = await repo.search(query)
assert rsp
repo2 = IndexRepo(persist_path=str(index_path), root_path=str(path), min_token_count=0)
rsp2 = await repo2.search(query)
assert rsp2
merged_rsp = await repo.merge(query=query, indices_list=[rsp, rsp2])
assert merged_rsp
shutil.rmtree(index_path)
if __name__ == "__main__":
pytest.main([__file__, "-s"])