feat: +unit test

This commit is contained in:
莘权 马 2023-12-28 15:42:36 +08:00
parent 8844fa74f7
commit c61a3d2a99
10 changed files with 177 additions and 86 deletions

View file

@ -55,9 +55,9 @@ class BrainMemory(BaseModel):
return "\n".join(texts)
@staticmethod
async def loads(redis_key: str, redis_conf: Dict = None) -> "BrainMemory":
redis = Redis(conf=redis_conf)
if not redis.is_valid() or not redis_key:
async def loads(redis_key: str) -> "BrainMemory":
redis = Redis()
if not redis.is_valid or not redis_key:
return BrainMemory()
v = await redis.get(key=redis_key)
logger.debug(f"REDIS GET {redis_key} {v}")
@ -67,11 +67,11 @@ class BrainMemory(BaseModel):
return bm
return BrainMemory()
async def dumps(self, redis_key: str, timeout_sec: int = 30 * 60, redis_conf: Dict = None):
async def dumps(self, redis_key: str, timeout_sec: int = 30 * 60):
if not self.is_dirty:
return
redis = Redis(conf=redis_conf)
if not redis.is_valid() or not redis_key:
redis = Redis()
if not redis.is_valid or not redis_key:
return False
v = self.json(ensure_ascii=False)
if self.cacheable:
@ -86,26 +86,26 @@ class BrainMemory(BaseModel):
async def set_history_summary(self, history_summary, redis_key, redis_conf):
if self.historical_summary == history_summary:
if self.is_dirty:
await self.dumps(redis_key=redis_key, redis_conf=redis_conf)
await self.dumps(redis_key=redis_key)
self.is_dirty = False
return
self.historical_summary = history_summary
self.history = []
await self.dumps(redis_key=redis_key, redis_conf=redis_conf)
await self.dumps(redis_key=redis_key)
self.is_dirty = False
def add_history(self, msg: Message):
if msg.id:
if self.to_int(msg.id, 0) <= self.to_int(self.last_history_id, -1):
return
self.history.append(msg.dict())
self.history.append(msg)
self.last_history_id = str(msg.id)
self.is_dirty = True
def exists(self, text) -> bool:
for m in reversed(self.history):
if m.get("content") == text:
if m.content == text:
return True
return False
@ -163,7 +163,7 @@ class BrainMemory(BaseModel):
msgs.reverse()
self.history = msgs
self.is_dirty = True
await self.dumps(redis_key=CONFIG.REDIS_KEY, redis_conf=CONFIG.REDIS_CONF)
await self.dumps(redis_key=CONFIG.REDIS_KEY)
self.is_dirty = False
return BrainMemory.to_metagpt_history_format(self.history)
@ -217,7 +217,7 @@ class BrainMemory(BaseModel):
return await self._openai_rewrite(sentence=sentence, context=context, llm=llm)
@staticmethod
async def _metagpt_rewrite(sentence: str):
async def _metagpt_rewrite(sentence: str, **kwargs):
return sentence
@staticmethod

View file

@ -63,5 +63,5 @@ class Redis:
self._client = None
@property
def is_valid(self):
return bool(self._client)
def is_valid(self) -> bool:
return self._client is not None

View file

@ -0,0 +1 @@
{"design_filename": "docs/system_design/20231221155954.json", "task_filename": "docs/tasks/20231221155954.json", "codes_filenames": ["game.py", "main.py"], "reason": "```json\n{\n \"game.py\": \"Add handling for no empty cells in add_new_tile function, Update score in move function\",\n \"main.py\": \"Handle game over condition in the game loop\"\n}\n```"}

View file

@ -0,0 +1 @@
{"Implementation approach": "We will use the Pygame library to create the game interface and handle user input. The game logic will be implemented using Python classes and data structures.", "File list": ["main.py", "game.py"], "Data structures and interfaces": "classDiagram\n class Game {\n -grid: List[List[int]]\n -score: int\n -game_over: bool\n +__init__()\n +reset_game()\n +move(direction: str)\n +is_game_over() bool\n +get_empty_cells() List[Tuple[int, int]]\n +add_new_tile()\n +get_score() int\n }\n class UI {\n -game: Game\n +__init__(game: Game)\n +draw_grid()\n +draw_score()\n +draw_game_over()\n +handle_input()\n }\n Game --> UI", "Program call flow": "sequenceDiagram\n participant M as Main\n participant G as Game\n participant U as UI\n M->>G: reset_game()\n M->>U: draw_grid()\n M->>U: draw_score()\n M->>U: handle_input()\n U->>G: move(direction)\n G->>G: add_new_tile()\n G->>U: draw_grid()\n G->>U: draw_score()\n G->>U: draw_game_over()\n G->>G: is_game_over()\n G->>G: get_empty_cells()\n G->>G: get_score()", "Anything UNCLEAR": "..."}

View file

@ -0,0 +1 @@
{"Required Python packages": ["pygame==2.0.1"], "Required Other language third-party packages": ["No third-party dependencies required"], "Logic Analysis": [["game.py", "Contains Game class and related functions for game logic"], ["main.py", "Contains main function, initializes the game and UI"]], "Task list": ["game.py", "main.py"], "Full API spec": "", "Shared Knowledge": "The game logic will be implemented using Python classes and data structures. The Pygame library will be used to create the game interface and handle user input.", "Anything UNCLEAR": "..."}

View file

@ -0,0 +1 @@
{"summary": "---\n## instruction:\nThe errors are caused by both the development code and the test code. The development code needs to be fixed to ensure that the `reset_game` method resets the grid properly. The test code also needs to be fixed to ensure that the `add_new_tile` test does not raise an index out of range error.\n\n## File To Rewrite:\ngame.py\n\n## Status:\nFAIL\n\n## Send To:\nEngineer\n---", "stdout": "", "stderr": "E.......F\n======================================================================\nERROR: test_add_new_tile (__main__.TestGame)\n----------------------------------------------------------------------\nTraceback (most recent call last):\n File \"/Users/xx/tests/test_game.py\", line 104, in test_add_new_tile\n self.assertIn(self.game.grid[empty_cells[0][0]][empty_cells[0][1]], [2, 4])\nIndexError: list index out of range\n\n======================================================================\nFAIL: test_reset_game (__main__.TestGame)\n----------------------------------------------------------------------\nTraceback (most recent call last):\n File \"/Users/xx/tests/test_game.py\", line 13, in test_reset_game\n self.assertEqual(self.game.grid, [[0 for _ in range(4)] for _ in range(4)])\nAssertionError: Lists differ: [[0, 0, 0, 0], [0, 2, 0, 0], [0, 0, 0, 2], [0, 0, 0, 0]] != [[0, 0, 0, 0], [0, 0, 0, 0], [0, 0, 0, 0], [0, 0, 0, 0]]\n\nFirst differing element 1:\n[0, 2, 0, 0]\n[0, 0, 0, 0]\n\n- [[0, 0, 0, 0], [0, 2, 0, 0], [0, 0, 0, 2], [0, 0, 0, 0]]\n? --- ^\n\n+ [[0, 0, 0, 0], [0, 0, 0, 0], [0, 0, 0, 0], [0, 0, 0, 0]]\n? +++ ^\n\n\n----------------------------------------------------------------------\nRan 9 tests in 0.002s\n\nFAILED (failures=1, errors=1)\n"}

View file

@ -58,7 +58,29 @@ class TestSkillAction:
action = SkillAction(skill=self.skill, args=parser_action.args)
rsp = await action.run()
assert rsp
assert "image/png;base64," in rsp.content
assert "image/png;base64," in rsp.content or "http" in rsp.content
@pytest.mark.parametrize(
("skill_name", "txt", "want"),
[
("skill1", 'skill1(a="1", b="2")', {"a": "1", "b": "2"}),
("skill1", '(a="1", b="2")', None),
("skill1", 'skill1(a="1", b="2"', None),
],
)
def test_parse_arguments(self, skill_name, txt, want):
args = ArgumentsParingAction.parse_arguments(skill_name, txt)
assert args == want
@pytest.mark.asyncio
async def test_find_and_call_function_error(self):
with pytest.raises(ValueError):
await SkillAction.find_and_call_function("dummy_call", {"a": 1})
@pytest.mark.asyncio
async def test_skill_action_error(self):
action = SkillAction(skill=self.skill, args={})
await action.run()
if __name__ == "__main__":

View file

@ -6,12 +6,24 @@
@File : test_write_code.py
@Modifiled By: mashenquan, 2023-12-6. According to RFC 135
"""
from pathlib import Path
import pytest
from metagpt.actions.write_code import WriteCode
from metagpt.config import CONFIG
from metagpt.const import (
CODE_SUMMARIES_FILE_REPO,
SYSTEM_DESIGN_FILE_REPO,
TASK_FILE_REPO,
TEST_OUTPUTS_FILE_REPO,
)
from metagpt.logs import logger
from metagpt.provider.openai_api import OpenAILLM as LLM
from metagpt.schema import CodingContext, Document
from metagpt.utils.common import aread
from metagpt.utils.file_repository import FileRepository
from tests.metagpt.actions.mock_markdown import TASKS_2, WRITE_CODE_PROMPT_SAMPLE
@ -37,3 +49,47 @@ async def test_write_code_directly():
llm = LLM()
rsp = await llm.aask(prompt)
logger.info(rsp)
@pytest.mark.asyncio
async def test_write_code_deps():
# Prerequisites
CONFIG.src_workspace = CONFIG.git_repo.workdir / "snake1/snake1"
demo_path = Path(__file__).parent / "../../data/demo_project"
await FileRepository.save_file(
filename="test_game.py.json",
content=await aread(str(demo_path / "test_game.py.json")),
relative_path=TEST_OUTPUTS_FILE_REPO,
)
await FileRepository.save_file(
filename="20231221155954.json",
content=await aread(str(demo_path / "code_summaries.json")),
relative_path=CODE_SUMMARIES_FILE_REPO,
)
await FileRepository.save_file(
filename="20231221155954.json",
content=await aread(str(demo_path / "system_design.json")),
relative_path=SYSTEM_DESIGN_FILE_REPO,
)
await FileRepository.save_file(
filename="20231221155954.json", content=await aread(str(demo_path / "tasks.json")), relative_path=TASK_FILE_REPO
)
await FileRepository.save_file(
filename="main.py", content='if __name__ == "__main__":\nmain()', relative_path=CONFIG.src_workspace
)
context = CodingContext(
filename="game.py",
design_doc=await FileRepository.get_file(filename="20231221155954.json", relative_path=SYSTEM_DESIGN_FILE_REPO),
task_doc=await FileRepository.get_file(filename="20231221155954.json", relative_path=TASK_FILE_REPO),
code_doc=Document(filename="game.py", content="", root_path="snake1"),
)
coding_doc = Document(root_path="snake1", filename="game.py", content=context.json())
action = WriteCode(context=coding_doc)
rsp = await action.run()
assert rsp
assert rsp.code_doc.content
if __name__ == "__main__":
pytest.main([__file__, "-s"])

View file

@ -6,40 +6,33 @@
@File : test_text_to_speech.py
@Desc : Unit tests.
"""
import asyncio
import base64
from pydantic import BaseModel
import pytest
from metagpt.config import CONFIG
from metagpt.learn.text_to_speech import text_to_speech
async def mock_text_to_speech():
class Input(BaseModel):
input: str
@pytest.mark.asyncio
async def test_text_to_speech():
# Prerequisites
assert CONFIG.IFLYTEK_APP_ID
assert CONFIG.IFLYTEK_API_KEY
assert CONFIG.IFLYTEK_API_SECRET
assert CONFIG.AZURE_TTS_SUBSCRIPTION_KEY and CONFIG.AZURE_TTS_SUBSCRIPTION_KEY != "YOUR_API_KEY"
assert CONFIG.AZURE_TTS_REGION
inputs = [{"input": "Panda emoji"}]
# test azure
data = await text_to_speech("panda emoji")
assert "base64" in data or "http" in data
for i in inputs:
seed = Input(**i)
base64_data = await text_to_speech(seed.input)
assert base64_data != ""
print(f"{seed.input} -> {base64_data}")
flags = ";base64,"
assert flags in base64_data
ix = base64_data.find(flags) + len(flags)
declaration = base64_data[0:ix]
assert declaration
data = base64_data[ix:]
assert data
assert base64.b64decode(data, validate=True)
def test_suite():
loop = asyncio.get_event_loop()
task = loop.create_task(mock_text_to_speech())
loop.run_until_complete(task)
# test iflytek
key = CONFIG.AZURE_TTS_SUBSCRIPTION_KEY
CONFIG.AZURE_TTS_SUBSCRIPTION_KEY = ""
data = await text_to_speech("panda emoji")
assert "base64" in data or "http" in data
CONFIG.AZURE_TTS_SUBSCRIPTION_KEY = key
if __name__ == "__main__":
test_suite()
pytest.main([__file__, "-s"])

View file

@ -5,47 +5,63 @@
@Author : mashenquan
@File : test_brain_memory.py
"""
# import json
# from typing import List
#
# import pydantic
#
# from metagpt.memory.brain_memory import BrainMemory
# from metagpt.schema import Message
#
#
# def test_json():
# class Input(pydantic.BaseModel):
# history: List[str]
# solution: List[str]
# knowledge: List[str]
# stack: List[str]
#
# inputs = [{"history": ["a", "b"], "solution": ["c"], "knowledge": ["d", "e"], "stack": ["f"]}]
#
# for i in inputs:
# v = Input(**i)
# bm = BrainMemory()
# for h in v.history:
# msg = Message(content=h)
# bm.history.append(msg.dict())
# for h in v.solution:
# msg = Message(content=h)
# bm.solution.append(msg.dict())
# for h in v.knowledge:
# msg = Message(content=h)
# bm.knowledge.append(msg.dict())
# for h in v.stack:
# msg = Message(content=h)
# bm.stack.append(msg.dict())
# s = bm.json()
# m = json.loads(s)
# bm = BrainMemory(**m)
# assert bm
# for v in bm.history:
# msg = Message(**v)
# assert msg
#
#
# if __name__ == "__main__":
# test_json()
import pytest
from metagpt.config import LLMProviderEnum
from metagpt.llm import LLM
from metagpt.memory.brain_memory import BrainMemory
from metagpt.schema import Message
@pytest.mark.asyncio
async def test_memory():
memory = BrainMemory()
memory.add_talk(Message(content="talk"))
assert memory.history[0].role == "user"
memory.add_answer(Message(content="answer"))
assert memory.history[1].role == "assistant"
redis_key = BrainMemory.to_redis_key("none", "user_id", "chat_id")
await memory.dumps(redis_key=redis_key)
assert memory.exists("talk")
assert 1 == memory.to_int("1", 0)
memory.last_talk = "AAA"
assert memory.pop_last_talk() == "AAA"
assert memory.last_talk is None
assert memory.is_history_available
assert memory.history_text
memory = await BrainMemory.loads(redis_key=redis_key)
assert memory
@pytest.mark.parametrize(
("input", "tag", "val"),
[("[TALK]:Hello", "TALK", "Hello"), ("Hello", None, "Hello"), ("[TALK]Hello", None, "[TALK]Hello")],
)
def test_extract_info(input, tag, val):
t, v = BrainMemory.extract_info(input)
assert tag == t
assert val == v
@pytest.mark.asyncio
@pytest.mark.parametrize("llm", [LLM(provider=LLMProviderEnum.OPENAI), LLM(provider=LLMProviderEnum.METAGPT)])
async def test_memory_llm(llm):
memory = BrainMemory()
for i in range(500):
memory.add_talk(Message(content="Lily is a girl.\n"))
res = await memory.is_related("apple", "moon", llm)
assert not res
res = await memory.rewrite(sentence="apple Lily eating", context="", llm=llm)
assert "Lily" in res
res = await memory.get_title(llm=llm)
assert res
assert "Lily" in res
assert memory.history or memory.historical_summary
if __name__ == "__main__":
pytest.main([__file__, "-s"])