feat: +unit test

fixbug: PYTHONPATH

fixbug: unit test
This commit is contained in:
莘权 马 2023-12-27 11:24:22 +08:00
parent 641c71bf18
commit 0adabfe53f
33 changed files with 1561 additions and 297 deletions

View file

@ -36,7 +36,8 @@ class PrepareDocuments(Action):
if not path:
name = CONFIG.project_name or FileRepository.new_filename()
path = Path(CONFIG.workspace_path) / name
else:
path = Path(CONFIG.project_path)
if path.exists() and not CONFIG.inc:
shutil.rmtree(path)
CONFIG.git_repo = GitRepository(local_path=path, auto_init=True)

View file

@ -26,6 +26,7 @@ from metagpt.actions.write_prd_an import (
WP_IS_RELATIVE_NODE,
WP_ISSUE_TYPE_NODE,
WRITE_PRD_NODE,
WRITE_PRD_NODE_NO_NAME,
)
from metagpt.config import CONFIG
from metagpt.const import (
@ -123,7 +124,8 @@ class WritePRD(Action):
# logger.info(rsp)
project_name = CONFIG.project_name if CONFIG.project_name else ""
context = CONTEXT_TEMPLATE.format(requirements=requirements, project_name=project_name)
node = await WRITE_PRD_NODE.fill(context=context, llm=self.llm) # schema=schema
write_prd_node = WRITE_PRD_NODE if not project_name else WRITE_PRD_NODE_NO_NAME
node = await write_prd_node.fill(context=context, llm=self.llm) # schema=schema
await self._rename_workspace(node)
return node

View file

@ -34,7 +34,7 @@ ORIGINAL_REQUIREMENTS = ActionNode(
PROJECT_NAME = ActionNode(
key="Project Name",
expected_type=str,
instruction="Name the project using snake case style, like 'game_2048' or 'simple_crm'.",
instruction="According to the content of \"Original Requirements,\" name the project using snake case style , like 'game_2048' or 'simple_crm.",
example="game_2048",
)
@ -141,7 +141,6 @@ NODES = [
LANGUAGE,
PROGRAMMING_LANGUAGE,
ORIGINAL_REQUIREMENTS,
PROJECT_NAME,
PRODUCT_GOALS,
USER_STORIES,
COMPETITIVE_ANALYSIS,
@ -152,7 +151,8 @@ NODES = [
ANYTHING_UNCLEAR,
]
WRITE_PRD_NODE = ActionNode.from_children("WritePRD", NODES)
WRITE_PRD_NODE = ActionNode.from_children("WritePRD", NODES + [PROJECT_NAME])
WRITE_PRD_NODE_NO_NAME = ActionNode.from_children("WritePRD", NODES)
WP_ISSUE_TYPE_NODE = ActionNode.from_children("WP_ISSUE_TYPE", [ISSUE_TYPE, REASON])
WP_IS_RELATIVE_NODE = ActionNode.from_children("WP_IS_RELATIVE", [IS_RELATIVE, REASON])

View file

@ -8,5 +8,5 @@ async def google_search(query: str, max_results: int = 6, **kwargs):
:param max_results: The number of search results to retrieve
:return: The web search results in markdown format.
"""
resluts = await SearchEngine().run(query, max_results=max_results, as_string=False)
return "\n".join(f"{i}. [{j['title']}]({j['link']}): {j['snippet']}" for i, j in enumerate(resluts, 1))
results = await SearchEngine().run(query, max_results=max_results, as_string=False)
return "\n".join(f"{i}. [{j['title']}]({j['link']}): {j['snippet']}" for i, j in enumerate(results, 1))

View file

@ -4,6 +4,8 @@
import json
from pathlib import Path
import aiofiles
from metagpt.provider.openai_api import OpenAILLM as GPTAPI
ICL_SAMPLE = """Interface definition:
@ -174,6 +176,9 @@ class UTGenerator:
return doc
for name, prop in node.items():
if not isinstance(prop, dict):
doc += f'{" " * level}{self._para_to_str(node)}\n'
break
doc += f'{" " * level}{self.para_to_str(name, prop, prop_object_required)}\n'
doc += dive_into_object(prop)
if prop["type"] == "array":
@ -202,12 +207,12 @@ class UTGenerator:
return tags
def generate_ut(self, include_tags) -> bool:
async def generate_ut(self, include_tags) -> bool:
"""Generate test case files"""
tags = self.get_tags_mapping()
for tag, paths in tags.items():
if include_tags is None or tag in include_tags:
self._generate_ut(tag, paths)
await self._generate_ut(tag, paths)
return True
def build_api_doc(self, node: dict, path: str, method: str) -> str:
@ -250,21 +255,22 @@ class UTGenerator:
return doc
def _store(self, data, base, folder, fname):
async def _store(self, data, base, folder, fname):
"""Store data in a file."""
file_path = self.get_file_path(Path(base) / folder, fname)
with open(file_path, "w", encoding="utf-8") as file:
file.write(data)
async with aiofiles.open(file_path, mode="w", encoding="utf-8") as file:
await file.write(data)
def ask_gpt_and_save(self, question: str, tag: str, fname: str):
async def ask_gpt_and_save(self, question: str, tag: str, fname: str):
"""Generate questions and store both questions and answers"""
messages = [self.icl_sample, question]
result = self.gpt_msgs_to_code(messages=messages)
result = await self.gpt_msgs_to_code(messages=messages)
self._store(question, self.questions_path, tag, f"{fname}.txt")
self._store(result, self.ut_py_path, tag, f"{fname}.py")
await self._store(question, self.questions_path, tag, f"{fname}.txt")
data = result.get("code", "") if result else ""
await self._store(data, self.ut_py_path, tag, f"{fname}.py")
def _generate_ut(self, tag, paths):
async def _generate_ut(self, tag, paths):
"""Process the structure under a data path
Args:
@ -276,13 +282,13 @@ class UTGenerator:
summary = node["summary"]
question = self.template_prefix
question += self.build_api_doc(node, path, method)
self.ask_gpt_and_save(question, tag, summary)
await self.ask_gpt_and_save(question, tag, summary)
async def gpt_msgs_to_code(self, messages: list) -> str:
"""Choose based on different calling methods"""
result = ""
if self.chatgpt_method == "API":
result = await GPTAPI().aask_code(msgs=messages)
result = await GPTAPI().aask_code(messages=messages)
return result

View file

@ -6,7 +6,7 @@
from __future__ import annotations
import importlib
from typing import Any, Callable, Coroutine, Literal, overload
from typing import Any, Callable, Coroutine, overload
from metagpt.config import CONFIG
from metagpt.tools import WebBrowserEngineType
@ -46,12 +46,3 @@ class WebBrowserEngine:
async def run(self, url: str, *urls: str) -> WebPage | list[WebPage]:
return await self.run_func(url, *urls)
if __name__ == "__main__":
import fire
async def main(url: str, *urls: str, engine_type: Literal["playwright", "selenium"] = "playwright", **kwargs):
return await WebBrowserEngine(engine=WebBrowserEngineType(engine_type), **kwargs).run(url, *urls)
fire.Fire(main)

View file

@ -142,12 +142,3 @@ async def _log_stream(sr, log_func):
_install_lock: asyncio.Lock = None
_install_cache = set()
if __name__ == "__main__":
import fire
async def main(url: str, *urls: str, browser_type: str = "chromium", **kwargs):
return await PlaywrightWrapper(browser_type=browser_type, **kwargs).run(url, *urls)
fire.Fire(main)

View file

@ -118,12 +118,3 @@ def _gen_get_driver_func(browser_type, *args, executable_path=None):
return WebDriver(options=deepcopy(options), service=Service(executable_path=executable_path))
return _get_driver
if __name__ == "__main__":
import fire
async def main(url: str, *urls: str, browser_type: str = "chrome", **kwargs):
return await SeleniumWrapper(browser_type=browser_type, **kwargs).run(url, *urls)
fire.Fire(main)

View file

@ -51,7 +51,7 @@ def check_cmd_exists(command) -> int:
def require_python_version(req_version: Tuple) -> bool:
if not (2 <= len(req_version) <= 3):
raise ValueError("req_version should be (3, 9) or (3, 10, 13)")
return True if sys.version_info > req_version else False
return bool(sys.version_info > req_version)
class OutputParser:
@ -454,7 +454,7 @@ def general_after_log(i: "loguru.Logger", sec_format: str = "%0.3f") -> typing.C
return log_it
def read_json_file(json_file: str, encoding=None) -> list[Any]:
def read_json_file(json_file: str, encoding="utf-8") -> list[Any]:
if not Path(json_file).exists():
raise FileNotFoundError(f"json_file: {json_file} not exist, return []")

View file

@ -14,7 +14,6 @@ from typing import Set
import aiofiles
from metagpt.config import CONFIG
from metagpt.utils.common import aread
from metagpt.utils.exceptions import handle_exception
@ -86,7 +85,7 @@ class DependencyFile:
if persist:
await self.load()
root = CONFIG.git_repo.workdir
root = self._filename.parent
try:
key = Path(filename).relative_to(root)
except ValueError:

View file

@ -81,10 +81,11 @@ class FileRepository:
:return: List of changed dependency filenames or paths.
"""
dependencies = await self.get_dependency(filename=filename)
changed_files = self.changed_files
changed_files = set(self.changed_files.keys())
changed_dependent_files = set()
for df in dependencies:
if df in changed_files.keys():
rdf = Path(df).relative_to(self._relative_path)
if str(rdf) in changed_files:
changed_dependent_files.add(df)
return changed_dependent_files

View file

@ -17,7 +17,6 @@ from git.repo import Repo
from git.repo.fun import is_git_dir
from gitignore_parser import parse_gitignore
from metagpt.const import DEFAULT_WORKSPACE_ROOT
from metagpt.logs import logger
from metagpt.utils.dependency_file import DependencyFile
from metagpt.utils.file_repository import FileRepository
@ -271,20 +270,3 @@ class GitRepository:
continue
files.append(filename)
return files
if __name__ == "__main__":
path = DEFAULT_WORKSPACE_ROOT / "git"
path.mkdir(exist_ok=True, parents=True)
repo = GitRepository()
repo.open(path, auto_init=True)
repo.filter_gitignore(filenames=["snake_game/snake_game/__pycache__", "snake_game/snake_game/game.py"])
changes = repo.changed_files
print(changes)
repo.add_change(changes)
print(repo.status)
repo.commit("test")
print(repo.status)
repo.delete_repository()

View file

@ -13,7 +13,6 @@ from pathlib import Path
import aiofiles
from metagpt.config import CONFIG
from metagpt.const import METAGPT_ROOT
from metagpt.logs import logger
from metagpt.utils.common import check_cmd_exists
@ -146,9 +145,3 @@ sequenceDiagram
S-->>SE: return summary
SE-->>M: return summary
"""
if __name__ == "__main__":
loop = asyncio.new_event_loop()
result = loop.run_until_complete(mermaid_to_file(MMC1, METAGPT_ROOT / f"{CONFIG.mermaid_engine}/1"))
result = loop.run_until_complete(mermaid_to_file(MMC2, METAGPT_ROOT / f"{CONFIG.mermaid_engine}/2"))
loop.close()

View file

@ -1,219 +1,67 @@
# !/usr/bin/python3
# -*- coding: utf-8 -*-
# @Author: Hui
# @Desc: { redis client }
# @Date: 2022/11/28 10:12
import json
"""
@Time : 2023/12/27
@Author : mashenquan
@File : redis.py
"""
import traceback
from datetime import timedelta
from enum import Enum
from typing import Awaitable, Callable, Dict, Optional, Union
from redis import asyncio as aioredis
import aioredis # https://aioredis.readthedocs.io/en/latest/getting-started/
from metagpt.config import CONFIG
from metagpt.logs import logger
class RedisTypeEnum(Enum):
"""Redis 数据类型"""
String = "String"
List = "List"
Hash = "Hash"
Set = "Set"
ZSet = "ZSet"
def make_url(
dialect: str,
*,
user: Optional[str] = None,
password: Optional[str] = None,
host: Optional[str] = None,
port: Optional[Union[str, int]] = None,
name: Optional[Union[str, int]] = None,
) -> str:
url_parts = [f"{dialect}://"]
if user or password:
if user:
url_parts.append(user)
if password:
url_parts.append(f":{password}")
url_parts.append("@")
if not host and not dialect.startswith("sqlite"):
host = "127.0.0.1"
if host:
url_parts.append(f"{host}")
if port:
url_parts.append(f":{port}")
# 比如redis可能传入0
if name is not None:
url_parts.append(f"/{name}")
return "".join(url_parts)
class RedisAsyncClient(aioredis.Redis):
"""异步的客户端
例子::
rdb = RedisAsyncClient()
print(rdb.url)
Args:
host: 服务器地址
port: 服务器端口
user: 用户名
db: 数据库
password: 密码
decode_responses: 字符串输入被编码成utf8存储在Redis里了而取出来的时候还是被编码后的bytes需要显示的decode才能变成字符串
health_check_interval: 定时检测连接防止出现ConnectionErrors (104, Connection reset by peer)
"""
def __init__(
self,
host: str = "localhost",
port: int = 6379,
db: int = 0,
password: str = None,
decode_responses=True,
health_check_interval=10,
socket_connect_timeout=5,
retry_on_timeout=True,
socket_keepalive=True,
**kwargs,
):
super().__init__(
host=host,
port=port,
db=db,
password=password,
decode_responses=decode_responses,
health_check_interval=health_check_interval,
socket_connect_timeout=socket_connect_timeout,
retry_on_timeout=retry_on_timeout,
socket_keepalive=socket_keepalive,
**kwargs,
)
self.url = make_url("redis", host=host, port=port, name=db, password=password)
class RedisCacheInfo(object):
"""统一缓存信息类"""
def __init__(self, key, timeout: Union[int, timedelta] = timedelta(seconds=60), data_type=RedisTypeEnum.String):
"""
缓存信息类初始化
Args:
key: 缓存的key
timeout: 缓存过期时间, 单位秒
data_type: 缓存采用的数据结构 (不传并不影响用于标记业务采用的是什么数据结构)
"""
self.key = key
self.timeout = timeout
self.data_type = data_type
def __str__(self):
return f"cache key {self.key} timeout {self.timeout}s"
class RedisManager:
client: RedisAsyncClient = None
@classmethod
def init_redis_conn(cls, host, port, password, db):
"""初始化redis 连接"""
if cls.client is None:
cls.client = RedisAsyncClient(host=host, port=port, password=password, db=db)
@classmethod
async def set_with_cache_info(cls, redis_cache_info: RedisCacheInfo, value):
"""
根据 RedisCacheInfo 设置 Redis 缓存
:param redis_cache_info: RedisCacheInfo缓存信息对象
:param value: 缓存的值
:return:
"""
await cls.client.setex(redis_cache_info.key, redis_cache_info.timeout, value)
@classmethod
async def get_with_cache_info(cls, redis_cache_info: RedisCacheInfo):
"""
根据 RedisCacheInfo 获取 Redis 缓存
:param redis_cache_info: RedisCacheInfo 缓存信息对象
:return:
"""
cache_info = await cls.client.get(redis_cache_info.key)
return cache_info
@classmethod
async def del_with_cache_info(cls, redis_cache_info: RedisCacheInfo):
"""
根据 RedisCacheInfo 删除 Redis 缓存
:param redis_cache_info: RedisCacheInfo缓存信息对象
:return:
"""
await cls.client.delete(redis_cache_info.key)
@staticmethod
async def get_or_set_cache(cache_info: RedisCacheInfo, fetch_data_func: Callable[[], Awaitable[dict]]) -> dict:
"""
获取缓存数据如果缓存不存在则从提供的函数中获取并设置缓存
当前版本仅支持 json 形式的 string 格式数据
"""
serialized_data = await RedisManager.get_with_cache_info(cache_info)
if serialized_data:
return json.loads(serialized_data)
data = await fetch_data_func()
try:
serialized_data = json.dumps(data)
await RedisManager.set_with_cache_info(cache_info, serialized_data)
except Exception as e:
logger.warning(f"数据 {data} 通过 json 进行序列化缓存失败:{e}")
return data
@classmethod
def is_valid(cls):
return cls.client is not None
class Redis:
def __init__(self, conf: Dict = None):
def __init__(self):
self._client = None
async def _connect(self, force=False):
if self._client and not force:
return True
if not CONFIG.REDIS_HOST or not CONFIG.REDIS_PORT or CONFIG.REDIS_DB is None or CONFIG.REDIS_PASSWORD is None:
return False
try:
host = CONFIG.REDIS_HOST
port = int(CONFIG.REDIS_PORT)
pwd = CONFIG.REDIS_PASSWORD
db = CONFIG.REDIS_DB
RedisManager.init_redis_conn(host=host, port=port, password=pwd, db=db)
self._client = await aioredis.from_url(
f"redis://{CONFIG.REDIS_HOST}:{CONFIG.REDIS_PORT}",
username=CONFIG.REDIS_USER,
password=CONFIG.REDIS_PASSWORD,
db=CONFIG.REDIS_DB,
)
return True
except Exception as e:
logger.warning(f"Redis initialization has failed:{e}")
return False
def is_valid(self):
return RedisManager.is_valid()
async def get(self, key: str) -> str:
if not self.is_valid() or not key:
async def get(self, key: str) -> bytes:
if not await self._connect() or not key:
return None
try:
v = await RedisManager.get_with_cache_info(redis_cache_info=RedisCacheInfo(key=key))
v = await self._client.get(key)
return v
except Exception as e:
logger.exception(f"{e}, stack:{traceback.format_exc()}")
return None
async def set(self, key: str, data: str, timeout_sec: int):
if not self.is_valid() or not key:
async def set(self, key: str, data: str, timeout_sec: int = None):
if not await self._connect() or not key:
return
try:
await RedisManager.set_with_cache_info(
redis_cache_info=RedisCacheInfo(key=key, timeout=timeout_sec), value=data
)
ex = None if not timeout_sec else timedelta(seconds=timeout_sec)
await self._client.set(key, data, ex=ex)
except Exception as e:
logger.exception(f"{e}, stack:{traceback.format_exc()}")
async def close(self):
if not self._client:
return
await self._client.close()
self._client = None
@property
def is_valid(self):
return bool(self._client)

View file

@ -136,8 +136,7 @@ class S3:
pathname = path / object_name
try:
async with aiofiles.open(str(pathname), mode="wb") as file:
if format == BASE64_FORMAT:
data = base64.b64decode(data)
data = base64.b64decode(data) if format == BASE64_FORMAT else data.encode(encoding="utf-8")
await file.write(data)
bucket = CONFIG.S3_BUCKET

View file

@ -9,4 +9,7 @@ google
httplib2
google_api_python_client
selenium
webdriver_manager
webdriver_manager
pyppeteer
#aioboto3~=11.3.0 # Used by metagpt/utils/s3.py
aioredis~=2.0.1 # Used by metagpt/utils/redis.py

View file

@ -46,8 +46,8 @@ semantic-kernel==0.4.0.dev0
wrapt==1.15.0
#aiohttp_jinja2
# azure-cognitiveservices-speech~=1.31.0 # Used by metagpt/tools/azure_tts.py
#aioboto3~=11.3.0
#redis==4.3.5
#aioboto3~=11.3.0 # Used by metagpt/utils/s3.py
aioredis~=2.0.1 # Used by metagpt/utils/redis.py
websocket-client==1.6.2
aiofiles==23.2.1
gitpython==3.1.40

View file

@ -0,0 +1,57 @@
## Implementation approach
We will use the Pygame library to create the game interface and handle user input. The game logic will be implemented using Python classes and data structures.
## File list
- main.py
- game.py
## Data structures and interfaces
classDiagram
class Game {
-grid: List[List[int]]
-score: int
-game_over: bool
+__init__()
+reset_game()
+move(direction: str)
+is_game_over() bool
+get_empty_cells() List[Tuple[int, int]]
+add_new_tile()
+get_score() int
}
class UI {
-game: Game
+__init__(game: Game)
+draw_grid()
+draw_score()
+draw_game_over()
+handle_input()
}
Game --> UI
## Program call flow
sequenceDiagram
participant M as Main
participant G as Game
participant U as UI
M->>G: reset_game()
M->>U: draw_grid()
M->>U: draw_score()
M->>U: handle_input()
U->>G: move(direction)
G->>G: add_new_tile()
G->>U: draw_grid()
G->>U: draw_score()
G->>U: draw_game_over()
G->>G: is_game_over()
G->>G: get_empty_cells()
G->>G: get_score()
## Anything UNCLEAR
...

View file

@ -0,0 +1,63 @@
## Language
en_us
## Programming Language
Python
## Original Requirements
write a 2048 game
## Project Name
game_2048
## Product Goals
- Create an addictive and engaging gaming experience
- Ensure smooth performance and responsiveness
- Offer customizable game settings and features
## User Stories
- As a player, I want to be able to play the game on different devices and screen sizes
- As a gamer, I want to be challenged with increasing difficulty levels as I progress
- As a user, I want to be able to undo my last move in the game
## Competitive Analysis
- 2048 Game by Gabriele Cirulli: Popular and addictive, lacks advanced customization options
## Competitive Quadrant Chart
quadrantChart
title "Engagement and Customization of 2048 Games"
x-axis "Low Customization" --> "High Customization"
y-axis "Low Engagement" --> "High Engagement"
quadrant-1 "Enhance Customization"
quadrant-2 "Improve Engagement"
quadrant-3 "Maintain Customization, Enhance Engagement"
quadrant-4 "Highly Engaging and Customizable"
"2048 Game by Gabriele Cirulli": [0.4, 0.7]
"Our Target Product": [0.6, 0.8]
## Requirement Analysis
The product should provide an intuitive and seamless gaming experience with customizable features to enhance user engagement.
## Requirement Pool
- ['P0', 'Implement game logic and user interface']
- ['P1', 'Incorporate multiple difficulty levels and scoring system']
- ['P2', 'Integrate customizable game settings and undo feature']
## UI Design draft
The UI should have a clean and modern design with intuitive game controls and customizable settings for difficulty levels and game themes.
## Anything UNCLEAR
...

View file

@ -0,0 +1,39 @@
### Code Review All
#### game.py
- The `add_new_tile` function should handle the case when there are no empty cells left.
- The `move` function should update the score when tiles are merged.
#### main.py
- The game loop does not handle the game over condition properly. It should break the loop when the game is over.
### Call flow
```mermaid
sequenceDiagram
participant M as Main
participant G as Game
participant U as UI
M->>G: reset_game()
M->>U: draw_grid()
M->>U: draw_score()
M->>U: handle_input()
U->>G: move(direction)
G->>G: add_new_tile()
G->>U: draw_grid()
G->>U: draw_score()
G->>U: draw_game_over()
G->>G: is_game_over()
G->>G: get_empty_cells()
G->>G: get_score()
```
### Summary
The code implements the 2048 game using Python classes and data structures. The Pygame library is used for the game interface and user input handling. The `game.py` file contains the `Game` class and related functions for game logic, while the `main.py` file initializes the game and UI.
### TODOs
```python
{
"game.py": "Add handling for no empty cells in add_new_tile function, Update score in move function",
"main.py": "Handle game over condition in the game loop"
}
```

File diff suppressed because it is too large Load diff

View file

@ -41,7 +41,7 @@ async def test_run():
{"content": "The one who eaten a poison apple.", "role": "assistant"},
],
"knowledge": [{"content": "tulin is a scientist."}],
"last_talk": "what's apple?",
"last_talk": "Do you have a poison apple?",
},
"language": "English",
"agent_description": "chatterbox",

View file

@ -12,11 +12,16 @@ from pathlib import Path
import pytest
import requests
from metagpt.config import CONFIG
@pytest.mark.asyncio
async def test_hello():
script_pathname = Path(__file__).parent / "../../../metagpt/tools/hello.py"
process = subprocess.Popen(["python", str(script_pathname)])
workdir = Path(__file__).parent.parent.parent.parent
script_pathname = workdir / "metagpt/tools/hello.py"
env = CONFIG.new_environ()
env["PYTHONPATH"] = str(workdir) + ":" + env.get("PYTHONPATH", "")
process = subprocess.Popen(["python", str(script_pathname)], cwd=workdir, env=env)
await asyncio.sleep(5)
url = "http://localhost:8082/openapi/greeting/dave"

View file

@ -12,11 +12,16 @@ from pathlib import Path
import pytest
import requests
from metagpt.config import CONFIG
@pytest.mark.asyncio
async def test_oas2_svc():
script_pathname = Path(__file__).parent / "../../../metagpt/tools/metagpt_oas3_api_svc.py"
process = subprocess.Popen(["python", str(script_pathname)])
workdir = Path(__file__).parent.parent.parent.parent
script_pathname = workdir / "metagpt/tools/metagpt_oas3_api_svc.py"
env = CONFIG.new_environ()
env["PYTHONPATH"] = str(workdir) + ":" + env.get("PYTHONPATH", "")
process = subprocess.Popen(["python", str(script_pathname)], cwd=str(workdir), env=env)
await asyncio.sleep(5)
url = "http://localhost:8080/openapi/greeting/dave"

View file

@ -9,34 +9,34 @@ from pathlib import Path
import pytest
from metagpt.const import API_QUESTIONS_PATH, SWAGGER_PATH, UT_PY_PATH
from metagpt.config import CONFIG
from metagpt.const import API_QUESTIONS_PATH, UT_PY_PATH
from metagpt.tools.ut_writer import YFT_PROMPT_PREFIX, UTGenerator
class TestUTWriter:
def test_api_to_ut_sample(self):
@pytest.mark.asyncio
async def test_api_to_ut_sample(self):
# Prerequisites
swagger_file = SWAGGER_PATH / "yft_swaggerApi.json"
swagger_file = Path(__file__).parent / "../../data/ut_writer/yft_swaggerApi.json"
assert swagger_file.exists()
assert CONFIG.OPENAI_API_KEY and CONFIG.OPENAI_API_KEY != "YOUR_API_KEY"
assert not CONFIG.OPENAI_API_TYPE
assert CONFIG.OPENAI_API_MODEL
tags = ["测试"] # "智能合同导入", "律师审查", "ai合同审查", "草拟合同&律师在线审查", "合同审批", "履约管理", "签约公司"]
tags = ["测试", "作业"]
# 这里在文件中手动加入了两个测试标签的API
utg = UTGenerator(
swagger_file=swagger_file,
swagger_file=str(swagger_file),
ut_py_path=UT_PY_PATH,
questions_path=API_QUESTIONS_PATH,
template_prefix=YFT_PROMPT_PREFIX,
)
ret = utg.generate_ut(include_tags=tags)
ret = await utg.generate_ut(include_tags=tags)
# 后续加入对文件生成内容与数量的检验
assert ret
pathname = Path(__file__).with_suffix(".tmp")
utg.ask_gpt_and_save(question="question", tag="tag", fname=str(pathname))
assert pathname.exists()
pathname.unlink(missing_ok=True)
if __name__ == "__main__":
pytest.main([__file__, "-s"])

View file

@ -6,11 +6,13 @@
@File : test_common.py
@Modified by: mashenquan, 2023/11/21. Add unit tests.
"""
import importlib
import os
import platform
from pathlib import Path
from typing import Any, Set
import aiofiles
import pytest
from pydantic import BaseModel
@ -18,7 +20,20 @@ from metagpt.actions import RunCode
from metagpt.const import get_metagpt_root
from metagpt.roles.tutorial_assistant import TutorialAssistant
from metagpt.schema import Message
from metagpt.utils.common import any_to_str, any_to_str_set, check_cmd_exists
from metagpt.utils.common import (
NoMoneyException,
OutputParser,
any_to_str,
any_to_str_set,
check_cmd_exists,
concat_namespace,
import_class_inst,
parse_recipient,
print_members,
read_file_block,
read_json_file,
require_python_version,
)
class TestGetProjectRoot:
@ -96,6 +111,65 @@ class TestGetProjectRoot:
else:
assert result != 0
@pytest.mark.parametrize(("filename", "want"), [("1.md", "File list"), ("2.md", "Language"), ("3.md", "# TODOs")])
@pytest.mark.asyncio
async def test_parse_data_exception(self, filename, want):
pathname = Path(__file__).parent.parent.parent / "data/output_parser" / filename
assert pathname.exists()
async with aiofiles.open(str(pathname), mode="r") as reader:
data = await reader.read()
result = OutputParser.parse_data(data=data)
assert want in result
@pytest.mark.parametrize(
("ver", "want", "err"), [((1, 2, 3, 4), False, True), ((2, 3, 9), True, False), ((3, 10, 18), False, False)]
)
def test_require_python_version(self, ver, want, err):
try:
res = require_python_version(ver)
assert res == want
except ValueError:
assert err
def test_no_money_exception(self):
val = NoMoneyException(3.10)
assert "Amount required:" in str(val)
@pytest.mark.parametrize("module_path", ["tests.metagpt.utils.test_common"])
def test_print_members(self, module_path):
module = importlib.import_module(module_path)
with pytest.raises(Exception) as info:
print_members(module)
assert info is None
@pytest.mark.parametrize(
("words", "want"), [("", ""), ("## Send To: Engineer", "Engineer"), ("Send To: \nNone", "None")]
)
def test_parse_recipient(self, words, want):
res = parse_recipient(words)
assert want == res
def test_concat_namespace(self):
assert concat_namespace("a", "b", "c") == "a:b:c"
assert concat_namespace("a", "b", "c", "e") == "a:b:c:e"
assert concat_namespace("a", "b", "c", "e", "f") == "a:b:c:e:f"
def test_read_json_file(self):
assert read_json_file(str(Path(__file__).parent / "../../data/ut_writer/yft_swaggerApi.json"), encoding="utf-8")
with pytest.raises(FileNotFoundError):
read_json_file("not_exists_file", encoding="utf-8")
with pytest.raises(ValueError):
read_json_file(__file__, encoding="utf-8")
def test_import_class_inst(self):
rc = import_class_inst("RunCode", "metagpt.actions.run_code", name="X")
assert rc.name == "X"
@pytest.mark.asyncio
async def test_read_file_block(self):
assert await read_file_block(filename=__file__, lineno=6, end_lineno=6) == "@File : test_common.py\n"
if __name__ == "__main__":
pytest.main([__file__, "-s"])

View file

@ -0,0 +1,32 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
@Time : 2023/12/27
@Author : mashenquan
@File : test_cost_manager.py
"""
import pytest
from metagpt.utils.cost_manager import CostManager
def test_cost_manager():
cm = CostManager(total_budget=20)
cm.update_cost(prompt_tokens=1000, completion_tokens=100, model="gpt-4-1106-preview")
assert cm.get_total_prompt_tokens() == 1000
assert cm.get_total_completion_tokens() == 100
assert cm.get_total_cost() == 0.013
cm.update_cost(prompt_tokens=100, completion_tokens=10, model="gpt-4-1106-preview")
assert cm.get_total_prompt_tokens() == 1100
assert cm.get_total_completion_tokens() == 110
assert cm.get_total_cost() == 0.0143
cost = cm.get_costs()
assert cost
assert cost.total_cost == cm.get_total_cost()
assert cost.total_prompt_tokens == cm.get_total_prompt_tokens()
assert cost.total_completion_tokens == cm.get_total_completion_tokens()
assert cost.total_budget == 20
if __name__ == "__main__":
pytest.main([__file__, "-s"])

View file

@ -23,3 +23,13 @@ async def test_write_and_read_file(root_path: Path, filename: str, content: byte
assert root_path / filename == full_file_name
file_data = await File.read(full_file_name)
assert file_data.decode("utf-8") == content
@pytest.mark.asyncio
async def test_read_chunk():
val = await File.read(file_path=__file__, chunk_size=10)
assert val
if __name__ == "__main__":
pytest.main([__file__, "-s"])

View file

@ -33,20 +33,22 @@ async def test_file_repo():
assert file_repo.workdir == full_path
assert file_repo.workdir.exists()
await file_repo.save("a.txt", "AAA")
await file_repo.save("b.txt", "BBB", ["a.txt"])
await file_repo.save("b.txt", "BBB", [str(full_path / "a.txt"), f"{file_repo_path}/c.txt"])
doc = await file_repo.get("a.txt")
assert "AAA" == doc.content
doc = await file_repo.get("b.txt")
assert "BBB" == doc.content
assert {"a.txt"} == await file_repo.get_dependency("b.txt")
assert {f"{file_repo_path}/a.txt", f"{file_repo_path}/c.txt"} == await file_repo.get_dependency("b.txt")
assert {"a.txt": ChangeType.UNTRACTED, "b.txt": ChangeType.UNTRACTED} == file_repo.changed_files
assert {"a.txt"} == await file_repo.get_changed_dependency("b.txt")
assert {f"{file_repo_path}/a.txt"} == await file_repo.get_changed_dependency("b.txt")
await file_repo.save("d/e.txt", "EEE")
assert ["d/e.txt"] == file_repo.get_change_dir_files("d")
assert set(file_repo.all_files) == {"a.txt", "b.txt", "d/e.txt"}
await file_repo.delete("d/e.txt")
await file_repo.delete("d/e.txt") # delete twice
assert set(file_repo.all_files) == {"a.txt", "b.txt"}
await file_repo.delete("b.txt")
assert set(file_repo.all_files) == {"a.txt"}
git_repo.delete_repository()

View file

@ -61,6 +61,11 @@ async def test_git():
assert repo.status
exist_dir = repo.workdir / "git4"
exist_dir.mkdir(parents=True, exist_ok=True)
repo.rename_root("git4")
assert repo.workdir.name == "git4"
repo.delete_repository()
assert not local_path.exists()
@ -80,6 +85,9 @@ async def test_git1():
all_files = repo1.get_files(relative_path=".", filter_ignored=True)
assert "__pycache__/a.pyc" not in all_files
res = repo1.filter_gitignore(filenames=["snake_game/snake_game/__pycache__", "snake_game/snake_game/game.py"])
assert res == ["snake_game/snake_game/game.py"]
repo1.delete_repository()
assert not local_path.exists()
@ -99,5 +107,20 @@ async def test_dependency_file():
assert not dependancy_file.exists
@pytest.mark.asyncio
async def test_git_open():
local_path = Path(__file__).parent / "git3"
local_path.mkdir(exist_ok=True, parents=True)
assert not GitRepository.is_git_dir(local_path)
repo = GitRepository()
repo.open(local_path, auto_init=False)
assert not repo.is_valid
assert not repo.status
assert not repo.workdir
shutil.rmtree(path=str(local_path), ignore_errors=True)
if __name__ == "__main__":
pytest.main([__file__, "-s"])

View file

@ -0,0 +1,39 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
@Time : 2023/12/27
@Author : mashenquan
@File : test_mermaid.py
"""
import pytest
from metagpt.config import CONFIG
from metagpt.utils.common import check_cmd_exists
from metagpt.utils.mermaid import MMC1, MMC2, mermaid_to_file
@pytest.mark.asyncio
@pytest.mark.parametrize("engine", ["nodejs", "playwright", "pyppeteer", "ink"])
async def test_mermaid(engine):
# Prerequisites
# npm install -g @mermaid-js/mermaid-cli
assert check_cmd_exists("npm") == 0
assert CONFIG.PYPPETEER_EXECUTABLE_PATH
CONFIG.mermaid_engine = engine
save_to = CONFIG.git_repo.workdir / f"{CONFIG.mermaid_engine}/1"
await mermaid_to_file(MMC1, save_to)
for ext in [".pdf", ".svg", ".png"]:
assert save_to.with_suffix(ext).exists()
save_to.with_suffix(ext).unlink(missing_ok=True)
save_to = CONFIG.git_repo.workdir / f"{CONFIG.mermaid_engine}/2"
await mermaid_to_file(MMC2, save_to)
for ext in [".pdf", ".svg", ".png"]:
assert save_to.with_suffix(ext).exists()
save_to.with_suffix(ext).unlink(missing_ok=True)
if __name__ == "__main__":
pytest.main([__file__, "-s"])

View file

@ -0,0 +1,32 @@
#!/usr/bin/env python3
# _*_ coding: utf-8 _*_
"""
@Time : 2023/12/27
@Author : mashenquan
@File : test_redis.py
"""
import pytest
from metagpt.config import CONFIG
from metagpt.utils.redis import Redis
@pytest.mark.asyncio
async def test_redis():
# Prerequisites
assert CONFIG.REDIS_HOST and CONFIG.REDIS_HOST != "YOUR_REDIS_HOST"
assert CONFIG.REDIS_PORT and CONFIG.REDIS_PORT != "YOUR_REDIS_PORT"
# assert CONFIG.REDIS_USER
assert CONFIG.REDIS_PASSWORD is not None and CONFIG.REDIS_PASSWORD != "YOUR_REDIS_PASSWORD"
assert CONFIG.REDIS_DB is not None and CONFIG.REDIS_DB != "YOUR_REDIS_DB_INDEX, str, 0-based"
conn = Redis()
assert not conn.is_valid
await conn.set("test", "test", timeout_sec=0)
assert await conn.get("test") == b"test"
await conn.close()
if __name__ == "__main__":
pytest.main([__file__, "-s"])

View file

@ -0,0 +1,54 @@
#!/usr/bin/env python3
# _*_ coding: utf-8 _*_
"""
@Time : 2023/12/27
@Author : mashenquan
@File : test_s3.py
"""
import uuid
from pathlib import Path
import aiofiles
import pytest
from metagpt.config import CONFIG
from metagpt.utils.s3 import S3
@pytest.mark.asyncio
async def test_s3():
# Prerequisites
assert CONFIG.S3_ACCESS_KEY and CONFIG.S3_ACCESS_KEY != "YOUR_S3_ACCESS_KEY"
assert CONFIG.S3_SECRET_KEY and CONFIG.S3_SECRET_KEY != "YOUR_S3_SECRET_KEY"
assert CONFIG.S3_ENDPOINT_URL and CONFIG.S3_ENDPOINT_URL != "YOUR_S3_ENDPOINT_URL"
# assert CONFIG.S3_SECURE: true # true/false
assert CONFIG.S3_BUCKET and CONFIG.S3_BUCKET != "YOUR_S3_BUCKET"
conn = S3()
assert conn.is_valid
object_name = "unittest.bak"
await conn.upload_file(bucket=CONFIG.S3_BUCKET, local_path=__file__, object_name=object_name)
pathname = (Path(__file__).parent / uuid.uuid4().hex).with_suffix(".bak")
pathname.unlink(missing_ok=True)
await conn.download_file(bucket=CONFIG.S3_BUCKET, object_name=object_name, local_path=str(pathname))
assert pathname.exists()
url = await conn.get_object_url(bucket=CONFIG.S3_BUCKET, object_name=object_name)
assert url
bin_data = await conn.get_object(bucket=CONFIG.S3_BUCKET, object_name=object_name)
assert bin_data
async with aiofiles.open(__file__, mode="r", encoding="utf-8") as reader:
data = await reader.read()
res = await conn.cache(data, ".bak", "script")
assert "http" in res
@pytest.mark.asyncio
async def test_s3_no_error():
conn = S3()
conn.auth_config["aws_secret_access_key"] = ""
res = await conn.cache("ABC", ".bak", "script")
assert not res
if __name__ == "__main__":
pytest.main([__file__, "-s"])